From 26dc21cf64b80c8ab73dae4552f7da07ddd3432e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 25 Sep 2019 20:36:51 +0300 Subject: [PATCH] app/vmselect/promql: add `increases_over_time` and `decreases_over_time` functions `increases_over_time(q[d])` returns the number of `q` increases during the given duration `d`. `decreases_over_time(q[d])` returns the number of `q` decreases during the given duration `d`. --- app/vmselect/promql/exec_test.go | 22 +++++++++++ app/vmselect/promql/rollup.go | 63 +++++++++++++++++++++++------- app/vmselect/promql/rollup_test.go | 2 + 3 files changed, 72 insertions(+), 15 deletions(-) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 7288ee858..6dfeed585 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -2671,6 +2671,28 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`increases_over_time`, func(t *testing.T) { + t.Parallel() + q := `increases_over_time(rand(0)[200s:10s])` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{11, 9, 9, 12, 9, 8}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`decreases_over_time`, func(t *testing.T) { + t.Parallel() + q := `decreases_over_time(rand(0)[200s:10s])` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{9, 11, 11, 8, 11, 12}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`limitk(-1)`, func(t *testing.T) { t.Parallel() q := `limitk(-1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))` diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index de14eb0e7..d5467c8bc 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -38,21 +38,23 @@ var rollupFuncs = map[string]newRollupFunc{ "stdvar_over_time": newRollupFuncOneArg(rollupStdvar), // Additional rollup funcs. - "sum2_over_time": newRollupFuncOneArg(rollupSum2), - "geomean_over_time": newRollupFuncOneArg(rollupGeomean), - "first_over_time": newRollupFuncOneArg(rollupFirst), - "last_over_time": newRollupFuncOneArg(rollupLast), - "distinct_over_time": newRollupFuncOneArg(rollupDistinct), - "integrate": newRollupFuncOneArg(rollupIntegrate), - "ideriv": newRollupFuncOneArg(rollupIderiv), - "lifetime": newRollupFuncOneArg(rollupLifetime), - "scrape_interval": newRollupFuncOneArg(rollupScrapeInterval), - "rollup": newRollupFuncOneArg(rollupFake), - "rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets - "rollup_deriv": newRollupFuncOneArg(rollupFake), - "rollup_delta": newRollupFuncOneArg(rollupFake), - "rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets - "rollup_candlestick": newRollupFuncOneArg(rollupFake), + "sum2_over_time": newRollupFuncOneArg(rollupSum2), + "geomean_over_time": newRollupFuncOneArg(rollupGeomean), + "first_over_time": newRollupFuncOneArg(rollupFirst), + "last_over_time": newRollupFuncOneArg(rollupLast), + "distinct_over_time": newRollupFuncOneArg(rollupDistinct), + "increases_over_time": newRollupFuncOneArg(rollupIncreases), + "decreases_over_time": newRollupFuncOneArg(rollupDecreases), + "integrate": newRollupFuncOneArg(rollupIntegrate), + "ideriv": newRollupFuncOneArg(rollupIderiv), + "lifetime": newRollupFuncOneArg(rollupLifetime), + "scrape_interval": newRollupFuncOneArg(rollupScrapeInterval), + "rollup": newRollupFuncOneArg(rollupFake), + "rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets + "rollup_deriv": newRollupFuncOneArg(rollupFake), + "rollup_delta": newRollupFuncOneArg(rollupFake), + "rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets + "rollup_candlestick": newRollupFuncOneArg(rollupFake), } var rollupFuncsMayAdjustWindow = map[string]bool{ @@ -820,6 +822,37 @@ func rollupChanges(rfa *rollupFuncArg) float64 { return float64(n) } +func rollupIncreases(rfa *rollupFuncArg) float64 { + // There is no need in handling NaNs here, since they must be cleaned up + // before calling rollup funcs. + values := rfa.values + if len(values) == 0 { + if math.IsNaN(rfa.prevValue) { + return nan + } + return 0 + } + prevValue := rfa.prevValue + if math.IsNaN(prevValue) { + prevValue = values[0] + values = values[1:] + } + if len(values) == 0 { + return 0 + } + n := 0 + for _, v := range values { + if v > prevValue { + n++ + } + prevValue = v + } + return float64(n) +} + +// `decreases_over_time` logic is the same as `resets` logic. +var rollupDecreases = rollupResets + func rollupResets(rfa *rollupFuncArg) float64 { // There is no need in handling NaNs here, since they must be cleaned up // before calling rollup funcs. diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index 90c6c58dc..5dff302b8 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -294,6 +294,8 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) { f("integrate", 61.0275) f("distinct_over_time", 8) f("ideriv", 0) + f("decreases_over_time", 5) + f("increases_over_time", 5) } func TestRollupNewRollupFuncError(t *testing.T) {