mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-19 07:01:02 +01:00
bc1f92d7f5
- Drop samples and return true from remotewrite.TryPush() at fast path when all the remote storage systems are configured with the disabled on-disk queue, every in-memory queue is full and -remoteWrite.dropSamplesOnOverload is set to true. This case is quite common, so it should be optimized. Previously additional CPU time was spent on per-remoteWriteCtx relabeling and other processing in this case. - Properly count the number of dropped samples inside remoteWriteCtx.pushInternalTrackDropped(). Previously dropped samples were counted only if -remoteWrite.dropSamplesOnOverload flag is set. In reality, the samples are dropped when they couldn't be sent to the queue because in-memory queue is full and on-disk queue is disabled. The remoteWriteCtx.pushInternalTrackDropped() function is called by streaming aggregation for pushing the aggregated data to the remote storage. Streaming aggregation cannot wait until the remote storage processes pending data, so it drops aggregated samples in this case. - Clarify the description for -remoteWrite.disableOnDiskQueue command-line flag at -help output, so it is clear that this flag can be set individually per each -remoteWrite.url. - Make the -remoteWrite.dropSamplesOnOverload flag global. If some of the remote storage systems are configured with the disabled on-disk queue, then there is no sense in keeping samples on some of these systems, while dropping samples on the remaining systems, since this will result in global stall on the remote storage system with the disabled on-disk queue and with the -remoteWrite.dropSamplesOnOverload=false flag. vmagent will always return false from remotewrite.TryPush() in this case. This will result in infinite duplicate samples written to the remaining remote storage systems. That's why the -remoteWrite.dropSamplesOnOverload is forcibly set to true if more than one -remoteWrite.disableOnDiskQueue flag is set. This allows proceeding with newly scraped / pushed samples by sending them to the remaining remote storage systems, while dropping them on overloaded systems with the -remoteWrite.disableOnDiskQueue flag set. - Verify that the remoteWriteCtx.TryPush() returns true in the TestRemoteWriteContext_TryPush_ImmutableTimeseries test. - Mention in vmagent docs that the -remoteWrite.disableOnDiskQueue command-line flag can be set individually per each -remoteWrite.url. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6248 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065
1031 lines
38 KiB
Go
1031 lines
38 KiB
Go
package remotewrite
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"path/filepath"
|
|
"slices"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/cespare/xxhash/v2"
|
|
)
|
|
|
|
var (
|
|
remoteWriteURLs = flagutil.NewArrayString("remoteWrite.url", "Remote storage URL to write data to. It must support either VictoriaMetrics remote write protocol "+
|
|
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
|
|
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. "+
|
|
"The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set")
|
|
enableMultitenantHandlers = flag.Bool("enableMultitenantHandlers", false, "Whether to process incoming data via multitenant insert handlers according to "+
|
|
"https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+
|
|
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ."+
|
|
"See https://docs.victoriametrics.com/vmagent/#multitenancy for details")
|
|
|
|
shardByURL = flag.Bool("remoteWrite.shardByURL", false, "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . "+
|
|
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . "+
|
|
"See also -remoteWrite.shardByURLReplicas")
|
|
shardByURLReplicas = flag.Int("remoteWrite.shardByURLReplicas", 1, "How many copies of data to make among remote storage systems enumerated via -remoteWrite.url "+
|
|
"when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages")
|
|
shardByURLLabels = flagutil.NewArrayString("remoteWrite.shardByURL.labels", "Optional list of labels, which must be used for sharding outgoing samples "+
|
|
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
|
|
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels")
|
|
shardByURLIgnoreLabels = flagutil.NewArrayString("remoteWrite.shardByURL.ignoreLabels", "Optional list of labels, which must be ignored when sharding outgoing samples "+
|
|
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
|
|
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels")
|
|
|
|
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . "+
|
|
"See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue")
|
|
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
|
|
"Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.")
|
|
queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
|
|
"isn't enough for sending high volume of collected data to remote storage. "+
|
|
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage")
|
|
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
|
|
"It is hidden by default, since it can contain sensitive info such as auth key")
|
|
maxPendingBytesPerURL = flagutil.NewArrayBytes("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
|
|
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. "+
|
|
"Buffered data is stored in ~500MB chunks. It is recommended to set the value for this flag to a multiple of the block size 500MB. "+
|
|
"Disk usage is unlimited if the value is set to 0")
|
|
significantFigures = flagutil.NewArrayInt("remoteWrite.significantFigures", 0, "The number of significant figures to leave in metric values before writing them "+
|
|
"to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. "+
|
|
"This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits")
|
|
roundDigits = flagutil.NewArrayInt("remoteWrite.roundDigits", 100, "Round metric values to this number of decimal digits after the point before "+
|
|
"writing them to remote storage. "+
|
|
"Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. "+
|
|
"By default, digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. "+
|
|
"This option may be used for improving data compression for the stored metrics")
|
|
sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. `+
|
|
`This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+
|
|
`For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+
|
|
`Enabled sorting for labels can slow down ingestion performance a bit`)
|
|
maxHourlySeries = flag.Int("remoteWrite.maxHourlySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last hour. "+
|
|
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter")
|
|
maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
|
|
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter")
|
|
maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. "+
|
|
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit")
|
|
|
|
disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
|
|
"when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. "+
|
|
"See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload")
|
|
dropSamplesOnOverload = flag.Bool("remoteWrite.dropSamplesOnOverload", false, "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+
|
|
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence")
|
|
)
|
|
|
|
var (
|
|
// rwctxs contains statically populated entries when -remoteWrite.url is specified.
|
|
rwctxs []*remoteWriteCtx
|
|
|
|
// Data without tenant id is written to defaultAuthToken if -enableMultitenantHandlers is specified.
|
|
defaultAuthToken = &auth.Token{}
|
|
|
|
// ErrQueueFullHTTPRetry must be returned when TryPush() returns false.
|
|
ErrQueueFullHTTPRetry = &httpserver.ErrorWithStatusCode{
|
|
Err: fmt.Errorf("remote storage systems cannot keep up with the data ingestion rate; retry the request later " +
|
|
"or remove -remoteWrite.disableOnDiskQueue from vmagent command-line flags, so it could save pending data to -remoteWrite.tmpDataPath; " +
|
|
"see https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence"),
|
|
StatusCode: http.StatusTooManyRequests,
|
|
}
|
|
|
|
// disableOnDiskQueueAll is set to true if all -remoteWrite.url were configured to disable persistent queue via -remoteWrite.disableOnDiskQueue
|
|
disableOnDiskQueueAll bool
|
|
|
|
// dropSamplesOnFailureGlobal is set to true if -remoteWrite.dropSamplesOnOverload is set or if multiple -remoteWrite.disableOnDiskQueue options are set.
|
|
dropSamplesOnFailureGlobal bool
|
|
)
|
|
|
|
// MultitenancyEnabled returns true if -enableMultitenantHandlers is specified.
|
|
func MultitenancyEnabled() bool {
|
|
return *enableMultitenantHandlers
|
|
}
|
|
|
|
// Contains the current relabelConfigs.
|
|
var allRelabelConfigs atomic.Pointer[relabelConfigs]
|
|
|
|
// Contains the current global stream aggregators.
|
|
var sasGlobal atomic.Pointer[streamaggr.Aggregators]
|
|
|
|
// Contains the current global deduplicator.
|
|
var deduplicatorGlobal *streamaggr.Deduplicator
|
|
|
|
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
|
|
// since it may lead to high memory usage due to big number of buffers.
|
|
var maxQueues = cgroup.AvailableCPUs() * 16
|
|
|
|
const persistentQueueDirname = "persistent-queue"
|
|
|
|
// InitSecretFlags must be called after flag.Parse and before any logging.
|
|
func InitSecretFlags() {
|
|
if !*showRemoteWriteURL {
|
|
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
|
|
flagutil.RegisterSecretFlag("remoteWrite.url")
|
|
}
|
|
}
|
|
|
|
var (
|
|
shardByURLLabelsMap map[string]struct{}
|
|
shardByURLIgnoreLabelsMap map[string]struct{}
|
|
)
|
|
|
|
// Init initializes remotewrite.
|
|
//
|
|
// It must be called after flag.Parse().
|
|
//
|
|
// Stop must be called for graceful shutdown.
|
|
func Init() {
|
|
if len(*remoteWriteURLs) == 0 {
|
|
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
|
|
}
|
|
if *maxHourlySeries > 0 {
|
|
hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour)
|
|
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
|
|
return float64(hourlySeriesLimiter.MaxItems())
|
|
})
|
|
_ = metrics.NewGauge(`vmagent_hourly_series_limit_current_series`, func() float64 {
|
|
return float64(hourlySeriesLimiter.CurrentItems())
|
|
})
|
|
}
|
|
if *maxDailySeries > 0 {
|
|
dailySeriesLimiter = bloomfilter.NewLimiter(*maxDailySeries, 24*time.Hour)
|
|
_ = metrics.NewGauge(`vmagent_daily_series_limit_max_series`, func() float64 {
|
|
return float64(dailySeriesLimiter.MaxItems())
|
|
})
|
|
_ = metrics.NewGauge(`vmagent_daily_series_limit_current_series`, func() float64 {
|
|
return float64(dailySeriesLimiter.CurrentItems())
|
|
})
|
|
}
|
|
|
|
if *queues > maxQueues {
|
|
*queues = maxQueues
|
|
}
|
|
if *queues <= 0 {
|
|
*queues = 1
|
|
}
|
|
|
|
if len(*shardByURLLabels) > 0 && len(*shardByURLIgnoreLabels) > 0 {
|
|
logger.Fatalf("-remoteWrite.shardByURL.labels and -remoteWrite.shardByURL.ignoreLabels cannot be set simultaneously; " +
|
|
"see https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages")
|
|
}
|
|
shardByURLLabelsMap = newMapFromStrings(*shardByURLLabels)
|
|
shardByURLIgnoreLabelsMap = newMapFromStrings(*shardByURLIgnoreLabels)
|
|
|
|
initLabelsGlobal()
|
|
|
|
// Register SIGHUP handler for config reload before loadRelabelConfigs.
|
|
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
|
sighupCh := procutil.NewSighupChan()
|
|
|
|
rcs, err := loadRelabelConfigs()
|
|
if err != nil {
|
|
logger.Fatalf("cannot load relabel configs: %s", err)
|
|
}
|
|
allRelabelConfigs.Store(rcs)
|
|
relabelConfigSuccess.Set(1)
|
|
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
|
|
|
|
sasFile, sasOpts := getStreamAggrOpts(-1)
|
|
if sasFile != "" {
|
|
sas, err := newStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
|
|
if err != nil {
|
|
logger.Fatalf("cannot initialize stream aggregators from -streamAggr.config=%q: %s", sasFile, err)
|
|
}
|
|
sasGlobal.Store(sas)
|
|
} else if sasOpts.DedupInterval > 0 {
|
|
deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesDropFailed, sasOpts.DedupInterval, sasOpts.DropInputLabels, sasOpts.Alias)
|
|
}
|
|
|
|
if len(*remoteWriteURLs) > 0 {
|
|
rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
|
}
|
|
|
|
disableOnDiskQueues := []bool(*disableOnDiskQueue)
|
|
disableOnDiskQueueAll = !slices.Contains(disableOnDiskQueues, false)
|
|
|
|
// Samples must be dropped if multiple -remoteWrite.disableOnDiskQueue options are configured and at least a single is set to true.
|
|
// In this case it is impossible to prevent from sending many duplicates of samples passed to TryPush() to all the configured -remoteWrite.url
|
|
// if these samples couldn't be sent to the -remoteWrite.url with the disabled persistent queue. So it is better sending samples
|
|
// to the remaining -remoteWrite.url and dropping them on the blocked queue.
|
|
dropSamplesOnFailureGlobal = *dropSamplesOnOverload || len(disableOnDiskQueues) > 1 && slices.Contains(disableOnDiskQueues, true)
|
|
|
|
dropDanglingQueues()
|
|
|
|
// Start config reloader.
|
|
configReloaderWG.Add(1)
|
|
go func() {
|
|
defer configReloaderWG.Done()
|
|
for {
|
|
select {
|
|
case <-sighupCh:
|
|
case <-configReloaderStopCh:
|
|
return
|
|
}
|
|
reloadRelabelConfigs()
|
|
reloadStreamAggrConfigs()
|
|
}
|
|
}()
|
|
}
|
|
|
|
func dropDanglingQueues() {
|
|
if *keepDanglingQueues {
|
|
return
|
|
}
|
|
// Remove dangling persistent queues, if any.
|
|
// This is required for the case when the number of queues has been changed or URL have been changed.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
|
|
//
|
|
// In case if there were many persistent queues with identical *remoteWriteURLs
|
|
// the queue with the last index will be dropped.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6140
|
|
existingQueues := make(map[string]struct{}, len(rwctxs))
|
|
for _, rwctx := range rwctxs {
|
|
existingQueues[rwctx.fq.Dirname()] = struct{}{}
|
|
}
|
|
|
|
queuesDir := filepath.Join(*tmpDataPath, persistentQueueDirname)
|
|
files := fs.MustReadDir(queuesDir)
|
|
removed := 0
|
|
for _, f := range files {
|
|
dirname := f.Name()
|
|
if _, ok := existingQueues[dirname]; !ok {
|
|
logger.Infof("removing dangling queue %q", dirname)
|
|
fullPath := filepath.Join(queuesDir, dirname)
|
|
fs.MustRemoveAll(fullPath)
|
|
removed++
|
|
}
|
|
}
|
|
if removed > 0 {
|
|
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs))
|
|
}
|
|
}
|
|
|
|
func reloadRelabelConfigs() {
|
|
relabelConfigReloads.Inc()
|
|
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
|
rcs, err := loadRelabelConfigs()
|
|
if err != nil {
|
|
relabelConfigReloadErrors.Inc()
|
|
relabelConfigSuccess.Set(0)
|
|
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
|
|
return
|
|
}
|
|
allRelabelConfigs.Store(rcs)
|
|
relabelConfigSuccess.Set(1)
|
|
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
|
|
logger.Infof("successfully reloaded relabel configs")
|
|
}
|
|
|
|
var (
|
|
relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
|
|
relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
|
|
relabelConfigSuccess = metrics.NewGauge(`vmagent_relabel_config_last_reload_successful`, nil)
|
|
relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
|
|
)
|
|
|
|
func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
|
if len(urls) == 0 {
|
|
logger.Panicf("BUG: urls must be non-empty")
|
|
}
|
|
|
|
maxInmemoryBlocks := memory.Allowed() / len(urls) / *maxRowsPerBlock / 100
|
|
if maxInmemoryBlocks / *queues > 100 {
|
|
// There is no much sense in keeping higher number of blocks in memory,
|
|
// since this means that the producer outperforms consumer and the queue
|
|
// will continue growing. It is better storing the queue to file.
|
|
maxInmemoryBlocks = 100 * *queues
|
|
}
|
|
if maxInmemoryBlocks < 2 {
|
|
maxInmemoryBlocks = 2
|
|
}
|
|
rwctxs := make([]*remoteWriteCtx, len(urls))
|
|
for i, remoteWriteURLRaw := range urls {
|
|
remoteWriteURL, err := url.Parse(remoteWriteURLRaw)
|
|
if err != nil {
|
|
logger.Fatalf("invalid -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
|
}
|
|
sanitizedURL := fmt.Sprintf("%d:secret-url", i+1)
|
|
if at != nil {
|
|
// Construct full remote_write url for the given tenant according to https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
|
|
remoteWriteURL.Path = fmt.Sprintf("%s/insert/%d:%d/prometheus/api/v1/write", remoteWriteURL.Path, at.AccountID, at.ProjectID)
|
|
sanitizedURL = fmt.Sprintf("%s:%d:%d", sanitizedURL, at.AccountID, at.ProjectID)
|
|
}
|
|
if *showRemoteWriteURL {
|
|
sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL)
|
|
}
|
|
rwctxs[i] = newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
|
|
}
|
|
return rwctxs
|
|
}
|
|
|
|
var (
|
|
configReloaderStopCh = make(chan struct{})
|
|
configReloaderWG sync.WaitGroup
|
|
)
|
|
|
|
// StartIngestionRateLimiter starts ingestion rate limiter.
|
|
//
|
|
// Ingestion rate limiter must be started before Init() call.
|
|
//
|
|
// StopIngestionRateLimiter must be called before Stop() call in order to unblock all the callers
|
|
// to ingestion rate limiter. Otherwise deadlock may occur at Stop() call.
|
|
func StartIngestionRateLimiter() {
|
|
if *maxIngestionRate <= 0 {
|
|
return
|
|
}
|
|
ingestionRateLimitReached := metrics.NewCounter(`vmagent_max_ingestion_rate_limit_reached_total`)
|
|
ingestionRateLimiterStopCh = make(chan struct{})
|
|
ingestionRateLimiter = ratelimiter.New(int64(*maxIngestionRate), ingestionRateLimitReached, ingestionRateLimiterStopCh)
|
|
}
|
|
|
|
// StopIngestionRateLimiter stops ingestion rate limiter.
|
|
func StopIngestionRateLimiter() {
|
|
if ingestionRateLimiterStopCh == nil {
|
|
return
|
|
}
|
|
close(ingestionRateLimiterStopCh)
|
|
ingestionRateLimiterStopCh = nil
|
|
}
|
|
|
|
var (
|
|
ingestionRateLimiter *ratelimiter.RateLimiter
|
|
ingestionRateLimiterStopCh chan struct{}
|
|
)
|
|
|
|
// Stop stops remotewrite.
|
|
//
|
|
// It is expected that nobody calls TryPush during and after the call to this func.
|
|
func Stop() {
|
|
close(configReloaderStopCh)
|
|
configReloaderWG.Wait()
|
|
|
|
sasGlobal.Load().MustStop()
|
|
if deduplicatorGlobal != nil {
|
|
deduplicatorGlobal.MustStop()
|
|
deduplicatorGlobal = nil
|
|
}
|
|
|
|
for _, rwctx := range rwctxs {
|
|
rwctx.MustStop()
|
|
}
|
|
rwctxs = nil
|
|
|
|
if sl := hourlySeriesLimiter; sl != nil {
|
|
sl.MustStop()
|
|
}
|
|
if sl := dailySeriesLimiter; sl != nil {
|
|
sl.MustStop()
|
|
}
|
|
}
|
|
|
|
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
|
|
//
|
|
// PushDropSamplesOnFailure drops wr samples if they cannot be sent to -remoteWrite.url by any reason.
|
|
//
|
|
// PushDropSamplesOnFailure can modify wr contents.
|
|
func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
|
_ = tryPush(at, wr, true)
|
|
}
|
|
|
|
// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url
|
|
//
|
|
// TryPush can modify wr contents, so the caller must re-initialize wr before calling TryPush() after unsuccessful attempt.
|
|
// TryPush may send partial data from wr on unsuccessful attempt, so repeated call for the same wr may send the data multiple times.
|
|
//
|
|
// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false.
|
|
func TryPush(at *auth.Token, wr *prompbmarshal.WriteRequest) bool {
|
|
return tryPush(at, wr, dropSamplesOnFailureGlobal)
|
|
}
|
|
|
|
func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnFailure bool) bool {
|
|
tss := wr.Timeseries
|
|
|
|
if at == nil && MultitenancyEnabled() {
|
|
// Write data to default tenant if at isn't set when multitenancy is enabled.
|
|
at = defaultAuthToken
|
|
}
|
|
|
|
var tenantRctx *relabelCtx
|
|
if at != nil {
|
|
// Convert at to (vm_account_id, vm_project_id) labels.
|
|
tenantRctx = getRelabelCtx()
|
|
defer putRelabelCtx(tenantRctx)
|
|
}
|
|
rowsCount := getRowsCount(tss)
|
|
|
|
// Quick check whether writes to configured remote storage systems are blocked.
|
|
// This allows saving CPU time spent on relabeling and block compression
|
|
// if some of remote storage systems cannot keep up with the data ingestion rate.
|
|
// this shortcut is only applicable if all remote writes have disableOnDiskQueue = true
|
|
if disableOnDiskQueueAll {
|
|
skippedQueues := 0
|
|
for _, rwctx := range rwctxs {
|
|
if rwctx.fq.IsWriteBlocked() {
|
|
rwctx.pushFailures.Inc()
|
|
if !forceDropSamplesOnFailure {
|
|
return false
|
|
}
|
|
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
|
skippedQueues++
|
|
}
|
|
}
|
|
if skippedQueues == len(rwctxs) {
|
|
// All the queues are skipped because they are blocked and dropSamplesOnFailure is set to true.
|
|
// Return true to the caller, so it doesn't re-send the samples again.
|
|
return true
|
|
}
|
|
}
|
|
|
|
var rctx *relabelCtx
|
|
rcs := allRelabelConfigs.Load()
|
|
pcsGlobal := rcs.global
|
|
if pcsGlobal.Len() > 0 {
|
|
rctx = getRelabelCtx()
|
|
defer putRelabelCtx(rctx)
|
|
}
|
|
globalRowsPushedBeforeRelabel.Add(rowsCount)
|
|
maxSamplesPerBlock := *maxRowsPerBlock
|
|
// Allow up to 10x of labels per each block on average.
|
|
maxLabelsPerBlock := 10 * maxSamplesPerBlock
|
|
|
|
sas := sasGlobal.Load()
|
|
|
|
for len(tss) > 0 {
|
|
// Process big tss in smaller blocks in order to reduce the maximum memory usage
|
|
samplesCount := 0
|
|
labelsCount := 0
|
|
i := 0
|
|
for i < len(tss) {
|
|
samplesCount += len(tss[i].Samples)
|
|
labelsCount += len(tss[i].Samples) * len(tss[i].Labels)
|
|
i++
|
|
if samplesCount >= maxSamplesPerBlock || labelsCount >= maxLabelsPerBlock {
|
|
break
|
|
}
|
|
}
|
|
|
|
ingestionRateLimiter.Register(samplesCount)
|
|
|
|
tssBlock := tss
|
|
if i < len(tss) {
|
|
tssBlock = tss[:i]
|
|
tss = tss[i:]
|
|
} else {
|
|
tss = nil
|
|
}
|
|
if tenantRctx != nil {
|
|
tenantRctx.tenantToLabels(tssBlock, at.AccountID, at.ProjectID)
|
|
}
|
|
if rctx != nil {
|
|
rowsCountBeforeRelabel := getRowsCount(tssBlock)
|
|
tssBlock = rctx.applyRelabeling(tssBlock, pcsGlobal)
|
|
rowsCountAfterRelabel := getRowsCount(tssBlock)
|
|
rowsDroppedByGlobalRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel)
|
|
}
|
|
sortLabelsIfNeeded(tssBlock)
|
|
tssBlock = limitSeriesCardinality(tssBlock)
|
|
if sas.IsEnabled() {
|
|
matchIdxs := matchIdxsPool.Get()
|
|
matchIdxs.B = sas.Push(tssBlock, matchIdxs.B)
|
|
if !*streamAggrGlobalKeepInput {
|
|
tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput)
|
|
}
|
|
matchIdxsPool.Put(matchIdxs)
|
|
} else if deduplicatorGlobal != nil {
|
|
deduplicatorGlobal.Push(tssBlock)
|
|
tssBlock = tssBlock[:0]
|
|
}
|
|
if !tryPushBlockToRemoteStorages(tssBlock, forceDropSamplesOnFailure) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func pushToRemoteStoragesDropFailed(tss []prompbmarshal.TimeSeries) {
|
|
if tryPushBlockToRemoteStorages(tss, true) {
|
|
return
|
|
}
|
|
}
|
|
|
|
func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
|
if len(tssBlock) == 0 {
|
|
// Nothing to push
|
|
return true
|
|
}
|
|
|
|
if len(rwctxs) == 1 {
|
|
// Fast path - just push data to the configured single remote storage
|
|
return rwctxs[0].TryPush(tssBlock, forceDropSamplesOnFailure)
|
|
}
|
|
|
|
// We need to push tssBlock to multiple remote storages.
|
|
// This is either sharding or replication depending on -remoteWrite.shardByURL command-line flag value.
|
|
if *shardByURL && *shardByURLReplicas < len(rwctxs) {
|
|
// Shard tssBlock samples among rwctxs.
|
|
replicas := *shardByURLReplicas
|
|
if replicas <= 0 {
|
|
replicas = 1
|
|
}
|
|
return tryShardingBlockAmongRemoteStorages(tssBlock, replicas, forceDropSamplesOnFailure)
|
|
}
|
|
|
|
// Replicate tssBlock samples among rwctxs.
|
|
// Push tssBlock to remote storage systems in parallel in order to reduce
|
|
// the time needed for sending the data to multiple remote storage systems.
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(rwctxs))
|
|
var anyPushFailed atomic.Bool
|
|
for _, rwctx := range rwctxs {
|
|
go func(rwctx *remoteWriteCtx) {
|
|
defer wg.Done()
|
|
if !rwctx.TryPush(tssBlock, forceDropSamplesOnFailure) {
|
|
anyPushFailed.Store(true)
|
|
}
|
|
}(rwctx)
|
|
}
|
|
wg.Wait()
|
|
return !anyPushFailed.Load()
|
|
}
|
|
|
|
func tryShardingBlockAmongRemoteStorages(tssBlock []prompbmarshal.TimeSeries, replicas int, forceDropSamplesOnFailure bool) bool {
|
|
x := getTSSShards(len(rwctxs))
|
|
defer putTSSShards(x)
|
|
|
|
shards := x.shards
|
|
tmpLabels := promutils.GetLabels()
|
|
for _, ts := range tssBlock {
|
|
hashLabels := ts.Labels
|
|
if len(shardByURLLabelsMap) > 0 {
|
|
hashLabels = tmpLabels.Labels[:0]
|
|
for _, label := range ts.Labels {
|
|
if _, ok := shardByURLLabelsMap[label.Name]; ok {
|
|
hashLabels = append(hashLabels, label)
|
|
}
|
|
}
|
|
tmpLabels.Labels = hashLabels
|
|
} else if len(shardByURLIgnoreLabelsMap) > 0 {
|
|
hashLabels = tmpLabels.Labels[:0]
|
|
for _, label := range ts.Labels {
|
|
if _, ok := shardByURLIgnoreLabelsMap[label.Name]; !ok {
|
|
hashLabels = append(hashLabels, label)
|
|
}
|
|
}
|
|
tmpLabels.Labels = hashLabels
|
|
}
|
|
h := getLabelsHash(hashLabels)
|
|
idx := h % uint64(len(shards))
|
|
i := 0
|
|
for {
|
|
shards[idx] = append(shards[idx], ts)
|
|
i++
|
|
if i >= replicas {
|
|
break
|
|
}
|
|
idx++
|
|
if idx >= uint64(len(shards)) {
|
|
idx = 0
|
|
}
|
|
}
|
|
}
|
|
promutils.PutLabels(tmpLabels)
|
|
|
|
// Push sharded samples to remote storage systems in parallel in order to reduce
|
|
// the time needed for sending the data to multiple remote storage systems.
|
|
var wg sync.WaitGroup
|
|
var anyPushFailed atomic.Bool
|
|
for i, rwctx := range rwctxs {
|
|
shard := shards[i]
|
|
if len(shard) == 0 {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
|
|
defer wg.Done()
|
|
if !rwctx.TryPush(tss, forceDropSamplesOnFailure) {
|
|
anyPushFailed.Store(true)
|
|
}
|
|
}(rwctx, shard)
|
|
}
|
|
wg.Wait()
|
|
return !anyPushFailed.Load()
|
|
}
|
|
|
|
type tssShards struct {
|
|
shards [][]prompbmarshal.TimeSeries
|
|
}
|
|
|
|
func getTSSShards(n int) *tssShards {
|
|
v := tssShardsPool.Get()
|
|
if v == nil {
|
|
v = &tssShards{}
|
|
}
|
|
x := v.(*tssShards)
|
|
if cap(x.shards) < n {
|
|
x.shards = make([][]prompbmarshal.TimeSeries, n)
|
|
}
|
|
x.shards = x.shards[:n]
|
|
return x
|
|
}
|
|
|
|
func putTSSShards(x *tssShards) {
|
|
shards := x.shards
|
|
for i := range shards {
|
|
clear(shards[i])
|
|
shards[i] = shards[i][:0]
|
|
}
|
|
tssShardsPool.Put(x)
|
|
}
|
|
|
|
var tssShardsPool sync.Pool
|
|
|
|
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
|
|
func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) {
|
|
if !*sortLabels {
|
|
return
|
|
}
|
|
for i := range tss {
|
|
promrelabel.SortLabels(tss[i].Labels)
|
|
}
|
|
}
|
|
|
|
func limitSeriesCardinality(tss []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries {
|
|
if hourlySeriesLimiter == nil && dailySeriesLimiter == nil {
|
|
return tss
|
|
}
|
|
dst := make([]prompbmarshal.TimeSeries, 0, len(tss))
|
|
for i := range tss {
|
|
labels := tss[i].Labels
|
|
h := getLabelsHash(labels)
|
|
if hourlySeriesLimiter != nil && !hourlySeriesLimiter.Add(h) {
|
|
hourlySeriesLimitRowsDropped.Add(len(tss[i].Samples))
|
|
logSkippedSeries(labels, "-remoteWrite.maxHourlySeries", hourlySeriesLimiter.MaxItems())
|
|
continue
|
|
}
|
|
if dailySeriesLimiter != nil && !dailySeriesLimiter.Add(h) {
|
|
dailySeriesLimitRowsDropped.Add(len(tss[i].Samples))
|
|
logSkippedSeries(labels, "-remoteWrite.maxDailySeries", dailySeriesLimiter.MaxItems())
|
|
continue
|
|
}
|
|
dst = append(dst, tss[i])
|
|
}
|
|
return dst
|
|
}
|
|
|
|
var (
|
|
hourlySeriesLimiter *bloomfilter.Limiter
|
|
dailySeriesLimiter *bloomfilter.Limiter
|
|
|
|
hourlySeriesLimitRowsDropped = metrics.NewCounter(`vmagent_hourly_series_limit_rows_dropped_total`)
|
|
dailySeriesLimitRowsDropped = metrics.NewCounter(`vmagent_daily_series_limit_rows_dropped_total`)
|
|
)
|
|
|
|
func getLabelsHash(labels []prompbmarshal.Label) uint64 {
|
|
bb := labelsHashBufPool.Get()
|
|
b := bb.B[:0]
|
|
for _, label := range labels {
|
|
b = append(b, label.Name...)
|
|
b = append(b, label.Value...)
|
|
}
|
|
h := xxhash.Sum64(b)
|
|
bb.B = b
|
|
labelsHashBufPool.Put(bb)
|
|
return h
|
|
}
|
|
|
|
var labelsHashBufPool bytesutil.ByteBufferPool
|
|
|
|
func logSkippedSeries(labels []prompbmarshal.Label, flagName string, flagValue int) {
|
|
select {
|
|
case <-logSkippedSeriesTicker.C:
|
|
// Do not use logger.WithThrottler() here, since this will increase CPU usage
|
|
// because every call to logSkippedSeries will result to a call to labelsToString.
|
|
logger.Warnf("skip series %s because %s=%d reached", labelsToString(labels), flagName, flagValue)
|
|
default:
|
|
}
|
|
}
|
|
|
|
var logSkippedSeriesTicker = time.NewTicker(5 * time.Second)
|
|
|
|
func labelsToString(labels []prompbmarshal.Label) string {
|
|
var b []byte
|
|
b = append(b, '{')
|
|
for i, label := range labels {
|
|
b = append(b, label.Name...)
|
|
b = append(b, '=')
|
|
b = strconv.AppendQuote(b, label.Value)
|
|
if i+1 < len(labels) {
|
|
b = append(b, ',')
|
|
}
|
|
}
|
|
b = append(b, '}')
|
|
return string(b)
|
|
}
|
|
|
|
var (
|
|
globalRowsPushedBeforeRelabel = metrics.NewCounter("vmagent_remotewrite_global_rows_pushed_before_relabel_total")
|
|
rowsDroppedByGlobalRelabel = metrics.NewCounter("vmagent_remotewrite_global_relabel_metrics_dropped_total")
|
|
)
|
|
|
|
type remoteWriteCtx struct {
|
|
idx int
|
|
fq *persistentqueue.FastQueue
|
|
c *client
|
|
|
|
sas atomic.Pointer[streamaggr.Aggregators]
|
|
deduplicator *streamaggr.Deduplicator
|
|
|
|
streamAggrKeepInput bool
|
|
streamAggrDropInput bool
|
|
|
|
pss []*pendingSeries
|
|
pssNextIdx atomic.Uint64
|
|
|
|
rowsPushedAfterRelabel *metrics.Counter
|
|
rowsDroppedByRelabel *metrics.Counter
|
|
|
|
pushFailures *metrics.Counter
|
|
rowsDroppedOnPushFailure *metrics.Counter
|
|
}
|
|
|
|
func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx {
|
|
// strip query params, otherwise changing params resets pq
|
|
pqURL := *remoteWriteURL
|
|
pqURL.RawQuery = ""
|
|
pqURL.Fragment = ""
|
|
h := xxhash.Sum64([]byte(pqURL.String()))
|
|
queuePath := filepath.Join(*tmpDataPath, persistentQueueDirname, fmt.Sprintf("%d_%016X", argIdx+1, h))
|
|
maxPendingBytes := maxPendingBytesPerURL.GetOptionalArg(argIdx)
|
|
if maxPendingBytes != 0 && maxPendingBytes < persistentqueue.DefaultChunkFileSize {
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4195
|
|
logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize)
|
|
maxPendingBytes = persistentqueue.DefaultChunkFileSize
|
|
}
|
|
|
|
isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx)
|
|
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
|
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
|
return float64(fq.GetPendingBytes())
|
|
})
|
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
|
return float64(fq.GetInmemoryQueueLen())
|
|
})
|
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queue_blocked{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
|
if fq.IsWriteBlocked() {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
|
|
var c *client
|
|
switch remoteWriteURL.Scheme {
|
|
case "http", "https":
|
|
c = newHTTPClient(argIdx, remoteWriteURL.String(), sanitizedURL, fq, *queues)
|
|
default:
|
|
logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL)
|
|
}
|
|
c.init(argIdx, *queues, sanitizedURL)
|
|
|
|
// Initialize pss
|
|
sf := significantFigures.GetOptionalArg(argIdx)
|
|
rd := roundDigits.GetOptionalArg(argIdx)
|
|
pssLen := *queues
|
|
if n := cgroup.AvailableCPUs(); pssLen > n {
|
|
// There is no sense in running more than availableCPUs concurrent pendingSeries,
|
|
// since every pendingSeries can saturate up to a single CPU.
|
|
pssLen = n
|
|
}
|
|
pss := make([]*pendingSeries, pssLen)
|
|
for i := range pss {
|
|
pss[i] = newPendingSeries(fq, c.useVMProto, sf, rd)
|
|
}
|
|
|
|
rwctx := &remoteWriteCtx{
|
|
idx: argIdx,
|
|
fq: fq,
|
|
c: c,
|
|
pss: pss,
|
|
|
|
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
|
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
|
|
|
pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
|
rowsDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_samples_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
|
|
}
|
|
|
|
// Initialize sas
|
|
sasFile, sasOpts := getStreamAggrOpts(argIdx)
|
|
if sasFile != "" {
|
|
sas, err := newStreamAggrConfig(argIdx, rwctx.pushInternalTrackDropped)
|
|
if err != nil {
|
|
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
|
|
}
|
|
rwctx.sas.Store(sas)
|
|
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
|
|
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx)
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
|
|
} else if sasOpts.DedupInterval > 0 {
|
|
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, sasOpts.DedupInterval, sasOpts.DropInputLabels, sasOpts.Alias)
|
|
}
|
|
|
|
return rwctx
|
|
}
|
|
|
|
func (rwctx *remoteWriteCtx) MustStop() {
|
|
// sas and deduplicator must be stopped before rwctx is closed
|
|
// because they can write pending series to rwctx.pss if there are any
|
|
sas := rwctx.sas.Swap(nil)
|
|
sas.MustStop()
|
|
|
|
if rwctx.deduplicator != nil {
|
|
rwctx.deduplicator.MustStop()
|
|
rwctx.deduplicator = nil
|
|
}
|
|
|
|
for _, ps := range rwctx.pss {
|
|
ps.MustStop()
|
|
}
|
|
rwctx.idx = 0
|
|
rwctx.pss = nil
|
|
rwctx.fq.UnblockAllReaders()
|
|
rwctx.c.MustStop()
|
|
rwctx.c = nil
|
|
|
|
rwctx.fq.MustClose()
|
|
rwctx.fq = nil
|
|
|
|
rwctx.rowsPushedAfterRelabel = nil
|
|
rwctx.rowsDroppedByRelabel = nil
|
|
}
|
|
|
|
// TryPush sends tss series to the configured remote write endpoint
|
|
//
|
|
// TryPush doesn't modify tss, so tss can be passed concurrently to TryPush across distinct rwctx instances.
|
|
func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
|
var rctx *relabelCtx
|
|
var v *[]prompbmarshal.TimeSeries
|
|
defer func() {
|
|
if rctx == nil {
|
|
return
|
|
}
|
|
*v = prompbmarshal.ResetTimeSeries(tss)
|
|
tssPool.Put(v)
|
|
putRelabelCtx(rctx)
|
|
}()
|
|
|
|
// Apply relabeling
|
|
rcs := allRelabelConfigs.Load()
|
|
pcs := rcs.perURL[rwctx.idx]
|
|
if pcs.Len() > 0 {
|
|
rctx = getRelabelCtx()
|
|
// Make a copy of tss before applying relabeling in order to prevent
|
|
// from affecting time series for other remoteWrite.url configs.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
|
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
|
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
|
|
tss = append(*v, tss...)
|
|
rowsCountBeforeRelabel := getRowsCount(tss)
|
|
tss = rctx.applyRelabeling(tss, pcs)
|
|
rowsCountAfterRelabel := getRowsCount(tss)
|
|
rwctx.rowsDroppedByRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel)
|
|
}
|
|
rowsCount := getRowsCount(tss)
|
|
rwctx.rowsPushedAfterRelabel.Add(rowsCount)
|
|
|
|
// Apply stream aggregation or deduplication if they are configured
|
|
sas := rwctx.sas.Load()
|
|
if sas.IsEnabled() {
|
|
matchIdxs := matchIdxsPool.Get()
|
|
matchIdxs.B = sas.Push(tss, matchIdxs.B)
|
|
if !rwctx.streamAggrKeepInput {
|
|
if rctx == nil {
|
|
rctx = getRelabelCtx()
|
|
// Make a copy of tss before dropping aggregated series
|
|
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
|
|
tss = append(*v, tss...)
|
|
}
|
|
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
|
|
}
|
|
matchIdxsPool.Put(matchIdxs)
|
|
} else if rwctx.deduplicator != nil {
|
|
rwctx.deduplicator.Push(tss)
|
|
return true
|
|
}
|
|
|
|
// Try pushing tss to remote storage
|
|
if rwctx.tryPushInternal(tss) {
|
|
return true
|
|
}
|
|
|
|
// Couldn't push tss to remote storage
|
|
rwctx.pushFailures.Inc()
|
|
if forceDropSamplesOnFailure {
|
|
rowsCount := getRowsCount(tss)
|
|
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
var matchIdxsPool bytesutil.ByteBufferPool
|
|
|
|
func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, dropInput bool) []prompbmarshal.TimeSeries {
|
|
dst := src[:0]
|
|
if !dropInput {
|
|
for i, match := range matchIdxs {
|
|
if match == 1 {
|
|
continue
|
|
}
|
|
dst = append(dst, src[i])
|
|
}
|
|
}
|
|
tail := src[len(dst):]
|
|
clear(tail)
|
|
return dst
|
|
}
|
|
|
|
func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSeries) {
|
|
if rwctx.tryPushInternal(tss) {
|
|
return
|
|
}
|
|
if !rwctx.fq.IsPersistentQueueDisabled() {
|
|
logger.Panicf("BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set")
|
|
}
|
|
rwctx.pushFailures.Inc()
|
|
rowsCount := getRowsCount(tss)
|
|
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
|
}
|
|
|
|
func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) bool {
|
|
var rctx *relabelCtx
|
|
var v *[]prompbmarshal.TimeSeries
|
|
defer func() {
|
|
if rctx == nil {
|
|
return
|
|
}
|
|
*v = prompbmarshal.ResetTimeSeries(tss)
|
|
tssPool.Put(v)
|
|
putRelabelCtx(rctx)
|
|
}()
|
|
|
|
if len(labelsGlobal) > 0 {
|
|
// Make a copy of tss before adding extra labels in order to prevent
|
|
// from affecting time series for other remoteWrite.url configs.
|
|
rctx = getRelabelCtx()
|
|
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
|
|
tss = append(*v, tss...)
|
|
rctx.appendExtraLabels(tss, labelsGlobal)
|
|
}
|
|
|
|
pss := rwctx.pss
|
|
idx := rwctx.pssNextIdx.Add(1) % uint64(len(pss))
|
|
|
|
return pss[idx].TryPush(tss)
|
|
}
|
|
|
|
var tssPool = &sync.Pool{
|
|
New: func() any {
|
|
a := []prompbmarshal.TimeSeries{}
|
|
return &a
|
|
},
|
|
}
|
|
|
|
func getRowsCount(tss []prompbmarshal.TimeSeries) int {
|
|
rowsCount := 0
|
|
for _, ts := range tss {
|
|
rowsCount += len(ts.Samples)
|
|
}
|
|
return rowsCount
|
|
}
|
|
|
|
func newMapFromStrings(a []string) map[string]struct{} {
|
|
m := make(map[string]struct{}, len(a))
|
|
for _, s := range a {
|
|
m[s] = struct{}{}
|
|
}
|
|
return m
|
|
}
|