lib/streamaggr: add ignore_first_sample_interval param for streamaggr cfg (#7313)

### Describe Your Changes

As of right now by default aggregated output in streaming aggregation
takes a staleness interval and only starts sending first samples after
the staleness interval passes. We have a use case where we prefer to
start sending data as soon as we have any. This adds the option to
configure when we start sending first samples

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7116

### Checklist

The following checks are **mandatory**:

- [x] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Ivan Yurochko 2024-11-21 16:20:22 +01:00 committed by GitHub
parent 84b4b5f3e5
commit 5dd879cd17
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 38 additions and 9 deletions

View File

@ -22,6 +22,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert): revert the default value of `-remoteWrite.maxQueueSize` from `1_000_000` to `100_000`. It was bumped in [v1.104.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.104.0), which increases memory usage and is not needed for most setups. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7471).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add `Raw Query` tab for displaying raw data. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7024).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `ignore_first_sample_interval` param to [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows users to control the time interval when aggregation skips sending aggregated samples to avoid unexpected spikes in values. By default, this interval is set to x2 of `staleness_interval`. The new setting is applicable only to `total`, `total_prometheus`, `increase`, `increase_prometheus` and `histogram_bucket` outputs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7116) for details. Thanks to @iyuroch for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7313).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent): Properly return `200 OK` HTTP status code when importing data via [Pushgateway protocol](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format) using [multitenant URL format](https://docs.victoriametrics.com/cluster-victoriametrics/#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7571).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): properly return result for binary operation `^` aka pow at query requests for `NaN` values. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7359) for details.

View File

@ -2061,6 +2061,7 @@ _Appears in:_
| `output_relabel_configs` | OutputRelabelConfigs is an optional relabeling rules, which are applied<br />on the aggregated output before being sent to remote storage. | _[RelabelConfig](#relabelconfig) array_ | false |
| `outputs` | Outputs is a list of output aggregate functions to produce.<br /><br />The following names are allowed:<br /><br />- total - aggregates input counters<br />- increase - counts the increase over input counters<br />- count_series - counts the input series<br />- count_samples - counts the input samples<br />- sum_samples - sums the input samples<br />- last - the last biggest sample value<br />- min - the minimum sample value<br />- max - the maximum sample value<br />- avg - the average value across all the samples<br />- stddev - standard deviation across all the samples<br />- stdvar - standard variance across all the samples<br />- histogram_bucket - creates VictoriaMetrics histogram for input samples<br />- quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]<br /><br />The output time series will have the following names:<br /><br /> input_name:aggr_<interval>_<output> | _string array_ | true |
| `staleness_interval` | Staleness interval is interval after which the series state will be reset if no samples have been sent during it.<br />The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket. | _string_ | false |
| `ignore_first_sample_interval` | IgnoreFirstSampleInterval specifies the interval after which the agent begins sending samples.<br />By default, it is set to the `staleness_interval`. It helps reducing the initial sample load after the agent restart.<br />This parameter is relevant only for the following outputs: total, total_prometheus, increase, increase_prometheus, and histogram_bucket. We recommend setting it to 0s unless you observe unexpected spikes in produced values. | _string_ | false |
| `without` | Without is an optional list of labels, which must be excluded when grouping input series.<br /><br />See also By.<br /><br />If neither By nor Without are set, then the Outputs are calculated<br />individually per each input time series. | _string array_ | false |

View File

@ -174,6 +174,11 @@ type Config struct {
// The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket.
StalenessInterval string `yaml:"staleness_interval,omitempty"`
// IgnoreFirstSampleInterval specifies the interval after which the agent begins sending samples.
// By default, it is set to the staleness interval, and it helps reduce the initial sample load after an agent restart.
// This parameter is relevant only for the following outputs: total, total_prometheus, increase, increase_prometheus, and histogram_bucket.
IgnoreFirstSampleInterval string `yaml:"ignore_first_sample_interval,omitempty"`
// Outputs is a list of output aggregate functions to produce.
//
// The following names are allowed:
@ -483,6 +488,16 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
}
// check cfg.IgnoreFirstSampleInterval
// by default, it equals to the staleness interval to have backward compatibility, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7116
ignoreFirstSampleInterval := stalenessInterval
if cfg.IgnoreFirstSampleInterval != "" {
ignoreFirstSampleInterval, err = time.ParseDuration(cfg.IgnoreFirstSampleInterval)
if err != nil {
return nil, fmt.Errorf("cannot parse `ignore_first_sample_interval: %q`: %w", cfg.IgnoreFirstSampleInterval, err)
}
}
// Check cfg.DropInputLabels
dropInputLabels := opts.DropInputLabels
if v := cfg.DropInputLabels; v != nil {
@ -557,7 +572,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
aggrOutputs := make([]aggrOutput, len(cfg.Outputs))
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
as, err := newAggrState(output, outputsSeen, stalenessInterval)
as, err := newAggrState(output, outputsSeen, stalenessInterval, ignoreFirstSampleInterval)
if err != nil {
return nil, err
}
@ -645,7 +660,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil
}
func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) {
func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval, ignoreFirstSampleInterval time.Duration) (aggrState, error) {
// check for duplicated output
if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
@ -690,9 +705,9 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter
case "histogram_bucket":
return newHistogramBucketAggrState(stalenessInterval), nil
case "increase":
return newTotalAggrState(stalenessInterval, true, true), nil
return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, true, true), nil
case "increase_prometheus":
return newTotalAggrState(stalenessInterval, true, false), nil
return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, true, false), nil
case "last":
return newLastAggrState(), nil
case "max":
@ -710,9 +725,9 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter
case "sum_samples":
return newSumSamplesAggrState(), nil
case "total":
return newTotalAggrState(stalenessInterval, false, true), nil
return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, false, true), nil
case "total_prometheus":
return newTotalAggrState(stalenessInterval, false, false), nil
return newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval, false, false), nil
case "unique_samples":
return newUniqueSamplesAggrState(), nil
default:

View File

@ -554,6 +554,18 @@ foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 0
foo:1m_total 0
`, "11")
// total output for non-repeated series, ignore first sample 0s
f(`
- interval: 1m
outputs: [total]
ignore_first_sample_interval: 0s
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 4.34
foo:1m_total 123
`, "11")
// total_prometheus output for non-repeated series

View File

@ -45,9 +45,9 @@ type totalLastValueState struct {
deleteDeadline uint64
}
func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
func newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + roundDurationToSecs(ignoreFirstSampleInterval)
return &totalAggrState{
resetTotalOnFlush: resetTotalOnFlush,
@ -60,7 +60,7 @@ func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepF
func (as *totalAggrState) pushSamples(samples []pushSample) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline
keepFirstSample := as.keepFirstSample && currentTime >= as.ignoreFirstSampleDeadline
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)