From d9cddf1ad8af2a690b75b600139771cf80be2da8 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Mon, 13 May 2024 16:39:49 +0300 Subject: [PATCH] lib/streamaggr: added rate and rate_avg output (#6243) Added `rate` and `rate_avg` output Resource usage is the same as for increase output, tested on a benchmark --------- Signed-off-by: hagen1778 Co-authored-by: hagen1778 (cherry picked from commit 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0) --- docs/CHANGELOG.md | 1 + docs/stream-aggregation.md | 42 ++++++--- lib/streamaggr/rate.go | 147 ++++++++++++++++++++++++++++++ lib/streamaggr/streamaggr.go | 6 ++ lib/streamaggr/streamaggr_test.go | 19 +++- lib/streamaggr/total.go | 7 +- 6 files changed, 205 insertions(+), 17 deletions(-) create mode 100644 lib/streamaggr/rate.go diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ed08b0825b..1563262f2a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -43,6 +43,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion! * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205). diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 820ca4eef2..879187d54f 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -513,6 +513,8 @@ Below are aggregation functions that can be put in the `outputs` list at [stream * [count_series](#count_series) * [increase](#increase) * [increase_prometheus](#increase_prometheus) +* [rate_sum](#rate_sum) +* [rate_avg](#rate_avg) * [histogram_bucket](#histogram_bucket) * [last](#last) * [max](#max) @@ -577,7 +579,7 @@ The results of `increase` is equal to the following [MetricsQL](https://docs.vic sum(increase_pure(some_counter[interval])) ``` -`increase` assumes that all the counters start from 0. For example, if the fist seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) +`increase` assumes that all the counters start from 0. For example, if the first seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) is `10`, then `increase` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series, then take a look at [increase_prometheus](#increase_prometheus). @@ -585,21 +587,37 @@ For example, see below time series produced by config with aggregation interval increase aggregation -`increase` can be used as an alternative for [rate](https://docs.victoriametrics.com/metricsql/#rate) function. -For example, if `increase` is calculated for `some_counter` with `interval: 5m`, then `rate` can be calculated -by dividing the resulting aggregation by `5m`: - -```metricsql -some_counter:5m_increase / 5m -``` - -This is similar to `rate(some_counter[5m])`. - Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. See also [increase_prometheus](#increase_prometheus) and [total](#total). +### rate_sum + +`rate_sum` returns the sum of average per-second change of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`. +`rate_sum` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter). + +The results of `rate_sum` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +sum(rate(some_counter[interval])) +``` + +See also [rate_avg](#rate_avg) and [total](#total) outputs. + +### rate_avg + +`rate_avg` returns the average of average per-second of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`. +`rate_avg` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter). + +The results of `rate_avg` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +avg(rate(some_counter[interval])) +``` + +See also [rate_sum](#rate_avg) and [total](#total) outputs. + ### increase_prometheus `increase_prometheus` returns the increase of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`. @@ -741,7 +759,7 @@ The results of `total` is roughly equal to the the following [MetricsQL](https:/ sum(running_sum(increase_pure(some_counter))) ``` -`total` assumes that all the counters start from 0. For example, if the fist seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) +`total` assumes that all the counters start from 0. For example, if the first seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) is `10`, then `total` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series, then take a look at [total_prometheus](#total_prometheus). diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go new file mode 100644 index 0000000000..af8fe0786f --- /dev/null +++ b/lib/streamaggr/rate.go @@ -0,0 +1,147 @@ +package streamaggr + +import ( + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// rateAggrState calculates output=rate, e.g. the counter per-second change. +type rateAggrState struct { + m sync.Map + + suffix string + + // Time series state is dropped if no new samples are received during stalenessSecs. + stalenessSecs uint64 +} + +type rateStateValue struct { + mu sync.Mutex + lastValues map[string]*rateLastValueState + deleteDeadline uint64 + deleted bool +} + +type rateLastValueState struct { + value float64 + timestamp int64 + deleteDeadline uint64 + + // total stores cumulative difference between registered values + // in the aggregation interval + total float64 + // prevTimestamp stores timestamp of the last registered value + // in the previous aggregation interval + prevTimestamp int64 +} + +func newRateAggrState(stalenessInterval time.Duration, suffix string) *rateAggrState { + stalenessSecs := roundDurationToSecs(stalenessInterval) + return &rateAggrState{ + suffix: suffix, + stalenessSecs: stalenessSecs, + } +} + +func (as *rateAggrState) pushSamples(samples []pushSample) { + currentTime := fasttime.UnixTimestamp() + deleteDeadline := currentTime + as.stalenessSecs + for i := range samples { + s := &samples[i] + inputKey, outputKey := getInputOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &rateStateValue{ + lastValues: make(map[string]*rateLastValueState), + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + sv := v.(*rateStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + lv, ok := sv.lastValues[inputKey] + if ok { + if s.timestamp < lv.timestamp { + // Skip out of order sample + sv.mu.Unlock() + continue + } + if lv.prevTimestamp == 0 { + lv.prevTimestamp = lv.timestamp + } + if s.value >= lv.value { + lv.total += s.value - lv.value + } else { + // counter reset + lv.total += s.value + } + } else { + lv = &rateLastValueState{} + } + lv.value = s.value + lv.timestamp = s.timestamp + lv.deleteDeadline = deleteDeadline + sv.lastValues[inputKey] = lv + sv.deleteDeadline = deleteDeadline + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to flushState + // Try obtaining and updating the entry again. + goto again + } + } +} + +func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { + currentTime := fasttime.UnixTimestamp() + currentTimeMsec := int64(currentTime) * 1000 + + m := &as.m + m.Range(func(k, v interface{}) bool { + sv := v.(*rateStateValue) + sv.mu.Lock() + + // check for stale entries + deleted := currentTime > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + sv.mu.Unlock() + m.Delete(k) + return true + } + + // Delete outdated entries in sv.lastValues + var rate float64 + m := sv.lastValues + for k1, v1 := range m { + if currentTime > v1.deleteDeadline { + delete(m, k1) + } else if v1.prevTimestamp > 0 { + rate += v1.total * 1000 / float64(v1.timestamp-v1.prevTimestamp) + v1.prevTimestamp = v1.timestamp + v1.total = 0 + } + } + if as.suffix == "rate_avg" { + // note: capture m length after deleted items were removed + rate /= float64(len(m)) + } + sv.mu.Unlock() + + key := k.(string) + ctx.appendSeries(key, as.suffix, currentTimeMsec, rate) + return true + }) +} diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 3e5b72ecd9..2619678537 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -164,6 +164,8 @@ type Config struct { // // The following names are allowed: // + // - rate_sum - calculates sum of rate for input counters + // - rate_avg - calculates average of rate for input counters // - total - aggregates input counters // - total_prometheus - aggregates input counters, ignoring the first sample in new time series // - increase - calculates the increase over input series @@ -530,6 +532,10 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option aggrStates[i] = newTotalAggrState(stalenessInterval, true, true) case "increase_prometheus": aggrStates[i] = newTotalAggrState(stalenessInterval, true, false) + case "rate_sum": + aggrStates[i] = newRateAggrState(stalenessInterval, "rate_sum") + case "rate_avg": + aggrStates[i] = newRateAggrState(stalenessInterval, "rate_avg") case "count_series": aggrStates[i] = newCountSeriesAggrState() case "count_samples": diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index ae234bb76e..f3e73ba511 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -828,7 +828,7 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 `, ` foo{abc="123"} 4 bar 5 -foo{abc="123"} 8.5 +foo{abc="123"} 8.5 10 foo{abc="456",de="fg"} 8 `, `bar-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 1 bar-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1 @@ -836,6 +836,20 @@ bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5 foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2 foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1 foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5 +`, "1111") + + // test rate_sum and rate_avg + f(` +- interval: 1m + by: [cde] + outputs: [rate_sum, rate_avg] +`, ` +foo{abc="123", cde="1"} 4 +foo{abc="123", cde="1"} 8.5 10 +foo{abc="456", cde="1"} 8 +foo{abc="456", cde="1"} 10 10 +`, `foo:1m_by_cde_rate_avg{cde="1"} 0.325 +foo:1m_by_cde_rate_sum{cde="1"} 0.65 `, "1111") // keep_metric_names @@ -979,6 +993,7 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { } rows.UnmarshalWithErrLogger(s, errLogger) var tss []prompbmarshal.TimeSeries + now := time.Now().UnixMilli() samples := make([]prompbmarshal.Sample, 0, len(rows.Rows)) for _, row := range rows.Rows { labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1) @@ -994,7 +1009,7 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { } samples = append(samples, prompbmarshal.Sample{ Value: row.Value, - Timestamp: row.Timestamp, + Timestamp: now + row.Timestamp, }) ts := prompbmarshal.TimeSeries{ Labels: labels, diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index a0576245ed..b53d326d32 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -34,13 +34,13 @@ type totalAggrState struct { type totalStateValue struct { mu sync.Mutex - lastValues map[string]lastValueState + lastValues map[string]totalLastValueState total float64 deleteDeadline uint64 deleted bool } -type lastValueState struct { +type totalLastValueState struct { value float64 timestamp int64 deleteDeadline uint64 @@ -78,7 +78,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &totalStateValue{ - lastValues: make(map[string]lastValueState), + lastValues: make(map[string]totalLastValueState), } vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { @@ -97,6 +97,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { sv.mu.Unlock() continue } + if s.value >= lv.value { sv.total += s.value - lv.value } else {