app/streamaggr: follow-up after c0e4ccb7b5

* rm vmagent mentions from vminsert flags
* improve documentation wording, add links to related sections
* mention `ignore_first_intervals` in the stream aggr options
* update flags description
* add basic test for config parsing validation

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
hagen1778 2024-04-22 14:22:59 +02:00
parent c0e4ccb7b5
commit bae3874e6a
No known key found for this signature in database
GPG Key ID: 3BF75F3741CA9640
9 changed files with 48 additions and 19 deletions

View File

@ -3159,6 +3159,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-streamAggr.ignoreFirstIntervals int
Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
-streamAggr.ignoreOldSamples
Whether to ignore input samples with old timestamps outside the current aggregation interval. See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
-streamAggr.keepInput

View File

@ -105,7 +105,8 @@ var (
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+
"for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent.")
streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")

View File

@ -32,7 +32,8 @@ var (
"See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent.")
streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
)

View File

@ -3162,6 +3162,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-streamAggr.ignoreFirstIntervals int
Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
-streamAggr.ignoreOldSamples
Whether to ignore input samples with old timestamps outside the current aggregation interval. See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
-streamAggr.keepInput

View File

@ -3170,6 +3170,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-streamAggr.ignoreFirstIntervals int
Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
-streamAggr.ignoreOldSamples
Whether to ignore input samples with old timestamps outside the current aggregation interval. See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
-streamAggr.keepInput

View File

