From d523015f27832ffe0773245346d90bdecc28683e Mon Sep 17 00:00:00 2001 From: Hui Wang Date: Tue, 3 Sep 2024 16:47:05 +0800 Subject: [PATCH] =?UTF-8?q?stream=20aggregation:=20perform=20deduplication?= =?UTF-8?q?=20for=20all=20received=20data=20when=20=E2=80=A6=20(#6711)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …specifying `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval` command-line flag [The documentation](https://docs.victoriametrics.com/stream-aggregation/) contains conflicting descriptions regarding deduplication for non-matched series when `-remoteWrite.streamAggr.config` and / or `-streamAggr.config` are set: 1. Statement below says **all the received data** is deduplicated: >[vmagent](https://docs.victoriametrics.com/vmagent/) supports relabeling, deduplication and stream aggregation for all the received data, scraped or pushed. Then, the collected data will be forwarded to specified -remoteWrite.url destinations. The data processing order is the following: >1. all the received data is relabeled according to the specified [-remoteWrite.relabelConfig](https://docs.victoriametrics.com/vmagent/#relabeling) (if it is set) >2. all the received data is deduplicated according to specified [-streamAggr.dedupInterval](https://docs.victoriametrics.com/stream-aggregation/#deduplication) (if it is set to duration bigger than 0) 2. Another statement says the deduplication is performed individually for the **matching samples** >The de-deduplication is performed after applying [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling) and before performing the aggregation. If the -remoteWrite.streamAggr.config and / or -streamAggr.config is set, then the de-duplication is performed individually per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) for the matching samples after applying [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). Considering the following deduplication use cases: 1. To apply deduplication(globally or for specific remoteWrite destination) for all the received data, scraped or pushed --- using `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval`. 2. To deduplicate and aggregate metrics that match the rule `match` filters --- using `-remoteWrite.streamAggr.config` and specifiying `dedup_interval` option in [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). 3. To deduplicate all the received data while having `streamAggr.config` for some metrics --- no way for a single vmagent now, need to set up two level vmagents This PR implements case3. --------- Co-authored-by: Roman Khavronenko --- app/vmagent/remotewrite/remotewrite.go | 6 ++++-- app/vmagent/remotewrite/streamaggr.go | 20 +++++++++----------- docs/changelog/CHANGELOG.md | 4 ++++ docs/stream-aggregation.md | 5 ----- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index bbc8663ae..ea74c4985 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -494,7 +494,8 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput) } matchIdxsPool.Put(matchIdxs) - } else if deduplicatorGlobal != nil { + } + if deduplicatorGlobal != nil { deduplicatorGlobal.Push(tssBlock) tssBlock = tssBlock[:0] } @@ -922,7 +923,8 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput) } matchIdxsPool.Put(matchIdxs) - } else if rwctx.deduplicator != nil { + } + if rwctx.deduplicator != nil { rwctx.deduplicator.Push(tss) return true } diff --git a/app/vmagent/remotewrite/streamaggr.go b/app/vmagent/remotewrite/streamaggr.go index 6275f95ee..8bdc8c3a7 100644 --- a/app/vmagent/remotewrite/streamaggr.go +++ b/app/vmagent/remotewrite/streamaggr.go @@ -130,11 +130,10 @@ func initStreamAggrConfigGlobal() { sasGlobal.Store(sas) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp()) - } else { - dedupInterval := streamAggrGlobalDedupInterval.Duration() - if dedupInterval > 0 { - deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, dedupInterval, *streamAggrGlobalDropInputLabels, "dedup-global") - } + } + dedupInterval := streamAggrGlobalDedupInterval.Duration() + if dedupInterval > 0 { + deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, dedupInterval, *streamAggrGlobalDropInputLabels, "dedup-global") } } @@ -152,12 +151,11 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() { rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(idx) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp()) - } else { - dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx) - if dedupInterval > 0 { - alias := fmt.Sprintf("dedup-%d", idx+1) - rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels, alias) - } + } + dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx) + if dedupInterval > 0 { + alias := fmt.Sprintf("dedup-%d", idx+1) + rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels, alias) } } diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 84995b1af..75f2da32b 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -18,6 +18,10 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +**Update note 1: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): perform deduplication for all received data when specifying `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval` command-line flag. Previously, if the `-remoteWrite.streamAggr.config` or `-streamAggr.config` is set, only series that matched aggregation config were deduplicated. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6711#issuecomment-2288361213) for details.** + +* FEATURE [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): perform deduplication for all received data when specifying `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval` command-line flags are set. Previously, if the `-remoteWrite.streamAggr.config` or `-streamAggr.config` is set, only series that matched aggregation config were deduplicated. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6711#issuecomment-2288361213) for details. + ## [v1.103.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.103.0) Released at 2024-08-28 diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index f8a5cd8c1..7dba5e6d0 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -86,8 +86,6 @@ before sending them to the configured `-remoteWrite.url`. The de-duplication can only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds. The de-deduplication is performed after applying [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling) and before performing the aggregation. - If the `-remoteWrite.streamAggr.config` and / or `-streamAggr.config` is set, then the de-duplication is performed individually per each - [stream aggregation config](#stream-aggregation-config) for the matching samples after applying [input_relabel_configs](#relabeling). - By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) in `-remoteWrite.streamAggr.config` or `-streamAggr.config` configs. @@ -100,9 +98,6 @@ before sending them to the configured `-remoteWrite.url`. The de-duplication can seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds. The de-duplication is performed after applying `-relabelConfig` [relabeling](https://docs.victoriametrics.com/#relabeling). - If the `-streamAggr.config` is set, then the de-duplication is performed individually per each [stream aggregation config](#stream-aggregation-config) - for the matching samples after applying [input_relabel_configs](#relabeling). - - By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-streamAggr.config`. It is possible to drop the given labels before applying the de-duplication. See [these docs](#dropping-unneeded-labels).