diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 3461c93f4..6b8019acc 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -3904,6 +3904,22 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r1} f(q, resultExpected) }) + t.Run(`keep_next_value()`, func(t *testing.T) { + t.Parallel() + q := `keep_next_value(label_set(time() < 1300 default time() > 1700, "__name__", "foobar", "x", "y"))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1000, 1200, 1800, 1800, 1800, 2000}, + Timestamps: timestampsExpected, + } + r1.MetricName.MetricGroup = []byte("foobar") + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("x"), + Value: []byte("y"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) t.Run(`distinct_over_time([500s])`, func(t *testing.T) { t.Parallel() q := `distinct_over_time((time() < 1700)[500s])` @@ -5342,6 +5358,7 @@ func TestExecError(t *testing.T) { f(`median()`) f(`median("foo", "bar")`) f(`keep_last_value()`) + f(`keep_next_value()`) f(`distinct_over_time()`) f(`distinct()`) f(`alias()`) diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index 065b521f6..0048ddaa4 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -70,6 +70,7 @@ var transformFuncs = map[string]transformFunc{ "union": transformUnion, "": transformUnion, // empty func is a synonim to union "keep_last_value": transformKeepLastValue, + "keep_next_value": transformKeepNextValue, "start": newTransformFuncZeroArgs(transformStart), "end": newTransformFuncZeroArgs(transformEnd), "step": newTransformFuncZeroArgs(transformStep), @@ -724,13 +725,37 @@ func transformKeepLastValue(tfa *transformFuncArg) ([]*timeseries, error) { if len(values) == 0 { continue } - prevValue := values[0] + lastValue := values[0] for i, v := range values { - if math.IsNaN(v) { - v = prevValue + if !math.IsNaN(v) { + lastValue = v + continue } - values[i] = v - prevValue = v + values[i] = lastValue + } + } + return rvs, nil +} + +func transformKeepNextValue(tfa *transformFuncArg) ([]*timeseries, error) { + args := tfa.args + if err := expectTransformArgsNum(args, 1); err != nil { + return nil, err + } + rvs := args[0] + for _, ts := range rvs { + values := ts.Values + if len(values) == 0 { + continue + } + nextValue := values[len(values)-1] + for i := len(values) - 1; i >= 0; i-- { + v := values[i] + if !math.IsNaN(v) { + nextValue = v + continue + } + values[i] = nextValue } } return rvs, nil diff --git a/docs/ExtendedPromQL.md b/docs/ExtendedPromQL.md index f749b4095..f79574a46 100644 --- a/docs/ExtendedPromQL.md +++ b/docs/ExtendedPromQL.md @@ -70,7 +70,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `median_over_time(m[d])` - calculates median values for `m` over `d` time window. Shorthand to `quantile_over_time(0.5, m[d])`. - `median(q)` - median aggregate. Shorthand to `quantile(0.5, q)`. - `limitk(k, q)` - limits the number of time series returned from `q` to `k`. -- `keep_last_value(q)` - fills missing data (gaps) in `q` with the previous value. +- `keep_last_value(q)` - fills missing data (gaps) in `q` with the previous non-empty value. +- `keep_next_value(q)` - fills missing data (gaps) in `q` with the next non-empty value. - `distinct_over_time(m[d])` - returns distinct number of values for `m` data points over `d` duration. - `distinct(q)` - returns a time series with the number of unique values for each timestamp in `q`. - `sum2_over_time(m[d])` - returns sum of squares for all the `m` values over `d` duration. diff --git a/lib/metricsql/transform.go b/lib/metricsql/transform.go index cb5701705..53c262561 100644 --- a/lib/metricsql/transform.go +++ b/lib/metricsql/transform.go @@ -49,6 +49,7 @@ var transformFuncs = map[string]bool{ "union": true, "": true, // empty func is a synonim to union "keep_last_value": true, + "keep_next_value": true, "start": true, "end": true, "step": true,