From f409f2d0501eac063b9eca1fc86f66550ecaad5e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 4 Jan 2020 12:44:09 +0200 Subject: [PATCH] app/vmselect/promql: add `histogram_share(le, buckets)` function --- app/vmselect/promql/exec_test.go | 248 ++++++++++++++++++++++++++++++- app/vmselect/promql/transform.go | 204 ++++++++++++++++++------- docs/ExtendedPromQL.md | 2 + lib/metricsql/transform.go | 1 + 4 files changed, 401 insertions(+), 54 deletions(-) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index dd1fabf86..d1e8bed2b 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -2308,18 +2308,36 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{} f(q, resultExpected) }) + t.Run(`histogram_share(scalar)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(123, time())` + resultExpected := []netstorage.Result{} + f(q, resultExpected) + }) t.Run(`histogram_quantile(single-value-no-le)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0.6, label_set(100, "foo", "bar"))` resultExpected := []netstorage.Result{} f(q, resultExpected) }) + t.Run(`histogram_share(single-value-no-le)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(123, label_set(100, "foo", "bar"))` + resultExpected := []netstorage.Result{} + f(q, resultExpected) + }) t.Run(`histogram_quantile(single-value-invalid-le)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0.6, label_set(100, "le", "foobar"))` resultExpected := []netstorage.Result{} f(q, resultExpected) }) + t.Run(`histogram_share(single-value-invalid-le)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(50, label_set(100, "le", "foobar"))` + resultExpected := []netstorage.Result{} + f(q, resultExpected) + }) t.Run(`histogram_quantile(single-value-valid-le)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0.6, label_set(100, "le", "200"))` @@ -2331,6 +2349,39 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`histogram_share(single-value-valid-le)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(80, label_set(100, "le", "200"))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.4, 0.4, 0.4, 0.4, 0.4, 0.4}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`histogram_share(single-value-valid-le)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(200, label_set(100, "le", "200"))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`histogram_share(single-value-valid-le)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(300, label_set(100, "le", "200"))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`histogram_quantile(single-value-valid-le, boundsLabel)`, func(t *testing.T) { t.Parallel() q := `sort(histogram_quantile(0.6, label_set(100, "le", "200"), "foobar"))` @@ -2349,7 +2400,6 @@ func TestExecSuccess(t *testing.T) { Timestamps: timestampsExpected, } r3 := netstorage.Result{ - MetricName: metricNameExpected, Values: []float64{200, 200, 200, 200, 200, 200}, Timestamps: timestampsExpected, } @@ -2360,6 +2410,34 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r1, r2, r3} f(q, resultExpected) }) + t.Run(`histogram_quantile(single-value-valid-le, boundsLabel)`, func(t *testing.T) { + t.Parallel() + q := `sort(histogram_share(120, label_set(100, "le", "200"), "foobar"))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0, 0, 0, 0, 0, 0}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("foobar"), + Value: []byte("lower"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.6, 0.6, 0.6, 0.6, 0.6, 0.6}, + Timestamps: timestampsExpected, + } + r3 := netstorage.Result{ + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + r3.MetricName.Tags = []storage.Tag{{ + Key: []byte("foobar"), + Value: []byte("upper"), + }} + resultExpected := []netstorage.Result{r1, r2, r3} + f(q, resultExpected) + }) t.Run(`histogram_quantile(single-value-valid-le-max-phi)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(1, ( @@ -2374,6 +2452,20 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`histogram_share(single-value-valid-le-max-le)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(200, ( + label_set(100, "le", "200"), + label_set(0, "le", "55"), + ))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`histogram_quantile(single-value-valid-le-min-phi)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0, ( @@ -2388,6 +2480,48 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`histogram_share(single-value-valid-le-min-le)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(0, ( + label_set(100, "le", "200"), + label_set(0, "le", "55"), + ))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0, 0, 0, 0, 0, 0}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`histogram_share(single-value-valid-le-low-le)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(55, ( + label_set(100, "le", "200"), + label_set(0, "le", "55"), + ))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0, 0, 0, 0, 0, 0}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`histogram_share(single-value-valid-le-mid-le)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(105, ( + label_set(100, "le", "200"), + label_set(0, "le", "55"), + ))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.3448275862068966, 0.3448275862068966, 0.3448275862068966, 0.3448275862068966, 0.3448275862068966, 0.3448275862068966}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`histogram_quantile(single-value-valid-le-min-phi-no-zero-bucket)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0, label_set(100, "le", "200"))` @@ -2410,6 +2544,17 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`histogram_share(scalar-phi)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(time() / 8, label_set(100, "le", "200"))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.625, 0.75, 0.875, 1, 1, 1}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`histogram_quantile(valid)`, func(t *testing.T) { t.Parallel() q := `sort(histogram_quantile(0.6, @@ -2440,6 +2585,36 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r1, r2} f(q, resultExpected) }) + t.Run(`histogram_share(valid)`, func(t *testing.T) { + t.Parallel() + q := `sort(histogram_share(25, + label_set(90, "foo", "bar", "le", "10") + or label_set(100, "foo", "bar", "le", "30") + or label_set(300, "foo", "bar", "le", "+Inf") + or label_set(200, "tag", "xx", "le", "10") + or label_set(300, "tag", "xx", "le", "30") + ))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.325, 0.325, 0.325, 0.325, 0.325, 0.325}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.9166666666666666, 0.9166666666666666, 0.9166666666666666, 0.9166666666666666, 0.9166666666666666, 0.9166666666666666}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{{ + Key: []byte("tag"), + Value: []byte("xx"), + }} + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) t.Run(`histogram_quantile(negative-bucket-count)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0.6, @@ -2468,7 +2643,7 @@ func TestExecSuccess(t *testing.T) { )` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{10, 10, 10, 10, 10, 10}, + Values: []float64{30, 30, 30, 30, 30, 30}, Timestamps: timestampsExpected, } r.MetricName.Tags = []storage.Tag{{ @@ -2497,6 +2672,25 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`histogram_share(normal-bucket-count)`, func(t *testing.T) { + t.Parallel() + q := `histogram_share(22, + label_set(0, "foo", "bar", "le", "10") + or label_set(100, "foo", "bar", "le", "30") + or label_set(300, "foo", "bar", "le", "+Inf") + )` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.2, 0.2, 0.2, 0.2, 0.2, 0.2}, + Timestamps: timestampsExpected, + } + r.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`histogram_quantile(normal-bucket-count, boundsLabel)`, func(t *testing.T) { t.Parallel() q := `sort(histogram_quantile(0.2, @@ -2547,6 +2741,56 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r1, r2, r3} f(q, resultExpected) }) + t.Run(`histogram_share(normal-bucket-count, boundsLabel)`, func(t *testing.T) { + t.Parallel() + q := `sort(histogram_share(22, + label_set(0, "foo", "bar", "le", "10") + or label_set(100, "foo", "bar", "le", "30") + or label_set(300, "foo", "bar", "le", "+Inf"), + "xxx" + ))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0, 0, 0, 0, 0, 0}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("xxx"), + Value: []byte("lower"), + }, + } + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.2, 0.2, 0.2, 0.2, 0.2, 0.2}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + r3 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333}, + Timestamps: timestampsExpected, + } + r3.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("xxx"), + Value: []byte("upper"), + }, + } + resultExpected := []netstorage.Result{r1, r2, r3} + f(q, resultExpected) + }) t.Run(`histogram_quantile(zero-bucket-count)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0.6, diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index 9e84556f3..4931e13d5 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -93,6 +93,7 @@ var transformFuncs = map[string]transformFunc{ "asin": newTransformFuncOneArg(transformAsin), "acos": newTransformFuncOneArg(transformAcos), "prometheus_buckets": transformPrometheusBuckets, + "histogram_share": transformHistogramShare, } func getTransformFunc(s string) transformFunc { @@ -399,6 +400,104 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries { return rvs } +func transformHistogramShare(tfa *transformFuncArg) ([]*timeseries, error) { + args := tfa.args + if len(args) < 2 || len(args) > 3 { + return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args)) + } + les, err := getScalar(args[0], 0) + if err != nil { + return nil, fmt.Errorf("cannot parse le: %s", err) + } + + // Convert buckets with `vmrange` labels to buckets with `le` labels. + tss := vmrangeBucketsToLE(args[1]) + + // Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details. + var boundsLabel string + if len(args) > 2 { + s, err := getString(args[2], 2) + if err != nil { + return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %s", err) + } + boundsLabel = s + } + + // Group metrics by all tags excluding "le" + m := groupLeTimeseries(tss) + + // Calculate share for les + + share := func(i int, les []float64, xss []leTimeseries) (q, lower, upper float64) { + leReq := les[i] + if math.IsNaN(leReq) || len(xss) == 0 { + return nan, nan, nan + } + fixBrokenBuckets(i, xss) + if leReq < 0 { + return 0, 0, 0 + } + if math.IsInf(leReq, 1) { + return 1, 1, 1 + } + for j, xs := range xss { + v := xs.ts.Values[i] + le := xs.le + if leReq < le { + continue + } + // precondition: leReq >= le + if j+1 >= len(xss) { + return 1, 1, 1 + } + vNext := xss[j+1].ts.Values[i] + leNext := xss[j+1].le + if math.IsInf(leNext, 1) { + return v / vNext, v / vNext, 1 + } + vLast := xss[len(xss)-1].ts.Values[i] + lower = v / vLast + upper = vNext / vLast + q = lower + (vNext-v)/vLast*(leReq-le)/(leNext-le) + return q, lower, upper + } + leLast := xss[len(xss)-1].le + return leReq / leLast, 0, 1 + } + rvs := make([]*timeseries, 0, len(m)) + for _, xss := range m { + sort.Slice(xss, func(i, j int) bool { + return xss[i].le < xss[j].le + }) + dst := xss[0].ts + var tsLower, tsUpper *timeseries + if len(boundsLabel) > 0 { + tsLower = ×eries{} + tsLower.CopyFromShallowTimestamps(dst) + tsLower.MetricName.RemoveTag(boundsLabel) + tsLower.MetricName.AddTag(boundsLabel, "lower") + tsUpper = ×eries{} + tsUpper.CopyFromShallowTimestamps(dst) + tsUpper.MetricName.RemoveTag(boundsLabel) + tsUpper.MetricName.AddTag(boundsLabel, "upper") + } + for i := range dst.Values { + q, lower, upper := share(i, les, xss) + dst.Values[i] = q + if len(boundsLabel) > 0 { + tsLower.Values[i] = lower + tsUpper.Values[i] = upper + } + } + rvs = append(rvs, dst) + if len(boundsLabel) > 0 { + rvs = append(rvs, tsLower) + rvs = append(rvs, tsUpper) + } + } + return rvs, nil +} + func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 2 || len(args) > 3 { @@ -423,73 +522,35 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { } // Group metrics by all tags excluding "le" - type x struct { - le float64 - ts *timeseries - } - m := make(map[string][]x) - bb := bbPool.Get() - for _, ts := range tss { - tagValue := ts.MetricName.GetTagValue("le") - if len(tagValue) == 0 { - continue - } - le, err := strconv.ParseFloat(bytesutil.ToUnsafeString(tagValue), 64) - if err != nil { - continue - } - ts.MetricName.ResetMetricGroup() - ts.MetricName.RemoveTag("le") - bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName) - m[string(bb.B)] = append(m[string(bb.B)], x{ - le: le, - ts: ts, - }) - } - bbPool.Put(bb) + m := groupLeTimeseries(tss) // Calculate quantile for each group in m - lastNonInf := func(i int, xss []x) float64 { + lastNonInf := func(i int, xss []leTimeseries) float64 { for len(xss) > 0 { xsLast := xss[len(xss)-1] v := xsLast.ts.Values[i] if v == 0 { return nan } - if !math.IsNaN(v) && !math.IsInf(xsLast.le, 0) { + if !math.IsInf(xsLast.le, 0) { return xsLast.le } xss = xss[:len(xss)-1] } return nan } - quantile := func(i int, phis []float64, xss []x) (q, lower, upper float64) { + quantile := func(i int, phis []float64, xss []leTimeseries) (q, lower, upper float64) { phi := phis[i] if math.IsNaN(phi) { return nan, nan, nan } - // Fix broken buckets. - // They are already sorted by le, so their values must be in ascending order, - // since the next bucket value includes all the previous buckets. - vPrev := float64(0) - for _, xs := range xss { - v := xs.ts.Values[i] - if v < vPrev { - xs.ts.Values[i] = vPrev - } else if !math.IsNaN(v) { - vPrev = v - } - } - vLast := nan - for len(xss) > 0 { + fixBrokenBuckets(i, xss) + vLast := float64(0) + if len(xss) > 0 { vLast = xss[len(xss)-1].ts.Values[i] - if !math.IsNaN(vLast) { - break - } - xss = xss[:len(xss)-1] } - if vLast == 0 || math.IsNaN(vLast) { + if vLast == 0 { return nan, nan, nan } if phi < 0 { @@ -499,15 +560,10 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { return inf, vLast, inf } vReq := vLast * phi - vPrev = 0 + vPrev := float64(0) lePrev := float64(0) for _, xs := range xss { v := xs.ts.Values[i] - if math.IsNaN(v) { - // Skip NaNs - they may appear if the selected time range - // contains multiple different bucket sets. - continue - } le := xs.le if v <= 0 { // Skip zero buckets. @@ -566,6 +622,50 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { return rvs, nil } +type leTimeseries struct { + le float64 + ts *timeseries +} + +func groupLeTimeseries(tss []*timeseries) map[string][]leTimeseries { + m := make(map[string][]leTimeseries) + bb := bbPool.Get() + for _, ts := range tss { + tagValue := ts.MetricName.GetTagValue("le") + if len(tagValue) == 0 { + continue + } + le, err := strconv.ParseFloat(bytesutil.ToUnsafeString(tagValue), 64) + if err != nil { + continue + } + ts.MetricName.ResetMetricGroup() + ts.MetricName.RemoveTag("le") + bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName) + m[string(bb.B)] = append(m[string(bb.B)], leTimeseries{ + le: le, + ts: ts, + }) + } + bbPool.Put(bb) + return m +} + +func fixBrokenBuckets(i int, xss []leTimeseries) { + // Fix broken buckets. + // They are already sorted by le, so their values must be in ascending order, + // since the next bucket includes all the previous buckets. + vPrev := float64(0) + for _, xs := range xss { + v := xs.ts.Values[i] + if v < vPrev || math.IsNaN(v) { + xs.ts.Values[i] = vPrev + } else { + vPrev = v + } + } +} + func transformHour(t time.Time) int { return t.Hour() } diff --git a/docs/ExtendedPromQL.md b/docs/ExtendedPromQL.md index 6a7fdee53..182e4398f 100644 --- a/docs/ExtendedPromQL.md +++ b/docs/ExtendedPromQL.md @@ -82,6 +82,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `histogram_over_time(m[d])` - calculates [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) for `m` over `d`. For example, the following query calculates median temperature by country over the last 24 hours: `histogram_quantile(0.5, sum(histogram_over_time(temperature[24h])) by (vmbucket, country))`. +- `histogram_share(le, buckets)` - returns share (in the range 0..1) for `buckets`. Useful for calculating SLI and SLO. + For instance, the following query returns the share of requests which are performed under 1.5 seconds: `histogram_share(1.5, sum(request_duration_seconds_bucket) by (le))`. - `topk_*` and `bottomk_*` aggregate functions, which return up to K time series. Note that the standard `topk` function may return more than K time series - see [this article](https://www.robustperception.io/graph-top-n-time-series-in-grafana) for details. - `topk_min(k, q)` - returns top K time series with the max minimums on the given time range diff --git a/lib/metricsql/transform.go b/lib/metricsql/transform.go index 44cf09c07..74f840f7e 100644 --- a/lib/metricsql/transform.go +++ b/lib/metricsql/transform.go @@ -72,6 +72,7 @@ var transformFuncs = map[string]bool{ "asin": true, "acos": true, "prometheus_buckets": true, + "histogram_share": true, } func isTransformFunc(s string) bool {