metricsql: support optional 2nd argument for rollup functions (#3841)

* metricsql: support optional 2nd argument for rollup functions

Support optional 2nd argument `min`, `max` or `avg` for rollup functions:
 * rollup
 * rollup_delta
 * rollup_deriv
 * rollup_increase
 * rollup_rate
 * rollup_scrape_interval

 If second argument is passed, then rollup function will return only the selected aggregation type.
 This change can be useful for situations where only one type of rollup calculation is needed.
 For example, `rollup_rate(requests_total[5m], "max")`.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* wip

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2023-02-24 22:47:52 +01:00 committed by GitHub
parent 87aeeec3e8
commit dac21d874b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 205 additions and 24 deletions

View File

@ -7522,6 +7522,22 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2, r3, r4}
f(q, resultExpected)
})
t.Run(`rollup_candlestick(high)`, func(t *testing.T) {
t.Parallel()
q := `rollup_candlestick(alias(round(rand(0),0.01),"foobar")[:10s], "high")`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.9, 0.94, 0.97, 0.93, 0.98, 0.92},
Timestamps: timestampsExpected,
}
r.MetricName.MetricGroup = []byte("foobar")
r.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("high"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`rollup_increase()`, func(t *testing.T) {
t.Parallel()
q := `sort(rollup_increase(time()))`
@ -7555,6 +7571,61 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`rollup_rate()`, func(t *testing.T) {
t.Parallel()
q := `rollup_rate((2000-time())[600s])`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{5, 4, 3, 2, 1, 0},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("avg"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{6, 5, 4, 3, 2, 1},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("max"),
}}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{4, 3, 2, 1, 0, -1},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{{
Key: []byte("rollup"),
Value: []byte("min"),
}}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`rollup_rate(q, "max")`, func(t *testing.T) {
t.Parallel()
q := `rollup_rate((2000-time())[600s], "max")`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{6, 5, 4, 3, 2, 1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`rollup_rate(q, "avg")`, func(t *testing.T) {
t.Parallel()
q := `rollup_rate((2000-time())[600s], "avg")`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{5, 4, 3, 2, 1, 0},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`rollup_scrape_interval()`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label(rollup_scrape_interval(1[5m:10S]), "rollup")`
@ -7654,6 +7725,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`rollup_deriv(q, "max")`, func(t *testing.T) {
t.Parallel()
q := `sort(rollup_deriv(time()[100s:50s], "max"))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`{}`, func(t *testing.T) {
t.Parallel()
q := `{}`
@ -8535,6 +8617,8 @@ func TestExecError(t *testing.T) {
f(`changes_prometheus()`)
f(`delta()`)
f(`delta_prometheus()`)
f(`rollup_candlestick()`)
f(`rollup()`)
// Invalid argument type
f(`median_over_time({}, 2)`)
@ -8621,6 +8705,12 @@ func TestExecError(t *testing.T) {
f(`ru()`)
f(`ru(1)`)
f(`ru(1,3,3)`)
// Invalid rollup tags
f(`rollup_rate(time()[5m], "")`)
f(`rollup_rate(time()[5m], "foo")`)
f(`rollup_rate(time()[5m], "foo", "bar")`)
f(`rollup_candlestick(time(), "foo")`)
}
func testResultsEqual(t *testing.T, result, resultExpected []netstorage.Result) {

View File

@ -68,13 +68,13 @@ var rollupFuncs = map[string]newRollupFunc{
"rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets
"rate_over_sum": newRollupFuncOneArg(rollupRateOverSum),
"resets": newRollupFuncOneArg(rollupResets),
"rollup": newRollupFuncOneArg(rollupFake),
"rollup_candlestick": newRollupFuncOneArg(rollupFake),
"rollup_delta": newRollupFuncOneArg(rollupFake),
"rollup_deriv": newRollupFuncOneArg(rollupFake),
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_scrape_interval": newRollupFuncOneArg(rollupFake),
"rollup": newRollupFuncOneOrTwoArgs(rollupFake),
"rollup_candlestick": newRollupFuncOneOrTwoArgs(rollupFake),
"rollup_delta": newRollupFuncOneOrTwoArgs(rollupFake),
"rollup_deriv": newRollupFuncOneOrTwoArgs(rollupFake),
"rollup_increase": newRollupFuncOneOrTwoArgs(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_rate": newRollupFuncOneOrTwoArgs(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_scrape_interval": newRollupFuncOneOrTwoArgs(rollupFake),
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
"share_gt_over_time": newRollupShareGT,
"share_le_over_time": newRollupShareLE,
@ -282,6 +282,29 @@ func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
return aggrFuncNames, nil
}
func getRollupTag(expr metricsql.Expr) (string, error) {
fe, ok := expr.(*metricsql.FuncExpr)
if !ok {
logger.Panicf("BUG: unexpected expression; want metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil))
}
if len(fe.Args) < 2 {
return "", nil
}
if len(fe.Args) != 2 {
return "", fmt.Errorf("unexpected number of args; got %d; want %d", len(fe.Args), 2)
}
arg := fe.Args[1]
se, ok := arg.(*metricsql.StringExpr)
if !ok {
return "", fmt.Errorf("unexpected rollup tag type: %s; expecting string", arg.AppendString(nil))
}
if se.S == "" {
return "", fmt.Errorf("rollup tag cannot be empty")
}
return se.S, nil
}
func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start, end, step int64, maxPointsPerSeries int,
window, lookbackDelta int64, sharedTimestamps []int64) (
func(values []float64, timestamps []int64), []*rollupConfig, error) {
@ -311,35 +334,69 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
samplesScannedPerCall: samplesScannedPerCall,
}
}
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
dst = append(dst, newRollupConfig(rollupMin, "min"))
dst = append(dst, newRollupConfig(rollupMax, "max"))
dst = append(dst, newRollupConfig(rollupAvg, "avg"))
return dst
appendRollupConfigs := func(dst []*rollupConfig, expr metricsql.Expr) ([]*rollupConfig, error) {
tag, err := getRollupTag(expr)
if err != nil {
return nil, fmt.Errorf("invalid args for %s: %w", expr.AppendString(nil), err)
}
switch tag {
case "min":
dst = append(dst, newRollupConfig(rollupMin, ""))
case "max":
dst = append(dst, newRollupConfig(rollupMax, ""))
case "avg":
dst = append(dst, newRollupConfig(rollupAvg, ""))
case "":
dst = append(dst, newRollupConfig(rollupMin, "min"))
dst = append(dst, newRollupConfig(rollupMax, "max"))
dst = append(dst, newRollupConfig(rollupAvg, "avg"))
default:
return nil, fmt.Errorf("unexpected second arg for %s: %q; want `min`, `max` or `avg`", expr.AppendString(nil), tag)
}
return dst, nil
}
var rcs []*rollupConfig
var err error
switch funcName {
case "rollup":
rcs = appendRollupConfigs(rcs)
rcs, err = appendRollupConfigs(rcs, expr)
case "rollup_rate", "rollup_deriv":
preFuncPrev := preFunc
preFunc = func(values []float64, timestamps []int64) {
preFuncPrev(values, timestamps)
derivValues(values, timestamps)
}
rcs = appendRollupConfigs(rcs)
rcs, err = appendRollupConfigs(rcs, expr)
case "rollup_increase", "rollup_delta":
preFuncPrev := preFunc
preFunc = func(values []float64, timestamps []int64) {
preFuncPrev(values, timestamps)
deltaValues(values)
}
rcs = appendRollupConfigs(rcs)
rcs, err = appendRollupConfigs(rcs, expr)
case "rollup_candlestick":
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
tag, err := getRollupTag(expr)
if err != nil {
return nil, nil, fmt.Errorf("invalid args for %s: %w", expr.AppendString(nil), err)
}
switch tag {
case "open":
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
case "close":
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
case "low":
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
case "high":
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
case "":
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
default:
return nil, nil, fmt.Errorf("unexpected second arg for %s: %q; want `min`, `max` or `avg`", expr.AppendString(nil), tag)
}
case "rollup_scrape_interval":
preFuncPrev := preFunc
preFunc = func(values []float64, timestamps []int64) {
@ -357,7 +414,7 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
values[0] = values[1]
}
}
rcs = appendRollupConfigs(rcs)
rcs, err = appendRollupConfigs(rcs, expr)
case "aggr_over_time":
aggrFuncNames, err := getRollupAggrFuncNames(expr)
if err != nil {
@ -376,6 +433,9 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
default:
rcs = append(rcs, newRollupConfig(rf, ""))
}
if err != nil {
return nil, nil, err
}
return preFunc, rcs, nil
}
@ -845,6 +905,15 @@ func newRollupFuncTwoArgs(rf rollupFunc) newRollupFunc {
}
}
func newRollupFuncOneOrTwoArgs(rf rollupFunc) newRollupFunc {
return func(args []interface{}) (rollupFunc, error) {
if len(args) < 1 || len(args) > 2 {
return nil, fmt.Errorf("unexpected number of args; got %d; want 1...2", len(args))
}
return rf, nil
}
}
func newRollupHoltWinters(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 3); err != nil {
return nil, err

View File

@ -26,6 +26,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): choose the backend with the minimum number of concurrently executed requests [among the configured backends](https://docs.victoriametrics.com/vmauth.html#load-balancing) in a round-robin manner for serving the incoming requests. This allows spreading the load among backends more evenly, while improving the response time.
* FEATURE: [vmalert enterprise](https://docs.victoriametrics.com/vmalert.html): add ability to read alerting and recording rules from S3, GCS or S3-compatible object storage. See [these docs](https://docs.victoriametrics.com/vmalert.html#reading-rules-from-object-storage).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): automatically retry requests to remote storage if up to 5 errors occur during the data migration process. This should help continuing the data migration process on temporary errors. Previously `vmctl` was stopping after the first error. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3600).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): support optional 2nd argument `min`, `max` or `avg` for [rollup](https://docs.victoriametrics.com/MetricsQL.html#rollup), [rollup_delta](https://docs.victoriametrics.com/MetricsQL.html#rollup_delta), [rollup_deriv](https://docs.victoriametrics.com/MetricsQL.html#rollup_deriv), [rollup_increase](https://docs.victoriametrics.com/MetricsQL.html#rollup_increase), [rollup_rate](https://docs.victoriametrics.com/MetricsQL.html#rollup_rate) and [rollup_scrape_interval](https://docs.victoriametrics.com/MetricsQL.html#rollup_scrape_interval) function. If the second argument is passed, then the function returns only the selected aggregation type. This change can be useful for situations where only one type of rollup calculation is needed. For example, `rollup_rate(requests_total[1i], "max")` would return only the max increase rates for `requests_total` metric per each interval between adjacent points on the graph. See [this article](https://valyala.medium.com/why-irate-from-prometheus-doesnt-capture-spikes-45f9896d7832) for details.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): support optional 2nd argument `open`, `low`, `high`, `close` for [rollup_candlestick](https://docs.victoriametrics.com/MetricsQL.html#rollup_candlestick) function. If the second argument is passed, then the function returns only the selected aggregation type.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [share(q)](https://docs.victoriametrics.com/MetricsQL.html#share) aggregate function.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `mad_over_time(m[d])` function for calculating the [median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) over raw samples on the lookbehind window `d`. See [this feature request](https://github.com/prometheus/prometheus/issues/5514).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_mad(q)` function for calculating the [median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) over points per each time series returned by `q`.
@ -33,6 +35,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_outliers(k, q)` function for dropping outliers located farther than `k*range_mad(q)` from the `range_median(q)`. This should help removing outliers during query time at [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3759).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_zscore(z, q)` function for dropping outliers located farther than `z*range_stddev(q)` from `range_avg(q)`. This should help removing outliers during query time at [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3759).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show `median` instead of `avg` in graph tooltip and line legend, since `median` is more tolerant against spikes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3706).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): small UX improvements for mobile view. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3707) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3848).
* FEATURE: add `-search.logQueryMemoryUsage` command-line flag for logging queries, which need more memory than specified by this command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3553). Thanks to @michal-kralik for the idea and the intial implementation.
* FEATURE: allow setting zero value for `-search.latencyOffset` command-line flag. This may be needed in [some cases](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2061#issuecomment-1299109836). Previously the minimum supported value for `-search.latencyOffset` command-line flag was `1s`.

View File

@ -475,7 +475,7 @@ It is expected that the `series_selector` returns time series of [counter type](
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
This function is supported by PromQL. See also [rate](#rate).
This function is supported by PromQL. See also [rate](#rate) and [rollup_rate](#rollup_rate).
#### lag
@ -588,7 +588,7 @@ It is expected that the `series_selector` returns time series of [counter type](
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
This function is supported by PromQL. See also [irate](#irate).
This function is supported by PromQL. See also [irate](#irate) and [rollup_rate](#rollup_rate).
#### rate_over_sum
@ -615,6 +615,8 @@ This function is supported by PromQL.
on the given lookbehind window `d` and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
These values are calculated individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
#### rollup_candlestick
`rollup_candlestick(series_selector[d])` is a [rollup function](#rollup-functions), which calculates `open`, `high`, `low` and `close` values (aka OHLC)
@ -622,6 +624,8 @@ over raw samples on the given lookbehind window `d` and returns them in time ser
The calculations are performed individually per each time series returned
from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering). This function is useful for financial applications.
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
#### rollup_delta
`rollup_delta(series_selector[d])` is a [rollup function](#rollup-functions), which calculates differences between adjacent raw samples
@ -629,6 +633,8 @@ on the given lookbehind window `d` and returns `min`, `max` and `avg` values for
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
See also [rollup_increase](#rollup_increase).
@ -640,6 +646,8 @@ for adjacent raw samples on the given lookbehind window `d` and returns `min`, `
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
#### rollup_increase
@ -649,6 +657,8 @@ on the given lookbehind window `d` and returns `min`, `max` and `avg` values for
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names. See also [rollup_delta](#rollup_delta).
#### rollup_rate
@ -656,8 +666,15 @@ Metric names are stripped from the resulting rollups. Add [keep_metric_names](#k
`rollup_rate(series_selector[d])` is a [rollup function](#rollup-functions), which calculates per-second change rates for adjacent raw samples
on the given lookbehind window `d` and returns `min`, `max` and `avg` values for the calculated per-second change rates
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
See [this article](https://valyala.medium.com/why-irate-from-prometheus-doesnt-capture-spikes-45f9896d7832) in order to undertand better
when to use `rollup_rate()`.
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
#### rollup_scrape_interval
@ -667,6 +684,8 @@ adjacent raw samples on the given lookbehind window `d` and returns `min`, `max`
and returns them in time series with `rollup="min"`, `rollup="max"` and `rollup="avg"` additional labels.
The calculations are performed individually per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
Optional 2nd argument `min`, `max` or `avg` can be passed to keep only one calculation result and without adding a label.
Metric names are stripped from the resulting rollups. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names. See also [scrape_interval](#scrape_interval).
#### scrape_interval
@ -1587,9 +1606,9 @@ See also [label_lowercase](#label_lowercase).
#### label_value
`label_value(q, "label")` is [label manipulation function](#label-manipulation-functions), which returns numeric values
for the given `label` for every time series returned by `q`.
for the given `label` for every time series returned by `q`.
For example, if `label_value(foo, "bar")` is applied to `foo{bar="1.234"}`, then it will return a time series
For example, if `label_value(foo, "bar")` is applied to `foo{bar="1.234"}`, then it will return a time series
`foo{bar="1.234"}` with `1.234` value. Function will return no data for non-numeric label values.
#### sort_by_label