diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 8e0883dec..b1ed9b337 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -359,6 +359,9 @@ func evalExprs(ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) { func evalRollupFuncArgs(ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) { var re *metricsql.RollupExpr rollupArgIdx := getRollupArgIdx(fe.Name) + if len(fe.Args) <= rollupArgIdx { + return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx, fe.Name, len(fe.Args), fe.AppendString(nil)) + } args := make([]interface{}, len(fe.Args)) for i, arg := range fe.Args { if i == rollupArgIdx { @@ -430,7 +433,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E if iafc != nil { logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil)) } - rvs, err = evalRollupFuncWithSubquery(ecNew, name, rf, re) + rvs, err = evalRollupFuncWithSubquery(ecNew, name, rf, expr, re) } if err != nil { return nil, err @@ -449,7 +452,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E return rvs, nil } -func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr) ([]*timeseries, error) { +func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) { // TODO: determine whether to use rollupResultCacheV here. var step int64 if len(re.Step) > 0 { @@ -490,7 +493,10 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re * } sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step) - preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) + preFunc, rcs, err := getRollupConfigs(name, rf, expr, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) + if err != nil { + return nil, err + } tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) var tssLock sync.Mutex removeMetricGroup := !rollupFuncsKeepMetricGroup[name] @@ -624,7 +630,11 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, return tss, nil } sharedTimestamps := getTimestamps(start, ec.End, ec.Step) - preFunc, rcs := getRollupConfigs(name, rf, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) + preFunc, rcs, err := getRollupConfigs(name, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) + if err != nil { + rss.Cancel() + return nil, err + } // Verify timeseries fit available memory after the rollup. // Take into account points from tssCached. @@ -751,62 +761,6 @@ func doRollupForTimeseries(rc *rollupConfig, tsDst *timeseries, mnSrc *storage.M tsDst.denyReuse = true } -func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) ( - func(values []float64, timestamps []int64), []*rollupConfig) { - preFunc := func(values []float64, timestamps []int64) {} - if rollupFuncsRemoveCounterResets[name] { - preFunc = func(values []float64, timestamps []int64) { - removeCounterResets(values) - } - } - newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig { - return &rollupConfig{ - TagValue: tagValue, - Func: rf, - Start: start, - End: end, - Step: step, - Window: window, - MayAdjustWindow: rollupFuncsMayAdjustWindow[name], - LookbackDelta: lookbackDelta, - Timestamps: sharedTimestamps, - } - } - appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig { - dst = append(dst, newRollupConfig(rollupMin, "min")) - dst = append(dst, newRollupConfig(rollupMax, "max")) - dst = append(dst, newRollupConfig(rollupAvg, "avg")) - return dst - } - var rcs []*rollupConfig - switch name { - case "rollup": - rcs = appendRollupConfigs(rcs) - case "rollup_rate", "rollup_deriv": - preFuncPrev := preFunc - preFunc = func(values []float64, timestamps []int64) { - preFuncPrev(values, timestamps) - derivValues(values, timestamps) - } - rcs = appendRollupConfigs(rcs) - case "rollup_increase", "rollup_delta": - preFuncPrev := preFunc - preFunc = func(values []float64, timestamps []int64) { - preFuncPrev(values, timestamps) - deltaValues(values) - } - rcs = appendRollupConfigs(rcs) - case "rollup_candlestick": - rcs = append(rcs, newRollupConfig(rollupFirst, "open")) - rcs = append(rcs, newRollupConfig(rollupLast, "close")) - rcs = append(rcs, newRollupConfig(rollupMin, "low")) - rcs = append(rcs, newRollupConfig(rollupMax, "high")) - default: - rcs = append(rcs, newRollupConfig(rf, "")) - } - return preFunc, rcs -} - var bbPool bytesutil.ByteBufferPool func evalNumber(ec *EvalConfig, n float64) []*timeseries { diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 1b1f1b91e..02180449c 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -4480,6 +4480,54 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`aggr_over_time(single-func)`, func(t *testing.T) { + t.Parallel() + q := `aggr_over_time("increase", rand(0)[:10s])` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{5.465672601448873, 6.642207999066246, 6.8400051805114295, 7.182425481980655, 5.1677922402706, 6.594060518641982}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("rollup"), + Value: []byte("increase"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) + t.Run(`aggr_over_time(multi-func)`, func(t *testing.T) { + t.Parallel() + q := `sort(aggr_over_time(("min_over_time", "count_over_time", "max_over_time"), round(rand(0),0.1)[:10s]))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0, 0, 0, 0, 0, 0}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("rollup"), + Value: []byte("min_over_time"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.9, 0.9, 1, 0.9, 1, 0.9}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{{ + Key: []byte("rollup"), + Value: []byte("max_over_time"), + }} + r3 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{20, 20, 20, 20, 20, 20}, + Timestamps: timestampsExpected, + } + r3.MetricName.Tags = []storage.Tag{{ + Key: []byte("rollup"), + Value: []byte("count_over_time"), + }} + resultExpected := []netstorage.Result{r1, r2, r3} + f(q, resultExpected) + }) t.Run(`rollup_candlestick()`, func(t *testing.T) { t.Parallel() q := `sort(rollup_candlestick(round(rand(0),0.01)[:10s]))` @@ -5181,6 +5229,9 @@ func TestExecError(t *testing.T) { f(`alias(1, "foo", "bar")`) f(`lifetime()`) f(`lag()`) + f(`aggr_over_time()`) + f(`aggr_over_time(foo)`) + f(`aggr_over_time("foo", bar, 1)`) // Invalid argument type f(`median_over_time({}, 2)`) @@ -5216,6 +5267,8 @@ func TestExecError(t *testing.T) { f(`label_transform(1, "foo", "bar", 4)`) f(`label_transform(1, "foo", "invalid(regexp", "baz`) f(`alias(1, 2)`) + f(`aggr_over_time(1, 2)`) + f(`aggr_over_time(("foo", "bar"), 3)`) // Duplicate timeseries f(`(label_set(1, "foo", "bar") or label_set(2, "foo", "baz")) diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index c03f4ddda..bd0733b3e 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" "github.com/valyala/histogram" @@ -63,6 +64,45 @@ var rollupFuncs = map[string]newRollupFunc{ "rollup_delta": newRollupFuncOneArg(rollupFake), "rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets "rollup_candlestick": newRollupFuncOneArg(rollupFake), + "aggr_over_time": newRollupFuncTwoArgs(rollupFake), +} + +// rollupAggrFuncs are functions that can be passed to `aggr_over_time()` +var rollupAggrFuncs = map[string]rollupFunc{ + // Standard rollup funcs from PromQL. + "changes": rollupChanges, + "delta": rollupDelta, + "deriv": rollupDerivSlow, + "deriv_fast": rollupDerivFast, + "idelta": rollupIdelta, + "increase": rollupIncrease, // + rollupFuncsRemoveCounterResets + "irate": rollupIderiv, // + rollupFuncsRemoveCounterResets + "rate": rollupDerivFast, // + rollupFuncsRemoveCounterResets + "resets": rollupResets, + "avg_over_time": rollupAvg, + "min_over_time": rollupMin, + "max_over_time": rollupMax, + "sum_over_time": rollupSum, + "count_over_time": rollupCount, + "stddev_over_time": rollupStddev, + "stdvar_over_time": rollupStdvar, + "absent_over_time": rollupAbsent, + + // Additional rollup funcs. + "sum2_over_time": rollupSum2, + "geomean_over_time": rollupGeomean, + "first_over_time": rollupFirst, + "last_over_time": rollupLast, + "distinct_over_time": rollupDistinct, + "increases_over_time": rollupIncreases, + "decreases_over_time": rollupDecreases, + "integrate": rollupIntegrate, + "ideriv": rollupIderiv, + "lifetime": rollupLifetime, + "lag": rollupLag, + "scrape_interval": rollupScrapeInterval, + "tmin_over_time": rollupTmin, + "tmax_over_time": rollupTmax, } var rollupFuncsMayAdjustWindow = map[string]bool{ @@ -95,15 +135,128 @@ var rollupFuncsKeepMetricGroup = map[string]bool{ "geomean_over_time": true, } +func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) { + fe, ok := expr.(*metricsql.FuncExpr) + if !ok { + logger.Panicf("BUG: unexpected expression; want metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil)) + } + if fe.Name != "aggr_over_time" { + logger.Panicf("BUG: unexpected function name: %q; want `aggr_over_time`", fe.Name) + } + if len(fe.Args) != 2 { + return nil, fmt.Errorf("unexpected number of args to aggr_over_time(); got %d; want %d", len(fe.Args), 2) + } + arg := fe.Args[0] + var aggrFuncNames []string + if se, ok := arg.(*metricsql.StringExpr); ok { + aggrFuncNames = append(aggrFuncNames, se.S) + } else { + fe, ok := arg.(*metricsql.FuncExpr) + if !ok || fe.Name != "" { + return nil, fmt.Errorf("%s cannot be passed to aggr_over_time(); expecting quoted aggregate function name or a list of quoted aggregate function names", + arg.AppendString(nil)) + } + for _, e := range fe.Args { + se, ok := e.(*metricsql.StringExpr) + if !ok { + return nil, fmt.Errorf("%s cannot be passed here; expecting quoted aggregate function name", e.AppendString(nil)) + } + aggrFuncNames = append(aggrFuncNames, se.S) + } + } + if len(aggrFuncNames) == 0 { + return nil, fmt.Errorf("aggr_over_time() must contain at least a single aggregate function name") + } + for _, s := range aggrFuncNames { + if rollupAggrFuncs[s] == nil { + return nil, fmt.Errorf("%q cannot be used in `aggr_over_time` function; expecting quoted aggregate function name", s) + } + } + return aggrFuncNames, nil +} + func getRollupArgIdx(funcName string) int { funcName = strings.ToLower(funcName) if rollupFuncs[funcName] == nil { logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", funcName) } - if funcName == "quantile_over_time" { + switch funcName { + case "quantile_over_time", "aggr_over_time": return 1 + default: + return 0 } - return 0 +} + +func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) ( + func(values []float64, timestamps []int64), []*rollupConfig, error) { + preFunc := func(values []float64, timestamps []int64) {} + if rollupFuncsRemoveCounterResets[name] { + preFunc = func(values []float64, timestamps []int64) { + removeCounterResets(values) + } + } + newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig { + return &rollupConfig{ + TagValue: tagValue, + Func: rf, + Start: start, + End: end, + Step: step, + Window: window, + MayAdjustWindow: rollupFuncsMayAdjustWindow[name], + LookbackDelta: lookbackDelta, + Timestamps: sharedTimestamps, + } + } + appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig { + dst = append(dst, newRollupConfig(rollupMin, "min")) + dst = append(dst, newRollupConfig(rollupMax, "max")) + dst = append(dst, newRollupConfig(rollupAvg, "avg")) + return dst + } + var rcs []*rollupConfig + switch name { + case "rollup": + rcs = appendRollupConfigs(rcs) + case "rollup_rate", "rollup_deriv": + preFuncPrev := preFunc + preFunc = func(values []float64, timestamps []int64) { + preFuncPrev(values, timestamps) + derivValues(values, timestamps) + } + rcs = appendRollupConfigs(rcs) + case "rollup_increase", "rollup_delta": + preFuncPrev := preFunc + preFunc = func(values []float64, timestamps []int64) { + preFuncPrev(values, timestamps) + deltaValues(values) + } + rcs = appendRollupConfigs(rcs) + case "rollup_candlestick": + rcs = append(rcs, newRollupConfig(rollupFirst, "open")) + rcs = append(rcs, newRollupConfig(rollupLast, "close")) + rcs = append(rcs, newRollupConfig(rollupMin, "low")) + rcs = append(rcs, newRollupConfig(rollupMax, "high")) + case "aggr_over_time": + aggrFuncNames, err := getRollupAggrFuncNames(expr) + if err != nil { + return nil, nil, fmt.Errorf("invalid args to %s: %s", expr.AppendString(nil), err) + } + for _, aggrFuncName := range aggrFuncNames { + if rollupFuncsRemoveCounterResets[aggrFuncName] { + // There is no need to save the previous preFunc, since it is either empty or the same. + preFunc = func(values []float64, timestamps []int64) { + removeCounterResets(values) + } + } + rf := rollupAggrFuncs[aggrFuncName] + rcs = append(rcs, newRollupConfig(rf, aggrFuncName)) + } + default: + rcs = append(rcs, newRollupConfig(rf, "")) + } + return preFunc, rcs, nil } func getRollupFunc(funcName string) newRollupFunc { @@ -489,6 +642,15 @@ func newRollupFuncOneArg(rf rollupFunc) newRollupFunc { } } +func newRollupFuncTwoArgs(rf rollupFunc) newRollupFunc { + return func(args []interface{}) (rollupFunc, error) { + if err := expectRollupArgsNum(args, 2); err != nil { + return nil, err + } + return rf, nil + } +} + func newRollupHoltWinters(args []interface{}) (rollupFunc, error) { if err := expectRollupArgsNum(args, 3); err != nil { return nil, err diff --git a/docs/ExtendedPromQL.md b/docs/ExtendedPromQL.md index 34022a8c1..a8ebf74c9 100644 --- a/docs/ExtendedPromQL.md +++ b/docs/ExtendedPromQL.md @@ -100,3 +100,6 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g Example: `share_gt_over_time(up[24h], 0)` - returns service availability for the last 24 hours. - `tmin_over_time(m[d])` - returns timestamp for the minimum value for `m` over `d` time range. - `tmax_over_time(m[d])` - returns timestamp for the maximum value for `m` over `d` time range. +- `aggr_over_time(("aggr_func1", "aggr_func2", ...), m[d])` - simultaneously calculates all the listed `aggr_func*` for `m` over `d` time range. + `aggr_func*` can contain any functions that accept range vector. For instance, `aggr_over_time(("min_over_time", "max_over_time", "rate"), m[d])` + would calculate `min_over_time`, `max_over_time` and `rate` for `m[d]`. diff --git a/lib/metricsql/rollup.go b/lib/metricsql/rollup.go index 1b7586716..7156f92d7 100644 --- a/lib/metricsql/rollup.go +++ b/lib/metricsql/rollup.go @@ -53,6 +53,7 @@ var rollupFuncs = map[string]bool{ "rollup_delta": true, "rollup_increase": true, "rollup_candlestick": true, + "aggr_over_time": true, } // IsRollupFunc returns whether funcName is known rollup function.