From f3cbd62823513529dd9be14cbee7598f39f513f7 Mon Sep 17 00:00:00 2001 From: Hui Wang Date: Fri, 12 Jul 2024 16:56:07 +0800 Subject: [PATCH] vmagent: fix `vm_streamaggr_flushed_samples_total` counter (#6604) We use `vm_streamaggr_flushed_samples_total` to show the number of produced samples by aggregation rule, previously it was overcounted, and doesn't account for `output_relabel_configs`. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 --------- Signed-off-by: hagen1778 Co-authored-by: hagen1778 (cherry picked from commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8) --- docs/CHANGELOG.md | 3 ++ docs/stream-aggregation.md | 2 +- lib/streamaggr/streamaggr.go | 68 +++++++++++++++++-------------- lib/streamaggr/streamaggr_test.go | 9 ++++ 4 files changed, 50 insertions(+), 32 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7dfcaadf1b..c97af984d7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -32,6 +32,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). **Update note 1: support for snap packages was removed due to lack of interest from community. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6543) for details. Please read about supported package types [here](https://docs.victoriametrics.com/#install).** +**Update note 2: [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) now prevents setting multiple identical outputs. For example, `outputs: [total, total]` will fail the validation phase. In addition, `outputs: ["quantiles(0.5)", "quantiles(0.9)"]` will fail the validation as well - use `outputs: ["quantiles(0.5, 0.9)"]` instead.** + * SECURITY: upgrade Go builder from Go1.22.4 to Go1.22.5. See the list of issues addressed in [Go1.22.5](https://github.com/golang/go/issues?q=milestone%3AGo1.22.5+label%3ACherryPickApproved). * SECURITY: upgrade base docker image (Alpine) from 3.20.0 to 3.20.1. See [alpine 3.20.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.20.1-released.html). @@ -53,6 +55,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show compacted result in the JSON tab for query results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6559). * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup/index.html): add support of using Azure Managed Identity and default credentials lookup when performing backups. See configuration docs [here](https://docs.victoriametrics.com/vmbackup/#providing-credentials-via-env-variables). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5984) for the details. Thanks to @justinrush for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6518). * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup/index.html): allow overriding Azure storage domain when performing backups. See configuration docs [here](https://docs.victoriametrics.com/vmbackup/#providing-credentials-via-env-variables). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5984) for the details. Thanks to @justinrush for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6518). +* FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation/): prevent having duplicated aggregation function as `outputs` in one [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It also prevents using `outputs: ["quantiles(0.5)", "quantiles(0.9)"]` instead of `outputs: ["quantiles(0.5, 0.9)"]`, as the former has higher computation cost for producing the same result. * BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana). * BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana). diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 7733c57339..b791aaeb21 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -995,7 +995,7 @@ specified individually per each `-remoteWrite.url`: # # by: [job, vmrange] - # outputs is the list of aggregations to perform on the input data. + # outputs is the list of unique aggregations to perform on the input data. # See https://docs.victoriametrics.com/stream-aggregation/#aggregation-outputs # outputs: [total] diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 3fa074fbc5..7691b9b4af 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -384,7 +384,7 @@ type aggregator struct { da *dedupAggr // aggrStates contains aggregate states for the given outputs - aggrStates []aggrState + aggrStates map[string]aggrState // minTimestamp is used for ignoring old samples when ignoreOldSamples is set minTimestamp atomic.Int64 @@ -505,7 +505,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options } if keepMetricNames { if len(cfg.Outputs) != 1 { - return nil, fmt.Errorf("`ouputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs) + return nil, fmt.Errorf("`outputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs) } if cfg.Outputs[0] == "histogram_bucket" || strings.HasPrefix(cfg.Outputs[0], "quantiles(") && strings.Contains(cfg.Outputs[0], ",") { return nil, fmt.Errorf("`keep_metric_names` cannot be applied to `outputs: %q`, since they can generate multiple time series", cfg.Outputs) @@ -526,11 +526,14 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options // initialize outputs list if len(cfg.Outputs) == 0 { - return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ - "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) + return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s", supportedOutputs) } - aggrStates := make([]aggrState, len(cfg.Outputs)) - for i, output := range cfg.Outputs { + aggrStates := make(map[string]aggrState, len(cfg.Outputs)) + for _, output := range cfg.Outputs { + // check for duplicated output + if _, ok := aggrStates[output]; ok { + return nil, fmt.Errorf("`outputs` list contains duplicated aggregation function: %s", output) + } if strings.HasPrefix(output, "quantiles(") { if !strings.HasSuffix(output, ")") { return nil, fmt.Errorf("missing closing brace for `quantiles()` output") @@ -552,47 +555,49 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options } phis[j] = phi } - aggrStates[i] = newQuantilesAggrState(phis) + if _, ok := aggrStates["quantiles"]; ok { + return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") + } + aggrStates["quantiles"] = newQuantilesAggrState(phis) continue } switch output { case "total": - aggrStates[i] = newTotalAggrState(stalenessInterval, false, true) + aggrStates[output] = newTotalAggrState(stalenessInterval, false, true) case "total_prometheus": - aggrStates[i] = newTotalAggrState(stalenessInterval, false, false) + aggrStates[output] = newTotalAggrState(stalenessInterval, false, false) case "increase": - aggrStates[i] = newTotalAggrState(stalenessInterval, true, true) + aggrStates[output] = newTotalAggrState(stalenessInterval, true, true) case "increase_prometheus": - aggrStates[i] = newTotalAggrState(stalenessInterval, true, false) + aggrStates[output] = newTotalAggrState(stalenessInterval, true, false) case "rate_sum": - aggrStates[i] = newRateAggrState(stalenessInterval, "rate_sum") + aggrStates[output] = newRateAggrState(stalenessInterval, "rate_sum") case "rate_avg": - aggrStates[i] = newRateAggrState(stalenessInterval, "rate_avg") + aggrStates[output] = newRateAggrState(stalenessInterval, "rate_avg") case "count_series": - aggrStates[i] = newCountSeriesAggrState() + aggrStates[output] = newCountSeriesAggrState() case "count_samples": - aggrStates[i] = newCountSamplesAggrState() + aggrStates[output] = newCountSamplesAggrState() case "unique_samples": - aggrStates[i] = newUniqueSamplesAggrState() + aggrStates[output] = newUniqueSamplesAggrState() case "sum_samples": - aggrStates[i] = newSumSamplesAggrState() + aggrStates[output] = newSumSamplesAggrState() case "last": - aggrStates[i] = newLastAggrState() + aggrStates[output] = newLastAggrState() case "min": - aggrStates[i] = newMinAggrState() + aggrStates[output] = newMinAggrState() case "max": - aggrStates[i] = newMaxAggrState() + aggrStates[output] = newMaxAggrState() case "avg": - aggrStates[i] = newAvgAggrState() + aggrStates[output] = newAvgAggrState() case "stddev": - aggrStates[i] = newStddevAggrState() + aggrStates[output] = newStddevAggrState() case "stdvar": - aggrStates[i] = newStdvarAggrState() + aggrStates[output] = newStdvarAggrState() case "histogram_bucket": - aggrStates[i] = newHistogramBucketAggrState(stalenessInterval) + aggrStates[output] = newHistogramBucketAggrState(stalenessInterval) default: - return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+ - "see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) + return nil, fmt.Errorf("unsupported output=%q; supported values: %s;", output, supportedOutputs) } } @@ -806,7 +811,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState a.minTimestamp.Store(startTime.UnixMilli() - 5_000) var wg sync.WaitGroup - for _, as := range a.aggrStates { + for output, as := range a.aggrStates { flushConcurrencyCh <- struct{}{} wg.Add(1) go func(as aggrState) { @@ -817,7 +822,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState ctx := getFlushCtx(a, pushFunc) as.flushState(ctx, resetState) - ctx.flushSeries() + ctx.flushSeries(output) ctx.resetSeries() putFlushCtx(ctx) }(as) @@ -1074,7 +1079,7 @@ func (ctx *flushCtx) resetSeries() { ctx.samples = ctx.samples[:0] } -func (ctx *flushCtx) flushSeries() { +func (ctx *flushCtx) flushSeries(aggrStateSuffix string) { tss := ctx.tss if len(tss) == 0 { // nothing to flush @@ -1086,6 +1091,7 @@ func (ctx *flushCtx) flushSeries() { // Fast path - push the output metrics. if ctx.pushFunc != nil { ctx.pushFunc(tss) + ctx.a.flushedSamples[aggrStateSuffix].Add(len(tss)) } return } @@ -1107,6 +1113,7 @@ func (ctx *flushCtx) flushSeries() { } if ctx.pushFunc != nil { ctx.pushFunc(dst) + ctx.a.flushedSamples[aggrStateSuffix].Add(len(dst)) } auxLabels.Labels = dstLabels promutils.PutLabels(auxLabels) @@ -1127,11 +1134,10 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo Labels: ctx.labels[labelsLen:], Samples: ctx.samples[samplesLen:], }) - ctx.a.flushedSamples[suffix].Add(len(ctx.tss)) // Limit the maximum length of ctx.tss in order to limit memory usage. if len(ctx.tss) >= 10_000 { - ctx.flushSeries() + ctx.flushSeries(suffix) ctx.resetSeries() } } diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 105caa00c7..9bd7b4e3ff 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -149,6 +149,15 @@ func TestAggregatorsFailure(t *testing.T) { f(` - interval: 1m outputs: ["quantiles(1.5)"] +`) + f(` +- interval: 1m + outputs: [total, total] +`) + // "quantiles(0.5)", "quantiles(0.9)" should be set as "quantiles(0.5, 0.9)" + f(` +- interval: 1m + outputs: ["quantiles(0.5)", "quantiles(0.9)"] `) }