From f8779d1ed2709ab0c98920243c540be1e040f3f1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 2 Jul 2024 21:21:35 +0200 Subject: [PATCH] lib/streamaggr: follow-up for the commit c0e4ccb7b50c3f0473888a7b45c3c3ba6b4a5cd7 - Clarify docs for `Ignore aggregation intervals on start` feature. - Make more clear the code dealing with ignoreFirstIntervals at aggregator.runFlusher() functions. It is better from readability and maintainability PoV using distinct a.flush() calls for distinct cases instead of merging them into a single a.flush() call. - Take into account the first incomplete interval when tracking the number of skipped aggregation intervals, since this behaviour is easier to understand by the end users. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6137 --- docs/stream-aggregation.md | 44 +++++++++++++++++++++++------------- lib/streamaggr/streamaggr.go | 27 +++++++++++----------- 2 files changed, 42 insertions(+), 29 deletions(-) diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index b8adb18ede..971f6275c2 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -22,6 +22,8 @@ after applying all the configured [relabeling stages](https://docs.victoriametri _By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples). It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples)._ +## Configuration + Stream aggregation can be configured via the following command-line flags: - `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) @@ -128,25 +130,30 @@ outside the current [aggregation interval](#stream-aggregation-config) must be i ## Ignore aggregation intervals on start -Stream aggregation may yield inaccurate results if it processes incomplete data. This issue can arise when data is -received from clients that maintain a queue of unsent data, such as Prometheus or vmagent. If the queue isn't fully -cleared within the aggregation `interval`, only a portion of the time series may be processed, leading to distorted -calculations. To mitigate this, consider the following options: +Streaming aggregation results may be incorrect for some time after the restart of [vmagent](https://docs.victoriametrics.com/vmagent/) +or [single-node VictoriaMetrics](https://docs.victoriametrics.com/) until all the buffered [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples) +are sent from remote sources to the `vmagent` or single-node VictoriaMetrics via [supported data ingestion protocols](https://docs.victoriametrics.com/vmagent/#how-to-push-data-to-vmagent). +In this case it may be a good idea to drop the aggregated data during the first `N` [aggrgation intervals](#stream-aggregation-config) +just after the restart of `vmagent` or single-node VictoriaMetrics. This can be done via the following options: -- Set `-streamAggr.ignoreFirstIntervals=` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/) - or to [vmagent](https://docs.victoriametrics.com/vmagent/) to skip first `` [aggregation intervals](#stream-aggregation-config) - from persisting to the storage. At [vmagent](https://docs.victoriametrics.com/vmagent/) - `-remoteWrite.streamAggr.ignoreFirstIntervals=` flag can be specified individually per each `-remoteWrite.url`. - It is expected that all incomplete or queued data will be processed during specified `` - and all subsequent aggregation intervals will produce correct data. +- The `-streamAggr.ignoreFirstIntervals=N` command-line flag at `vmagent` and single-node VictoriaMetrics. This flag instructs skipping the first `N` + [aggregation intervals](#stream-aggregation-config) just after the restart accross all the [configured stream aggregation configs](#configuration). -- Set `ignore_first_intervals: ` option individually per [aggregation config](#stream-aggregation-config). - This enables ignoring first `` aggregation intervals for that particular aggregation config. + The `-remoteWrite.streamAggr.ignoreFirstIntervals=N` command-line flag can be specified individually per each `-remoteWrite.url` at [vmagent](https://docs.victoriametrics.com/vmagent/). + +- The `ignore_first_intervals: N` option at the particular [aggregation config](#stream-aggregation-config). + +See also: + +- [Flush time alignment](#flush-time-alignment) +- [Ignoring old samples](#ignoring-old-samples) ## Flush time alignment By default, the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config). + For example: + - if `interval: 1m` is set, then the aggregated data is flushed to the storage at the end of every minute - if `interval: 1h` is set, then the aggregated data is flushed to the storage at the end of every hour @@ -157,6 +164,11 @@ The aggregated data on the first and the last interval is dropped during `vmagen since the first and the last aggregation intervals are incomplete, so they usually contain incomplete confusing data. If you need preserving the aggregated data on these intervals, then set `flush_on_shutdown: true` option in the [aggregate config](#stream-aggregation-config). +See also: + +- [Ignore aggregation intervals on start](#ignore-aggregation-intervals-on-start) +- [Ignoring old samples](#ignoring-old-samples) + ## Use cases Stream aggregation can be used in the following cases: @@ -994,15 +1006,15 @@ specified individually per each `-remoteWrite.url`: # ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval. # See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples - # See also -remoteWrite.streamAggr.ignoreOldSamples or -streamAggr.ignoreOldSamples command-line flag. + # See also -remoteWrite.streamAggr.ignoreOldSamples and -streamAggr.ignoreOldSamples command-line flag. # # ignore_old_samples: false - # ignore_first_intervals instructs ignoring first N aggregation intervals after process start. + # ignore_first_intervals instructs ignoring the first N aggregation intervals after process start. # See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start - # See also -remoteWrite.streamAggr.ignoreFirstIntervals or -streamAggr.ignoreFirstIntervals command-line flag. + # See also -remoteWrite.streamAggr.ignoreFirstIntervals and -streamAggr.ignoreFirstIntervals command-line flags. # - # ignore_first_intervals: false + # ignore_first_intervals: N # drop_input_labels instructs dropping the given labels from input samples. # The labels' dropping is performed before input_relabel_configs are applied. diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 831cd500fe..c020793c3f 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -130,9 +130,9 @@ type Options struct { // This option can be overridden individually per each aggregation via ignore_old_samples option. IgnoreOldSamples bool - // IgnoreFirstIntervals sets amount of aggregation intervals to ignore on start. + // IgnoreFirstIntervals sets the number of aggregation intervals to be ignored on start. // - // By default, no intervals will be ignored. + // By default, zero intervals are ignored. // // This option can be overridden individually per each aggregation via ignore_first_intervals option. IgnoreFirstIntervals int @@ -715,15 +715,16 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if alignFlushToInterval && skipIncompleteFlush { a.flush(nil, interval, true) + ignoreFirstIntervals-- } for tickerWait(t) { - pf := pushFunc if ignoreFirstIntervals > 0 { - pf = nil + a.flush(nil, interval, true) ignoreFirstIntervals-- + } else { + a.flush(pushFunc, interval, true) } - a.flush(pf, interval, true) if alignFlushToInterval { select { @@ -744,17 +745,17 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc ct := time.Now() if ct.After(flushDeadline) { - pf := pushFunc - if ignoreFirstIntervals > 0 { - pf = nil - ignoreFirstIntervals-- - } // It is time to flush the aggregated state if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { - pf = nil + a.flush(nil, interval, true) + ignoreFirstIntervals-- isSkippedFirstFlush = true + } else if ignoreFirstIntervals > 0 { + a.flush(nil, interval, true) + ignoreFirstIntervals-- + } else { + a.flush(pushFunc, interval, true) } - a.flush(pf, interval, true) for ct.After(flushDeadline) { flushDeadline = flushDeadline.Add(interval) } @@ -769,7 +770,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } - if !skipIncompleteFlush && ignoreFirstIntervals == 0 { + if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { a.dedupFlush(dedupInterval) a.flush(pushFunc, interval, true) }