diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index be856b7eb..ee9e6977e 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" + "github.com/valyala/histogram" ) var aggrFuncs = map[string]aggrFunc{ @@ -27,12 +28,20 @@ var aggrFuncs = map[string]aggrFunc{ "quantile": aggrFuncQuantile, // Extended PromQL funcs - "median": aggrFuncMedian, - "limitk": aggrFuncLimitK, - "distinct": newAggrFunc(aggrFuncDistinct), - "sum2": newAggrFunc(aggrFuncSum2), - "geomean": newAggrFunc(aggrFuncGeomean), - "histogram": newAggrFunc(aggrFuncHistogram), + "median": aggrFuncMedian, + "limitk": aggrFuncLimitK, + "distinct": newAggrFunc(aggrFuncDistinct), + "sum2": newAggrFunc(aggrFuncSum2), + "geomean": newAggrFunc(aggrFuncGeomean), + "histogram": newAggrFunc(aggrFuncHistogram), + "topk_min": newAggrFuncRangeTopK(minValue, false), + "topk_max": newAggrFuncRangeTopK(maxValue, false), + "topk_avg": newAggrFuncRangeTopK(avgValue, false), + "topk_median": newAggrFuncRangeTopK(medianValue, false), + "bottomk_min": newAggrFuncRangeTopK(minValue, true), + "bottomk_max": newAggrFuncRangeTopK(maxValue, true), + "bottomk_avg": newAggrFuncRangeTopK(avgValue, true), + "bottomk_median": newAggrFuncRangeTopK(medianValue, true), } type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error) @@ -459,37 +468,138 @@ func newAggrFuncTopK(isReverse bool) aggrFunc { return nil, err } afe := func(tss []*timeseries) []*timeseries { - rvs := tss - for n := range rvs[0].Values { - sort.Slice(rvs, func(i, j int) bool { - a := rvs[i].Values[n] - b := rvs[j].Values[n] - cmp := lessWithNaNs(a, b) + for n := range tss[0].Values { + sort.Slice(tss, func(i, j int) bool { + a := tss[i].Values[n] + b := tss[j].Values[n] if isReverse { - cmp = !cmp + a, b = b, a } - return cmp + return lessWithNaNs(a, b) }) - if math.IsNaN(ks[n]) { - ks[n] = 0 - } - k := int(ks[n]) - if k < 0 { - k = 0 - } - if k > len(rvs) { - k = len(rvs) - } - for _, ts := range rvs[:len(rvs)-k] { - ts.Values[n] = nan - } + fillNaNsAtIdx(n, ks[n], tss) } - return removeNaNs(rvs) + return removeNaNs(tss) } return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true) } } +type tsWithValue struct { + ts *timeseries + value float64 +} + +func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc { + return func(afa *aggrFuncArg) ([]*timeseries, error) { + args := afa.args + if err := expectTransformArgsNum(args, 2); err != nil { + return nil, err + } + ks, err := getScalar(args[0], 0) + if err != nil { + return nil, err + } + afe := func(tss []*timeseries) []*timeseries { + maxs := make([]tsWithValue, len(tss)) + for i, ts := range tss { + value := f(ts.Values) + maxs[i] = tsWithValue{ + ts: ts, + value: value, + } + } + sort.Slice(maxs, func(i, j int) bool { + a := maxs[i].value + b := maxs[j].value + if isReverse { + a, b = b, a + } + return lessWithNaNs(a, b) + }) + for i := range maxs { + tss[i] = maxs[i].ts + } + for i, k := range ks { + fillNaNsAtIdx(i, k, tss) + } + return removeNaNs(tss) + } + return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true) + } +} + +func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) { + if math.IsNaN(k) { + k = 0 + } + kn := int(k) + if kn < 0 { + kn = 0 + } + if kn > len(tss) { + kn = len(tss) + } + for _, ts := range tss[:len(tss)-kn] { + ts.Values[idx] = nan + } +} + +func minValue(values []float64) float64 { + if len(values) == 0 { + return nan + } + min := values[0] + for _, v := range values[1:] { + if v < min { + min = v + } + } + return min +} + +func maxValue(values []float64) float64 { + if len(values) == 0 { + return nan + } + max := values[0] + for _, v := range values[1:] { + if v > max { + max = v + } + } + return max +} + +func avgValue(values []float64) float64 { + sum := float64(0) + count := 0 + for _, v := range values { + if math.IsNaN(v) { + continue + } + count++ + sum += v + } + if count == 0 { + return nan + } + return sum / float64(count) +} + +func medianValue(values []float64) float64 { + h := histogram.GetFast() + for _, v := range values { + if math.IsNaN(v) { + continue + } + h.Update(v) + } + value := h.Quantile(0.5) + histogram.PutFast(h) + return value +} + func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 2); err != nil { diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index e4019b4f2..67aee800e 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -3104,6 +3104,126 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r1, r2} f(q, resultExpected) }) + t.Run(`topk_min(1)`, func(t *testing.T) { + t.Parallel() + q := `sort(topk_min(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, nan, nan, nan}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) + t.Run(`bottomk_min(1)`, func(t *testing.T) { + t.Parallel() + q := `sort(bottomk_min(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) + t.Run(`topk_max(1)`, func(t *testing.T) { + t.Parallel() + q := `sort(topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) + t.Run(`bottomk_max(1)`, func(t *testing.T) { + t.Parallel() + q := `sort(bottomk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, nan, nan, nan}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) + t.Run(`topk_avg(1)`, func(t *testing.T) { + t.Parallel() + q := `sort(topk_avg(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) + t.Run(`bottomk_avg(1)`, func(t *testing.T) { + t.Parallel() + q := `sort(bottomk_avg(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, nan, nan, nan}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) + t.Run(`topk_median(1)`, func(t *testing.T) { + t.Parallel() + q := `sort(topk_median(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) + t.Run(`bottomk_median(1)`, func(t *testing.T) { + t.Parallel() + q := `sort(bottomk_median(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, nan, nan, nan}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) t.Run(`topk(1, nan_timeseries)`, func(t *testing.T) { t.Parallel() q := `topk(1, label_set(NaN, "foo", "bar") or label_set(time()/150, "baz", "sss")) default 0` @@ -4508,8 +4628,16 @@ func TestExecError(t *testing.T) { f(`count_values()`) f(`quantile()`) f(`topk()`) + f(`topk_min()`) + f(`topk_max()`) + f(`topk_avg()`) + f(`topk_median()`) f(`limitk()`) f(`bottomk()`) + f(`bottomk_min()`) + f(`bottomk_max()`) + f(`bottomk_avg()`) + f(`bottomk_median()`) f(`time(123)`) f(`start(1)`) f(`end(1)`) @@ -4552,6 +4680,7 @@ func TestExecError(t *testing.T) { f(`clamp_max(1, 1 or label_set(2, "xx", "foo"))`) f(`clamp_min(1, 1 or label_set(2, "xx", "foo"))`) f(`topk(label_set(2, "xx", "foo") or 1, 12)`) + f(`topk_avg(label_set(2, "xx", "foo") or 1, 12)`) f(`limitk(label_set(2, "xx", "foo") or 1, 12)`) f(`round(1, 1 or label_set(2, "xx", "foo"))`) f(`histogram_quantile(1 or label_set(2, "xx", "foo"), 1)`) diff --git a/docs/ExtendedPromQL.md b/docs/ExtendedPromQL.md index d1a23eeea..5624d2430 100644 --- a/docs/ExtendedPromQL.md +++ b/docs/ExtendedPromQL.md @@ -61,3 +61,13 @@ Try these extensions on [an editable Grafana dashboard](http://play-grafana.vict - `increases_over_time(m[d])` and `decreases_over_time(m[d])` - returns the number of `m` increases or decreases over the given duration `d`. - `prometheus_buckets(q)` - converts [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) buckets to Prometheus buckets with `le` labels. - `histogram(q)` - calculates aggregate histogram over `q` time series for each point on the graph. +- `topk_*` and `bottomk_*` aggregate functions, which return up to K time series. Note that the standard `topk` function may return more than K time series - + see [this article](https://www.robustperception.io/graph-top-n-time-series-in-grafana) for details. + - `topk_min(k, q)` - returns top K time series with the max minimums on the given time range + - `topk_max(k, q)` - returns top K time series with the max maximums on the given time range + - `topk_avg(k, q)` - returns top K time series with the max averages on the given time range + - `topk_median(k, q)` - returns top K time series with the max medians on the given time range + - `bottomk_min(k, q)` - returns bottom K time series with the min minimums on the given time range + - `bottomk_max(k, q)` - returns bottom K time series with the min maximums on the given time range + - `bottomk_avg(k, q)` - returns bottom K time series with the min averages on the given time range + - `bottomk_median(k, q)` - returns bottom K time series with the min medians on the given time range