@ -19,7 +19,7 @@ The aggregation is applied to all the metrics received via any [supported data i
and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter)
after applying all the configured [relabeling stages](https://docs.victoriametrics.com/vmagent/#relabeling).
By default stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples).
By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples).
It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples).
Stream aggregation can be configured via the following command-line flags:
@ -82,7 +82,7 @@ The online de-duplication uses the same logic as [`-dedup.minScrapeInterval` com
## Ignoring old samples
By default all the input samples are taken into account during stream aggregation. If samples with old timestamps outside the current [aggregation interval](#stream-aggregation-config)
By default, all the input samples are taken into account during stream aggregation. If samples with old timestamps outside the current [aggregation interval](#stream-aggregation-config)
must be ignored, then the following options can be used:
- To pass `-remoteWrite.streamAggr.ignoreOldSamples` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/)
@ -94,19 +94,23 @@ must be ignored, then the following options can be used:
## Ignore aggregation intervals on start
Stream aggregation may yield inaccurate results if it processes incomplete data. This issue can arise when data is sourced from clients that maintain a queue of unsent data, such as Prometheus or vmagent. If the queue isn't fully cleared within the aggregation interval, only a portion of the time series may be included in that period, leading to distorted calculations. To mitigate this, consider the following options:
Stream aggregation may yield inaccurate results if it processes incomplete data. This issue can arise when data is
received from clients that maintain a queue of unsent data, such as Prometheus or vmagent. If the queue isn't fully
cleared within the aggregation `interval`, only a portion of the time series may be processed, leading to distorted
calculations. To mitigate this, consider the following options:
- Set `-remoteWrite.streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/)
or `-streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/) to skip first `<intervalsCount>` [aggregation intervals](#stream-aggregation-config)
or `-streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/)
to skip first `<intervalsCount>` [aggregation intervals](#stream-aggregation-config)
from persisting to the storage. It is expected that all incomplete or queued data will be processed during
specified `<intervalsCount>` and all subsequent aggregation intervals will produce correct data.
- To set `ignore_first_intervals: <intervalsCount>` option at the particular [aggregation config](#stream-aggregation-config).
- Set `ignore_first_intervals: <intervalsCount>` option individually per [aggregation config](#stream-aggregation-config).
This enables ignoring first `<intervalsCount>` aggregation intervals for that particular aggregation config.
## Flush time alignment
By default the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config).
By default, the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config).
For example:
- if `interval: 1m` is set, then the aggregated data is flushed to the storage at the end of every minute
- if `interval: 1h` is set, then the aggregated data is flushed to the storage at the end of every hour
@ -887,7 +891,7 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-
# Samples are de-duplicated on a per-series basis. See https://docs.victoriametrics.com/keyconcepts/#time-series
# and https://docs.victoriametrics.com/#deduplication
# The deduplication is performed after input_relabel_configs relabeling is applied.
# By default the deduplication is disabled unless -remoteWrite.streamAggr.dedupInterval or -streamAggr.dedupInterval
# By default, the deduplication is disabled unless -remoteWrite.streamAggr.dedupInterval or -streamAggr.dedupInterval
# command-line flags are set.
#
# dedup_interval: 30s
@ -904,7 +908,7 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-
# staleness_interval: 2m
# no_align_flush_to_interval disables aligning of flush times for the aggregated data to multiples of interval.
# By default flush times for the aggregated data is aligned to multiples of interval.
# By default, flush times for the aggregated data is aligned to multiples of interval.
# For example:
# - if `interval: 1m` is set, then flushes happen at the end of every minute,
# - if `interval: 1h` is set, then flushes happen at the end of every hour
@ -934,16 +938,23 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-
# keep_metric_names instructs keeping the original metric names for the aggregated samples.
# This option can be set only if outputs list contains only a single output.
# By default a special suffix is added to original metric names in the aggregated samples.
# By default, a special suffix is added to original metric names in the aggregated samples.
# See https://docs.victoriametrics.com/stream-aggregation/#output-metric-names
#
# keep_metric_names: false
# ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval.
# See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
# See also -streamAggr.ignoreOldSamples command-line flag.
#
# ignore_old_samples: false
# ignore_first_intervals instructs ignoring first N aggregation intervals after process start.
# See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
# See also -remoteWrite.streamAggr.ignoreFirstIntervals or -streamAggr.ignoreFirstIntervals
#
# ignore_first_intervals: false
# drop_input_labels instructs dropping the given labels from input samples.
# The labels' dropping is performed before input_relabel_configs are applied.
# This also means that the labels are dropped before de-duplication ( https://docs.victoriametrics.com/stream-aggregation/#deduplication )
@ -1016,7 +1027,7 @@ These issues can be be fixed in the following ways:
- By increasing the `interval` option at [stream aggregation config](#stream-aggregation-config), so it covers the expected
delays in data ingestion pipelines.
- By specifying the `staleness_interval` option at [stream aggregation config](#stream-aggregation-config), so it covers the expected
delays in data ingestion pipelines. By default the `staleness_interval` equals to `2 x interval`.
delays in data ingestion pipelines. By default, the `staleness_interval` equals to `2 x interval`.
### High resource usage

View File

@ -2166,6 +2166,8 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/
Supports array of values separated by comma or specified via multiple flags.
Empty values are set to false.
-remoteWrite.streamAggr.ignoreFirstIntervals int
Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
-remoteWrite.streamAggr.ignoreOldSamples array
Whether to ignore input samples with old timestamps outside the current aggregation interval for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
Supports array of values separated by comma or specified via multiple flags.

View File

@ -112,11 +112,11 @@ type Options struct {
// This option can be overridden individually per each aggregation via ignore_old_samples option.
IgnoreOldSamples bool
// IgnoreFirstIntervals sets amount of intervals to ignore on start
// IgnoreFirstIntervals sets amount of aggregation intervals to ignore on start.
//
// By default no intervals will be ignored.
// By default, no intervals will be ignored.
//
// This option can be overridden individually per each aggregation via ignore_intervals_on_start option.
// This option can be overridden individually per each aggregation via ignore_first_intervals option.
IgnoreFirstIntervals int
}

View File

@ -199,6 +199,14 @@ func TestAggregatorsEqual(t *testing.T) {
interval: 5m
flush_on_shutdown: false
`, false)
f(`
- outputs: [total]
interval: 5m
ignore_first_intervals: 2
`, `
- outputs: [total]
interval: 5m
ignore_first_intervals: 4`, false)
}
func TestAggregatorsSuccess(t *testing.T) {