From ed523b5bbc48543d8e748a4c3204f50acb333044 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 5 Mar 2024 00:45:22 +0200 Subject: [PATCH] app/{vminsert,vmagent}: allow using -streamAggr.dedupInterval without -streamAggr.config This allows performing online de-duplication of incoming samples --- README.md | 10 +- app/victoria-metrics/main.go | 2 +- app/vmagent/remotewrite/remotewrite.go | 29 ++- app/vminsert/common/insert_ctx.go | 2 +- app/vminsert/common/streamaggr.go | 32 +++- docs/CHANGELOG.md | 3 +- docs/README.md | 15 +- docs/Single-server-VictoriaMetrics.md | 15 +- docs/stream-aggregation.md | 38 ++-- docs/vmagent.md | 26 ++- lib/promscrape/scrapework.go | 2 +- lib/streamaggr/dedup_test.go | 2 +- lib/streamaggr/deduplicator.go | 196 +++++++++++++++++++++ lib/streamaggr/deduplicator_test.go | 49 ++++++ lib/streamaggr/deduplicator_timing_test.go | 21 +++ lib/streamaggr/streamaggr.go | 36 ++-- lib/streamaggr/streamaggr_test.go | 35 ++-- 17 files changed, 433 insertions(+), 80 deletions(-) create mode 100644 lib/streamaggr/deduplicator.go create mode 100644 lib/streamaggr/deduplicator_test.go create mode 100644 lib/streamaggr/deduplicator_timing_test.go diff --git a/README.md b/README.md index 57d57b588..402c580c1 100644 --- a/README.md +++ b/README.md @@ -1810,6 +1810,12 @@ so the de-duplication consistently leaves samples for one `vmagent` instance and from other `vmagent` instances. See [these docs](https://docs.victoriametrics.com/vmagent.html#high-availability) for details. +VictoriaMetrics stores all the ingested samples to disk even if `-dedup.minScrapeInterval` command-line flag is set. +The ingested samples are de-duplicated during [background merges](#storage) and during query execution. +VictoriaMetrics also supports de-duplication during data ingestion before the data is stored to disk, via `-streamAggr.dedupInterval` command-line flag - +see [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication). + + ## Storage VictoriaMetrics buffers the ingested data in memory for up to a second. Then the buffered data is written to in-memory `parts`, @@ -2645,7 +2651,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) -dedup.minScrapeInterval duration - Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling + Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See also -streamAggr.dedupInterval and https://docs.victoriametrics.com/#deduplication -deleteAuthKey value authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries Flag value can be read from the given file when using -deleteAuthKey=file:///abs/path/to/file or -deleteAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -deleteAuthKey=http://host/path or -deleteAuthKey=https://host/path @@ -3101,7 +3107,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -streamAggr.config string Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval duration - Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero + Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication -streamAggr.dropInput Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html -streamAggr.keepInput diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index a98327e39..30dfccaa1 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -31,7 +31,7 @@ var ( "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . "+ "With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing") minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+ - "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling") + "equal to -dedup.minScrapeInterval > 0. See also -streamAggr.dedupInterval and https://docs.victoriametrics.com/#deduplication") dryRun = flag.Bool("dryRun", false, "Whether to check config files without running VictoriaMetrics. The following config files are checked: "+ "-promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. "+ "This can be changed with -promscrape.config.strictParse=false command-line flag") diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index ee29d77ce..8c6b12ca1 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -89,9 +89,8 @@ var ( streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+ "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") - streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated "+ - "by stream aggregation. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+ + "with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication") disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ "when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+ "See also -remoteWrite.dropSamplesOnOverload") @@ -666,7 +665,9 @@ type remoteWriteCtx struct { fq *persistentqueue.FastQueue c *client - sas atomic.Pointer[streamaggr.Aggregators] + sas atomic.Pointer[streamaggr.Aggregators] + deduplicator *streamaggr.Deduplicator + streamAggrKeepInput bool streamAggrDropInput bool @@ -739,9 +740,10 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in // Initialize sas sasFile := streamAggrConfig.GetOptionalArg(argIdx) + dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx) if sasFile != "" { opts := &streamaggr.Options{ - DedupInterval: streamAggrDedupInterval.GetOptionalArg(argIdx), + DedupInterval: dedupInterval, } sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) if err != nil { @@ -752,17 +754,24 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) + } else if dedupInterval > 0 { + rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval) } return rwctx } func (rwctx *remoteWriteCtx) MustStop() { - // sas must be stopped before rwctx is closed + // sas and deduplicator must be stopped before rwctx is closed // because sas can write pending series to rwctx.pss if there are any sas := rwctx.sas.Swap(nil) sas.MustStop() + if rwctx.deduplicator != nil { + rwctx.deduplicator.MustStop() + rwctx.deduplicator = nil + } + for _, ps := range rwctx.pss { ps.MustStop() } @@ -801,7 +810,7 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool { rowsCount := getRowsCount(tss) rwctx.rowsPushedAfterRelabel.Add(rowsCount) - // Apply stream aggregation if any + // Apply stream aggregation or deduplication if they are configured sas := rwctx.sas.Load() if sas != nil { matchIdxs := matchIdxsPool.Get() @@ -816,6 +825,10 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool { tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput) } matchIdxsPool.Put(matchIdxs) + } else if rwctx.deduplicator != nil { + rwctx.deduplicator.Push(tss) + clear(tss) + tss = tss[:0] } // Try pushing the data to remote storage @@ -844,7 +857,7 @@ func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, drop } } tail := src[len(dst):] - _ = prompbmarshal.ResetTimeSeries(tail) + clear(tail) return dst } diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index 27ac496a6..02be393ea 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -142,7 +142,7 @@ func (ctx *InsertCtx) ApplyRelabeling() { // FlushBufs flushes buffered rows to the underlying storage. func (ctx *InsertCtx) FlushBufs() error { sas := sasGlobal.Load() - if sas != nil && !ctx.skipStreamAggr { + if (sas != nil || deduplicator != nil) && !ctx.skipStreamAggr { matchIdxs := matchIdxsPool.Get() matchIdxs.B = ctx.streamAggrCtx.push(ctx.mrs, matchIdxs.B) if !*streamAggrKeepInput { diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index d5e0730c2..dc5402f12 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -27,9 +27,8 @@ var ( streamAggrDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation with -streamAggr.config. "+ "By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+ "See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") - streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated "+ - "by stream aggregation. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . "+ + "See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication") ) var ( @@ -41,7 +40,8 @@ var ( saCfgSuccess = metrics.NewGauge(`vminsert_streamagg_config_last_reload_successful`, nil) saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`) - sasGlobal atomic.Pointer[streamaggr.Aggregators] + sasGlobal atomic.Pointer[streamaggr.Aggregators] + deduplicator *streamaggr.Deduplicator ) // CheckStreamAggrConfig checks config pointed by -stramaggr.config @@ -68,6 +68,9 @@ func InitStreamAggr() { saCfgReloaderStopCh = make(chan struct{}) if *streamAggrConfig == "" { + if *streamAggrDedupInterval > 0 { + deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval) + } return } @@ -80,6 +83,7 @@ func InitStreamAggr() { if err != nil { logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) } + sasGlobal.Store(sas) saCfgSuccess.Set(1) saCfgTimestamp.Set(fasttime.UnixTimestamp()) @@ -133,6 +137,11 @@ func MustStopStreamAggr() { sas := sasGlobal.Swap(nil) sas.MustStop() + + if deduplicator != nil { + deduplicator.MustStop() + deduplicator = nil + } } type streamAggrCtx struct { @@ -210,12 +219,17 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte ctx.buf = buf tss = tss[tssLen:] - matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) - for i := 0; i < len(matchIdxs); i++ { - matchIdxs[i] = 0 - } + sas := sasGlobal.Load() - matchIdxs = sas.Push(tss, matchIdxs) + if sas != nil { + matchIdxs = sas.Push(tss, matchIdxs) + } else if deduplicator != nil { + matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) + for i := range matchIdxs { + matchIdxs[i] = 1 + } + deduplicator.Push(tss) + } ctx.Reset() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2099ea007..0f369d780 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -31,9 +31,10 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). The memory usage reduction is most visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow using `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line flags without the need to specify `-streamAggr.config` and `-remoteWrite.streamAggr.config`. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). -* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `no_align_flush_to_interval` option for disabling time alignment for aggregated data flush. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs, which can be used for `increase` and `total` aggregations when the first sample of every new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) must be ignored. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_flush_timeouts_total` and `vm_streamaggr_dedup_flush_timeouts_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting flush timeouts for stream aggregation states. Expose also `vm_streamaggr_flush_duration_seconds` and `vm_streamaggr_dedup_flush_duration_seconds` [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram) for monitoring the real flush durations of stream aggregation states. diff --git a/docs/README.md b/docs/README.md index 50ed0145c..e5f89e78e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -26,7 +26,8 @@ Documentation for the cluster version of VictoriaMetrics is available [here](htt Learn more about [key concepts](https://docs.victoriametrics.com/keyConcepts.html) of VictoriaMetrics and follow the [quick start guide](https://docs.victoriametrics.com/Quick-Start.html) for a better experience. -If you have questions about VictoriaMetrics, then feel free asking them in the [VictoriaMetrics community Slack chat](https://slack.victoriametrics.com/). +If you have questions about VictoriaMetrics, then feel free asking them in the [VictoriaMetrics community Slack chat](https://victoriametrics.slack.com/), +you can join it via [Slack Inviter](https://slack.victoriametrics.com/). [Contact us](mailto:info@victoriametrics.com) if you need enterprise support for VictoriaMetrics. See [features available in enterprise package](https://docs.victoriametrics.com/enterprise.html). @@ -1812,6 +1813,12 @@ so the de-duplication consistently leaves samples for one `vmagent` instance and from other `vmagent` instances. See [these docs](https://docs.victoriametrics.com/vmagent.html#high-availability) for details. +VictoriaMetrics stores all the ingested samples to disk even if `-dedup.minScrapeInterval` command-line flag is set. +The ingested samples are de-duplicated during [background merges](#storage) and during query execution. +VictoriaMetrics also supports de-duplication during data ingestion before the data is stored to disk, via `-streamAggr.dedupInterval` command-line flag - +see [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication). + + ## Storage VictoriaMetrics buffers the ingested data in memory for up to a second. Then the buffered data is written to in-memory `parts`, @@ -2551,7 +2558,7 @@ Contact us with any questions regarding VictoriaMetrics at [info@victoriametrics Feel free asking any questions regarding VictoriaMetrics: -* [Slack](https://slack.victoriametrics.com/) +* [Slack Inviter](https://slack.victoriametrics.com/) and [Slack channel](https://victoriametrics.slack.com/) * [Twitter](https://twitter.com/VictoriaMetrics/) * [Linkedin](https://www.linkedin.com/company/victoriametrics/) * [Reddit](https://www.reddit.com/r/VictoriaMetrics/) @@ -2647,7 +2654,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) -dedup.minScrapeInterval duration - Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling + Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See also -streamAggr.dedupInterval and https://docs.victoriametrics.com/#deduplication -deleteAuthKey value authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries Flag value can be read from the given file when using -deleteAuthKey=file:///abs/path/to/file or -deleteAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -deleteAuthKey=http://host/path or -deleteAuthKey=https://host/path @@ -3103,7 +3110,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -streamAggr.config string Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval duration - Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero + Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication -streamAggr.dropInput Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html -streamAggr.keepInput diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 143eae6ee..075e805bb 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -34,7 +34,8 @@ Documentation for the cluster version of VictoriaMetrics is available [here](htt Learn more about [key concepts](https://docs.victoriametrics.com/keyConcepts.html) of VictoriaMetrics and follow the [quick start guide](https://docs.victoriametrics.com/Quick-Start.html) for a better experience. -If you have questions about VictoriaMetrics, then feel free asking them in the [VictoriaMetrics community Slack chat](https://slack.victoriametrics.com/). +If you have questions about VictoriaMetrics, then feel free asking them in the [VictoriaMetrics community Slack chat](https://victoriametrics.slack.com/), +you can join it via [Slack Inviter](https://slack.victoriametrics.com/). [Contact us](mailto:info@victoriametrics.com) if you need enterprise support for VictoriaMetrics. See [features available in enterprise package](https://docs.victoriametrics.com/enterprise.html). @@ -1820,6 +1821,12 @@ so the de-duplication consistently leaves samples for one `vmagent` instance and from other `vmagent` instances. See [these docs](https://docs.victoriametrics.com/vmagent.html#high-availability) for details. +VictoriaMetrics stores all the ingested samples to disk even if `-dedup.minScrapeInterval` command-line flag is set. +The ingested samples are de-duplicated during [background merges](#storage) and during query execution. +VictoriaMetrics also supports de-duplication during data ingestion before the data is stored to disk, via `-streamAggr.dedupInterval` command-line flag - +see [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication). + + ## Storage VictoriaMetrics buffers the ingested data in memory for up to a second. Then the buffered data is written to in-memory `parts`, @@ -2559,7 +2566,7 @@ Contact us with any questions regarding VictoriaMetrics at [info@victoriametrics Feel free asking any questions regarding VictoriaMetrics: -* [Slack](https://slack.victoriametrics.com/) +* [Slack Inviter](https://slack.victoriametrics.com/) and [Slack channel](https://victoriametrics.slack.com/) * [Twitter](https://twitter.com/VictoriaMetrics/) * [Linkedin](https://www.linkedin.com/company/victoriametrics/) * [Reddit](https://www.reddit.com/r/VictoriaMetrics/) @@ -2655,7 +2662,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) -dedup.minScrapeInterval duration - Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling + Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See also -streamAggr.dedupInterval and https://docs.victoriametrics.com/#deduplication -deleteAuthKey value authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries Flag value can be read from the given file when using -deleteAuthKey=file:///abs/path/to/file or -deleteAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -deleteAuthKey=http://host/path or -deleteAuthKey=https://host/path @@ -3111,7 +3118,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -streamAggr.config string Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval duration - Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero + Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication -streamAggr.dropInput Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html -streamAggr.keepInput diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 909984d20..2962ec1e6 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -50,17 +50,35 @@ This behaviour can be changed via the following command-line flags: ## Deduplication -By default, all the input samples are aggregated. Sometimes it is needed to de-duplicate samples for the same [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) -before the aggregation. For example, if the samples are received from replicated sources. -In this case the [de-duplication](https://docs.victoriametrics.com/#deduplication) can be enabled via the following options: +[vmagent](https://docs.victoriametrics.com/vmagent.html) supports de-duplication of samples before sending them +to the configured `-remoteWrite.url`. The de-duplication can be enabled via the following options: -- `-remoteWrite.streamAggr.dedupInterval` command-line flag at [vmagent](https://docs.victoriametrics.com/vmagent.html). - This flag can be specified individually per each `-remoteWrite.url`. - This allows setting different de-duplication intervals per each configured remote storage. -- `-streamAggr.dedupInterval` command-line flag at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). -- `dedup_interval` option per each [aggregate config](#stream-aggregation-config). +- By specifying the desired de-duplication interval via `-remoteWrite.streamAggr.dedupInterval` command-line flag for the particular `-remoteWrite.url`. + For example, `./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=30s` instructs `vmagent` to leave + only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds. + The de-duplication is performed after applying `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig` [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling). -De-duplicatation is performed after performing the input relabeling with `input_relabel_configs` - see [these docs](#relabeling). + If the `-remoteWrite.streamAggr.config` is set, then the de-duplication is performed individually per each [stream aggregation config](#stream-aggregation-config) + for the matching samples after applying [input_relabel_configs](#relabeling). + +- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-remoteWrite.streamAggr.config`. + +[Single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) supports two types of de-duplication: +- After storing the duplicate samples to local storage. See [`-dedup.minScrapeInterval`](https://docs.victoriametrics.com/#deduplication) command-line option. +- Before storing the duplicate samples to local storage. This type of de-duplication can be enabled via the following options: + - By specifying the desired de-duplication interval via `-streamAggr.dedupInterval` command-line flag. + For example, `./victoria-metrics -streamAggr.dedupInterval=30s` instructs VicotriaMetrics to leave only the last sample per each + seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds. + The de-duplication is performed after applying `-relabelConfig` [relabeling](https://docs.victoriametrics.com/#relabeling). + + If the `-streamAggr.config` is set, then the de-duplication is performed individually per each [stream aggregation config](#stream-aggregation-config) + for the matching samples after applying [input_relabel_configs](#relabeling). + + - By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-streamAggr.config`. + +The online de-duplication doesn't take into account timestamps associated with the de-duplicated samples - it just leaves the last seen sample +on the configured deduplication interval. If you need taking into account timestamps during the de-duplication, +then use [`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication). ## Flush time alignment @@ -407,7 +425,7 @@ The `keep_metric_names` option can be used if only a single output is set in [`o It is possible to apply [arbitrary relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) to input and output metrics during stream aggregation via `input_relabel_configs` and `output_relabel_configs` options in [stream aggregation config](#stream-aggregation-config). -Relabeling rules inside `input_relabel_configs` are applied to samples matching the `match` filters. +Relabeling rules inside `input_relabel_configs` are applied to samples matching the `match` filters before optional [deduplication](#deduplication). Relabeling rules inside `output_relabel_configs` are applied to aggregated samples before sending them to the remote storage. For example, the following config removes the `:1m_sum_samples` suffix added [to the output metric name](#output-metric-names): diff --git a/docs/vmagent.md b/docs/vmagent.md index 7474fa967..08eb403e7 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -235,7 +235,6 @@ And to `http://` it will forward only metrics that have `env=prod` lab Please note, order of flags is important: 1st mentioned `-remoteWrite.urlRelabelConfig` will be applied to the 1st mentioned `-remoteWrite.url`, and so on. - ### Prometheus remote_write proxy `vmagent` can be used as a proxy for Prometheus data sent via Prometheus `remote_write` protocol. It can accept data via the `remote_write` API @@ -251,6 +250,28 @@ the `-remoteWrite.url` command-line flag should be configured as `:// 0 && !sw.seriesLimitExceeded { sw.seriesLimitExceeded = true diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index 5e843c717..685bba6f7 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -53,7 +53,7 @@ func TestDedupAggrSerial(t *testing.T) { } } -func TestDedupAggrConcurrent(t *testing.T) { +func TestDedupAggrConcurrent(_ *testing.T) { const concurrency = 5 const seriesCount = 10_000 da := newDedupAggr() diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go new file mode 100644 index 000000000..1fd11e7ff --- /dev/null +++ b/lib/streamaggr/deduplicator.go @@ -0,0 +1,196 @@ +package streamaggr + +import ( + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/VictoriaMetrics/metrics" +) + +// Deduplicator deduplicates samples per each time series. +type Deduplicator struct { + da *dedupAggr + lc promutils.LabelsCompressor + + wg sync.WaitGroup + stopCh chan struct{} + + ms *metrics.Set +} + +// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series. +// +// The de-duplicated samples are passed to pushFunc once per dedupInterval. +// +// MustStop must be called on the returned deduplicator in order to free up occupied resources. +func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration) *Deduplicator { + d := &Deduplicator{ + da: newDedupAggr(), + stopCh: make(chan struct{}), + ms: metrics.NewSet(), + } + + ms := d.ms + _ = ms.NewGauge(`vm_streamaggr_dedup_state_size_bytes`, func() float64 { + return float64(d.da.sizeBytes()) + }) + _ = ms.NewGauge(`vm_streamaggr_dedup_state_items_count`, func() float64 { + return float64(d.da.itemsCount()) + }) + + _ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 { + return float64(d.lc.SizeBytes()) + }) + _ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { + return float64(d.lc.ItemsCount()) + }) + metrics.RegisterSet(ms) + + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.runFlusher(pushFunc, dedupInterval) + }() + + return d +} + +// MustStop stops d. +func (d *Deduplicator) MustStop() { + metrics.UnregisterSet(d.ms) + d.ms = nil + + close(d.stopCh) + d.wg.Wait() +} + +// Push pushes tss to d. +func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { + ctx := getDeduplicatorPushCtx() + pss := ctx.pss + buf := ctx.buf + + for _, ts := range tss { + buf = d.lc.Compress(buf[:0], ts.Labels) + key := bytesutil.InternBytes(buf) + for _, s := range ts.Samples { + pss = append(pss, pushSample{ + key: key, + value: s.Value, + }) + } + } + + d.da.pushSamples(pss) + + ctx.pss = pss + ctx.buf = buf + putDeduplicatorPushCtx(ctx) +} + +func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration) { + t := time.NewTicker(dedupInterval) + defer t.Stop() + for { + select { + case <-d.stopCh: + return + case <-t.C: + d.flush(pushFunc) + } + } +} + +func (d *Deduplicator) flush(pushFunc PushFunc) { + timestamp := time.Now().UnixMilli() + d.da.flush(func(pss []pushSample) { + ctx := getDeduplicatorFlushCtx() + + tss := ctx.tss + labels := ctx.labels + samples := ctx.samples + for _, ps := range pss { + labelsLen := len(labels) + labels = decompressLabels(labels, &d.lc, ps.key) + + samplesLen := len(samples) + samples = append(samples, prompbmarshal.Sample{ + Value: ps.value, + Timestamp: timestamp, + }) + + tss = append(tss, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[samplesLen:], + }) + } + pushFunc(tss) + + ctx.tss = tss + ctx.labels = labels + ctx.samples = samples + putDeduplicatorFlushCtx(ctx) + }, true) +} + +type deduplicatorPushCtx struct { + pss []pushSample + buf []byte +} + +func (ctx *deduplicatorPushCtx) reset() { + clear(ctx.pss) + ctx.pss = ctx.pss[:0] + + ctx.buf = ctx.buf[:0] +} + +func getDeduplicatorPushCtx() *deduplicatorPushCtx { + v := deduplicatorPushCtxPool.Get() + if v == nil { + return &deduplicatorPushCtx{} + } + return v.(*deduplicatorPushCtx) +} + +func putDeduplicatorPushCtx(ctx *deduplicatorPushCtx) { + ctx.reset() + deduplicatorPushCtxPool.Put(ctx) +} + +var deduplicatorPushCtxPool sync.Pool + +type deduplicatorFlushCtx struct { + tss []prompbmarshal.TimeSeries + labels []prompbmarshal.Label + samples []prompbmarshal.Sample +} + +func (ctx *deduplicatorFlushCtx) reset() { + clear(ctx.tss) + ctx.tss = ctx.tss[:0] + + clear(ctx.labels) + ctx.labels = ctx.labels[:0] + + clear(ctx.samples) + ctx.samples = ctx.samples[:0] +} + +func getDeduplicatorFlushCtx() *deduplicatorFlushCtx { + v := deduplicatorFlushCtxPool.Get() + if v == nil { + return &deduplicatorFlushCtx{} + } + return v.(*deduplicatorFlushCtx) +} + +func putDeduplicatorFlushCtx(ctx *deduplicatorFlushCtx) { + ctx.reset() + deduplicatorFlushCtxPool.Put(ctx) +} + +var deduplicatorFlushCtxPool sync.Pool diff --git a/lib/streamaggr/deduplicator_test.go b/lib/streamaggr/deduplicator_test.go new file mode 100644 index 000000000..7f8abd8e6 --- /dev/null +++ b/lib/streamaggr/deduplicator_test.go @@ -0,0 +1,49 @@ +package streamaggr + +import ( + "sync" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestDeduplicator(t *testing.T) { + var tssResult []prompbmarshal.TimeSeries + var tssResultLock sync.Mutex + pushFunc := func(tss []prompbmarshal.TimeSeries) { + tssResultLock.Lock() + tssResult = appendClonedTimeseries(tssResult, tss) + tssResultLock.Unlock() + } + + tss := mustParsePromMetrics(` +foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123 +bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54 +x 8943 +baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34 +x 90984 +x 433 +asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322 +foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894 +baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3 +`) + + d := NewDeduplicator(pushFunc, time.Hour) + for i := 0; i < 10; i++ { + d.Push(tss) + } + d.flush(pushFunc) + d.MustStop() + + result := timeSeriessToString(tssResult) + resultExpected := `asfjkldsf{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 12322 +bar{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 34.54 +baz_aaa_aaa_fdd{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} -2.3 +foo{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 894 +x 433 +` + if result != resultExpected { + t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, resultExpected) + } +} diff --git a/lib/streamaggr/deduplicator_timing_test.go b/lib/streamaggr/deduplicator_timing_test.go new file mode 100644 index 000000000..c9bf0f3dc --- /dev/null +++ b/lib/streamaggr/deduplicator_timing_test.go @@ -0,0 +1,21 @@ +package streamaggr + +import ( + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func BenchmarkDeduplicatorPush(b *testing.B) { + pushFunc := func(tss []prompbmarshal.TimeSeries) {} + d := NewDeduplicator(pushFunc, time.Hour) + + b.ReportAllocs() + b.SetBytes(int64(len(benchSeries))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + d.Push(benchSeries) + } + }) +} diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index f8f5c8c6b..f2de26a99 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -61,20 +61,12 @@ func LoadFromFile(path string, pushFunc PushFunc, opts *Options) (*Aggregators, as, err := newAggregatorsFromData(data, pushFunc, opts) if err != nil { - return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) + return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err) } return as, nil } -func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) { - var cfgs []*Config - if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { - return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) - } - return NewAggregators(cfgs, pushFunc, opts) -} - // Options contains optional settings for the Aggregators. type Options struct { // DedupInterval is deduplication interval for samples received for the same time series. @@ -194,25 +186,23 @@ type Config struct { OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` } -// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. +// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data. type Aggregators struct { as []*aggregator - // configData contains marshaled configs passed to NewAggregators(). + // configData contains marshaled configs. // It is used in Equal() for comparing Aggregators. configData []byte ms *metrics.Set } -// NewAggregators creates Aggregators from the given cfgs. -// -// pushFunc is called when the aggregated data must be flushed. -// -// opts can contain additional options. If opts is nil, then default options are used. -// -// MustStop must be called on the returned Aggregators when they are no longer needed. -func NewAggregators(cfgs []*Config, pushFunc PushFunc, opts *Options) (*Aggregators, error) { +func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) { + var cfgs []*Config + if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { + return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) + } + ms := metrics.NewSet() as := make([]*aggregator, len(cfgs)) for i, cfg := range cfgs { @@ -306,7 +296,7 @@ func (a *Aggregators) Equal(b *Aggregators) bool { // Otherwise it allocates new matchIdxs. func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte { matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) - for i := 0; i < len(matchIdxs); i++ { + for i := range matchIdxs { matchIdxs[i] = 0 } if a == nil { @@ -378,6 +368,9 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option } // check cfg.Interval + if cfg.Interval == "" { + return nil, fmt.Errorf("missing `interval` option") + } interval, err := time.ParseDuration(cfg.Interval) if err != nil { return nil, fmt.Errorf("cannot parse `interval: %q`: %w", cfg.Interval, err) @@ -910,7 +903,8 @@ func (ctx *flushCtx) reset() { } func (ctx *flushCtx) resetSeries() { - ctx.tss = prompbmarshal.ResetTimeSeries(ctx.tss) + clear(ctx.tss) + ctx.tss = ctx.tss[:0] clear(ctx.labels) ctx.labels = ctx.labels[:0] diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index d00bc4d85..2d1a344ca 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -210,14 +210,7 @@ func TestAggregatorsSuccess(t *testing.T) { var tssOutputLock sync.Mutex pushFunc := func(tss []prompbmarshal.TimeSeries) { tssOutputLock.Lock() - for _, ts := range tss { - labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) - samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) - tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ - Labels: labelsCopy, - Samples: samplesCopy, - }) - } + tssOutput = appendClonedTimeseries(tssOutput, tss) tssOutputLock.Unlock() } opts := &Options{ @@ -244,12 +237,7 @@ func TestAggregatorsSuccess(t *testing.T) { } // Verify the tssOutput contains the expected metrics - tsStrings := make([]string, len(tssOutput)) - for i, ts := range tssOutput { - tsStrings[i] = timeSeriesToString(ts) - } - sort.Strings(tsStrings) - outputMetrics := strings.Join(tsStrings, "") + outputMetrics := timeSeriessToString(tssOutput) if outputMetrics != outputMetricsExpected { t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) } @@ -925,6 +913,15 @@ foo:1m_sum_samples{baz="qwe"} 10 `, "11111111") } +func timeSeriessToString(tss []prompbmarshal.TimeSeries) string { + a := make([]string, len(tss)) + for i, ts := range tss { + a[i] = timeSeriesToString(ts) + } + sort.Strings(a) + return strings.Join(a, "") +} + func timeSeriesToString(ts prompbmarshal.TimeSeries) string { labelsString := promrelabel.LabelsToString(ts.Labels) if len(ts.Samples) != 1 { @@ -965,3 +962,13 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { } return tss } + +func appendClonedTimeseries(dst, src []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries { + for _, ts := range src { + dst = append(dst, prompbmarshal.TimeSeries{ + Labels: append(ts.Labels[:0:0], ts.Labels...), + Samples: append(ts.Samples[:0:0], ts.Samples...), + }) + } + return dst +}