lib/streamaggr: skip unfinished aggregation state on shutdown by default (#5689)

Sending unfinished aggregate states tend to produce unexpected anomalies with lower values than expected.
The old behavior can be restored by specifying `flush_on_shutdown: true` setting in streaming aggregation config

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2024-01-26 22:45:23 +01:00 committed by Aliaksandr Valialkin
parent 562edb72ea
commit 9e9f170fe7
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
4 changed files with 40 additions and 2 deletions

View File

@ -37,6 +37,7 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to set `attach_metadata.node=true` option for all the [`kubernetes_sd_configs`](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) defined at [`-promscrape.config`](https://docs.victoriametrics.com/vmagent.html#quick-start) via `-promscrape.kubernetes.attachNodeMetadataAll` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4640). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5593).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not send unfinished [aggregation state](https://docs.victoriametrics.com/stream-aggregation.html) on shutdown or [hot config reload](https://docs.victoriametrics.com/stream-aggregation.html#configuration-update) by default, as it tend to produce unexpected anomalies with lower values. The old behavior can be restored by specifying `flush_on_shutdown: true` setting in streaming aggregation config. See more details [here](https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs).
* FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation.html): expand `%{ENV_VAR}` placeholders in config files with the corresponding environment variable values.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags:
- `-datasource.oauth2.endpointParams` for `-datasource.url`

View File

@ -410,11 +410,15 @@ For example, the following config removes the `:1m_sum_samples` suffix added [to
## Aggregation outputs
The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config)
and then sent to the storage.
and then sent to the storage once per `interval`.
If `by` and `without` lists are specified in the [config](#stream-aggregation-config),
then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`.
On vmagent shutdown or [configuration reload](#configuration-update) unfinished aggregated states are discarded,
as they might produce lower values than user expects. It is possible to specify `flush_on_shutdown: true` setting in
aggregation config to make vmagent to send unfinished states to the remote storage.
Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config).
### total
@ -642,6 +646,12 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
#
# staleness_interval: 2m
# flush_on_shutdown defines whether to flush the unfinished aggregation states on process restarts
# or config reloads. It is not recommended changing this setting, unless unfinished aggregations states
# are preferred to missing data points.
# Is `false` by default.
# flush_on_shutdown: false
# without is an optional list of labels, which must be removed from the output aggregation.
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels
#

View File

@ -131,6 +131,10 @@ type Config struct {
// OutputRelabelConfigs is an optional relabeling rules, which are applied
// on the aggregated output before being sent to remote storage.
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
// FlushOnShutdown defines whether to flush the aggregation state on process termination
// or config reload. Is `false` by default.
FlushOnShutdown bool `yaml:"flush_on_shutdown,omitempty"`
}
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
@ -240,6 +244,10 @@ type aggregator struct {
// for `interval: 1m`, `by: [job]`
suffix string
// flushOnShutdown defines whether to flush the state of aggregation
// on MustStop call.
flushOnShutdown bool
wg sync.WaitGroup
stopCh chan struct{}
}
@ -393,7 +401,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
aggrStates: aggrStates,
pushFunc: pushFunc,
suffix: suffix,
suffix: suffix,
flushOnShutdown: cfg.FlushOnShutdown,
stopCh: make(chan struct{}),
}
@ -506,6 +515,10 @@ func (a *aggregator) MustStop() {
close(a.stopCh)
a.wg.Wait()
if !a.flushOnShutdown {
return
}
// Flush the remaining data from the last interval if needed.
flushConcurrencyCh <- struct{}{}
if a.dedupAggr != nil {

View File

@ -155,6 +155,15 @@ func TestAggregatorsEqual(t *testing.T) {
`, `
- outputs: [total]
interval: 5m
`, false)
f(`
- outputs: [total]
interval: 5m
flush_on_shutdown: true
`, `
- outputs: [total]
interval: 5m
flush_on_shutdown: false
`, false)
}
@ -181,6 +190,11 @@ func TestAggregatorsSuccess(t *testing.T) {
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
for _, ag := range a.as {
// explicitly set flushOnShutdown, so aggregations results
// are immediately available after a.MustStop() call.
ag.flushOnShutdown = true
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)