app/vmselect/promql: properly handle aggr(aggr_over_time(...))

This commit is contained in:
Aliaksandr Valialkin 2020-01-10 21:56:59 +02:00
parent 87a106702b
commit adc36d00b7
2 changed files with 44 additions and 0 deletions

View File

@ -4538,6 +4538,41 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2, r3} resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`avg(aggr_over_time(multi-func))`, func(t *testing.T) {
t.Parallel()
q := `avg(aggr_over_time(("min_over_time", "max_over_time"), time()[:10s]))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`avg(aggr_over_time(multi-func)) by (rollup)`, func(t *testing.T) {
t.Parallel()
q := `sort(avg(aggr_over_time(("min_over_time", "max_over_time"), time()[:10s])) by (rollup))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{800, 1000, 1200, 1400, 1600, 1800},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("min_over_time"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("max_over_time"),
}}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`rollup_candlestick()`, func(t *testing.T) { t.Run(`rollup_candlestick()`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `sort(rollup_candlestick(round(rand(0),0.01)[:10s]))` q := `sort(rollup_candlestick(round(rand(0),0.01)[:10s]))`

View File

@ -136,6 +136,15 @@ var rollupFuncsKeepMetricGroup = map[string]bool{
} }
func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) { func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
afe, ok := expr.(*metricsql.AggrFuncExpr)
if ok {
// This is for incremental aggregate function case:
//
// sum(aggr_over_time(...))
//
// See aggr_incremental.go for details.
expr = afe.Args[0]
}
fe, ok := expr.(*metricsql.FuncExpr) fe, ok := expr.(*metricsql.FuncExpr)
if !ok { if !ok {
logger.Panicf("BUG: unexpected expression; want metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil)) logger.Panicf("BUG: unexpected expression; want metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil))