diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index d715e7fd2..7cdb76cf1 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -158,14 +158,14 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { re := &metricsql.RollupExpr{ Expr: me, } - rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil) + rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %s`, me.AppendString(nil), err) } return rv, nil } if re, ok := e.(*metricsql.RollupExpr); ok { - rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil) + rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %s`, re.AppendString(nil), err) } @@ -201,7 +201,7 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { if err != nil { return nil, err } - rv, err := evalRollupFunc(ec, fe.Name, rf, re, nil) + rv, err := evalRollupFunc(ec, fe.Name, rf, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %s`, fe.AppendString(nil), err) } @@ -222,7 +222,7 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { return nil, err } iafc := newIncrementalAggrFuncContext(ae, callbacks) - return evalRollupFunc(ec, fe.Name, rf, re, iafc) + return evalRollupFunc(ec, fe.Name, rf, e, re, iafc) } } args, err := evalExprs(ec, ae.Args) @@ -403,7 +403,7 @@ func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr { return &reNew } -func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { +func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { ecNew := ec var offset int64 if len(re.Offset) > 0 { @@ -425,7 +425,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.Ro var rvs []*timeseries var err error if me, ok := re.Expr.(*metricsql.MetricExpr); ok { - rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, me, iafc, re.Window) + rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, expr, me, iafc, re.Window) } else { if iafc != nil { logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil)) @@ -450,7 +450,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.Ro } func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr) ([]*timeseries, error) { - // Do not use rollupResultCacheV here, since it works only with metricsql.MetricExpr. + // TODO: determine whether to use rollupResultCacheV here. var step int64 if len(re.Step) > 0 { var err error @@ -558,7 +558,8 @@ var ( rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`) ) -func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) { +func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, + expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) { if me.IsEmpty() { return evalNumber(ec, nan), nil } @@ -572,7 +573,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me } // Search for partial results in cache. - tssCached, start := rollupResultCacheV.Get(name, ec, me, iafc, window) + tssCached, start := rollupResultCacheV.Get(ec, expr, window) if start > ec.End { // The result is fully cached. rollupResultCacheFullHits.Inc() @@ -645,8 +646,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me return nil, err } tss = mergeTimeseries(tssCached, tss, start, ec) - rollupResultCacheV.Put(name, ec, me, iafc, window, tss) - + rollupResultCacheV.Put(ec, expr, window, tss) return tss, nil } diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 2090ed4e5..abe54565b 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -129,7 +129,7 @@ func ResetRollupResultCache() { rollupResultCacheV.c.Reset() } -func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) (tss []*timeseries, newStart int64) { +func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) { if *disableCache || !ec.mayCache() { return nil, ec.Start } @@ -138,7 +138,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql bb := bbPool.Get() defer bbPool.Put(bb) - bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step) + bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step) metainfoBuf := rrc.c.Get(nil, bb.B) if len(metainfoBuf) == 0 { return nil, ec.Start @@ -158,7 +158,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql if len(compressedResultBuf.B) == 0 { mi.RemoveKey(key) metainfoBuf = mi.Marshal(metainfoBuf[:0]) - bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step) + bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step) rrc.c.Set(bb.B, metainfoBuf) return nil, ec.Start } @@ -210,7 +210,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql var resultBufPool bytesutil.ByteBufferPool -func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64, tss []*timeseries) { +func (rrc *rollupResultCache) Put(ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) { if *disableCache || len(tss) == 0 || !ec.mayCache() { return } @@ -261,7 +261,7 @@ func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricsql bb.B = key.Marshal(bb.B[:0]) rrc.c.SetBig(bb.B, compressedResultBuf.B) - bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step) + bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step) metainfoBuf := rrc.c.Get(nil, bb.B) var mi rollupResultCacheMetainfo if len(metainfoBuf) > 0 { @@ -289,24 +289,13 @@ var ( var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total") // Increment this value every time the format of the cache changes. -const rollupResultCacheVersion = 6 +const rollupResultCacheVersion = 7 -func marshalRollupResultCacheKey(dst []byte, funcName string, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, step int64) []byte { +func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step int64) []byte { dst = append(dst, rollupResultCacheVersion) - if iafc == nil { - dst = append(dst, 0) - } else { - dst = append(dst, 1) - dst = iafc.ae.AppendString(dst) - } - dst = encoding.MarshalUint64(dst, uint64(len(funcName))) - dst = append(dst, funcName...) dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, step) - tfs := toTagFilters(me.LabelFilters) - for i := range tfs { - dst = tfs[i].Marshal(dst) - } + dst = expr.AppendString(dst) return dst } diff --git a/app/vmselect/promql/rollup_result_cache_test.go b/app/vmselect/promql/rollup_result_cache_test.go index 3ab7eb2aa..0ca594132 100644 --- a/app/vmselect/promql/rollup_result_cache_test.go +++ b/app/vmselect/promql/rollup_result_cache_test.go @@ -9,7 +9,6 @@ import ( func TestRollupResultCache(t *testing.T) { ResetRollupResultCache() - funcName := "foo" window := int64(456) ec := &EvalConfig{ Start: 1000, @@ -24,15 +23,18 @@ func TestRollupResultCache(t *testing.T) { Value: "xxx", }}, } - iafc := &incrementalAggrFuncContext{ - ae: &metricsql.AggrFuncExpr{ - Name: "foobar", - }, + fe := &metricsql.FuncExpr{ + Name: "foo", + Args: []metricsql.Expr{me}, + } + ae := &metricsql.AggrFuncExpr{ + Name: "foobar", + Args: []metricsql.Expr{fe}, } // Try obtaining an empty value. t.Run("empty", func(t *testing.T) { - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != ec.Start { t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start) } @@ -42,7 +44,7 @@ func TestRollupResultCache(t *testing.T) { }) // Store timeseries overlapping with start - t.Run("start-overlap-no-iafc", func(t *testing.T) { + t.Run("start-overlap-no-ae", func(t *testing.T) { ResetRollupResultCache() tss := []*timeseries{ { @@ -50,8 +52,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1400 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) } @@ -63,7 +65,7 @@ func TestRollupResultCache(t *testing.T) { } testTimeseriesEqual(t, tss, tssExpected) }) - t.Run("start-overlap-with-iafc", func(t *testing.T) { + t.Run("start-overlap-with-ae", func(t *testing.T) { ResetRollupResultCache() tss := []*timeseries{ { @@ -71,8 +73,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, iafc, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, iafc, window) + rollupResultCacheV.Put(ec, ae, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, ae, window) if newStart != 1400 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) } @@ -94,8 +96,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{333, 0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -113,8 +115,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -132,8 +134,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -151,8 +153,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -170,8 +172,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2, 3, 4, 5, 6, 7}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 2200 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) } @@ -193,8 +195,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{1, 2, 3, 4, 5, 6}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 2200 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) } @@ -218,8 +220,8 @@ func TestRollupResultCache(t *testing.T) { } tss = append(tss, ts) } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tssResult, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tssResult, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 2200 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) } @@ -247,10 +249,10 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss1) - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss2) - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss3) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss1) + rollupResultCacheV.Put(ec, fe, window, tss2) + rollupResultCacheV.Put(ec, fe, window, tss3) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1400 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) }