From 8bb254d96089defc9587762dde5a372ec7d7e3cc Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 24 Nov 2019 00:02:18 +0200 Subject: [PATCH] app/vmselect/promql: add `histogram` aggregate function, which is useful for building heatmaps from multiple time series --- app/vmselect/promql/aggr.go | 43 ++++++++-- app/vmselect/promql/exec.go | 2 +- app/vmselect/promql/exec_test.go | 131 ++++++++++++++++++++++++++++--- app/vmselect/promql/transform.go | 35 ++++++--- 4 files changed, 182 insertions(+), 29 deletions(-) diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index c4afc0af9e..828fec4eb5 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/metrics" ) var aggrFuncs = map[string]aggrFunc{ @@ -26,11 +27,12 @@ var aggrFuncs = map[string]aggrFunc{ "quantile": aggrFuncQuantile, // Extended PromQL funcs - "median": aggrFuncMedian, - "limitk": aggrFuncLimitK, - "distinct": newAggrFunc(aggrFuncDistinct), - "sum2": newAggrFunc(aggrFuncSum2), - "geomean": newAggrFunc(aggrFuncGeomean), + "median": aggrFuncMedian, + "limitk": aggrFuncLimitK, + "distinct": newAggrFunc(aggrFuncDistinct), + "sum2": newAggrFunc(aggrFuncSum2), + "geomean": newAggrFunc(aggrFuncGeomean), + "histogram": newAggrFunc(aggrFuncHistogram), } type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error) @@ -184,6 +186,37 @@ func aggrFuncGeomean(tss []*timeseries) []*timeseries { return tss[:1] } +func aggrFuncHistogram(tss []*timeseries) []*timeseries { + m := make(map[string]*timeseries) + for i := range tss[0].Values { + var h metrics.Histogram + for _, ts := range tss { + v := ts.Values[i] + h.Update(v) + } + h.VisitNonZeroBuckets(func(vmrange string, count uint64) { + ts := m[vmrange] + if ts == nil { + ts = ×eries{} + ts.CopyFromShallowTimestamps(tss[0]) + ts.MetricName.RemoveTag("vmrange") + ts.MetricName.AddTag("vmrange", vmrange) + values := ts.Values + for k := range values { + values[k] = 0 + } + m[vmrange] = ts + } + ts.Values[i] = float64(count) + }) + } + rvs := make([]*timeseries, 0, len(m)) + for _, ts := range m { + rvs = append(rvs, ts) + } + return vmrangeBucketsToLE(rvs) +} + func aggrFuncMin(tss []*timeseries) []*timeseries { if len(tss) == 1 { // Fast path - nothing to min. diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 6a26fad4b6..62ccb5fe9d 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -110,7 +110,7 @@ func timeseriesToResult(tss []*timeseries, maySort bool) ([]netstorage.Result, e for i, ts := range tss { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) if _, ok := m[string(bb.B)]; ok { - return nil, fmt.Errorf(`duplicate output timeseries: %s%s`, ts.MetricName.MetricGroup, stringMetricName(&ts.MetricName)) + return nil, fmt.Errorf(`duplicate output timeseries: %s`, stringMetricName(&ts.MetricName)) } m[string(bb.B)] = struct{}{} diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index b7efad70cd..432aec5f9b 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -2459,12 +2459,12 @@ func TestExecSuccess(t *testing.T) { t.Run(`prometheus_buckets(missing-vmrange)`, func(t *testing.T) { t.Parallel() q := `sort(prometheus_buckets(( - alias(label_set(time()/20, "foo", "bar", "le", "0.2"), "xxx"), + alias(label_set(time()/20, "foo", "bar", "le", "0.2"), "xyz"), alias(label_set(time()/100, "foo", "bar", "vmrange", "foobar"), "xxx"), alias(label_set(time()/100, "foo", "bar", "vmrange", "30...foobar"), "xxx"), alias(label_set(time()/100, "foo", "bar", "vmrange", "30...40"), "xxx"), alias(label_set(time()/80, "foo", "bar", "vmrange", "0...900", "le", "54"), "yyy"), - alias(label_set(time()/40, "foo", "bar", "vmrange", "900...1000", "le", "2343"), "yyy"), + alias(label_set(time()/40, "foo", "bar", "vmrange", "900...+Inf", "le", "2343"), "yyy"), )))` r1 := netstorage.Result{ MetricName: metricNameExpected, @@ -2500,10 +2500,10 @@ func TestExecSuccess(t *testing.T) { } r3 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{12.5, 15, 17.5, 20, 22.5, 25}, + Values: []float64{10, 12, 14, 16, 18, 20}, Timestamps: timestampsExpected, } - r3.MetricName.MetricGroup = []byte("yyy") + r3.MetricName.MetricGroup = []byte("xxx") r3.MetricName.Tags = []storage.Tag{ { Key: []byte("foo"), @@ -2511,12 +2511,12 @@ func TestExecSuccess(t *testing.T) { }, { Key: []byte("le"), - Value: []byte("900"), + Value: []byte("+Inf"), }, } r4 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{37.5, 45, 52.5, 60, 67.5, 75}, + Values: []float64{12.5, 15, 17.5, 20, 22.5, 25}, Timestamps: timestampsExpected, } r4.MetricName.MetricGroup = []byte("yyy") @@ -2527,16 +2527,32 @@ func TestExecSuccess(t *testing.T) { }, { Key: []byte("le"), - Value: []byte("1000"), + Value: []byte("900"), }, } r5 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{37.5, 45, 52.5, 60, 67.5, 75}, + Timestamps: timestampsExpected, + } + r5.MetricName.MetricGroup = []byte("yyy") + r5.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("le"), + Value: []byte("+Inf"), + }, + } + r6 := netstorage.Result{ MetricName: metricNameExpected, Values: []float64{50, 60, 70, 80, 90, 100}, Timestamps: timestampsExpected, } - r5.MetricName.MetricGroup = []byte("xxx") - r5.MetricName.Tags = []storage.Tag{ + r6.MetricName.MetricGroup = []byte("xyz") + r6.MetricName.Tags = []storage.Tag{ { Key: []byte("foo"), Value: []byte("bar"), @@ -2546,7 +2562,7 @@ func TestExecSuccess(t *testing.T) { Value: []byte("0.2"), }, } - resultExpected := []netstorage.Result{r1, r2, r3, r4, r5} + resultExpected := []netstorage.Result{r1, r2, r3, r4, r5, r6} f(q, resultExpected) }) t.Run(`prometheus_buckets(valid)`, func(t *testing.T) { @@ -2674,6 +2690,99 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`histogram(scalar)`, func(t *testing.T) { + t.Parallel() + q := `histogram(123)` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0, 0, 0, 0, 0, 0}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("1e2"), + }, + } + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("2e2"), + }, + } + r3 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + r3.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("+Inf"), + }, + } + resultExpected := []netstorage.Result{r1, r2, r3} + f(q, resultExpected) + }) + t.Run(`histogram(vector)`, func(t *testing.T) { + t.Parallel() + q := `sort(histogram(( + label_set(1, "foo", "bar"), + label_set(1.5, "xx", "yy"), + alias(1.9, "foobar"), + )))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0, 0, 0, 0, 0, 0}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("9e-1"), + }, + } + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("1"), + }, + } + r3 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{3, 3, 3, 3, 3, 3}, + Timestamps: timestampsExpected, + } + r3.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("2"), + }, + } + r4 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{3, 3, 3, 3, 3, 3}, + Timestamps: timestampsExpected, + } + r4.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("+Inf"), + }, + } + resultExpected := []netstorage.Result{r1, r2, r3, r4} + f(q, resultExpected) + }) t.Run(`avg(scalar) wiTHout (xx, yy)`, func(t *testing.T) { t.Parallel() q := `avg wiTHout (xx, yy) (123)` @@ -4520,7 +4629,7 @@ func testMetricNamesEqual(t *testing.T, mn, mnExpected *storage.MetricName, pos t.Fatalf(`unexpected tag key at #%d,%d; got %q; want %q`, pos, i, tag.Key, tagExpected.Key) } if string(tag.Value) != string(tagExpected.Value) { - t.Fatalf(`unexpected tag value at #%d,%d; got %q; want %q`, pos, i, tag.Value, tagExpected.Value) + t.Fatalf(`unexpected tag value for key %q at #%d,%d; got %q; want %q`, tag.Key, pos, i, tag.Value, tagExpected.Value) } } } diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index ac00a57d45..aacfa2bfbe 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -332,29 +332,40 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries { } // Convert `vmrange` label in each group of time series to `le` label. + copyTS := func(src *timeseries, leStr string) *timeseries { + var ts timeseries + ts.CopyFromShallowTimestamps(src) + values := ts.Values + for i := range values { + values[i] = 0 + } + ts.MetricName.RemoveTag("le") + ts.MetricName.AddTag("le", leStr) + return &ts + } for _, xss := range m { sort.Slice(xss, func(i, j int) bool { return xss[i].end < xss[j].end }) - xssNew := make([]x, 0, len(xss)) - endStrPrev := "0" + xssNew := make([]x, 0, len(xss)+2) + var xsPrev x for _, xs := range xss { ts := xs.ts - if xs.startStr != endStrPrev { - var tsDummy timeseries - tsDummy.CopyFromShallowTimestamps(ts) - values := tsDummy.Values - for i := range values { - values[i] = 0 - } - tsDummy.MetricName.AddTag("le", xs.startStr) + if xs.start != xsPrev.end { xssNew = append(xssNew, x{ endStr: xs.startStr, end: xs.start, - ts: &tsDummy, + ts: copyTS(ts, xs.startStr), }) } ts.MetricName.AddTag("le", xs.endStr) xssNew = append(xssNew, xs) - endStrPrev = xs.endStr + xsPrev = xs + } + if !math.IsInf(xsPrev.end, 1) { + xssNew = append(xssNew, x{ + endStr: "+Inf", + end: math.Inf(1), + ts: copyTS(xsPrev.ts, "+Inf"), + }) } xss = xssNew for i := range xss[0].ts.Values {