From 1cedaf61cb0e4893200edd7e84696df1f6418fd3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 17 Mar 2024 23:01:44 +0200 Subject: [PATCH] app/{vmagent,vminsert}: add an ability to ignore input samples outside the current aggregation interval for stream aggregation See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples --- README.md | 2 ++ app/vmagent/remotewrite/remotewrite.go | 16 +++++++++--- app/vminsert/common/streamaggr.go | 15 ++++++++--- docs/CHANGELOG.md | 1 + docs/README.md | 19 ++++++++------ docs/Single-server-VictoriaMetrics.md | 17 +++++++----- docs/stream-aggregation.md | 17 ++++++++++++ docs/vmagent.md | 4 +++ lib/streamaggr/streamaggr.go | 36 +++++++++++++++++++++++--- 9 files changed, 101 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 1311e14d7..22c1e6c97 100644 --- a/README.md +++ b/README.md @@ -3116,6 +3116,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. + -streamAggr.ignoreOldSamples + Whether to ignore input samples with old timestamps outside the current aggregation interval. See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples -streamAggr.keepInput Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html -tls array diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 4d64b100c..7ba891c9d 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -92,6 +92,8 @@ var ( "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+ "with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication") + streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+ + "for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples") streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels") @@ -745,10 +747,12 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in // Initialize sas sasFile := streamAggrConfig.GetOptionalArg(argIdx) dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx) + ignoreOldSamples := streamAggrIgnoreOldSamples.GetOptionalArg(argIdx) if sasFile != "" { opts := &streamaggr.Options{ - DedupInterval: dedupInterval, - DropInputLabels: *streamAggrDropInputLabels, + DedupInterval: dedupInterval, + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: ignoreOldSamples, } sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) if err != nil { @@ -916,7 +920,9 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() { logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc() opts := &streamaggr.Options{ - DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx), + DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx), + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(rwctx.idx), } sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) if err != nil { @@ -961,7 +967,9 @@ func CheckStreamAggrConfigs() error { continue } opts := &streamaggr.Options{ - DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx), + DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx), + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx), } sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, opts) if err != nil { diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 0a4862340..e6f80e503 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -32,6 +32,8 @@ var ( "See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication") streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels") + streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples") ) var ( @@ -54,7 +56,9 @@ func CheckStreamAggrConfig() error { } pushNoop := func(tss []prompbmarshal.TimeSeries) {} opts := &streamaggr.Options{ - DedupInterval: *streamAggrDedupInterval, + DedupInterval: *streamAggrDedupInterval, + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: *streamAggrIgnoreOldSamples, } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts) if err != nil { @@ -80,8 +84,9 @@ func InitStreamAggr() { sighupCh := procutil.NewSighupChan() opts := &streamaggr.Options{ - DedupInterval: *streamAggrDedupInterval, - DropInputLabels: *streamAggrDropInputLabels, + DedupInterval: *streamAggrDedupInterval, + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: *streamAggrIgnoreOldSamples, } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) if err != nil { @@ -112,7 +117,9 @@ func reloadStreamAggrConfig() { saCfgReloads.Inc() opts := &streamaggr.Options{ - DedupInterval: *streamAggrDedupInterval, + DedupInterval: *streamAggrDedupInterval, + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: *streamAggrIgnoreOldSamples, } sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) if err != nil { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b744ab5ab..eb1694238 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -43,6 +43,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): use the same logic in [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) as in [the deduplication at VictoriaMetrics](https://docs.victoriametrics.com/#deduplication). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5643). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): ignore out of order samples samples when calculating [`increase`](https://docs.victoriametrics.com/stream-aggregation/#increase), [`increase_prometheus`](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus), [`total`](https://docs.victoriametrics.com/stream-aggregation/#total) and [`total_prometheus`](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs. Thanks to @edma2 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5931). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add an ability to ignore input samples with old timestamps outside the current aggregation interval. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples) for details. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`. diff --git a/docs/README.md b/docs/README.md index 93c4b18e8..842de3fff 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1679,8 +1679,14 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical This allows saving CPU and RAM when executing unexpected heavy queries. - `-search.maxConcurrentRequests` limits the number of concurrent requests VictoriaMetrics can process. Bigger number of concurrent requests usually means bigger memory usage. For example, if a single query needs 100 MiB of additional memory during its execution, then 100 concurrent queries may need `100 * 100 MiB = 10 GiB` - of additional memory. So it is better to limit the number of concurrent queries, while suspending additional incoming queries if the concurrency limit is reached. - VictoriaMetrics provides `-search.maxQueueDuration` command-line flag for limiting the max wait time for suspended queries. See also `-search.maxMemoryPerQuery` command-line flag. + of additional memory. So it is better to limit the number of concurrent queries, while pausing additional incoming queries if the concurrency limit is reached. + VictoriaMetrics provides `-search.maxQueueDuration` command-line flag for limiting the max wait time for paused queries. See also `-search.maxMemoryPerQuery` command-line flag. +- `-search.maxQueueDuration` limits the maximum duration queries may wait for execution when `-search.maxConcurrentRequests` concurrent queries are executed. +- `-search.ignoreExtraFiltersAtLabelsAPI` enables ignoring of `match[]`, [`extra_filters[]` and `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements) + query args at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels) and + [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues). + This may be useful for reducing the load on VictoriaMetrics if the provided extra filters match too many time series. + The downside is that the endpoints can return labels and series, which do not match the provided extra filters. - `-search.maxSamplesPerSeries` limits the number of raw samples the query can process per each time series. VictoriaMetrics sequentially processes raw samples per each found time series during the query. It unpacks raw samples on the selected time range per each time series into memory and then applies the given [rollup function](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions). The `-search.maxSamplesPerSeries` command-line flag @@ -1723,11 +1729,6 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). In this case it might be useful to set the `-search.maxLabelsAPIDuration` to quite low value in order to limit CPU and memory usage. See also `-search.maxLabelsAPISeries` and `-search.ignoreExtraFiltersAtLabelsAPI`. -- `-search.ignoreExtraFiltersAtLabelsAPI` enables ignoring of `match[]`, [`extra_filters[]` and `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements) - query args at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels) and - [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues). - This may be useful for reducing the load on VictoriaMetrics if the provided extra filters match too many time series. - The downside is that the endpoints can return labels and series, which do not match the provided extra filters. - `-search.maxTagValueSuffixesPerSearch` limits the number of entries, which may be returned from `/metrics/find` endpoint. See [Graphite Metrics API usage docs](#graphite-metrics-api-usage). See also [resource usage limits at VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#resource-usage-limits), @@ -2790,7 +2791,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -loggerWarnsPerSecondLimit int Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit -maxConcurrentInserts int - The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 32) + The maximum number of concurrent insert requests. Default value depends on the number of CPU cores and should work for most cases since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration -maxInsertRequestSize size The maximum size in bytes of a single Prometheus remote_write API request Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 33554432) @@ -3118,6 +3119,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. + -streamAggr.ignoreOldSamples + Whether to ignore input samples with old timestamps outside the current aggregation interval. See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples -streamAggr.keepInput Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html -tls array diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 10908f415..1089860a3 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -1687,8 +1687,14 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical This allows saving CPU and RAM when executing unexpected heavy queries. - `-search.maxConcurrentRequests` limits the number of concurrent requests VictoriaMetrics can process. Bigger number of concurrent requests usually means bigger memory usage. For example, if a single query needs 100 MiB of additional memory during its execution, then 100 concurrent queries may need `100 * 100 MiB = 10 GiB` - of additional memory. So it is better to limit the number of concurrent queries, while suspending additional incoming queries if the concurrency limit is reached. - VictoriaMetrics provides `-search.maxQueueDuration` command-line flag for limiting the max wait time for suspended queries. See also `-search.maxMemoryPerQuery` command-line flag. + of additional memory. So it is better to limit the number of concurrent queries, while pausing additional incoming queries if the concurrency limit is reached. + VictoriaMetrics provides `-search.maxQueueDuration` command-line flag for limiting the max wait time for paused queries. See also `-search.maxMemoryPerQuery` command-line flag. +- `-search.maxQueueDuration` limits the maximum duration queries may wait for execution when `-search.maxConcurrentRequests` concurrent queries are executed. +- `-search.ignoreExtraFiltersAtLabelsAPI` enables ignoring of `match[]`, [`extra_filters[]` and `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements) + query args at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels) and + [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues). + This may be useful for reducing the load on VictoriaMetrics if the provided extra filters match too many time series. + The downside is that the endpoints can return labels and series, which do not match the provided extra filters. - `-search.maxSamplesPerSeries` limits the number of raw samples the query can process per each time series. VictoriaMetrics sequentially processes raw samples per each found time series during the query. It unpacks raw samples on the selected time range per each time series into memory and then applies the given [rollup function](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions). The `-search.maxSamplesPerSeries` command-line flag @@ -1731,11 +1737,6 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). In this case it might be useful to set the `-search.maxLabelsAPIDuration` to quite low value in order to limit CPU and memory usage. See also `-search.maxLabelsAPISeries` and `-search.ignoreExtraFiltersAtLabelsAPI`. -- `-search.ignoreExtraFiltersAtLabelsAPI` enables ignoring of `match[]`, [`extra_filters[]` and `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements) - query args at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels) and - [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues). - This may be useful for reducing the load on VictoriaMetrics if the provided extra filters match too many time series. - The downside is that the endpoints can return labels and series, which do not match the provided extra filters. - `-search.maxTagValueSuffixesPerSearch` limits the number of entries, which may be returned from `/metrics/find` endpoint. See [Graphite Metrics API usage docs](#graphite-metrics-api-usage). See also [resource usage limits at VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#resource-usage-limits), @@ -3126,6 +3127,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. + -streamAggr.ignoreOldSamples + Whether to ignore input samples with old timestamps outside the current aggregation interval. See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples -streamAggr.keepInput Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html -tls array diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 8c1eb0ed1..2bde2c64a 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -80,6 +80,18 @@ It is possible to drop the given labels before applying the de-duplication. See The online de-duplication uses the same logic as [`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication) at VictoriaMetrics. +## Ignoring old samples + +By default all the input samples are taken into account during stream aggregation. If samples with old timestamps outside the current [aggregation interval](#stream-aggregation-config) +must be ignored, then the following options can be used: + +- To pass `-remoteWrite.streamAggr.ignoreOldSamples` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/) + or `-streamAggr.ignoreOldSamples` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/). + This enables ignoring old samples for all the [aggregation configs](#stream-aggregation-config). + +- To set `ignore_old_samples: true` option at the particular [aggregation config](#stream-aggregation-config). + This enables ignoring old samples for that particular aggregation config. + ## Flush time alignment By default the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config). @@ -915,6 +927,11 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # # keep_metric_names: false + # ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval. + # See also -streamAggr.ignoreOldSamples command-line flag. + # + # ignore_old_samples: false + # drop_input_labels instructs dropping the given labels from input samples. # The labels' dropping is performed before input_relabel_configs are applied. # This also means that the labels are dropped before de-duplication ( https://docs.victoriametrics.com/stream-aggregation.html#deduplication ) diff --git a/docs/vmagent.md b/docs/vmagent.md index 556accca6..652b0bfff 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -2125,6 +2125,10 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html Supports array of values separated by comma or specified via multiple flags. Empty values are set to false. + -remoteWrite.streamAggr.ignoreOldSamples array + Whether to ignore input samples with old timestamps outside the current aggregation interval for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples + Supports array of values separated by comma or specified via multiple flags. + Empty values are set to false. -remoteWrite.streamAggr.keepInput array Whether to keep all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html Supports array of values separated by comma or specified via multiple flags. diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index e1418c948..3aaee6ac8 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -101,8 +102,15 @@ type Options struct { // // input_name:[_by_][_without_]_ // - // This option can be overriden individually per each aggregation via keep_metric_names option. + // This option can be overridden individually per each aggregation via keep_metric_names option. KeepMetricNames bool + + // IgnoreOldSamples instructs to ignore samples with timestamps older than the current aggregation interval. + // + // By default all the samples are taken into account. + // + // This option can be overridden individually per each aggregation via ignore_old_samples option. + IgnoreOldSamples bool } // Config is a configuration for a single stream aggregation. @@ -164,6 +172,9 @@ type Config struct { // KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix. KeepMetricNames *bool `yaml:"keep_metric_names,omitempty"` + // IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. + IgnoreOldSamples *bool `yaml:"ignore_old_samples,omitempty"` + // By is an optional list of labels for grouping input series. // // See also Without. @@ -327,7 +338,8 @@ type aggregator struct { inputRelabeling *promrelabel.ParsedConfigs outputRelabeling *promrelabel.ParsedConfigs - keepMetricNames bool + keepMetricNames bool + ignoreOldSamples bool by []string without []string @@ -342,6 +354,9 @@ type aggregator struct { // lc is used for compressing series keys before passing them to dedupAggr and aggrState lc promutils.LabelsCompressor + // minTimestamp is used for ignoring old samples when ignoreOldSamples is set + minTimestamp atomic.Int64 + // suffix contains a suffix, which should be added to aggregate metric names // // It contains the interval, labels in (by, without), plus output name. @@ -458,6 +473,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option } } + // check cfg.IgnoreOldSamples + ignoreOldSamples := opts.IgnoreOldSamples + if v := cfg.IgnoreOldSamples; v != nil { + ignoreOldSamples = *v + } + // initialize outputs list if len(cfg.Outputs) == 0 { return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ @@ -544,7 +565,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option inputRelabeling: inputRelabeling, outputRelabeling: outputRelabeling, - keepMetricNames: keepMetricNames, + keepMetricNames: keepMetricNames, + ignoreOldSamples: ignoreOldSamples, by: by, without: without, @@ -710,6 +732,8 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState } wg.Wait() + a.minTimestamp.Store(startTime.UnixMilli() - 5_000) + d := time.Since(startTime) a.flushDuration.Update(d.Seconds()) if d > interval { @@ -742,6 +766,8 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { outputLabels := &ctx.outputLabels dropLabels := a.dropInputLabels + ignoreOldSamples := a.ignoreOldSamples + minTimestamp := a.minTimestamp.Load() for idx, ts := range tss { if !a.match.Match(ts.Labels) { continue @@ -775,6 +801,10 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { // Skip NaN values continue } + if ignoreOldSamples && sample.Timestamp < minTimestamp { + // Skip old samples outside the current aggregation interval + continue + } samples = append(samples, pushSample{ key: key, value: sample.Value,