app/vmselect/promql: fix results caching for multi-arg rollup functions such as quantile_over_time

Previosly only a single arg was taken into account, so caching didn't work properly for multi-arg rollup funcs.
This commit is contained in:
Aliaksandr Valialkin 2020-01-03 20:42:51 +02:00
parent a774120460
commit 3e09d38f29
3 changed files with 51 additions and 60 deletions

View File

@ -158,14 +158,14 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
re := &metricsql.RollupExpr{ re := &metricsql.RollupExpr{
Expr: me, Expr: me,
} }
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil) rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, e, re, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %s`, me.AppendString(nil), err) return nil, fmt.Errorf(`cannot evaluate %q: %s`, me.AppendString(nil), err)
} }
return rv, nil return rv, nil
} }
if re, ok := e.(*metricsql.RollupExpr); ok { if re, ok := e.(*metricsql.RollupExpr); ok {
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil) rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, e, re, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %s`, re.AppendString(nil), err) return nil, fmt.Errorf(`cannot evaluate %q: %s`, re.AppendString(nil), err)
} }
@ -201,7 +201,7 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
rv, err := evalRollupFunc(ec, fe.Name, rf, re, nil) rv, err := evalRollupFunc(ec, fe.Name, rf, e, re, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %s`, fe.AppendString(nil), err) return nil, fmt.Errorf(`cannot evaluate %q: %s`, fe.AppendString(nil), err)
} }
@ -222,7 +222,7 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
return nil, err return nil, err
} }
iafc := newIncrementalAggrFuncContext(ae, callbacks) iafc := newIncrementalAggrFuncContext(ae, callbacks)
return evalRollupFunc(ec, fe.Name, rf, re, iafc) return evalRollupFunc(ec, fe.Name, rf, e, re, iafc)
} }
} }
args, err := evalExprs(ec, ae.Args) args, err := evalExprs(ec, ae.Args)
@ -403,7 +403,7 @@ func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
return &reNew return &reNew
} }
func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
ecNew := ec ecNew := ec
var offset int64 var offset int64
if len(re.Offset) > 0 { if len(re.Offset) > 0 {
@ -425,7 +425,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.Ro
var rvs []*timeseries var rvs []*timeseries
var err error var err error
if me, ok := re.Expr.(*metricsql.MetricExpr); ok { if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, me, iafc, re.Window) rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, expr, me, iafc, re.Window)
} else { } else {
if iafc != nil { if iafc != nil {
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil)) logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil))
@ -450,7 +450,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.Ro
} }
func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr) ([]*timeseries, error) { func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr) ([]*timeseries, error) {
// Do not use rollupResultCacheV here, since it works only with metricsql.MetricExpr. // TODO: determine whether to use rollupResultCacheV here.
var step int64 var step int64
if len(re.Step) > 0 { if len(re.Step) > 0 {
var err error var err error
@ -558,7 +558,8 @@ var (
rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`) rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
) )
func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) { func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) {
if me.IsEmpty() { if me.IsEmpty() {
return evalNumber(ec, nan), nil return evalNumber(ec, nan), nil
} }
@ -572,7 +573,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
} }
// Search for partial results in cache. // Search for partial results in cache.
tssCached, start := rollupResultCacheV.Get(name, ec, me, iafc, window) tssCached, start := rollupResultCacheV.Get(ec, expr, window)
if start > ec.End { if start > ec.End {
// The result is fully cached. // The result is fully cached.
rollupResultCacheFullHits.Inc() rollupResultCacheFullHits.Inc()
@ -645,8 +646,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
return nil, err return nil, err
} }
tss = mergeTimeseries(tssCached, tss, start, ec) tss = mergeTimeseries(tssCached, tss, start, ec)
rollupResultCacheV.Put(name, ec, me, iafc, window, tss) rollupResultCacheV.Put(ec, expr, window, tss)
return tss, nil return tss, nil
} }

View File

