mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
app/vmselect/promql: fix results caching for multi-arg rollup functions such as quantile_over_time
Previosly only a single arg was taken into account, so caching didn't work properly for multi-arg rollup funcs.
This commit is contained in:
parent
3d0c7b095a
commit
9a1f6848ca
@ -164,14 +164,14 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
|
|||||||
re := &metricsql.RollupExpr{
|
re := &metricsql.RollupExpr{
|
||||||
Expr: me,
|
Expr: me,
|
||||||
}
|
}
|
||||||
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil)
|
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, e, re, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(`cannot evaluate %q: %s`, me.AppendString(nil), err)
|
return nil, fmt.Errorf(`cannot evaluate %q: %s`, me.AppendString(nil), err)
|
||||||
}
|
}
|
||||||
return rv, nil
|
return rv, nil
|
||||||
}
|
}
|
||||||
if re, ok := e.(*metricsql.RollupExpr); ok {
|
if re, ok := e.(*metricsql.RollupExpr); ok {
|
||||||
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil)
|
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, e, re, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(`cannot evaluate %q: %s`, re.AppendString(nil), err)
|
return nil, fmt.Errorf(`cannot evaluate %q: %s`, re.AppendString(nil), err)
|
||||||
}
|
}
|
||||||
@ -207,7 +207,7 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
rv, err := evalRollupFunc(ec, fe.Name, rf, re, nil)
|
rv, err := evalRollupFunc(ec, fe.Name, rf, e, re, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(`cannot evaluate %q: %s`, fe.AppendString(nil), err)
|
return nil, fmt.Errorf(`cannot evaluate %q: %s`, fe.AppendString(nil), err)
|
||||||
}
|
}
|
||||||
@ -228,7 +228,7 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
iafc := newIncrementalAggrFuncContext(ae, callbacks)
|
iafc := newIncrementalAggrFuncContext(ae, callbacks)
|
||||||
return evalRollupFunc(ec, fe.Name, rf, re, iafc)
|
return evalRollupFunc(ec, fe.Name, rf, e, re, iafc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
args, err := evalExprs(ec, ae.Args)
|
args, err := evalExprs(ec, ae.Args)
|
||||||
@ -409,7 +409,7 @@ func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
|
|||||||
return &reNew
|
return &reNew
|
||||||
}
|
}
|
||||||
|
|
||||||
func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
|
func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
|
||||||
ecNew := ec
|
ecNew := ec
|
||||||
var offset int64
|
var offset int64
|
||||||
if len(re.Offset) > 0 {
|
if len(re.Offset) > 0 {
|
||||||
@ -431,7 +431,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.Ro
|
|||||||
var rvs []*timeseries
|
var rvs []*timeseries
|
||||||
var err error
|
var err error
|
||||||
if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
|
if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
|
||||||
rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, me, iafc, re.Window)
|
rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, expr, me, iafc, re.Window)
|
||||||
} else {
|
} else {
|
||||||
if iafc != nil {
|
if iafc != nil {
|
||||||
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil))
|
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil))
|
||||||
@ -456,7 +456,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.Ro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr) ([]*timeseries, error) {
|
func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr) ([]*timeseries, error) {
|
||||||
// Do not use rollupResultCacheV here, since it works only with metricsql.MetricExpr.
|
// TODO: determine whether to use rollupResultCacheV here.
|
||||||
var step int64
|
var step int64
|
||||||
if len(re.Step) > 0 {
|
if len(re.Step) > 0 {
|
||||||
var err error
|
var err error
|
||||||
@ -564,7 +564,8 @@ var (
|
|||||||
rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
|
rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
|
||||||
)
|
)
|
||||||
|
|
||||||
func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) {
|
func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
|
||||||
|
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) {
|
||||||
if me.IsEmpty() {
|
if me.IsEmpty() {
|
||||||
return evalNumber(ec, nan), nil
|
return evalNumber(ec, nan), nil
|
||||||
}
|
}
|
||||||
@ -578,7 +579,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Search for partial results in cache.
|
// Search for partial results in cache.
|
||||||
tssCached, start := rollupResultCacheV.Get(name, ec, me, iafc, window)
|
tssCached, start := rollupResultCacheV.Get(ec, expr, window)
|
||||||
if start > ec.End {
|
if start > ec.End {
|
||||||
// The result is fully cached.
|
// The result is fully cached.
|
||||||
rollupResultCacheFullHits.Inc()
|
rollupResultCacheFullHits.Inc()
|
||||||
@ -657,7 +658,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
|
|||||||
}
|
}
|
||||||
tss = mergeTimeseries(tssCached, tss, start, ec)
|
tss = mergeTimeseries(tssCached, tss, start, ec)
|
||||||
if !isPartial {
|
if !isPartial {
|
||||||
rollupResultCacheV.Put(name, ec, me, iafc, window, tss)
|
rollupResultCacheV.Put(ec, expr, window, tss)
|
||||||
}
|
}
|
||||||
return tss, nil
|
return tss, nil
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,7 @@ func ResetRollupResultCache() {
|
|||||||
rollupResultCacheV.c.Reset()
|
rollupResultCacheV.c.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) (tss []*timeseries, newStart int64) {
|
func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) {
|
||||||
if *disableCache || !ec.mayCache() {
|
if *disableCache || !ec.mayCache() {
|
||||||
return nil, ec.Start
|
return nil, ec.Start
|
||||||
}
|
}
|
||||||
@ -143,7 +143,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql
|
|||||||
bb := bbPool.Get()
|
bb := bbPool.Get()
|
||||||
defer bbPool.Put(bb)
|
defer bbPool.Put(bb)
|
||||||
|
|
||||||
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, ec.AuthToken, me, iafc, window, ec.Step)
|
bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step)
|
||||||
metainfoBuf := rrc.c.Get(nil, bb.B)
|
metainfoBuf := rrc.c.Get(nil, bb.B)
|
||||||
if len(metainfoBuf) == 0 {
|
if len(metainfoBuf) == 0 {
|
||||||
return nil, ec.Start
|
return nil, ec.Start
|
||||||
@ -163,7 +163,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql
|
|||||||
if len(compressedResultBuf.B) == 0 {
|
if len(compressedResultBuf.B) == 0 {
|
||||||
mi.RemoveKey(key)
|
mi.RemoveKey(key)
|
||||||
metainfoBuf = mi.Marshal(metainfoBuf[:0])
|
metainfoBuf = mi.Marshal(metainfoBuf[:0])
|
||||||
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, ec.AuthToken, me, iafc, window, ec.Step)
|
bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step)
|
||||||
rrc.c.Set(bb.B, metainfoBuf)
|
rrc.c.Set(bb.B, metainfoBuf)
|
||||||
return nil, ec.Start
|
return nil, ec.Start
|
||||||
}
|
}
|
||||||
@ -215,7 +215,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricsql
|
|||||||
|
|
||||||
var resultBufPool bytesutil.ByteBufferPool
|
var resultBufPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64, tss []*timeseries) {
|
func (rrc *rollupResultCache) Put(ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) {
|
||||||
if *disableCache || len(tss) == 0 || !ec.mayCache() {
|
if *disableCache || len(tss) == 0 || !ec.mayCache() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -266,7 +266,7 @@ func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricsql
|
|||||||
bb.B = key.Marshal(bb.B[:0])
|
bb.B = key.Marshal(bb.B[:0])
|
||||||
rrc.c.SetBig(bb.B, compressedResultBuf.B)
|
rrc.c.SetBig(bb.B, compressedResultBuf.B)
|
||||||
|
|
||||||
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, ec.AuthToken, me, iafc, window, ec.Step)
|
bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step)
|
||||||
metainfoBuf := rrc.c.Get(nil, bb.B)
|
metainfoBuf := rrc.c.Get(nil, bb.B)
|
||||||
var mi rollupResultCacheMetainfo
|
var mi rollupResultCacheMetainfo
|
||||||
if len(metainfoBuf) > 0 {
|
if len(metainfoBuf) > 0 {
|
||||||
@ -294,26 +294,15 @@ var (
|
|||||||
var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
|
var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
|
||||||
|
|
||||||
// Increment this value every time the format of the cache changes.
|
// Increment this value every time the format of the cache changes.
|
||||||
const rollupResultCacheVersion = 6
|
const rollupResultCacheVersion = 7
|
||||||
|
|
||||||
func marshalRollupResultCacheKey(dst []byte, funcName string, at *auth.Token, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, step int64) []byte {
|
func marshalRollupResultCacheKey(dst []byte, at *auth.Token, expr metricsql.Expr, window, step int64) []byte {
|
||||||
dst = append(dst, rollupResultCacheVersion)
|
dst = append(dst, rollupResultCacheVersion)
|
||||||
if iafc == nil {
|
|
||||||
dst = append(dst, 0)
|
|
||||||
} else {
|
|
||||||
dst = append(dst, 1)
|
|
||||||
dst = iafc.ae.AppendString(dst)
|
|
||||||
}
|
|
||||||
dst = encoding.MarshalUint32(dst, at.AccountID)
|
dst = encoding.MarshalUint32(dst, at.AccountID)
|
||||||
dst = encoding.MarshalUint32(dst, at.ProjectID)
|
dst = encoding.MarshalUint32(dst, at.ProjectID)
|
||||||
dst = encoding.MarshalUint64(dst, uint64(len(funcName)))
|
|
||||||
dst = append(dst, funcName...)
|
|
||||||
dst = encoding.MarshalInt64(dst, window)
|
dst = encoding.MarshalInt64(dst, window)
|
||||||
dst = encoding.MarshalInt64(dst, step)
|
dst = encoding.MarshalInt64(dst, step)
|
||||||
tfs := toTagFilters(me.LabelFilters)
|
dst = expr.AppendString(dst)
|
||||||
for i := range tfs {
|
|
||||||
dst = tfs[i].Marshal(dst)
|
|
||||||
}
|
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,6 @@ import (
|
|||||||
|
|
||||||
func TestRollupResultCache(t *testing.T) {
|
func TestRollupResultCache(t *testing.T) {
|
||||||
ResetRollupResultCache()
|
ResetRollupResultCache()
|
||||||
funcName := "foo"
|
|
||||||
window := int64(456)
|
window := int64(456)
|
||||||
ec := &EvalConfig{
|
ec := &EvalConfig{
|
||||||
Start: 1000,
|
Start: 1000,
|
||||||
@ -30,15 +29,18 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Value: "xxx",
|
Value: "xxx",
|
||||||
}},
|
}},
|
||||||
}
|
}
|
||||||
iafc := &incrementalAggrFuncContext{
|
fe := &metricsql.FuncExpr{
|
||||||
ae: &metricsql.AggrFuncExpr{
|
Name: "foo",
|
||||||
|
Args: []metricsql.Expr{me},
|
||||||
|
}
|
||||||
|
ae := &metricsql.AggrFuncExpr{
|
||||||
Name: "foobar",
|
Name: "foobar",
|
||||||
},
|
Args: []metricsql.Expr{fe},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try obtaining an empty value.
|
// Try obtaining an empty value.
|
||||||
t.Run("empty", func(t *testing.T) {
|
t.Run("empty", func(t *testing.T) {
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tss, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != ec.Start {
|
if newStart != ec.Start {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start)
|
||||||
}
|
}
|
||||||
@ -48,7 +50,7 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Store timeseries overlapping with start
|
// Store timeseries overlapping with start
|
||||||
t.Run("start-overlap-no-iafc", func(t *testing.T) {
|
t.Run("start-overlap-no-ae", func(t *testing.T) {
|
||||||
ResetRollupResultCache()
|
ResetRollupResultCache()
|
||||||
tss := []*timeseries{
|
tss := []*timeseries{
|
||||||
{
|
{
|
||||||
@ -56,8 +58,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Values: []float64{0, 1, 2},
|
Values: []float64{0, 1, 2},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
rollupResultCacheV.Put(ec, fe, window, tss)
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tss, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != 1400 {
|
if newStart != 1400 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
|
||||||
}
|
}
|
||||||
@ -69,7 +71,7 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
testTimeseriesEqual(t, tss, tssExpected)
|
testTimeseriesEqual(t, tss, tssExpected)
|
||||||
})
|
})
|
||||||
t.Run("start-overlap-with-iafc", func(t *testing.T) {
|
t.Run("start-overlap-with-ae", func(t *testing.T) {
|
||||||
ResetRollupResultCache()
|
ResetRollupResultCache()
|
||||||
tss := []*timeseries{
|
tss := []*timeseries{
|
||||||
{
|
{
|
||||||
@ -77,8 +79,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Values: []float64{0, 1, 2},
|
Values: []float64{0, 1, 2},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, iafc, window, tss)
|
rollupResultCacheV.Put(ec, ae, window, tss)
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, iafc, window)
|
tss, newStart := rollupResultCacheV.Get(ec, ae, window)
|
||||||
if newStart != 1400 {
|
if newStart != 1400 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
|
||||||
}
|
}
|
||||||
@ -100,8 +102,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Values: []float64{333, 0, 1, 2},
|
Values: []float64{333, 0, 1, 2},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
rollupResultCacheV.Put(ec, fe, window, tss)
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tss, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != 1000 {
|
if newStart != 1000 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
||||||
}
|
}
|
||||||
@ -119,8 +121,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Values: []float64{0, 1, 2},
|
Values: []float64{0, 1, 2},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
rollupResultCacheV.Put(ec, fe, window, tss)
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tss, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != 1000 {
|
if newStart != 1000 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
||||||
}
|
}
|
||||||
@ -138,8 +140,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Values: []float64{0, 1, 2},
|
Values: []float64{0, 1, 2},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
rollupResultCacheV.Put(ec, fe, window, tss)
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tss, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != 1000 {
|
if newStart != 1000 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
||||||
}
|
}
|
||||||
@ -157,8 +159,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Values: []float64{0, 1, 2},
|
Values: []float64{0, 1, 2},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
rollupResultCacheV.Put(ec, fe, window, tss)
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tss, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != 1000 {
|
if newStart != 1000 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
||||||
}
|
}
|
||||||
@ -176,8 +178,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Values: []float64{0, 1, 2, 3, 4, 5, 6, 7},
|
Values: []float64{0, 1, 2, 3, 4, 5, 6, 7},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
rollupResultCacheV.Put(ec, fe, window, tss)
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tss, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != 2200 {
|
if newStart != 2200 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
|
||||||
}
|
}
|
||||||
@ -199,8 +201,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Values: []float64{1, 2, 3, 4, 5, 6},
|
Values: []float64{1, 2, 3, 4, 5, 6},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
rollupResultCacheV.Put(ec, fe, window, tss)
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tss, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != 2200 {
|
if newStart != 2200 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
|
||||||
}
|
}
|
||||||
@ -224,8 +226,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
tss = append(tss, ts)
|
tss = append(tss, ts)
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
rollupResultCacheV.Put(ec, fe, window, tss)
|
||||||
tssResult, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tssResult, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != 2200 {
|
if newStart != 2200 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
|
||||||
}
|
}
|
||||||
@ -253,10 +255,10 @@ func TestRollupResultCache(t *testing.T) {
|
|||||||
Values: []float64{0, 1, 2},
|
Values: []float64{0, 1, 2},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss1)
|
rollupResultCacheV.Put(ec, fe, window, tss1)
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss2)
|
rollupResultCacheV.Put(ec, fe, window, tss2)
|
||||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss3)
|
rollupResultCacheV.Put(ec, fe, window, tss3)
|
||||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
tss, newStart := rollupResultCacheV.Get(ec, fe, window)
|
||||||
if newStart != 1400 {
|
if newStart != 1400 {
|
||||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
|
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user