app/vmselect/promql: support for sum(x) by (y) limit N syntax in order to limit the number of output time series after aggregation

This commit is contained in:
Aliaksandr Valialkin 2020-05-12 19:06:54 +03:00
parent 0a134ace63
commit 574289c3fb
10 changed files with 84 additions and 28 deletions

View File

@ -64,7 +64,7 @@ func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc {
if err := expectTransformArgsNum(args, 1); err != nil {
return nil, err
}
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, false)
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, false)
}
}
@ -80,7 +80,7 @@ func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.Modifie
}
}
func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeseries, modifier *metricsql.ModifierExpr, keepOriginal bool) ([]*timeseries, error) {
func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeseries, modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) {
arg := copyTimeseriesMetricNames(argOrig)
// Perform grouping.
@ -92,7 +92,13 @@ func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeserie
if keepOriginal {
ts = argOrig[i]
}
m[string(bb.B)] = append(m[string(bb.B)], ts)
tss := m[string(bb.B)]
if tss == nil && maxSeries > 0 && len(m) >= maxSeries {
// We already reached time series limit after grouping. Skip other time series.
continue
}
tss = append(tss, ts)
m[string(bb.B)] = tss
}
bbPool.Put(bb)
@ -441,7 +447,7 @@ func aggrFuncCountValues(afa *aggrFuncArg) ([]*timeseries, error) {
}
return rvs
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, false)
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, false)
}
func newAggrFuncTopK(isReverse bool) aggrFunc {
@ -468,7 +474,7 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
}
return removeNaNs(tss)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true)
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
}
@ -512,7 +518,7 @@ func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggr
}
return removeNaNs(tss)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true)
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
}
@ -618,7 +624,7 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
}
return tss
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true)
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
func aggrFuncQuantile(afa *aggrFuncArg) ([]*timeseries, error) {
@ -631,7 +637,7 @@ func aggrFuncQuantile(afa *aggrFuncArg) ([]*timeseries, error) {
return nil, err
}
afe := newAggrQuantileFunc(phis)
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, false)
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, false)
}
func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) {
@ -641,7 +647,7 @@ func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) {
}
phis := evalNumber(afa.ec, 0.5)[0].Values
afe := newAggrQuantileFunc(phis)
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, false)
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, false)
}
func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries {

View File

@ -81,6 +81,10 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries, workerI
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
iac := m[string(bb.B)]
if iac == nil {
if iafc.ae.Limit > 0 && len(m) >= iafc.ae.Limit {
// Skip this time series, since the limit on the number of output time series has been already reached.
return
}
tsAggr := &timeseries{
Values: make([]float64, len(ts.Values)),
Timestamps: ts.Timestamps,
@ -106,6 +110,10 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
for k, iac := range m {
iacGlobal := mGlobal[k]
if iacGlobal == nil {
if iafc.ae.Limit > 0 && len(mGlobal) >= iafc.ae.Limit {
// Skip this time series, since the limit on the number of output time series has been already reached.
continue
}
mGlobal[k] = iac
continue
}

View File

@ -663,12 +663,17 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
pointsPerTimeseries := 1 + (ec.End-ec.Start)/ec.Step
timeseriesLen := rssLen
if iafc != nil {
// Incremental aggregates require hold only GOMAXPROCS timeseries in memory.
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
timeseriesLen = runtime.GOMAXPROCS(-1)
if iafc.ae.Modifier.Op != "" {
// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
// since each group can have own set of time series in memory.
timeseriesLen *= 1000
if iafc.ae.Limit > 0 {
// There is an explicit limit on the number of output time series.
timeseriesLen *= iafc.ae.Limit
} else {
// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
// since each group can have own set of time series in memory.
timeseriesLen *= 1000
}
}
// The maximum number of output time series is limited by rssLen.
if timeseriesLen > rssLen {

View File

@ -3496,6 +3496,21 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`sum(multi-vector) by (known-tag) limit 1`, func(t *testing.T) {
t.Parallel()
q := `sum(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss")) by (foo) limit 1`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 10, 10, 10, 10, 10},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum(multi-vector) by (known-tags)`, func(t *testing.T) {
t.Parallel()
q := `sum(label_set(10, "foo", "bar", "baz", "sss", "x", "y") or label_set(time()/100, "baz", "sss", "foo", "bar")) by (foo, baz, foo)`

View File

@ -22,6 +22,8 @@ Feel free [filing a feature request](https://github.com/VictoriaMetrics/Victoria
This functionality can be tried at [an editable Grafana dashboard](http://play-grafana.victoriametrics.com:3000/d/4ome8yJmz/node-exporter-on-victoriametrics-demo).
- [`WITH` templates](https://play.victoriametrics.com/promql/expand-with-exprs). This feature simplifies writing and managing complex queries. Go to [`WITH` templates playground](https://victoriametrics.com/promql/expand-with-exprs) and try it.
- Aggregate functions support optional `limit N` suffix in order to limit the number of output series. For example, `sum(x) by (y) limit 10` limits
the number of output time series after the aggregation to 10. All the other time series are dropped.
- Metric names and metric labels may contain escaped chars. For instance, `foo\-bar{baz\=aa="b"}` is valid expression. It returns time series with name `foo-bar` containing label `baz=aa` with value `b`. Additionally, `\xXX` escape sequence is supported, where `XX` is hexadecimal representation of escaped char.
- `offset`, range duration and step value for range vector may refer to the current step aka `$__interval` value from Grafana.
For instance, `rate(metric[10i] offset 5i)` would return per-second rate over a range covering 10 previous steps with the offset of 5 steps.

2
go.mod
View File

@ -9,7 +9,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.0.1
github.com/VictoriaMetrics/metrics v1.11.2
github.com/VictoriaMetrics/metricsql v0.1.0
github.com/VictoriaMetrics/metricsql v0.2.0
github.com/aws/aws-sdk-go v1.30.25
github.com/cespare/xxhash/v2 v2.1.1
github.com/golang/protobuf v1.4.1 // indirect

4
go.sum
View File

@ -45,8 +45,8 @@ github.com/VictoriaMetrics/fasthttp v1.0.1 h1:I7YdbswTIW63WxoFoUOSNxeOEGB46rdKUL
github.com/VictoriaMetrics/fasthttp v1.0.1/go.mod h1:BqgsieH90PR7x97c89j+eqZDloKkDhAEQTwhLw6jw/4=
github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/tGoxihHvx8=
github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
github.com/VictoriaMetrics/metricsql v0.1.0 h1:IoyG84PCwFY15rNuxpr2nQ+YZBYIhnd7zTiaGo5BNpc=
github.com/VictoriaMetrics/metricsql v0.1.0/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
github.com/VictoriaMetrics/metricsql v0.2.0 h1:tGWPFSCqDRKYLuc99lNmxa0md9uyWHVycbumGLAM4V0=
github.com/VictoriaMetrics/metricsql v0.2.0/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/aws/aws-sdk-go v1.30.25 h1:89NXJkfpjnMEnsxkP8MVX+LDsoiLCSqevraLb5y4Mjk=

View File

@ -1,7 +1,5 @@
[![Build Status](https://github.com/VictoriaMetrics/metricsql/workflows/main/badge.svg)](https://github.com/VictoriaMetrics/metricsql/actions)
[![GoDoc](https://godoc.org/github.com/VictoriaMetrics/metricsql?status.svg)](http://godoc.org/github.com/VictoriaMetrics/metricsql)
[![Go Report](https://goreportcard.com/badge/github.com/VictoriaMetrics/metricsql)](https://goreportcard.com/report/github.com/VictoriaMetrics/metricsql)
[![codecov](https://codecov.io/gh/VictoriaMetrics/metricsql/branch/master/graph/badge.svg)](https://codecov.io/gh/VictoriaMetrics/metricsql)
# metricsql

View File

@ -535,12 +535,10 @@ func (p *parser) parseAggrFuncExpr() (*AggrFuncExpr, error) {
if isIdentPrefix(p.lex.Token) {
goto funcPrefixLabel
}
switch p.lex.Token {
case "(":
if p.lex.Token == "(" {
goto funcArgsLabel
default:
return nil, fmt.Errorf(`AggrFuncExpr: unexpected token %q; want "("`, p.lex.Token)
}
return nil, fmt.Errorf(`AggrFuncExpr: unexpected token %q; want "("`, p.lex.Token)
funcPrefixLabel:
{
@ -550,7 +548,6 @@ funcPrefixLabel:
if err := p.parseModifierExpr(&ae.Modifier); err != nil {
return nil, err
}
goto funcArgsLabel
}
funcArgsLabel:
@ -562,11 +559,25 @@ funcArgsLabel:
ae.Args = args
// Verify whether func suffix exists.
if ae.Modifier.Op != "" || !isAggrFuncModifier(p.lex.Token) {
return &ae, nil
if ae.Modifier.Op == "" && isAggrFuncModifier(p.lex.Token) {
if err := p.parseModifierExpr(&ae.Modifier); err != nil {
return nil, err
}
}
if err := p.parseModifierExpr(&ae.Modifier); err != nil {
return nil, err
// Check for optional limit.
if strings.ToLower(p.lex.Token) == "limit" {
if err := p.lex.Next(); err != nil {
return nil, err
}
limit, err := strconv.Atoi(p.lex.Token)
if err != nil {
return nil, fmt.Errorf("cannot parse limit %q: %s", p.lex.Token, err)
}
if err := p.lex.Next(); err != nil {
return nil, err
}
ae.Limit = limit
}
return &ae, nil
}
@ -640,6 +651,7 @@ func expandWithExpr(was []*withArgExpr, e Expr) (Expr, error) {
Name: t.Name,
Args: args,
Modifier: t.Modifier,
Limit: t.Limit,
}
ae.Modifier.Args = modifierArgs
return ae, nil
@ -1495,6 +1507,12 @@ type AggrFuncExpr struct {
// Modifier is optional modifier such as `by (...)` or `without (...)`.
Modifier ModifierExpr
// Optional limit for the number of output time series.
// This is MetricsQL extension.
//
// Example: `sum(...) by (...) limit 10` would return maximum 10 time series.
Limit int
}
// AppendString appends string representation of ae to dst and returns the result.
@ -1505,6 +1523,10 @@ func (ae *AggrFuncExpr) AppendString(dst []byte) []byte {
dst = append(dst, ' ')
dst = ae.Modifier.AppendString(dst)
}
if ae.Limit > 0 {
dst = append(dst, " limit "...)
dst = strconv.AppendInt(dst, int64(ae.Limit), 10)
}
return dst
}

2
vendor/modules.txt vendored
View File

@ -18,7 +18,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil
github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.11.2
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.1.0
# github.com/VictoriaMetrics/metricsql v0.2.0
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop
# github.com/aws/aws-sdk-go v1.30.25