vmagent: fix vm_streamaggr_flushed_samples_total counter (#6604)

We use `vm_streamaggr_flushed_samples_total` to show the number of
produced samples by aggregation rule, previously it was overcounted, and
doesn't account for `output_relabel_configs`.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit 2eb1bc4f81)
This commit is contained in:
Hui Wang 2024-07-12 16:56:07 +08:00 committed by hagen1778
parent 7c97cef95c
commit f3cbd62823
No known key found for this signature in database
GPG Key ID: 3BF75F3741CA9640
4 changed files with 50 additions and 32 deletions

View File

@ -32,6 +32,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
**Update note 1: support for snap packages was removed due to lack of interest from community. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6543) for details. Please read about supported package types [here](https://docs.victoriametrics.com/#install).**
**Update note 2: [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) now prevents setting multiple identical outputs. For example, `outputs: [total, total]` will fail the validation phase. In addition, `outputs: ["quantiles(0.5)", "quantiles(0.9)"]` will fail the validation as well - use `outputs: ["quantiles(0.5, 0.9)"]` instead.**
* SECURITY: upgrade Go builder from Go1.22.4 to Go1.22.5. See the list of issues addressed in [Go1.22.5](https://github.com/golang/go/issues?q=milestone%3AGo1.22.5+label%3ACherryPickApproved).
* SECURITY: upgrade base docker image (Alpine) from 3.20.0 to 3.20.1. See [alpine 3.20.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.20.1-released.html).
@ -53,6 +55,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show compacted result in the JSON tab for query results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6559).
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup/index.html): add support of using Azure Managed Identity and default credentials lookup when performing backups. See configuration docs [here](https://docs.victoriametrics.com/vmbackup/#providing-credentials-via-env-variables). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5984) for the details. Thanks to @justinrush for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6518).
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup/index.html): allow overriding Azure storage domain when performing backups. See configuration docs [here](https://docs.victoriametrics.com/vmbackup/#providing-credentials-via-env-variables). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5984) for the details. Thanks to @justinrush for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6518).
* FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation/): prevent having duplicated aggregation function as `outputs` in one [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It also prevents using `outputs: ["quantiles(0.5)", "quantiles(0.9)"]` instead of `outputs: ["quantiles(0.5, 0.9)"]`, as the former has higher computation cost for producing the same result.
* BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana).
* BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana).

View File

@ -995,7 +995,7 @@ specified individually per each `-remoteWrite.url`:
#
# by: [job, vmrange]
# outputs is the list of aggregations to perform on the input data.
# outputs is the list of unique aggregations to perform on the input data.
# See https://docs.victoriametrics.com/stream-aggregation/#aggregation-outputs
#
outputs: [total]

View File

@ -384,7 +384,7 @@ type aggregator struct {
da *dedupAggr
// aggrStates contains aggregate states for the given outputs
aggrStates []aggrState
aggrStates map[string]aggrState
// minTimestamp is used for ignoring old samples when ignoreOldSamples is set
minTimestamp atomic.Int64
@ -505,7 +505,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
}
if keepMetricNames {
if len(cfg.Outputs) != 1 {
return nil, fmt.Errorf("`ouputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs)
return nil, fmt.Errorf("`outputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs)
}
if cfg.Outputs[0] == "histogram_bucket" || strings.HasPrefix(cfg.Outputs[0], "quantiles(") && strings.Contains(cfg.Outputs[0], ",") {
return nil, fmt.Errorf("`keep_metric_names` cannot be applied to `outputs: %q`, since they can generate multiple time series", cfg.Outputs)
@ -526,11 +526,14 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
// initialize outputs list
if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s", supportedOutputs)
}
aggrStates := make([]aggrState, len(cfg.Outputs))
for i, output := range cfg.Outputs {
aggrStates := make(map[string]aggrState, len(cfg.Outputs))
for _, output := range cfg.Outputs {
// check for duplicated output
if _, ok := aggrStates[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicated aggregation function: %s", output)
}
if strings.HasPrefix(output, "quantiles(") {
if !strings.HasSuffix(output, ")") {
return nil, fmt.Errorf("missing closing brace for `quantiles()` output")
@ -552,47 +555,49 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
}
phis[j] = phi
}
aggrStates[i] = newQuantilesAggrState(phis)
if _, ok := aggrStates["quantiles"]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`")
}
aggrStates["quantiles"] = newQuantilesAggrState(phis)
continue
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(stalenessInterval, false, true)
aggrStates[output] = newTotalAggrState(stalenessInterval, false, true)
case "total_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, false, false)
aggrStates[output] = newTotalAggrState(stalenessInterval, false, false)
case "increase":
aggrStates[i] = newTotalAggrState(stalenessInterval, true, true)
aggrStates[output] = newTotalAggrState(stalenessInterval, true, true)
case "increase_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, true, false)
aggrStates[output] = newTotalAggrState(stalenessInterval, true, false)
case "rate_sum":
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_sum")
aggrStates[output] = newRateAggrState(stalenessInterval, "rate_sum")
case "rate_avg":
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_avg")
aggrStates[output] = newRateAggrState(stalenessInterval, "rate_avg")
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
aggrStates[output] = newCountSeriesAggrState()
case "count_samples":
aggrStates[i] = newCountSamplesAggrState()
aggrStates[output] = newCountSamplesAggrState()
case "unique_samples":
aggrStates[i] = newUniqueSamplesAggrState()
aggrStates[output] = newUniqueSamplesAggrState()
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState()
aggrStates[output] = newSumSamplesAggrState()
case "last":
aggrStates[i] = newLastAggrState()
aggrStates[output] = newLastAggrState()
case "min":
aggrStates[i] = newMinAggrState()
aggrStates[output] = newMinAggrState()
case "max":
aggrStates[i] = newMaxAggrState()
aggrStates[output] = newMaxAggrState()
case "avg":
aggrStates[i] = newAvgAggrState()
aggrStates[output] = newAvgAggrState()
case "stddev":
aggrStates[i] = newStddevAggrState()
aggrStates[output] = newStddevAggrState()
case "stdvar":
aggrStates[i] = newStdvarAggrState()
aggrStates[output] = newStdvarAggrState()
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(stalenessInterval)
aggrStates[output] = newHistogramBucketAggrState(stalenessInterval)
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
return nil, fmt.Errorf("unsupported output=%q; supported values: %s;", output, supportedOutputs)
}
}
@ -806,7 +811,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
var wg sync.WaitGroup
for _, as := range a.aggrStates {
for output, as := range a.aggrStates {
flushConcurrencyCh <- struct{}{}
wg.Add(1)
go func(as aggrState) {
@ -817,7 +822,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
ctx := getFlushCtx(a, pushFunc)
as.flushState(ctx, resetState)
ctx.flushSeries()
ctx.flushSeries(output)
ctx.resetSeries()
putFlushCtx(ctx)
}(as)
@ -1074,7 +1079,7 @@ func (ctx *flushCtx) resetSeries() {
ctx.samples = ctx.samples[:0]
}
func (ctx *flushCtx) flushSeries() {
func (ctx *flushCtx) flushSeries(aggrStateSuffix string) {
tss := ctx.tss
if len(tss) == 0 {
// nothing to flush
@ -1086,6 +1091,7 @@ func (ctx *flushCtx) flushSeries() {
// Fast path - push the output metrics.
if ctx.pushFunc != nil {
ctx.pushFunc(tss)
ctx.a.flushedSamples[aggrStateSuffix].Add(len(tss))
}
return
}
@ -1107,6 +1113,7 @@ func (ctx *flushCtx) flushSeries() {
}
if ctx.pushFunc != nil {
ctx.pushFunc(dst)
ctx.a.flushedSamples[aggrStateSuffix].Add(len(dst))
}
auxLabels.Labels = dstLabels
promutils.PutLabels(auxLabels)
@ -1127,11 +1134,10 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
ctx.a.flushedSamples[suffix].Add(len(ctx.tss))
// Limit the maximum length of ctx.tss in order to limit memory usage.
if len(ctx.tss) >= 10_000 {
ctx.flushSeries()
ctx.flushSeries(suffix)
ctx.resetSeries()
}
}

View File

@ -149,6 +149,15 @@ func TestAggregatorsFailure(t *testing.T) {
f(`
- interval: 1m
outputs: ["quantiles(1.5)"]
`)
f(`
- interval: 1m
outputs: [total, total]
`)
// "quantiles(0.5)", "quantiles(0.9)" should be set as "quantiles(0.5, 0.9)"
f(`
- interval: 1m
outputs: ["quantiles(0.5)", "quantiles(0.9)"]
`)
}