app/vmselect/promql: allow passing multiple args to aggregate functions such as avg(q1, q2, q3)

This commit is contained in:
Aliaksandr Valialkin 2020-08-15 01:15:01 +03:00
parent a2021d0dde
commit 285665e93b
3 changed files with 46 additions and 14 deletions

View File

@ -65,14 +65,25 @@ func getAggrFunc(s string) aggrFunc {
func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc { func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc {
return func(afa *aggrFuncArg) ([]*timeseries, error) { return func(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args tss, err := getAggrTimeseries(afa.args)
if err := expectTransformArgsNum(args, 1); err != nil { if err != nil {
return nil, err return nil, err
} }
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, false) return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false)
} }
} }
func getAggrTimeseries(args [][]*timeseries) ([]*timeseries, error) {
if len(args) == 0 {
return nil, fmt.Errorf("expecting at least one arg")
}
tss := args[0]
for _, arg := range args[1:] {
tss = append(tss, arg...)
}
return tss, nil
}
func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.ModifierExpr) { func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.ModifierExpr) {
groupOp := strings.ToLower(modifier.Op) groupOp := strings.ToLower(modifier.Op)
switch groupOp { switch groupOp {
@ -126,8 +137,8 @@ func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeserie
} }
func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) { func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args tss, err := getAggrTimeseries(afa.args)
if err := expectTransformArgsNum(args, 1); err != nil { if err != nil {
return nil, err return nil, err
} }
afe := func(tss []*timeseries) []*timeseries { afe := func(tss []*timeseries) []*timeseries {
@ -138,7 +149,7 @@ func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) {
// Only a single time series per group must be returned // Only a single time series per group must be returned
limit = 1 limit = 1
} }
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, limit, true) return aggrFuncExt(afe, tss, &afa.ae.Modifier, limit, true)
} }
func aggrFuncGroup(tss []*timeseries) []*timeseries { func aggrFuncGroup(tss []*timeseries) []*timeseries {
@ -434,8 +445,8 @@ func aggrFuncMode(tss []*timeseries) []*timeseries {
} }
func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) { func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args tss, err := getAggrTimeseries(afa.args)
if err := expectTransformArgsNum(args, 1); err != nil { if err != nil {
return nil, err return nil, err
} }
afe := func(tss []*timeseries) []*timeseries { afe := func(tss []*timeseries) []*timeseries {
@ -476,7 +487,7 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
} }
return tss return tss
} }
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, true) return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, true)
} }
// modeNoNaNs returns mode for a. // modeNoNaNs returns mode for a.
@ -811,13 +822,13 @@ func aggrFuncQuantile(afa *aggrFuncArg) ([]*timeseries, error) {
} }
func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) { func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args tss, err := getAggrTimeseries(afa.args)
if err := expectTransformArgsNum(args, 1); err != nil { if err != nil {
return nil, err return nil, err
} }
phis := evalNumber(afa.ec, 0.5)[0].Values phis := evalNumber(afa.ec, 0.5)[0].Values
afe := newAggrQuantileFunc(phis) afe := newAggrQuantileFunc(phis)
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, false) return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false)
} }
func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries { func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries {

View File

@ -3380,6 +3380,28 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`sum(multi-args)`, func(t *testing.T) {
t.Parallel()
q := `sum(1, 2, 3)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{6, 6, 6, 6, 6, 6},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum(union-args)`, func(t *testing.T) {
t.Parallel()
q := `sum((1, 2, 3))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum(scalar) by ()`, func(t *testing.T) { t.Run(`sum(scalar) by ()`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `sum(123) by ()` q := `sum(123) by ()`
@ -5966,7 +5988,6 @@ func TestExecError(t *testing.T) {
f(`label_move()`) f(`label_move()`)
f(`median_over_time()`) f(`median_over_time()`)
f(`median()`) f(`median()`)
f(`median("foo", "bar")`)
f(`keep_last_value()`) f(`keep_last_value()`)
f(`keep_next_value()`) f(`keep_next_value()`)
f(`interpolate()`) f(`interpolate()`)
@ -6068,7 +6089,6 @@ func TestExecError(t *testing.T) {
) + 10`) ) + 10`)
// Invalid aggregates // Invalid aggregates
f(`sum(1, 2)`)
f(`sum(1) foo (bar)`) f(`sum(1) foo (bar)`)
f(`sum foo () (bar)`) f(`sum foo () (bar)`)
f(`sum(foo) by (1)`) f(`sum(foo) by (1)`)

View File

@ -37,6 +37,7 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `offset` may be negative. For example, `q offset -1h`. - `offset` may be negative. For example, `q offset -1h`.
- [Range duration](https://prometheus.io/docs/prometheus/latest/querying/basics/#range-vector-selectors) and [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be fractional. For instance, `rate(node_network_receive_bytes_total[1.5m] offset 0.5d)`. - [Range duration](https://prometheus.io/docs/prometheus/latest/querying/basics/#range-vector-selectors) and [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be fractional. For instance, `rate(node_network_receive_bytes_total[1.5m] offset 0.5d)`.
- `default` binary operator. `q1 default q2` fills gaps in `q1` with the corresponding values from `q2`. - `default` binary operator. `q1 default q2` fills gaps in `q1` with the corresponding values from `q2`.
- Most aggregate functions accept arbitrary number of args. For example, `avg(q1, q2, q3)` would return the average values for every point across `q1`, `q2` and `q3`.
- `histogram_quantile` accepts optional third arg - `boundsLabel`. In this case it returns `lower` and `upper` bounds for the estimated percentile. See [this issue for details](https://github.com/prometheus/prometheus/issues/5706). - `histogram_quantile` accepts optional third arg - `boundsLabel`. In this case it returns `lower` and `upper` bounds for the estimated percentile. See [this issue for details](https://github.com/prometheus/prometheus/issues/5706).
- `if` binary operator. `q1 if q2` removes values from `q1` for missing values from `q2`. - `if` binary operator. `q1 if q2` removes values from `q1` for missing values from `q2`.
- `ifnot` binary operator. `q1 ifnot q2` removes values from `q1` for existing values from `q2`. - `ifnot` binary operator. `q1 ifnot q2` removes values from `q1` for existing values from `q2`.