app/vmselect/promql: duration handling improvements in MetricsQL queries

- Support durations anywhere in MetricsQL queries. E.g. sum_over_time(m[1h])/1h is equivalent to sum_over_time(m[1h])/3600
- Support durations without suffix. E.g. rate(m[300]) is equivalent to rate(m[5m])
This commit is contained in:
Aliaksandr Valialkin 2021-07-12 17:16:38 +03:00
parent 51cd19d2e3
commit 9add9d86a6
12 changed files with 176 additions and 128 deletions

View File

@ -26,7 +26,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
"github.com/valyala/fastjson/fastfloat"
"github.com/valyala/quicktemplate"
)
@ -1060,15 +1059,9 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if err != nil {
return err
}
if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" {
window, err := parsePositiveDuration(windowStr, step)
if err != nil {
return fmt.Errorf("cannot parse window: %w", err)
}
offset, err := parseDuration(offsetStr, step)
if err != nil {
return fmt.Errorf("cannot parse offset: %w", err)
}
if childQuery, windowExpr, offsetExpr := promql.IsMetricSelectorWithRollup(query); childQuery != "" {
window := windowExpr.Duration(step)
offset := offsetExpr.Duration(step)
start -= offset
end := start
start = end - window
@ -1083,22 +1076,13 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
queryDuration.UpdateDuration(startTime)
return nil
}
if childQuery, windowStr, stepStr, offsetStr := promql.IsRollup(query); childQuery != "" {
newStep, err := parsePositiveDuration(stepStr, step)
if err != nil {
return fmt.Errorf("cannot parse step: %w", err)
}
if childQuery, windowExpr, stepExpr, offsetExpr := promql.IsRollup(query); childQuery != "" {
newStep := stepExpr.Duration(step)
if newStep > 0 {
step = newStep
}
window, err := parsePositiveDuration(windowStr, step)
if err != nil {
return fmt.Errorf("cannot parse window: %w", err)
}
offset, err := parseDuration(offsetStr, step)
if err != nil {
return fmt.Errorf("cannot parse offset: %w", err)
}
window := windowExpr.Duration(step)
offset := offsetExpr.Duration(step)
start -= offset
end := start
start = end - window
@ -1158,20 +1142,6 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
var queryDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/query"}`)
func parseDuration(s string, step int64) (int64, error) {
if len(s) == 0 {
return 0, nil
}
return metricsql.DurationValue(s, step)
}
func parsePositiveDuration(s string, step int64) (int64, error) {
if len(s) == 0 {
return 0, nil
}
return metricsql.PositiveDurationValue(s, step)
}
// QueryRangeHandler processes /api/v1/query_range request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries

View File

@ -353,6 +353,12 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
rv := evalString(ec, se.S)
return rv, nil
}
if de, ok := e.(*metricsql.DurationExpr); ok {
d := de.Duration(ec.Step)
dSec := float64(d) / 1000
rv := evalNumber(ec, dSec)
return rv, nil
}
return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
}
@ -490,12 +496,8 @@ func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
ecNew := ec
var offset int64
if len(re.Offset) > 0 {
var err error
offset, err = metricsql.DurationValue(re.Offset, ec.Step)
if err != nil {
return nil, err
}
if re.Offset != nil {
offset = re.Offset.Duration(ec.Step)
ecNew = newEvalConfig(ecNew)
ecNew.Start -= offset
ecNew.End -= offset
@ -544,24 +546,11 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E
func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
// TODO: determine whether to use rollupResultCacheV here.
var step int64
if len(re.Step) > 0 {
var err error
step, err = metricsql.PositiveDurationValue(re.Step, ec.Step)
if err != nil {
return nil, err
}
} else {
step := re.Step.Duration(ec.Step)
if step == 0 {
step = ec.Step
}
var window int64
if len(re.Window) > 0 {
var err error
window, err = metricsql.PositiveDurationValue(re.Window, ec.Step)
if err != nil {
return nil, err
}
}
window := re.Window.Duration(ec.Step)
ecSQ := newEvalConfig(ec)
ecSQ.Start -= window + maxSilenceInterval + step
@ -671,18 +660,11 @@ var (
)
func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) {
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) {
if me.IsEmpty() {
return evalNumber(ec, nan), nil
}
var window int64
if len(windowStr) > 0 {
var err error
window, err = metricsql.PositiveDurationValue(windowStr, ec.Step)
if err != nil {
return nil, err
}
}
window := windowExpr.Duration(ec.Step)
// Search for partial results in cache.
tssCached, start := rollupResultCacheV.Get(ec, expr, window)

View File

@ -242,6 +242,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time()[:100] offset 0", func(t *testing.T) {
t.Parallel()
q := `time()[:100] offset 0`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time() offset 1h40s0ms", func(t *testing.T) {
t.Parallel()
q := `time() offset 1h40s0ms`
@ -253,6 +264,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time() offset 3640", func(t *testing.T) {
t.Parallel()
q := `time() offset 3640`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{-2800, -2600, -2400, -2200, -2000, -1800},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time() offset -1h40s0ms", func(t *testing.T) {
t.Parallel()
q := `time() offset -1h40s0ms`
@ -371,6 +393,28 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run("1h", func(t *testing.T) {
t.Parallel()
q := `1h`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{3600, 3600, 3600, 3600, 3600, 3600},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("sum_over_time(time()[1h]) / 1h", func(t *testing.T) {
t.Parallel()
q := `sum_over_time(time()[1h]) / 1h`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{-3.5, -2.5, -1.5, -0.5, 0.5, 1.5},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time()[:100s] offset 100s", func(t *testing.T) {
t.Parallel()
q := `time()[:100s] offset 100s`
@ -393,6 +437,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time()[300:100] offset 100", func(t *testing.T) {
t.Parallel()
q := `time()[300:100] offset 100`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time()[1.5i:0.5i] offset 0.5i", func(t *testing.T) {
t.Parallel()
q := `time()[1.5i:0.5i] offset 0.5i`

View File

@ -10,13 +10,13 @@ import (
// IsRollup verifies whether s is a rollup with non-empty window.
//
// It returns the wrapped query with the corresponding window, step and offset.
func IsRollup(s string) (childQuery string, window, step, offset string) {
func IsRollup(s string) (childQuery string, window, step, offset *metricsql.DurationExpr) {
expr, err := parsePromQLWithCache(s)
if err != nil {
return
}
re, ok := expr.(*metricsql.RollupExpr)
if !ok || len(re.Window) == 0 {
if !ok || re.Window == nil {
return
}
wrappedQuery := re.Expr.AppendString(nil)
@ -27,13 +27,13 @@ func IsRollup(s string) (childQuery string, window, step, offset string) {
// wrapped into rollup.
//
// It returns the wrapped query with the corresponding window with offset.
func IsMetricSelectorWithRollup(s string) (childQuery string, window, offset string) {
func IsMetricSelectorWithRollup(s string) (childQuery string, window, offset *metricsql.DurationExpr) {
expr, err := parsePromQLWithCache(s)
if err != nil {
return
}
re, ok := expr.(*metricsql.RollupExpr)
if !ok || len(re.Window) == 0 || len(re.Step) > 0 {
if !ok || re.Window == nil || re.Step != nil {
return
}
me, ok := re.Expr.(*metricsql.MetricExpr)

View File

@ -13,6 +13,8 @@ sort: 15
* FEATURE: vmselect: embed [vmui](https://github.com/VictoriaMetrics/vmui) into a single-node VictoriaMetrics and into `vmselect` component of cluster version. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1413). The web interface is available at the following paths:
* `/vmui/` for a single-node VictoriaMetrics
* `/select/<accountID>/vmui/` for `vmselect` at cluster version of VictoriaMetrics
* FEATURE: support durations anywhere in [MetricsQL queries](https://docs.victoriametrics.com/MetricsQL.html). For example, `sum_over_time(m[1h]) / 1h` is a valid query, which is equivalent to `sum_over_time(m[1h]) / 3600`.
* FEATURE: support durations without suffxies in [MetricsQL queries](https://docs.victoriametrics.com/MetricsQL.html). For example, `rate(m[3600])` is a valid query, which is equivalent to `rate(m[1h])`.
* BUGFIX: vmagent: remove `{ %space %}` typo in `/targets` output. The typo has been introduced in v1.62.0. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1408).
* BUGFIX: vmagent: fix CSS styles on `/targets` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1422).

View File

@ -44,6 +44,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `offset` may be put anywere in the query. For instance, `sum(foo) offset 24h`.
- `offset` may be negative. For example, `q offset -1h`.
- [Range duration](https://prometheus.io/docs/prometheus/latest/querying/basics/#range-vector-selectors) and [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be fractional. For instance, `rate(node_network_receive_bytes_total[1.5m] offset 0.5d)`.
- The duration suffix is optional. The duration is in seconds if the suffix is missing. For example, `rate(m[300] offset 1800)` is equivalent to `rate(m[5m]) offset 30m`.
- The duration can be placed anywhere in the query. For example, `sum_over_time(m[1h]) / 1h` is equivalent to `sum_over_time(m[1h]) / 3600`.
- `default` binary operator. `q1 default q2` fills gaps in `q1` with the corresponding values from `q2`.
- Most aggregate functions accept arbitrary number of args. For example, `avg(q1, q2, q3)` would return the average values for every point across `q1`, `q2` and `q3`.
- `histogram_quantile` accepts optional third arg - `boundsLabel`. In this case it returns `lower` and `upper` bounds for the estimated percentile. See [this issue for details](https://github.com/prometheus/prometheus/issues/5706).

2
go.mod
View File

@ -9,7 +9,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.0.16
github.com/VictoriaMetrics/metrics v1.17.3
github.com/VictoriaMetrics/metricsql v0.15.0
github.com/VictoriaMetrics/metricsql v0.16.0
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/aws/aws-sdk-go v1.39.1
github.com/cespare/xxhash/v2 v2.1.1

4
go.sum
View File

@ -105,8 +105,8 @@ github.com/VictoriaMetrics/fasthttp v1.0.16/go.mod h1:s9o5H4T58Kt4CTrdyJp4RorBKC
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metrics v1.17.3 h1:QPUakR6JRy8BhL2C2kOgYKLuoPDwtJQ+7iKIZSjt1A4=
github.com/VictoriaMetrics/metrics v1.17.3/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metricsql v0.15.0 h1:7tveqhA0xLumXhokcmgxiUn90VlKqiVtYZQ3p6y9Fu4=
github.com/VictoriaMetrics/metricsql v0.15.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VictoriaMetrics/metricsql v0.16.0 h1:YzrMnGUs6Y6f5LxsH8eSAoik98aEzlc1TiYgOONgr3Q=
github.com/VictoriaMetrics/metricsql v0.16.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=

View File

@ -445,7 +445,12 @@ func DurationValue(s string, step int64) (int64, error) {
if len(s) == 0 {
return 0, fmt.Errorf("duration cannot be empty")
}
var d float64
// Try parsing floating-point duration
d, err := strconv.ParseFloat(s, 64)
if err == nil {
// Convert the duration to milliseconds.
return int64(d * 1000), nil
}
isMinus := false
for len(s) > 0 {
n := scanSingleDuration(s, true)

View File

@ -423,12 +423,15 @@ func (p *parser) parseSingleExpr() (Expr, error) {
}
func (p *parser) parseSingleExprWithoutRollupSuffix() (Expr, error) {
if isPositiveNumberPrefix(p.lex.Token) || isInfOrNaN(p.lex.Token) {
return p.parsePositiveNumberExpr()
if isPositiveDuration(p.lex.Token) {
return p.parsePositiveDuration()
}
if isStringPrefix(p.lex.Token) {
return p.parseStringExpr()
}
if isPositiveNumberPrefix(p.lex.Token) || isInfOrNaN(p.lex.Token) {
return p.parsePositiveNumberExpr()
}
if isIdentPrefix(p.lex.Token) {
return p.parseIdentExpr()
}
@ -1221,29 +1224,29 @@ func (lfe *labelFilterExpr) toLabelFilter() (*LabelFilter, error) {
return &lf, nil
}
func (p *parser) parseWindowAndStep() (string, string, bool, error) {
func (p *parser) parseWindowAndStep() (*DurationExpr, *DurationExpr, bool, error) {
if p.lex.Token != "[" {
return "", "", false, fmt.Errorf(`windowAndStep: unexpected token %q; want "["`, p.lex.Token)
return nil, nil, false, fmt.Errorf(`windowAndStep: unexpected token %q; want "["`, p.lex.Token)
}
err := p.lex.Next()
if err != nil {
return "", "", false, err
return nil, nil, false, err
}
var window string
var window *DurationExpr
if !strings.HasPrefix(p.lex.Token, ":") {
window, err = p.parsePositiveDuration()
if err != nil {
return "", "", false, err
return nil, nil, false, err
}
}
var step string
var step *DurationExpr
inheritStep := false
if strings.HasPrefix(p.lex.Token, ":") {
// Parse step
p.lex.Token = p.lex.Token[1:]
if p.lex.Token == "" {
if err := p.lex.Next(); err != nil {
return "", "", false, err
return nil, nil, false, err
}
if p.lex.Token == "]" {
inheritStep = true
@ -1252,63 +1255,94 @@ func (p *parser) parseWindowAndStep() (string, string, bool, error) {
if p.lex.Token != "]" {
step, err = p.parsePositiveDuration()
if err != nil {
return "", "", false, err
return nil, nil, false, err
}
}
}
if p.lex.Token != "]" {
return "", "", false, fmt.Errorf(`windowAndStep: unexpected token %q; want "]"`, p.lex.Token)
return nil, nil, false, fmt.Errorf(`windowAndStep: unexpected token %q; want "]"`, p.lex.Token)
}
if err := p.lex.Next(); err != nil {
return "", "", false, err
return nil, nil, false, err
}
return window, step, inheritStep, nil
}
func (p *parser) parseOffset() (string, error) {
func (p *parser) parseOffset() (*DurationExpr, error) {
if !isOffset(p.lex.Token) {
return "", fmt.Errorf(`offset: unexpected token %q; want "offset"`, p.lex.Token)
return nil, fmt.Errorf(`offset: unexpected token %q; want "offset"`, p.lex.Token)
}
if err := p.lex.Next(); err != nil {
return "", err
return nil, err
}
d, err := p.parseDuration()
de, err := p.parseDuration()
if err != nil {
return "", err
return nil, err
}
return d, nil
return de, nil
}
func (p *parser) parseDuration() (string, error) {
isNegative := false
if p.lex.Token == "-" {
isNegative = true
func (p *parser) parseDuration() (*DurationExpr, error) {
isNegative := p.lex.Token == "-"
if isNegative {
if err := p.lex.Next(); err != nil {
return "", err
return nil, err
}
}
if !isPositiveDuration(p.lex.Token) {
return "", fmt.Errorf(`duration: unexpected token %q; want "duration"`, p.lex.Token)
}
d := p.lex.Token
if err := p.lex.Next(); err != nil {
return "", err
de, err := p.parsePositiveDuration()
if err != nil {
return nil, err
}
if isNegative {
d = "-" + d
de.s = "-" + de.s
}
return d, nil
return de, nil
}
func (p *parser) parsePositiveDuration() (string, error) {
d, err := p.parseDuration()
func (p *parser) parsePositiveDuration() (*DurationExpr, error) {
s := p.lex.Token
if isPositiveDuration(s) {
if err := p.lex.Next(); err != nil {
return nil, err
}
} else {
if !isPositiveNumberPrefix(s) {
return nil, fmt.Errorf(`duration: unexpected token %q; want "duration"`, s)
}
// Verify the duration in seconds without explicit suffix.
if _, err := p.parsePositiveNumberExpr(); err != nil {
return nil, fmt.Errorf(`duration: parse error: %s`, err)
}
}
de := &DurationExpr{
s: s,
}
return de, nil
}
// DurationExpr contains the duration
type DurationExpr struct {
s string
}
// AppendString appends string representation of de to dst and returns the result.
func (de *DurationExpr) AppendString(dst []byte) []byte {
if de == nil {
return dst
}
return append(dst, de.s...)
}
// Duration returns the duration from de in milliseconds.
func (de *DurationExpr) Duration(step int64) int64 {
if de == nil {
return 0
}
d, err := DurationValue(de.s, step)
if err != nil {
return "", err
panic(fmt.Errorf("BUG: cannot parse duration %q: %s", de.s, err))
}
if strings.HasPrefix(d, "-") {
return "", fmt.Errorf("positiveDuration: expecting positive duration; got %q", d)
}
return d, nil
return d
}
// parseIdentExpr parses expressions starting with `ident` token.
@ -1628,17 +1662,17 @@ type RollupExpr struct {
// Window contains optional window value from square brackets
//
// For example, `http_requests_total[5m]` will have Window value `5m`.
Window string
Window *DurationExpr
// Offset contains optional value from `offset` part.
//
// For example, `foobar{baz="aa"} offset 5m` will have Offset value `5m`.
Offset string
Offset *DurationExpr
// Step contains optional step value from square brackets.
//
// For example, `foobar[1h:3m]` will have Step value '3m'.
Step string
Step *DurationExpr
// If set to true, then `foo[1h:]` would print the same
// instead of `foo[1h]`.
@ -1647,7 +1681,7 @@ type RollupExpr struct {
// ForSubquery returns true if re represents subquery.
func (re *RollupExpr) ForSubquery() bool {
return len(re.Step) > 0 || re.InheritStep
return re.Step != nil || re.InheritStep
}
// AppendString appends string representation of re to dst and returns the result.
@ -1671,22 +1705,20 @@ func (re *RollupExpr) AppendString(dst []byte) []byte {
if needParens {
dst = append(dst, ')')
}
if len(re.Window) > 0 || re.InheritStep || len(re.Step) > 0 {
if re.Window != nil || re.InheritStep || re.Step != nil {
dst = append(dst, '[')
if len(re.Window) > 0 {
dst = append(dst, re.Window...)
}
if len(re.Step) > 0 {
dst = re.Window.AppendString(dst)
if re.Step != nil {
dst = append(dst, ':')
dst = append(dst, re.Step...)
dst = re.Step.AppendString(dst)
} else if re.InheritStep {
dst = append(dst, ':')
}
dst = append(dst, ']')
}
if len(re.Offset) > 0 {
if re.Offset != nil {
dst = append(dst, " offset "...)
dst = append(dst, re.Offset...)
dst = re.Offset.AppendString(dst)
}
return dst
}

View File

@ -52,7 +52,7 @@ var transformFuncs = map[string]bool{
"label_match": true,
"label_mismatch": true,
"union": true,
"": true, // empty func is a synonim to union
"": true, // empty func is a synonym to union
"keep_last_value": true,
"keep_next_value": true,
"interpolate": true,

2
vendor/modules.txt vendored
View File

@ -21,7 +21,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.17.3
## explicit
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.15.0
# github.com/VictoriaMetrics/metricsql v0.16.0
## explicit
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop