diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index b8adb18ede..971f6275c2 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -22,6 +22,8 @@ after applying all the configured [relabeling stages](https://docs.victoriametri _By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples). It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples)._ +## Configuration + Stream aggregation can be configured via the following command-line flags: - `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) @@ -128,25 +130,30 @@ outside the current [aggregation interval](#stream-aggregation-config) must be i ## Ignore aggregation intervals on start -Stream aggregation may yield inaccurate results if it processes incomplete data. This issue can arise when data is -received from clients that maintain a queue of unsent data, such as Prometheus or vmagent. If the queue isn't fully -cleared within the aggregation `interval`, only a portion of the time series may be processed, leading to distorted -calculations. To mitigate this, consider the following options: +Streaming aggregation results may be incorrect for some time after the restart of [vmagent](https://docs.victoriametrics.com/vmagent/) +or [single-node VictoriaMetrics](https://docs.victoriametrics.com/) until all the buffered [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples) +are sent from remote sources to the `vmagent` or single-node VictoriaMetrics via [supported data ingestion protocols](https://docs.victoriametrics.com/vmagent/#how-to-push-data-to-vmagent). +In this case it may be a good idea to drop the aggregated data during the first `N` [aggrgation intervals](#stream-aggregation-config) +just after the restart of `vmagent` or single-node VictoriaMetrics. This can be done via the following options: -- Set `-streamAggr.ignoreFirstIntervals=` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/) - or to [vmagent](https://docs.victoriametrics.com/vmagent/) to skip first `` [aggregation intervals](#stream-aggregation-config) - from persisting to the storage. At [vmagent](https://docs.victoriametrics.com/vmagent/) - `-remoteWrite.streamAggr.ignoreFirstIntervals=` flag can be specified individually per each `-remoteWrite.url`. - It is expected that all incomplete or queued data will be processed during specified `` - and all subsequent aggregation intervals will produce correct data. +- The `-streamAggr.ignoreFirstIntervals=N` command-line flag at `vmagent` and single-node VictoriaMetrics. This flag instructs skipping the first `N` + [aggregation intervals](#stream-aggregation-config) just after the restart accross all the [configured stream aggregation configs](#configuration). -- Set `ignore_first_intervals: ` option individually per [aggregation config](#stream-aggregation-config). - This enables ignoring first `` aggregation intervals for that particular aggregation config. + The `-remoteWrite.streamAggr.ignoreFirstIntervals=N` command-line flag can be specified individually per each `-remoteWrite.url` at [vmagent](https://docs.victoriametrics.com/vmagent/). + +- The `ignore_first_intervals: N` option at the particular [aggregation config](#stream-aggregation-config). + +See also: + +- [Flush time alignment](#flush-time-alignment) +- [Ignoring old samples](#ignoring-old-samples) ## Flush time alignment By default, the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config). + For example: + - if `interval: 1m` is set, then the aggregated data is flushed to the storage at the end of every minute - if `interval: 1h` is set, then the aggregated data is flushed to the storage at the end of every hour @@ -157,6 +164,11 @@ The aggregated data on the first and the last interval is dropped during `vmagen since the first and the last aggregation intervals are incomplete, so they usually contain incomplete confusing data. If you need preserving the aggregated data on these intervals, then set `flush_on_shutdown: true` option in the [aggregate config](#stream-aggregation-config). +See also: + +- [Ignore aggregation intervals on start](#ignore-aggregation-intervals-on-start) +- [Ignoring old samples](#ignoring-old-samples) + ## Use cases Stream aggregation can be used in the following cases: @@ -994,15 +1006,15 @@ specified individually per each `-remoteWrite.url`: # ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval. # See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples - # See also -remoteWrite.streamAggr.ignoreOldSamples or -streamAggr.ignoreOldSamples command-line flag. + # See also -remoteWrite.streamAggr.ignoreOldSamples and -streamAggr.ignoreOldSamples command-line flag. # # ignore_old_samples: false - # ignore_first_intervals instructs ignoring first N aggregation intervals after process start. + # ignore_first_intervals instructs ignoring the first N aggregation intervals after process start. # See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start - # See also -remoteWrite.streamAggr.ignoreFirstIntervals or -streamAggr.ignoreFirstIntervals command-line flag. + # See also -remoteWrite.streamAggr.ignoreFirstIntervals and -streamAggr.ignoreFirstIntervals command-line flags. # - # ignore_first_intervals: false + # ignore_first_intervals: N # drop_input_labels instructs dropping the given labels from input samples. # The labels' dropping is performed before input_relabel_configs are applied. diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 831cd500fe..c020793c3f 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -130,9 +130,9 @@ type Options struct { // This option can be overridden individually per each aggregation via ignore_old_samples option. IgnoreOldSamples bool - // IgnoreFirstIntervals sets amount of aggregation intervals to ignore on start. + // IgnoreFirstIntervals sets the number of aggregation intervals to be ignored on start. // - // By default, no intervals will be ignored. + // By default, zero intervals are ignored. // // This option can be overridden individually per each aggregation via ignore_first_intervals option. IgnoreFirstIntervals int @@ -715,15 +715,16 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if alignFlushToInterval && skipIncompleteFlush { a.flush(nil, interval, true) + ignoreFirstIntervals-- } for tickerWait(t) { - pf := pushFunc if ignoreFirstIntervals > 0 { - pf = nil + a.flush(nil, interval, true) ignoreFirstIntervals-- + } else { + a.flush(pushFunc, interval, true) } - a.flush(pf, interval, true) if alignFlushToInterval { select { @@ -744,17 +745,17 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc ct := time.Now() if ct.After(flushDeadline) { - pf := pushFunc - if ignoreFirstIntervals > 0 { - pf = nil - ignoreFirstIntervals-- - } // It is time to flush the aggregated state if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { - pf = nil + a.flush(nil, interval, true) + ignoreFirstIntervals-- isSkippedFirstFlush = true + } else if ignoreFirstIntervals > 0 { + a.flush(nil, interval, true) + ignoreFirstIntervals-- + } else { + a.flush(pushFunc, interval, true) } - a.flush(pf, interval, true) for ct.After(flushDeadline) { flushDeadline = flushDeadline.Add(interval) } @@ -769,7 +770,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } - if !skipIncompleteFlush && ignoreFirstIntervals == 0 { + if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { a.dedupFlush(dedupInterval) a.flush(pushFunc, interval, true) }