diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 3383b7e96..4b53cc626 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -22,6 +22,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert): revert the default value of `-remoteWrite.maxQueueSize` from `1_000_000` to `100_000`. It was bumped in [v1.104.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.104.0), which increases memory usage and is not needed for most setups. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7471). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add `Raw Query` tab for displaying raw data. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7024). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `ignore_first_sample_interval` param to [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows users to control the time interval when aggregation skips sending aggregated samples to avoid unexpected spikes in values. By default, this interval is set to x2 of `staleness_interval`. The new setting is applicable only to `total`, `total_prometheus`, `increase`, `increase_prometheus` and `histogram_bucket` outputs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7116) for details. Thanks to @iyuroch for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7313). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent): Properly return `200 OK` HTTP status code when importing data via [Pushgateway protocol](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format) using [multitenant URL format](https://docs.victoriametrics.com/cluster-victoriametrics/#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7571). * BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): properly return result for binary operation `^` aka pow at query requests for `NaN` values. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7359) for details. diff --git a/docs/operator/api.md b/docs/operator/api.md index 4b7e9c4d2..868c6f79d 100644 --- a/docs/operator/api.md +++ b/docs/operator/api.md @@ -2061,6 +2061,7 @@ _Appears in:_ | `output_relabel_configs` | OutputRelabelConfigs is an optional relabeling rules, which are applied
on the aggregated output before being sent to remote storage. | _[RelabelConfig](#relabelconfig) array_ | false | | `outputs` | Outputs is a list of output aggregate functions to produce.

The following names are allowed:

- total - aggregates input counters
- increase - counts the increase over input counters
- count_series - counts the input series
- count_samples - counts the input samples
- sum_samples - sums the input samples
- last - the last biggest sample value
- min - the minimum sample value
- max - the maximum sample value
- avg - the average value across all the samples
- stddev - standard deviation across all the samples
- stdvar - standard variance across all the samples
- histogram_bucket - creates VictoriaMetrics histogram for input samples
- quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]

The output time series will have the following names:

input_name:aggr__ | _string array_ | true | | `staleness_interval` | Staleness interval is interval after which the series state will be reset if no samples have been sent during it.
The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket. | _string_ | false | +| `ignore_first_sample_interval` | IgnoreFirstSampleInterval specifies the interval after which the agent begins sending samples.
By default, it is set to the `staleness_interval`. It helps reducing the initial sample load after the agent restart.
This parameter is relevant only for the following outputs: total, total_prometheus, increase, increase_prometheus, and histogram_bucket. We recommend setting it to 0s unless you observe unexpected spikes in produced values. | _string_ | false | | `without` | Without is an optional list of labels, which must be excluded when grouping input series.

See also By.

If neither By nor Without are set, then the Outputs are calculated
individually per each input time series. | _string array_ | false | diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 6188f8b80..f16f9a10a 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -174,6 +174,11 @@ type Config struct { // The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket. StalenessInterval string `yaml:"staleness_interval,omitempty"` + // IgnoreFirstSampleInterval specifies the interval after which the agent begins sending samples. + // By default, it is set to the staleness interval, and it helps reduce the initial sample load after an agent restart. + // This parameter is relevant only for the following outputs: total, total_prometheus, increase, increase_prometheus, and histogram_bucket. + IgnoreFirstSampleInterval string `yaml:"ignore_first_sample_interval,omitempty"` + // Outputs is a list of output aggregate functions to produce. // // The following names are allowed: @@ -483,6 +488,16 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, } } + // check cfg.IgnoreFirstSampleInterval + // by default, it equals to the staleness interval to have backward compatibility, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7116 + ignoreFirstSampleInterval := stalenessInterval + if cfg.IgnoreFirstSampleInterval != "" { + ignoreFirstSampleInterval, err = time.ParseDuration(cfg.IgnoreFirstSampleInterval) + if err != nil { + return nil, fmt.Errorf("cannot parse `ignore_first_sample_interval: %q`: %w", cfg.IgnoreFirstSampleInterval, err) + } + } + // Check cfg.DropInputLabels dropInputLabels := opts.DropInputLabels if v := cfg.DropInputLabels; v != nil { @@ -557,7 +572,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, aggrOutputs := make([]aggrOutput, len(cfg.Outputs)) outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) for i, output := range cfg.Outputs { - as, err := newAggrState(output, outputsSeen, stalenessInterval) + as, err := newAggrState(output, outputsSeen, stalenessInterval, ignoreFirstSampleInterval) if err != nil { return nil, err } @@ -645,7 +660,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, return a, nil } -func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) { +func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval, ignoreFirstSampleInterval time.Duration) (aggrState, error) { // check for duplicated output if _, ok := outputsSeen[output]; ok { return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) @@ -690,9 +705,9 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter case "histogram_bucket": return newHistogramBucketAggrState(stalenessInterval), nil case "increase": - return newTotalAggrState(stalenessInterval, true, true), nil + return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, true, true), nil case "increase_prometheus": - return newTotalAggrState(stalenessInterval, true, false), nil + return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, true, false), nil case "last": return newLastAggrState(), nil case "max": @@ -710,9 +725,9 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter case "sum_samples": return newSumSamplesAggrState(), nil case "total": - return newTotalAggrState(stalenessInterval, false, true), nil + return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, false, true), nil case "total_prometheus": - return newTotalAggrState(stalenessInterval, false, false), nil + return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, false, false), nil case "unique_samples": return newUniqueSamplesAggrState(), nil default: diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index ae64a6ac7..8338be12c 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -554,6 +554,18 @@ foo 123 bar{baz="qwe"} 4.34 `, `bar:1m_total{baz="qwe"} 0 foo:1m_total 0 +`, "11") + + // total output for non-repeated series, ignore first sample 0s + f(` +- interval: 1m + outputs: [total] + ignore_first_sample_interval: 0s +`, ` +foo 123 +bar{baz="qwe"} 4.34 +`, `bar:1m_total{baz="qwe"} 4.34 +foo:1m_total 123 `, "11") // total_prometheus output for non-repeated series diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index e905e4531..53dbcdc47 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -45,9 +45,9 @@ type totalLastValueState struct { deleteDeadline uint64 } -func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { +func newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { stalenessSecs := roundDurationToSecs(stalenessInterval) - ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs + ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + roundDurationToSecs(ignoreFirstSampleInterval) return &totalAggrState{ resetTotalOnFlush: resetTotalOnFlush, @@ -60,7 +60,7 @@ func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepF func (as *totalAggrState) pushSamples(samples []pushSample) { currentTime := fasttime.UnixTimestamp() deleteDeadline := currentTime + as.stalenessSecs - keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline + keepFirstSample := as.keepFirstSample && currentTime >= as.ignoreFirstSampleDeadline for i := range samples { s := &samples[i] inputKey, outputKey := getInputOutputKey(s.key)