@ -129,7 +129,7 @@ func ResetRollupResultCache() {
rollupResultCacheV.c.Reset() rollupResultCacheV.c.Reset()
} }
func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) (tss []*timeseries, newStart int64) { func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) {
if *disableCache || !ec.mayCache() { if *disableCache || !ec.mayCache() {
return nil, ec.Start return nil, ec.Start
} }
@ -138,7 +138,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql
bb := bbPool.Get() bb := bbPool.Get()
defer bbPool.Put(bb) defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step)
metainfoBuf := rrc.c.Get(nil, bb.B) metainfoBuf := rrc.c.Get(nil, bb.B)
if len(metainfoBuf) == 0 { if len(metainfoBuf) == 0 {
return nil, ec.Start return nil, ec.Start
@ -158,7 +158,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql
if len(compressedResultBuf.B) == 0 { if len(compressedResultBuf.B) == 0 {
mi.RemoveKey(key) mi.RemoveKey(key)
metainfoBuf = mi.Marshal(metainfoBuf[:0]) metainfoBuf = mi.Marshal(metainfoBuf[:0])
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step)
rrc.c.Set(bb.B, metainfoBuf) rrc.c.Set(bb.B, metainfoBuf)
return nil, ec.Start return nil, ec.Start
} }
@ -210,7 +210,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql
var resultBufPool bytesutil.ByteBufferPool var resultBufPool bytesutil.ByteBufferPool
func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64, tss []*timeseries) { func (rrc *rollupResultCache) Put(ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) {
if *disableCache || len(tss) == 0 || !ec.mayCache() { if *disableCache || len(tss) == 0 || !ec.mayCache() {
return return
} }
@ -261,7 +261,7 @@ func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricsql
bb.B = key.Marshal(bb.B[:0]) bb.B = key.Marshal(bb.B[:0])
rrc.c.SetBig(bb.B, compressedResultBuf.B) rrc.c.SetBig(bb.B, compressedResultBuf.B)
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step)
metainfoBuf := rrc.c.Get(nil, bb.B) metainfoBuf := rrc.c.Get(nil, bb.B)
var mi rollupResultCacheMetainfo var mi rollupResultCacheMetainfo
if len(metainfoBuf) > 0 { if len(metainfoBuf) > 0 {
@ -289,24 +289,13 @@ var (
var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total") var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
// Increment this value every time the format of the cache changes. // Increment this value every time the format of the cache changes.
const rollupResultCacheVersion = 6 const rollupResultCacheVersion = 7
func marshalRollupResultCacheKey(dst []byte, funcName string, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, step int64) []byte { func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step int64) []byte {
dst = append(dst, rollupResultCacheVersion) dst = append(dst, rollupResultCacheVersion)
if iafc == nil {
dst = append(dst, 0)
} else {
dst = append(dst, 1)
dst = iafc.ae.AppendString(dst)
}
dst = encoding.MarshalUint64(dst, uint64(len(funcName)))
dst = append(dst, funcName...)
dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step) dst = encoding.MarshalInt64(dst, step)
tfs := toTagFilters(me.LabelFilters) dst = expr.AppendString(dst)
for i := range tfs {
dst = tfs[i].Marshal(dst)
}
return dst return dst
} }

View File

@ -9,7 +9,6 @@ import (
func TestRollupResultCache(t *testing.T) { func TestRollupResultCache(t *testing.T) {
ResetRollupResultCache() ResetRollupResultCache()
funcName := "foo"
window := int64(456) window := int64(456)
ec := &EvalConfig{ ec := &EvalConfig{
Start: 1000, Start: 1000,
@ -24,15 +23,18 @@ func TestRollupResultCache(t *testing.T) {
Value: "xxx", Value: "xxx",
}}, }},
} }
iafc := &incrementalAggrFuncContext{ fe := &metricsql.FuncExpr{
ae: &metricsql.AggrFuncExpr{ Name: "foo",
Name: "foobar", Args: []metricsql.Expr{me},
}, }
ae := &metricsql.AggrFuncExpr{
Name: "foobar",
Args: []metricsql.Expr{fe},
} }
// Try obtaining an empty value. // Try obtaining an empty value.
t.Run("empty", func(t *testing.T) { t.Run("empty", func(t *testing.T) {
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tss, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != ec.Start { if newStart != ec.Start {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start) t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start)
} }
@ -42,7 +44,7 @@ func TestRollupResultCache(t *testing.T) {
}) })
// Store timeseries overlapping with start // Store timeseries overlapping with start
t.Run("start-overlap-no-iafc", func(t *testing.T) { t.Run("start-overlap-no-ae", func(t *testing.T) {
ResetRollupResultCache() ResetRollupResultCache()
tss := []*timeseries{ tss := []*timeseries{
{ {
@ -50,8 +52,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) rollupResultCacheV.Put(ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tss, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != 1400 { if newStart != 1400 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
} }
@ -63,7 +65,7 @@ func TestRollupResultCache(t *testing.T) {
} }
testTimeseriesEqual(t, tss, tssExpected) testTimeseriesEqual(t, tss, tssExpected)
}) })
t.Run("start-overlap-with-iafc", func(t *testing.T) { t.Run("start-overlap-with-ae", func(t *testing.T) {
ResetRollupResultCache() ResetRollupResultCache()
tss := []*timeseries{ tss := []*timeseries{
{ {
@ -71,8 +73,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(funcName, ec, me, iafc, window, tss) rollupResultCacheV.Put(ec, ae, window, tss)
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, iafc, window) tss, newStart := rollupResultCacheV.Get(ec, ae, window)
if newStart != 1400 { if newStart != 1400 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
} }
@ -94,8 +96,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{333, 0, 1, 2}, Values: []float64{333, 0, 1, 2},
}, },
} }
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) rollupResultCacheV.Put(ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tss, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != 1000 { if newStart != 1000 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
} }
@ -113,8 +115,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) rollupResultCacheV.Put(ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tss, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != 1000 { if newStart != 1000 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
} }
@ -132,8 +134,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) rollupResultCacheV.Put(ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tss, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != 1000 { if newStart != 1000 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
} }
@ -151,8 +153,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) rollupResultCacheV.Put(ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tss, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != 1000 { if newStart != 1000 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
} }
@ -170,8 +172,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2, 3, 4, 5, 6, 7}, Values: []float64{0, 1, 2, 3, 4, 5, 6, 7},
}, },
} }
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) rollupResultCacheV.Put(ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tss, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != 2200 { if newStart != 2200 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
} }
@ -193,8 +195,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{1, 2, 3, 4, 5, 6}, Values: []float64{1, 2, 3, 4, 5, 6},
}, },
} }
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) rollupResultCacheV.Put(ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tss, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != 2200 { if newStart != 2200 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
} }
@ -218,8 +220,8 @@ func TestRollupResultCache(t *testing.T) {
} }
tss = append(tss, ts) tss = append(tss, ts)
} }
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) rollupResultCacheV.Put(ec, fe, window, tss)
tssResult, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tssResult, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != 2200 { if newStart != 2200 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
} }
@ -247,10 +249,10 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss1) rollupResultCacheV.Put(ec, fe, window, tss1)
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss2) rollupResultCacheV.Put(ec, fe, window, tss2)
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss3) rollupResultCacheV.Put(ec, fe, window, tss3)
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) tss, newStart := rollupResultCacheV.Get(ec, fe, window)
if newStart != 1400 { if newStart != 1400 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
} }