diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 09f2dca29a..05f484cb19 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -37,6 +37,7 @@ The sandbox cluster installation is running under the constant load generated by * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to set `attach_metadata.node=true` option for all the [`kubernetes_sd_configs`](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) defined at [`-promscrape.config`](https://docs.victoriametrics.com/vmagent.html#quick-start) via `-promscrape.kubernetes.attachNodeMetadataAll` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4640). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5593). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not send unfinished [aggregation state](https://docs.victoriametrics.com/stream-aggregation.html) on shutdown or [hot config reload](https://docs.victoriametrics.com/stream-aggregation.html#configuration-update) by default, as it tend to produce unexpected anomalies with lower values. The old behavior can be restored by specifying `flush_on_shutdown: true` setting in streaming aggregation config. See more details [here](https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs). * FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation.html): expand `%{ENV_VAR}` placeholders in config files with the corresponding environment variable values. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags: - `-datasource.oauth2.endpointParams` for `-datasource.url` diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index cf705538d2..25670012a6 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -410,11 +410,15 @@ For example, the following config removes the `:1m_sum_samples` suffix added [to ## Aggregation outputs The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config) -and then sent to the storage. +and then sent to the storage once per `interval`. If `by` and `without` lists are specified in the [config](#stream-aggregation-config), then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`. +On vmagent shutdown or [configuration reload](#configuration-update) unfinished aggregated states are discarded, +as they might produce lower values than user expects. It is possible to specify `flush_on_shutdown: true` setting in +aggregation config to make vmagent to send unfinished states to the remote storage. + Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config). ### total @@ -641,6 +645,12 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # The parameter is only relevant for outputs: total, increase and histogram_bucket. # # staleness_interval: 2m + + # flush_on_shutdown defines whether to flush the unfinished aggregation states on process restarts + # or config reloads. It is not recommended changing this setting, unless unfinished aggregations states + # are preferred to missing data points. + # Is `false` by default. + # flush_on_shutdown: false # without is an optional list of labels, which must be removed from the output aggregation. # See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 836d21bd15..b0c00284b0 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -131,6 +131,10 @@ type Config struct { // OutputRelabelConfigs is an optional relabeling rules, which are applied // on the aggregated output before being sent to remote storage. OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` + + // FlushOnShutdown defines whether to flush the aggregation state on process termination + // or config reload. Is `false` by default. + FlushOnShutdown bool `yaml:"flush_on_shutdown,omitempty"` } // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. @@ -240,6 +244,10 @@ type aggregator struct { // for `interval: 1m`, `by: [job]` suffix string + // flushOnShutdown defines whether to flush the state of aggregation + // on MustStop call. + flushOnShutdown bool + wg sync.WaitGroup stopCh chan struct{} } @@ -393,7 +401,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) aggrStates: aggrStates, pushFunc: pushFunc, - suffix: suffix, + suffix: suffix, + flushOnShutdown: cfg.FlushOnShutdown, stopCh: make(chan struct{}), } @@ -506,6 +515,10 @@ func (a *aggregator) MustStop() { close(a.stopCh) a.wg.Wait() + if !a.flushOnShutdown { + return + } + // Flush the remaining data from the last interval if needed. flushConcurrencyCh <- struct{}{} if a.dedupAggr != nil { diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 20f2d43f00..4df257d8b9 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -155,6 +155,15 @@ func TestAggregatorsEqual(t *testing.T) { `, ` - outputs: [total] interval: 5m +`, false) + f(` +- outputs: [total] + interval: 5m + flush_on_shutdown: true +`, ` +- outputs: [total] + interval: 5m + flush_on_shutdown: false `, false) } @@ -181,6 +190,11 @@ func TestAggregatorsSuccess(t *testing.T) { if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } + for _, ag := range a.as { + // explicitly set flushOnShutdown, so aggregations results + // are immediately available after a.MustStop() call. + ag.flushOnShutdown = true + } // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics)