From 2f58f37f076b277797eb7965b22c7156e6f67558 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 1 Nov 2019 12:21:12 +0200 Subject: [PATCH] app/vmselect/promql: add `lag(q[d])` function, which returns the lag between the current timestamp and the timstamp for the last data point in `q` --- app/vmselect/promql/exec_test.go | 13 +++++++++++++ app/vmselect/promql/rollup.go | 20 ++++++++++++++++++-- app/vmselect/promql/rollup_test.go | 14 ++++++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 20e4139cbe..bdcfd1b794 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -3710,6 +3710,17 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`lag()`, func(t *testing.T) { + t.Parallel() + q := `lag(time()[60s:17s])` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{14, 10, 6, 2, 15, 11}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`()`, func(t *testing.T) { t.Parallel() q := `()` @@ -4175,6 +4186,8 @@ func TestExecError(t *testing.T) { f(`alias()`) f(`alias(1)`) f(`alias(1, "foo", "bar")`) + f(`lifetime()`) + f(`lag()`) // Invalid argument type f(`median_over_time({}, 2)`) diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 0754ed4a82..144001435b 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -48,6 +48,7 @@ var rollupFuncs = map[string]newRollupFunc{ "integrate": newRollupFuncOneArg(rollupIntegrate), "ideriv": newRollupFuncOneArg(rollupIderiv), "lifetime": newRollupFuncOneArg(rollupLifetime), + "lag": newRollupFuncOneArg(rollupLag), "scrape_interval": newRollupFuncOneArg(rollupScrapeInterval), "rollup": newRollupFuncOneArg(rollupFake), "rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets @@ -113,8 +114,9 @@ type rollupFuncArg struct { values []float64 timestamps []int64 - idx int - step int64 + currTimestamp int64 + idx int + step int64 } func (rfa *rollupFuncArg) reset() { @@ -122,6 +124,7 @@ func (rfa *rollupFuncArg) reset() { rfa.prevTimestamp = 0 rfa.values = nil rfa.timestamps = nil + rfa.currTimestamp = 0 rfa.idx = 0 rfa.step = 0 } @@ -226,6 +229,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i rfa.values = values[i:j] rfa.timestamps = timestamps[i:j] + rfa.currTimestamp = tEnd value := rc.Func(rfa) rfa.idx++ dstValues = append(dstValues, value) @@ -799,6 +803,18 @@ func rollupLifetime(rfa *rollupFuncArg) float64 { return float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) * 1e-3 } +func rollupLag(rfa *rollupFuncArg) float64 { + // Calculate the duration between the current timestamp and the last data point. + timestamps := rfa.timestamps + if len(timestamps) == 0 { + if math.IsNaN(rfa.prevValue) { + return nan + } + return float64(rfa.currTimestamp-rfa.prevTimestamp) * 1e-3 + } + return float64(rfa.currTimestamp-timestamps[len(timestamps)-1]) * 1e-3 +} + func rollupScrapeInterval(rfa *rollupFuncArg) float64 { // Calculate the average interval between data points. timestamps := rfa.timestamps diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index e207657cc8..a203412f08 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -632,6 +632,20 @@ func TestRollupFuncsNoWindow(t *testing.T) { timestampsExpected := []int64{10, 50, 90, 130} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) + t.Run("lag", func(t *testing.T) { + rc := rollupConfig{ + Func: rollupLag, + Start: 0, + End: 160, + Step: 40, + Window: 0, + } + rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) + values := rc.Do(nil, testValues, testTimestamps) + valuesExpected := []float64{nan, 0.004, 0, 0, 0.03} + timestampsExpected := []int64{0, 40, 80, 120, 160} + testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) + }) t.Run("lifetime_1", func(t *testing.T) { rc := rollupConfig{ Func: rollupLifetime,