From 285665e93b42f687ecd3282437ee4bdafae254b3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 15 Aug 2020 01:15:01 +0300 Subject: [PATCH] app/vmselect/promql: allow passing multiple args to aggregate functions such as `avg(q1, q2, q3)` --- app/vmselect/promql/aggr.go | 35 +++++++++++++++++++++----------- app/vmselect/promql/exec_test.go | 24 ++++++++++++++++++++-- docs/MetricsQL.md | 1 + 3 files changed, 46 insertions(+), 14 deletions(-) diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index 3a78c65314..e38b8dd77a 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -65,14 +65,25 @@ func getAggrFunc(s string) aggrFunc { func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc { return func(afa *aggrFuncArg) ([]*timeseries, error) { - args := afa.args - if err := expectTransformArgsNum(args, 1); err != nil { + tss, err := getAggrTimeseries(afa.args) + if err != nil { return nil, err } - return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, false) + return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false) } } +func getAggrTimeseries(args [][]*timeseries) ([]*timeseries, error) { + if len(args) == 0 { + return nil, fmt.Errorf("expecting at least one arg") + } + tss := args[0] + for _, arg := range args[1:] { + tss = append(tss, arg...) + } + return tss, nil +} + func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.ModifierExpr) { groupOp := strings.ToLower(modifier.Op) switch groupOp { @@ -126,8 +137,8 @@ func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeserie } func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) { - args := afa.args - if err := expectTransformArgsNum(args, 1); err != nil { + tss, err := getAggrTimeseries(afa.args) + if err != nil { return nil, err } afe := func(tss []*timeseries) []*timeseries { @@ -138,7 +149,7 @@ func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) { // Only a single time series per group must be returned limit = 1 } - return aggrFuncExt(afe, args[0], &afa.ae.Modifier, limit, true) + return aggrFuncExt(afe, tss, &afa.ae.Modifier, limit, true) } func aggrFuncGroup(tss []*timeseries) []*timeseries { @@ -434,8 +445,8 @@ func aggrFuncMode(tss []*timeseries) []*timeseries { } func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) { - args := afa.args - if err := expectTransformArgsNum(args, 1); err != nil { + tss, err := getAggrTimeseries(afa.args) + if err != nil { return nil, err } afe := func(tss []*timeseries) []*timeseries { @@ -476,7 +487,7 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) { } return tss } - return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, true) + return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, true) } // modeNoNaNs returns mode for a. @@ -811,13 +822,13 @@ func aggrFuncQuantile(afa *aggrFuncArg) ([]*timeseries, error) { } func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) { - args := afa.args - if err := expectTransformArgsNum(args, 1); err != nil { + tss, err := getAggrTimeseries(afa.args) + if err != nil { return nil, err } phis := evalNumber(afa.ec, 0.5)[0].Values afe := newAggrQuantileFunc(phis) - return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, false) + return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false) } func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries { diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 18bec093fb..d17c8ef699 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -3380,6 +3380,28 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`sum(multi-args)`, func(t *testing.T) { + t.Parallel() + q := `sum(1, 2, 3)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{6, 6, 6, 6, 6, 6}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`sum(union-args)`, func(t *testing.T) { + t.Parallel() + q := `sum((1, 2, 3))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`sum(scalar) by ()`, func(t *testing.T) { t.Parallel() q := `sum(123) by ()` @@ -5966,7 +5988,6 @@ func TestExecError(t *testing.T) { f(`label_move()`) f(`median_over_time()`) f(`median()`) - f(`median("foo", "bar")`) f(`keep_last_value()`) f(`keep_next_value()`) f(`interpolate()`) @@ -6068,7 +6089,6 @@ func TestExecError(t *testing.T) { ) + 10`) // Invalid aggregates - f(`sum(1, 2)`) f(`sum(1) foo (bar)`) f(`sum foo () (bar)`) f(`sum(foo) by (1)`) diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index 904afae6a2..ede6645544 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -37,6 +37,7 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `offset` may be negative. For example, `q offset -1h`. - [Range duration](https://prometheus.io/docs/prometheus/latest/querying/basics/#range-vector-selectors) and [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be fractional. For instance, `rate(node_network_receive_bytes_total[1.5m] offset 0.5d)`. - `default` binary operator. `q1 default q2` fills gaps in `q1` with the corresponding values from `q2`. +- Most aggregate functions accept arbitrary number of args. For example, `avg(q1, q2, q3)` would return the average values for every point across `q1`, `q2` and `q3`. - `histogram_quantile` accepts optional third arg - `boundsLabel`. In this case it returns `lower` and `upper` bounds for the estimated percentile. See [this issue for details](https://github.com/prometheus/prometheus/issues/5706). - `if` binary operator. `q1 if q2` removes values from `q1` for missing values from `q2`. - `ifnot` binary operator. `q1 ifnot q2` removes values from `q1` for existing values from `q2`.