From a8360d04c08cf3f2259c9572d17b1460f4398bf4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 3 Jan 2020 23:50:47 +0200 Subject: [PATCH] app/vmselect/promql: add `histogram_over_time(m[d])` rollup function --- app/vmselect/promql/eval.go | 29 ++++++-- app/vmselect/promql/exec_test.go | 114 +++++++++++++++++++++++++++++++ app/vmselect/promql/rollup.go | 83 +++++++++++++++++++++- docs/ExtendedPromQL.md | 3 + lib/metricsql/rollup.go | 1 + 5 files changed, 225 insertions(+), 5 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 7cdb76cf1..c47b53a67 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -491,6 +491,13 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re * values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) preFunc(values, timestamps) for _, rc := range rcs { + if tsm := newTimeseriesMap(name, sharedTimestamps, &tsSQ.MetricName); tsm != nil { + rc.DoTimeseriesMap(tsm, values, timestamps) + tssLock.Lock() + tss = tsm.AppendTimeseriesTo(tss) + tssLock.Unlock() + continue + } var ts timeseries doRollupForTimeseries(rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps, removeMetricGroup) tssLock.Lock() @@ -638,9 +645,9 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, removeMetricGroup := !rollupFuncsKeepMetricGroup[name] var tss []*timeseries if iafc != nil { - tss, err = evalRollupWithIncrementalAggregate(iafc, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) + tss, err = evalRollupWithIncrementalAggregate(name, iafc, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) } else { - tss, err = evalRollupNoIncrementalAggregate(rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) + tss, err = evalRollupNoIncrementalAggregate(name, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) } if err != nil { return nil, err @@ -662,13 +669,20 @@ func getRollupMemoryLimiter() *memoryLimiter { return &rollupMemoryLimiter } -func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, +func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { preFunc(rs.Values, rs.Timestamps) ts := getTimeseries() defer putTimeseries(ts) for _, rc := range rcs { + if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil { + rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) + for _, ts := range tsm.m { + iafc.updateTimeseries(ts, workerID) + } + continue + } ts.Reset() doRollupForTimeseries(rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup) iafc.updateTimeseries(ts, workerID) @@ -685,13 +699,20 @@ func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *n return tss, nil } -func evalRollupNoIncrementalAggregate(rss *netstorage.Results, rcs []*rollupConfig, +func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { tss := make([]*timeseries, 0, rss.Len()*len(rcs)) var tssLock sync.Mutex err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { preFunc(rs.Values, rs.Timestamps) for _, rc := range rcs { + if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil { + rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) + tssLock.Lock() + tss = tsm.AppendTimeseriesTo(tss) + tssLock.Unlock() + continue + } var ts timeseries doRollupForTimeseries(rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup) tssLock.Lock() diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index b8eb3a019..a27024121 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -3083,6 +3083,120 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`histogram_over_time`, func(t *testing.T) { + t.Parallel() + q := `sort(histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))` + r1 := netstorage.Result{ + Values: []float64{13, 14, 12, 8, 12, 13}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("vmrange"), + Value: []byte("1.0e0...1.5e0"), + }, + } + r2 := netstorage.Result{ + Values: []float64{14, 15, 12, 13, 15, 11}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("vmrange"), + Value: []byte("2.0e0...2.5e0"), + }, + } + r3 := netstorage.Result{ + Values: []float64{13, 11, 16, 19, 13, 16}, + Timestamps: timestampsExpected, + } + r3.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("vmrange"), + Value: []byte("1.5e0...2.0e0"), + }, + } + resultExpected := []netstorage.Result{r1, r2, r3} + f(q, resultExpected) + }) + t.Run(`sum(histogram_over_time) by (vmrange)`, func(t *testing.T) { + t.Parallel() + q := `sort(sum(histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s])) by (vmrange))` + r1 := netstorage.Result{ + Values: []float64{13, 14, 12, 8, 12, 13}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("vmrange"), + Value: []byte("1.0e0...1.5e0"), + }, + } + r2 := netstorage.Result{ + Values: []float64{14, 15, 12, 13, 15, 11}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("vmrange"), + Value: []byte("2.0e0...2.5e0"), + }, + } + r3 := netstorage.Result{ + Values: []float64{13, 11, 16, 19, 13, 16}, + Timestamps: timestampsExpected, + } + r3.MetricName.Tags = []storage.Tag{ + { + Key: []byte("vmrange"), + Value: []byte("1.5e0...2.0e0"), + }, + } + resultExpected := []netstorage.Result{r1, r2, r3} + f(q, resultExpected) + }) + t.Run(`sum(histogram_over_time)`, func(t *testing.T) { + t.Parallel() + q := `sum(histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))` + r := netstorage.Result{ + Values: []float64{40, 40, 40, 40, 40, 40}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`topk_min(histogram_over_time)`, func(t *testing.T) { + t.Parallel() + q := `topk_min(1, histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))` + r := netstorage.Result{ + Values: []float64{14, 15, 12, 13, 15, 11}, + Timestamps: timestampsExpected, + } + r.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("vmrange"), + Value: []byte("2.0e0...2.5e0"), + }, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`share_gt_over_time`, func(t *testing.T) { t.Parallel() q := `share_gt_over_time(rand(0)[200s:10s], 0.7)` diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index bac431103..b74b150e8 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -8,6 +8,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/metrics" "github.com/valyala/histogram" ) @@ -51,6 +53,7 @@ var rollupFuncs = map[string]newRollupFunc{ "scrape_interval": newRollupFuncOneArg(rollupScrapeInterval), "share_le_over_time": newRollupShareLE, "share_gt_over_time": newRollupShareGT, + "histogram_over_time": newRollupFuncOneArg(rollupHistogram), "rollup": newRollupFuncOneArg(rollupFake), "rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets "rollup_deriv": newRollupFuncOneArg(rollupFake), @@ -119,6 +122,8 @@ type rollupFuncArg struct { // Real previous value even if it is located too far from the current window. // It matches prevValue if prevValue is not nan. realPrevValue float64 + + tsm *timeseriesMap } func (rfa *rollupFuncArg) reset() { @@ -131,6 +136,7 @@ func (rfa *rollupFuncArg) reset() { rfa.step = 0 rfa.scrapeInterval = 0 rfa.realPrevValue = nan + rfa.tsm = nil } // rollupFunc must return rollup value for the given rfa. @@ -169,6 +175,54 @@ var ( // The maximum interval without previous rows. const maxSilenceInterval = 5 * 60 * 1000 +type timeseriesMap struct { + origin *timeseries + labelName string + h metrics.Histogram + m map[string]*timeseries +} + +func newTimeseriesMap(funcName string, sharedTimestamps []int64, mnSrc *storage.MetricName) *timeseriesMap { + if funcName != "histogram_over_time" { + return nil + } + + values := make([]float64, len(sharedTimestamps)) + for i := range values { + values[i] = nan + } + var origin timeseries + origin.MetricName.CopyFrom(mnSrc) + origin.MetricName.ResetMetricGroup() + origin.Timestamps = sharedTimestamps + origin.Values = values + return ×eriesMap{ + origin: &origin, + labelName: "vmrange", + m: make(map[string]*timeseries), + } +} + +func (tsm *timeseriesMap) AppendTimeseriesTo(dst []*timeseries) []*timeseries { + for _, ts := range tsm.m { + dst = append(dst, ts) + } + return dst +} + +func (tsm *timeseriesMap) GetOrCreateTimeseries(labelValue string) *timeseries { + ts := tsm.m[labelValue] + if ts != nil { + return ts + } + ts = ×eries{} + ts.CopyFromShallowTimestamps(tsm.origin) + ts.MetricName.RemoveTag(tsm.labelName) + ts.MetricName.AddTag(tsm.labelName, labelValue) + tsm.m[labelValue] = ts + return ts +} + // Do calculates rollups for the given timestamps and values, appends // them to dstValues and returns results. // @@ -176,8 +230,19 @@ const maxSilenceInterval = 5 * 60 * 1000 // // timestamps must cover time range [rc.Start - rc.Window - maxSilenceInterval ... rc.End + rc.Step]. // -// Cannot be called from concurrent goroutines. +// Do cannot be called from concurrent goroutines. func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []int64) []float64 { + return rc.doInternal(dstValues, nil, values, timestamps) +} + +// DoTimeseriesMap calculates rollups for the given timestamps and values and puts them to tsm. +func (rc *rollupConfig) DoTimeseriesMap(tsm *timeseriesMap, values []float64, timestamps []int64) { + ts := getTimeseries() + ts.Values = rc.doInternal(ts.Values[:0], tsm, values, timestamps) + putTimeseries(ts) +} + +func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, values []float64, timestamps []int64) []float64 { // Sanity checks. if rc.Step <= 0 { logger.Panicf("BUG: Step must be bigger than 0; got %d", rc.Step) @@ -212,6 +277,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i rfa.step = rc.Step rfa.scrapeInterval = scrapeInterval rfa.realPrevValue = nan + rfa.tsm = tsm i := 0 j := 0 @@ -612,6 +678,21 @@ func newRollupQuantile(args []interface{}) (rollupFunc, error) { return rf, nil } +func rollupHistogram(rfa *rollupFuncArg) float64 { + values := rfa.values + tsm := rfa.tsm + tsm.h.Reset() + for _, v := range values { + tsm.h.Update(v) + } + idx := rfa.idx + tsm.h.VisitNonZeroBuckets(func(vmrange string, count uint64) { + ts := tsm.GetOrCreateTimeseries(vmrange) + ts.Values[idx] = float64(count) + }) + return nan +} + func rollupAvg(rfa *rollupFuncArg) float64 { // Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation, // since it is slower and has no significant benefits in precision. diff --git a/docs/ExtendedPromQL.md b/docs/ExtendedPromQL.md index 3f8c91937..6a7fdee53 100644 --- a/docs/ExtendedPromQL.md +++ b/docs/ExtendedPromQL.md @@ -79,6 +79,9 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `increases_over_time(m[d])` and `decreases_over_time(m[d])` - returns the number of `m` increases or decreases over the given duration `d`. - `prometheus_buckets(q)` - converts [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) buckets to Prometheus buckets with `le` labels. - `histogram(q)` - calculates aggregate histogram over `q` time series for each point on the graph. See [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for more details. +- `histogram_over_time(m[d])` - calculates [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) for `m` over `d`. + For example, the following query calculates median temperature by country over the last 24 hours: + `histogram_quantile(0.5, sum(histogram_over_time(temperature[24h])) by (vmbucket, country))`. - `topk_*` and `bottomk_*` aggregate functions, which return up to K time series. Note that the standard `topk` function may return more than K time series - see [this article](https://www.robustperception.io/graph-top-n-time-series-in-grafana) for details. - `topk_min(k, q)` - returns top K time series with the max minimums on the given time range diff --git a/lib/metricsql/rollup.go b/lib/metricsql/rollup.go index 6206907ca..606ac5163 100644 --- a/lib/metricsql/rollup.go +++ b/lib/metricsql/rollup.go @@ -43,6 +43,7 @@ var rollupFuncs = map[string]bool{ "scrape_interval": true, "share_le_over_time": true, "share_gt_over_time": true, + "histogram_over_time": true, "rollup": true, "rollup_rate": true, "rollup_deriv": true,