diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 6430b0ab0e..5e822b9468 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -164,14 +164,14 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { re := &metricsql.RollupExpr{ Expr: me, } - rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil) + rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %s`, me.AppendString(nil), err) } return rv, nil } if re, ok := e.(*metricsql.RollupExpr); ok { - rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil) + rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %s`, re.AppendString(nil), err) } @@ -207,7 +207,7 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { if err != nil { return nil, err } - rv, err := evalRollupFunc(ec, fe.Name, rf, re, nil) + rv, err := evalRollupFunc(ec, fe.Name, rf, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %s`, fe.AppendString(nil), err) } @@ -228,7 +228,7 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { return nil, err } iafc := newIncrementalAggrFuncContext(ae, callbacks) - return evalRollupFunc(ec, fe.Name, rf, re, iafc) + return evalRollupFunc(ec, fe.Name, rf, e, re, iafc) } } args, err := evalExprs(ec, ae.Args) @@ -409,7 +409,7 @@ func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr { return &reNew } -func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { +func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { ecNew := ec var offset int64 if len(re.Offset) > 0 { @@ -431,7 +431,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.Ro var rvs []*timeseries var err error if me, ok := re.Expr.(*metricsql.MetricExpr); ok { - rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, me, iafc, re.Window) + rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, expr, me, iafc, re.Window) } else { if iafc != nil { logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil)) @@ -456,7 +456,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.Ro } func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr) ([]*timeseries, error) { - // Do not use rollupResultCacheV here, since it works only with metricsql.MetricExpr. + // TODO: determine whether to use rollupResultCacheV here. var step int64 if len(re.Step) > 0 { var err error @@ -564,7 +564,8 @@ var ( rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`) ) -func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) { +func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, + expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) { if me.IsEmpty() { return evalNumber(ec, nan), nil } @@ -578,7 +579,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me } // Search for partial results in cache. - tssCached, start := rollupResultCacheV.Get(name, ec, me, iafc, window) + tssCached, start := rollupResultCacheV.Get(ec, expr, window) if start > ec.End { // The result is fully cached. rollupResultCacheFullHits.Inc() @@ -657,7 +658,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me } tss = mergeTimeseries(tssCached, tss, start, ec) if !isPartial { - rollupResultCacheV.Put(name, ec, me, iafc, window, tss) + rollupResultCacheV.Put(ec, expr, window, tss) } return tss, nil } diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index ea8caf7e93..41fb23d113 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -134,7 +134,7 @@ func ResetRollupResultCache() { rollupResultCacheV.c.Reset() } -func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) (tss []*timeseries, newStart int64) { +func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) { if *disableCache || !ec.mayCache() { return nil, ec.Start } @@ -143,7 +143,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql bb := bbPool.Get() defer bbPool.Put(bb) - bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, ec.AuthToken, me, iafc, window, ec.Step) + bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step) metainfoBuf := rrc.c.Get(nil, bb.B) if len(metainfoBuf) == 0 { return nil, ec.Start @@ -163,7 +163,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql if len(compressedResultBuf.B) == 0 { mi.RemoveKey(key) metainfoBuf = mi.Marshal(metainfoBuf[:0]) - bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, ec.AuthToken, me, iafc, window, ec.Step) + bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step) rrc.c.Set(bb.B, metainfoBuf) return nil, ec.Start } @@ -215,7 +215,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql var resultBufPool bytesutil.ByteBufferPool -func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64, tss []*timeseries) { +func (rrc *rollupResultCache) Put(ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) { if *disableCache || len(tss) == 0 || !ec.mayCache() { return } @@ -266,7 +266,7 @@ func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricsql bb.B = key.Marshal(bb.B[:0]) rrc.c.SetBig(bb.B, compressedResultBuf.B) - bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, ec.AuthToken, me, iafc, window, ec.Step) + bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step) metainfoBuf := rrc.c.Get(nil, bb.B) var mi rollupResultCacheMetainfo if len(metainfoBuf) > 0 { @@ -294,26 +294,15 @@ var ( var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total") // Increment this value every time the format of the cache changes. -const rollupResultCacheVersion = 6 +const rollupResultCacheVersion = 7 -func marshalRollupResultCacheKey(dst []byte, funcName string, at *auth.Token, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, step int64) []byte { +func marshalRollupResultCacheKey(dst []byte, at *auth.Token, expr metricsql.Expr, window, step int64) []byte { dst = append(dst, rollupResultCacheVersion) - if iafc == nil { - dst = append(dst, 0) - } else { - dst = append(dst, 1) - dst = iafc.ae.AppendString(dst) - } dst = encoding.MarshalUint32(dst, at.AccountID) dst = encoding.MarshalUint32(dst, at.ProjectID) - dst = encoding.MarshalUint64(dst, uint64(len(funcName))) - dst = append(dst, funcName...) dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, step) - tfs := toTagFilters(me.LabelFilters) - for i := range tfs { - dst = tfs[i].Marshal(dst) - } + dst = expr.AppendString(dst) return dst } diff --git a/app/vmselect/promql/rollup_result_cache_test.go b/app/vmselect/promql/rollup_result_cache_test.go index c52be77d5a..20b7cff103 100644 --- a/app/vmselect/promql/rollup_result_cache_test.go +++ b/app/vmselect/promql/rollup_result_cache_test.go @@ -10,7 +10,6 @@ import ( func TestRollupResultCache(t *testing.T) { ResetRollupResultCache() - funcName := "foo" window := int64(456) ec := &EvalConfig{ Start: 1000, @@ -30,15 +29,18 @@ func TestRollupResultCache(t *testing.T) { Value: "xxx", }}, } - iafc := &incrementalAggrFuncContext{ - ae: &metricsql.AggrFuncExpr{ - Name: "foobar", - }, + fe := &metricsql.FuncExpr{ + Name: "foo", + Args: []metricsql.Expr{me}, + } + ae := &metricsql.AggrFuncExpr{ + Name: "foobar", + Args: []metricsql.Expr{fe}, } // Try obtaining an empty value. t.Run("empty", func(t *testing.T) { - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != ec.Start { t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start) } @@ -48,7 +50,7 @@ func TestRollupResultCache(t *testing.T) { }) // Store timeseries overlapping with start - t.Run("start-overlap-no-iafc", func(t *testing.T) { + t.Run("start-overlap-no-ae", func(t *testing.T) { ResetRollupResultCache() tss := []*timeseries{ { @@ -56,8 +58,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1400 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) } @@ -69,7 +71,7 @@ func TestRollupResultCache(t *testing.T) { } testTimeseriesEqual(t, tss, tssExpected) }) - t.Run("start-overlap-with-iafc", func(t *testing.T) { + t.Run("start-overlap-with-ae", func(t *testing.T) { ResetRollupResultCache() tss := []*timeseries{ { @@ -77,8 +79,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, iafc, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, iafc, window) + rollupResultCacheV.Put(ec, ae, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, ae, window) if newStart != 1400 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) } @@ -100,8 +102,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{333, 0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -119,8 +121,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -138,8 +140,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -157,8 +159,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1000 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) } @@ -176,8 +178,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2, 3, 4, 5, 6, 7}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 2200 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) } @@ -199,8 +201,8 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{1, 2, 3, 4, 5, 6}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 2200 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) } @@ -224,8 +226,8 @@ func TestRollupResultCache(t *testing.T) { } tss = append(tss, ts) } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss) - tssResult, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss) + tssResult, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 2200 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) } @@ -253,10 +255,10 @@ func TestRollupResultCache(t *testing.T) { Values: []float64{0, 1, 2}, }, } - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss1) - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss2) - rollupResultCacheV.Put(funcName, ec, me, nil, window, tss3) - tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window) + rollupResultCacheV.Put(ec, fe, window, tss1) + rollupResultCacheV.Put(ec, fe, window, tss2) + rollupResultCacheV.Put(ec, fe, window, tss3) + tss, newStart := rollupResultCacheV.Get(ec, fe, window) if newStart != 1400 { t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) }