lib/streamaggr: add option to ignore first N aggregation intervals (#6137)

Stream aggregation may yield inaccurate results if it processes incomplete data. 
This issue can arise when data is sourced from clients that maintain a queue of unsent data, such as Prometheus or vmagent.
 If the queue isn't fully cleared within the aggregation interval, only a portion of the time series may be included in that period, leading to distorted calculations. 
To mitigate this we add an option to ignore first N aggregation intervals. It is expected, that client queues
will be cleared during the time while aggregation ignores first N intervals and all subsequent aggregations
will be correct.
This commit is contained in:
Andrii Chubatiuk 2024-04-22 14:52:04 +03:00 committed by GitHub
parent 81b2fb88cc
commit c0e4ccb7b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 64 additions and 21 deletions

View File

@ -105,7 +105,8 @@ var (
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") "with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+ streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+
"for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") "for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent.")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
@ -857,9 +858,10 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
ignoreOldSamples := streamAggrIgnoreOldSamples.GetOptionalArg(argIdx) ignoreOldSamples := streamAggrIgnoreOldSamples.GetOptionalArg(argIdx)
if sasFile != "" { if sasFile != "" {
opts := &streamaggr.Options{ opts := &streamaggr.Options{
DedupInterval: dedupInterval, DedupInterval: dedupInterval,
DropInputLabels: *streamAggrDropInputLabels, DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: ignoreOldSamples, IgnoreOldSamples: ignoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
} }
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
if err != nil { if err != nil {

View File

@ -32,7 +32,8 @@ var (
"See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") "See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+ streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent.")
streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
) )
@ -56,9 +57,10 @@ func CheckStreamAggrConfig() error {
} }
pushNoop := func(_ []prompbmarshal.TimeSeries) {} pushNoop := func(_ []prompbmarshal.TimeSeries) {}
opts := &streamaggr.Options{ opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval, DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels, DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
} }
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts) sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts)
if err != nil { if err != nil {
@ -84,9 +86,10 @@ func InitStreamAggr() {
sighupCh := procutil.NewSighupChan() sighupCh := procutil.NewSighupChan()
opts := &streamaggr.Options{ opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval, DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels, DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
} }
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
if err != nil { if err != nil {
@ -117,9 +120,10 @@ func reloadStreamAggrConfig() {
saCfgReloads.Inc() saCfgReloads.Inc()
opts := &streamaggr.Options{ opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval, DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels, DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
} }
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
if err != nil { if err != nil {

View File

@ -92,6 +92,18 @@ must be ignored, then the following options can be used:
- To set `ignore_old_samples: true` option at the particular [aggregation config](#stream-aggregation-config). - To set `ignore_old_samples: true` option at the particular [aggregation config](#stream-aggregation-config).
This enables ignoring old samples for that particular aggregation config. This enables ignoring old samples for that particular aggregation config.
## Ignore aggregation intervals on start
Stream aggregation may yield inaccurate results if it processes incomplete data. This issue can arise when data is sourced from clients that maintain a queue of unsent data, such as Prometheus or vmagent. If the queue isn't fully cleared within the aggregation interval, only a portion of the time series may be included in that period, leading to distorted calculations. To mitigate this, consider the following options:
- Set `-remoteWrite.streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/)
or `-streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/) to skip first `<intervalsCount>` [aggregation intervals](#stream-aggregation-config)
from persisting to the storage. It is expected that all incomplete or queued data will be processed during
specified `<intervalsCount>` and all subsequent aggregation intervals will produce correct data.
- To set `ignore_first_intervals: <intervalsCount>` option at the particular [aggregation config](#stream-aggregation-config).
This enables ignoring first `<intervalsCount>` aggregation intervals for that particular aggregation config.
## Flush time alignment ## Flush time alignment
By default the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config). By default the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config).

View File

@ -111,6 +111,13 @@ type Options struct {
// //
// This option can be overridden individually per each aggregation via ignore_old_samples option. // This option can be overridden individually per each aggregation via ignore_old_samples option.
IgnoreOldSamples bool IgnoreOldSamples bool
// IgnoreFirstIntervals sets amount of intervals to ignore on start
//
// By default no intervals will be ignored.
//
// This option can be overridden individually per each aggregation via ignore_intervals_on_start option.
IgnoreFirstIntervals int
} }
// Config is a configuration for a single stream aggregation. // Config is a configuration for a single stream aggregation.
@ -175,6 +182,9 @@ type Config struct {
// IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. // IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval.
IgnoreOldSamples *bool `yaml:"ignore_old_samples,omitempty"` IgnoreOldSamples *bool `yaml:"ignore_old_samples,omitempty"`
// IgnoreFirstIntervals sets number of aggregation intervals to be ignored on start.
IgnoreFirstIntervals *int `yaml:"ignore_first_intervals,omitempty"`
// By is an optional list of labels for grouping input series. // By is an optional list of labels for grouping input series.
// //
// See also Without. // See also Without.
@ -479,6 +489,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
ignoreOldSamples = *v ignoreOldSamples = *v
} }
// check cfg.IgnoreFirstIntervals
ignoreFirstIntervals := opts.IgnoreFirstIntervals
if v := cfg.IgnoreFirstIntervals; v != nil {
ignoreFirstIntervals = *v
}
// initialize outputs list // initialize outputs list
if len(cfg.Outputs) == 0 { if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
@ -600,14 +616,14 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
a.wg.Add(1) a.wg.Add(1)
go func() { go func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval) a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals)
a.wg.Done() a.wg.Done()
}() }()
return a, nil return a, nil
} }
func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration) { func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration, ignoreFirstIntervals int) {
alignedSleep := func(d time.Duration) { alignedSleep := func(d time.Duration) {
if !alignFlushToInterval { if !alignFlushToInterval {
return return
@ -642,7 +658,12 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
} }
for tickerWait(t) { for tickerWait(t) {
a.flush(pushFunc, interval, true) pf := pushFunc
if ignoreFirstIntervals > 0 {
pf = nil
ignoreFirstIntervals--
}
a.flush(pf, interval, true)
if alignFlushToInterval { if alignFlushToInterval {
select { select {
@ -663,13 +684,17 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
ct := time.Now() ct := time.Now()
if ct.After(flushDeadline) { if ct.After(flushDeadline) {
pf := pushFunc
if ignoreFirstIntervals > 0 {
pf = nil
ignoreFirstIntervals--
}
// It is time to flush the aggregated state // It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, interval, true) pf = nil
isSkippedFirstFlush = true isSkippedFirstFlush = true
} else {
a.flush(pushFunc, interval, true)
} }
a.flush(pf, interval, true)
for ct.After(flushDeadline) { for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(interval) flushDeadline = flushDeadline.Add(interval)
} }
@ -684,7 +709,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
} }
} }
if !skipIncompleteFlush { if !skipIncompleteFlush && ignoreFirstIntervals == 0 {
a.dedupFlush(dedupInterval) a.dedupFlush(dedupInterval)
a.flush(pushFunc, interval, true) a.flush(pushFunc, interval, true)
} }