app/vmselect/promql: optimize repeated SLI-like instant queries with lookbehind windows >= 1d

Repeated instant queries with long lookbehind windows, which contain one of the following rollup functions,
are optimized via partial result caching:

- sum_over_time()
- count_over_time()
- avg_over_time()
- increase()
- rate()

The basic idea of optimization is to calculate

  rf(m[d] @ t)

as

  rf(m[offset] @ t) + rf(m[d] @ (t-offset)) - rf(m[offset] @ (t-d))

where rf(m[d] @ (t-offset)) is cached query result, which was calculated previously

The offset may be in the range of up to 1 hour.
This commit is contained in:
Aliaksandr Valialkin 2023-10-31 19:24:18 +01:00
parent c96fc05f3e
commit 41a0fdaf39
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
6 changed files with 614 additions and 134 deletions

View File

@ -79,6 +79,13 @@ type incrementalAggrFuncContext struct {
callbacks *incrementalAggrFuncCallbacks callbacks *incrementalAggrFuncCallbacks
} }
func (iafc *incrementalAggrFuncContext) resetState() {
byWorkerID := iafc.byWorkerID
for i := range byWorkerID {
byWorkerID[i].m = make(map[string]*incrementalAggrContext, len(byWorkerID[i].m))
}
}
func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext { func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext {
return &incrementalAggrFuncContext{ return &incrementalAggrFuncContext{
ae: ae, ae: ae,
@ -154,6 +161,8 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
finalizeAggrFunc(iac) finalizeAggrFunc(iac)
tss = append(tss, iac.ts) tss = append(tss, iac.ts)
} }
// reset iafc state, so it could be re-used
iafc.resetState()
return tss return tss
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
@ -61,7 +62,7 @@ func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error {
// AdjustStartEnd adjusts start and end values, so response caching may be enabled. // AdjustStartEnd adjusts start and end values, so response caching may be enabled.
// //
// See EvalConfig.mayCache for details. // See EvalConfig.mayCache() for details.
func AdjustStartEnd(start, end, step int64) (int64, int64) { func AdjustStartEnd(start, end, step int64) (int64, int64) {
if *disableCache { if *disableCache {
// Do not adjust start and end values when cache is disabled. // Do not adjust start and end values when cache is disabled.
@ -191,6 +192,11 @@ func (ec *EvalConfig) mayCache() bool {
if !ec.MayCache { if !ec.MayCache {
return false return false
} }
if ec.Start == ec.End {
// There is no need in aligning start and end to step for instant query
// in order to cache its results.
return true
}
if ec.Start%ec.Step != 0 { if ec.Start%ec.Step != 0 {
return false return false
} }
@ -1039,6 +1045,262 @@ func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float6
return dstValues, dstTimestamps return dstValues, dstTimestamps
} }
// minWindowForInstantRollupOptimization is the minimum lookbehind window in milliseconds
// for enabling smart caching of instant rollup function results.
const minWindowForInstantRollupOptimization = 24 * 3600 * 1000
// evalInstantRollup evaluates instant rollup where ec.Start == ec.End.
func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) ([]*timeseries, error) {
if ec.Start != ec.End {
logger.Panicf("BUG: evalInstantRollup cannot be called on non-empty time range; got %s", ec.timeRangeString())
}
timestamp := ec.Start
if qt.Enabled() {
qt = qt.NewChild("instant rollup %s; time=%s, window=%d", expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
defer qt.Done()
}
evalAt := func(qt *querytracer.Tracer, timestamp, window int64) ([]*timeseries, error) {
ecCopy := copyEvalConfig(ec)
ecCopy.Start = timestamp
ecCopy.End = timestamp
pointsPerSeries := int64(1)
return evalRollupFuncNoCache(qt, ecCopy, funcName, rf, expr, me, iafc, window, pointsPerSeries)
}
tooBigOffset := func(offset int64) bool {
maxOffset := window / 2
if maxOffset > 3600*1000 {
maxOffset = 3600 * 1000
}
return offset >= maxOffset
}
if !ec.mayCache() {
qt.Printf("do not apply instant rollup optimization because of disabled cache")
return evalAt(qt, timestamp, window)
}
if window < minWindowForInstantRollupOptimization {
qt.Printf("do not apply instant rollup optimization because of too small window=%d; must be equal or bigger than %d", window, minWindowForInstantRollupOptimization)
return evalAt(qt, timestamp, window)
}
switch funcName {
case "avg_over_time":
if iafc != nil {
qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name)
return evalAt(qt, timestamp, window)
}
qt.Printf("optimized calculation for instant rollup avg_over_time(m[d]) as (sum_over_time(m[d]) / count_over_time(m[d]))")
fe := expr.(*metricsql.FuncExpr)
feSum := *fe
feSum.Name = "sum_over_time"
feCount := *fe
feCount.Name = "count_over_time"
be := &metricsql.BinaryOpExpr{
Op: "/",
KeepMetricNames: fe.KeepMetricNames,
Left: &feSum,
Right: &feCount,
}
return evalExpr(qt, ec, be)
case "rate":
if iafc != nil {
if strings.ToLower(iafc.ae.Name) != "sum" {
qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name)
return evalAt(qt, timestamp, window)
}
qt.Printf("optimized calculation for sum(rate(m[d])) as (sum(increase(m[d])) / d)")
afe := expr.(*metricsql.AggrFuncExpr)
fe := afe.Args[0].(*metricsql.FuncExpr)
feIncrease := *fe
feIncrease.Name = "increase"
re := fe.Args[0].(*metricsql.RollupExpr)
d := re.Window.Duration(ec.Step)
if d == 0 {
d = ec.Step
}
afeIncrease := *afe
afeIncrease.Args = []metricsql.Expr{&feIncrease}
be := &metricsql.BinaryOpExpr{
Op: "/",
KeepMetricNames: true,
Left: &afeIncrease,
Right: &metricsql.NumberExpr{
N: float64(d) / 1000,
},
}
return evalExpr(qt, ec, be)
}
qt.Printf("optimized calculation for instant rollup rate(m[d]) as (increase(m[d]) / d)")
fe := expr.(*metricsql.FuncExpr)
feIncrease := *fe
feIncrease.Name = "increase"
re := fe.Args[0].(*metricsql.RollupExpr)
d := re.Window.Duration(ec.Step)
if d == 0 {
d = ec.Step
}
be := &metricsql.BinaryOpExpr{
Op: "/",
KeepMetricNames: fe.KeepMetricNames,
Left: &feIncrease,
Right: &metricsql.NumberExpr{
N: float64(d) / 1000,
},
}
return evalExpr(qt, ec, be)
case "count_over_time", "sum_over_time", "increase":
if iafc != nil && strings.ToLower(iafc.ae.Name) != "sum" {
qt.Printf("do not apply instant rollup optimization for non-sum incremental aggregate %s()", iafc.ae.Name)
return evalAt(qt, timestamp, window)
}
// Calculate
//
// rf(m[window] @ timestamp)
//
// as
//
// rf(m[window] @ (timestamp-offset)) + rf(m[offset] @ timestamp) - rf(m[offset] @ (timestamp-window))
//
// where
//
// - rf is count_over_time, sum_over_time or increase
// - rf(m[window] @ (timestamp-offset)) is obtained from cache
// - rf(m[offset] @ timestamp) and rf(m[offset] @ (timestamp-window)) are calculated from the storage
// These rollups are calculated faster than rf(m[window]) because offset is smaller than window.
qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
defer qtChild.Done()
again:
offset := int64(0)
tssCached := rollupResultCacheV.GetInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss)
ec.QueryStats.addSeriesFetched(len(tssCached))
if len(tssCached) == 0 {
// Cache miss. Re-populate it
start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds()
offset = timestamp - start
if offset < 0 {
start = timestamp
offset = 0
}
if tooBigOffset(offset) {
qtChild.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+
"for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window)
return evalAt(qtChild, timestamp, window)
}
qtChild.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start))
tss, err := evalAt(qtChild, start, window)
if err != nil {
return nil, err
}
rollupResultCacheV.PutInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss, tss)
tssCached = tss
} else {
offset = timestamp - tssCached[0].Timestamps[0]
if offset < 0 {
qtChild.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s",
storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp))
// Delete the outdated cached values, so the cache could be re-populated with newer values.
rollupResultCacheV.DeleteInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss)
goto again
}
if tooBigOffset(offset) {
qtChild.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+
"and the cached values is too big comparing to window=%d", offset, window)
// Delete the outdated cached values, so the cache could be re-populated with newer values.
rollupResultCacheV.DeleteInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss)
goto again
}
}
if offset == 0 {
qtChild.Printf("return cached values, since they have the requested timestamp=%s", storage.TimestampToHumanReadableFormat(timestamp))
return tssCached, nil
}
// Calculate count_over_time(m[offset] @ timestamp)
tssStart, err := evalAt(qtChild, timestamp, offset)
if err != nil {
return nil, err
}
// Calculate count_over_time(m[offset] @ (timestamp - window))
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
if err != nil {
return nil, err
}
tss, err := mergeInstantValues(qtChild, tssCached, tssStart, tssEnd)
if err != nil {
return nil, fmt.Errorf("cannot merge instant series: %w", err)
}
return tss, nil
default:
qt.Printf("instant rollup optimization isn't implemented for %s()", funcName)
return evalAt(qt, timestamp, window)
}
}
// mergeInstantValues calculates tssCached + tssStart - tssEnd
func mergeInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, error) {
qt = qt.NewChild("merge instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
defer qt.Done()
assertInstantValues(tssCached)
assertInstantValues(tssStart)
assertInstantValues(tssEnd)
m := make(map[string]*timeseries, len(tssCached))
bb := bbPool.Get()
defer bbPool.Put(bb)
for _, ts := range tssCached {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if tsExisting := m[string(bb.B)]; tsExisting != nil {
return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName)
}
m[string(bb.B)] = ts
}
for _, ts := range tssStart {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
tsCached := m[string(bb.B)]
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
if !math.IsNaN(ts.Values[0]) {
tsCached.Values[0] += ts.Values[0]
}
} else {
m[string(bb.B)] = ts
}
}
for _, ts := range tssEnd {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
tsCached := m[string(bb.B)]
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
if !math.IsNaN(ts.Values[0]) {
tsCached.Values[0] -= ts.Values[0]
}
}
}
rvs := make([]*timeseries, 0, len(m))
for _, ts := range m {
rvs = append(rvs, ts)
}
qt.Printf("resulting series=%d", len(rvs))
return rvs, nil
}
func assertInstantValues(tss []*timeseries) {
for _, ts := range tss {
if len(ts.Values) != 1 {
logger.Panicf("BUG: instant series must contain a single value; got %d values", len(ts.Values))
}
if len(ts.Timestamps) != 1 {
logger.Panicf("BUG: instant series must contain a single timestamp; got %d timestamps", len(ts.Timestamps))
}
}
}
var ( var (
rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`) rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`)
rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`) rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`)
@ -1049,23 +1311,28 @@ var (
func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) { expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) {
var rollupMemorySize int64
window, err := windowExpr.NonNegativeDuration(ec.Step) window, err := windowExpr.NonNegativeDuration(ec.Step)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err) return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
} }
if qt.Enabled() {
qt = qt.NewChild("rollup %s(): timeRange=%s, step=%d, window=%d", funcName, ec.timeRangeString(), ec.Step, window)
defer func() {
qt.Donef("neededMemoryBytes=%d", rollupMemorySize)
}()
}
if me.IsEmpty() { if me.IsEmpty() {
return evalNumber(ec, nan), nil return evalNumber(ec, nan), nil
} }
if ec.Start == ec.End {
rvs, err := evalInstantRollup(qt, ec, funcName, rf, expr, me, iafc, window)
if err != nil {
err = &UserReadableError{
Err: err,
}
return nil, err
}
return rvs, nil
}
// Search for partial results in cache. // Search for partial results in cache.
tssCached, start := rollupResultCacheV.Get(qt, ec, expr, window) tssCached, start := rollupResultCacheV.GetSeries(qt, ec, expr, window)
ec.QueryStats.addSeriesFetched(len(tssCached))
if start > ec.End { if start > ec.End {
// The result is fully cached. // The result is fully cached.
rollupResultCacheFullHits.Inc() rollupResultCacheFullHits.Inc()
@ -1077,10 +1344,41 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
rollupResultCacheMiss.Inc() rollupResultCacheMiss.Inc()
} }
// Obtain rollup configs before fetching data from db, ecCopy := copyEvalConfig(ec)
// so type errors can be caught earlier. ecCopy.Start = start
sharedTimestamps := getTimestamps(start, ec.End, ec.Step, ec.MaxPointsPerSeries) pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps) tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries)
if err != nil {
err = &UserReadableError{
Err: err,
}
return nil, err
}
rvs, err := mergeTimeseries(qt, tssCached, tss, start, ec)
if err != nil {
return nil, fmt.Errorf("cannot merge series: %w", err)
}
if tss != nil {
rollupResultCacheV.PutSeries(qt, ec, expr, window, tss)
}
return rvs, nil
}
// evalRollupFuncNoCache calculates the given rf with the given lookbehind window.
//
// pointsPerSeries is used only for estimating the needed memory for query processing
func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, pointsPerSeries int64) ([]*timeseries, error) {
if qt.Enabled() {
qt = qt.NewChild("rollup %s: timeRange=%s, step=%d, window=%d", expr.AppendString(nil), ec.timeRangeString(), ec.Step, window)
defer qt.Done()
}
if window <= 0 {
return nil, nil
}
// Obtain rollup configs before fetching data from db, so type errors could be caught earlier.
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1088,7 +1386,10 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
// Fetch the remaining part of the result. // Fetch the remaining part of the result.
tfss := searchutils.ToTagFilterss(me.LabelFilterss) tfss := searchutils.ToTagFilterss(me.LabelFilterss)
tfss = searchutils.JoinTagFilterss(tfss, ec.EnforcedTagFilterss) tfss = searchutils.JoinTagFilterss(tfss, ec.EnforcedTagFilterss)
minTimestamp := start - maxSilenceInterval minTimestamp := ec.Start
if needSilenceIntervalForRollupFunc(funcName) {
minTimestamp -= maxSilenceInterval
}
if window > ec.Step { if window > ec.Step {
minTimestamp -= window minTimestamp -= window
} else { } else {
@ -1100,21 +1401,16 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries) sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries)
rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline) rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline)
if err != nil { if err != nil {
return nil, &UserReadableError{ return nil, err
Err: err,
}
} }
rssLen := rss.Len() rssLen := rss.Len()
if rssLen == 0 { if rssLen == 0 {
rss.Cancel() rss.Cancel()
tss := mergeTimeseries(tssCached, nil, start, ec) return nil, nil
return tss, nil
} }
ec.QueryStats.addSeriesFetched(rssLen) ec.QueryStats.addSeriesFetched(rssLen)
// Verify timeseries fit available memory after the rollup. // Verify timeseries fit available memory during rollup calculations.
// Take into account points from tssCached.
pointsPerTimeseries := 1 + (ec.End-ec.Start)/ec.Step
timeseriesLen := rssLen timeseriesLen := rssLen
if iafc != nil { if iafc != nil {
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory. // Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
@ -1134,8 +1430,8 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
timeseriesLen = rssLen timeseriesLen = rssLen
} }
} }
rollupPoints := mulNoOverflow(pointsPerTimeseries, int64(timeseriesLen*len(rcs))) rollupPoints := mulNoOverflow(pointsPerSeries, int64(timeseriesLen*len(rcs)))
rollupMemorySize = sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16)) rollupMemorySize := sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16))
if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory { if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory {
memoryIntensiveQueries.Inc() memoryIntensiveQueries.Inc()
requestURI := ec.GetRequestURI() requestURI := ec.GetRequestURI()
@ -1146,44 +1442,33 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
} }
if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory { if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory {
rss.Cancel() rss.Cancel()
return nil, &UserReadableError{ err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+
Err: fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+ "according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+
"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+ "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ "increasing -search.maxMemoryPerQuery",
"increasing -search.maxMemoryPerQuery", expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3)
expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3), return nil, err
}
} }
rml := getRollupMemoryLimiter() rml := getRollupMemoryLimiter()
if !rml.Get(uint64(rollupMemorySize)) { if !rml.Get(uint64(rollupMemorySize)) {
rss.Cancel() rss.Cancel()
return nil, &UserReadableError{ err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+
Err: fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+ "total available memory for concurrent requests: %d bytes; requested memory: %d bytes; "+
"total available memory for concurrent requests: %d bytes; "+ "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
"requested memory: %d bytes; "+ "switching to node with more RAM; increasing -memory.allowedPercent",
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3)
"switching to node with more RAM; increasing -memory.allowedPercent", return nil, err
expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3),
}
} }
defer rml.Put(uint64(rollupMemorySize)) defer rml.Put(uint64(rollupMemorySize))
qt.Printf("the rollup evaluation needs an estimated %d bytes of RAM for %d series and %d points per series (summary %d points)",
rollupMemorySize, timeseriesLen, pointsPerSeries, rollupPoints)
// Evaluate rollup // Evaluate rollup
keepMetricNames := getKeepMetricNames(expr) keepMetricNames := getKeepMetricNames(expr)
var tss []*timeseries
if iafc != nil { if iafc != nil {
tss, err = evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps) return evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps)
} else {
tss, err = evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)
} }
if err != nil { return evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)
return nil, &UserReadableError{
Err: err,
}
}
tss = mergeTimeseries(tssCached, tss, start, ec)
rollupResultCacheV.Put(qt, ec, expr, window, tss)
return tss, nil
} }
var ( var (
@ -1198,6 +1483,53 @@ func getRollupMemoryLimiter() *memoryLimiter {
return &rollupMemoryLimiter return &rollupMemoryLimiter
} }
func needSilenceIntervalForRollupFunc(funcName string) bool {
// All rollup the functions, which do not rely on the previous sample
// before the lookbehind window (aka prevValue), do not need silence interval.
switch strings.ToLower(funcName) {
case
"absent_over_time",
"avg_over_time",
"count_eq_over_time",
"count_gt_over_time",
"count_le_over_time",
"count_ne_over_time",
"count_over_time",
"default_rollup",
"first_over_time",
"histogram_over_time",
"hoeffding_bound_lower",
"hoeffding_bound_upper",
"last_over_time",
"mad_over_time",
"max_over_time",
"median_over_time",
"min_over_time",
"predict_linear",
"present_over_time",
"quantile_over_time",
"quantiles_over_time",
"range_over_time",
"share_gt_over_time",
"share_le_over_time",
"share_eq_over_time",
"stale_samples_over_time",
"stddev_over_time",
"stdvar_over_time",
"sum_over_time",
"tfirst_over_time",
"timestamp",
"timestamp_with_name",
"tlast_over_time",
"tmax_over_time",
"tmin_over_time",
"zscore_over_time":
return false
default:
return true
}
}
func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool,
iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {

View File

@ -202,11 +202,74 @@ func ResetRollupResultCache() {
logger.Infof("rollupResult cache has been cleared") logger.Infof("rollupResult cache has been cleared")
} }
func (rrc *rollupResultCache) Get(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) { func (rrc *rollupResultCache) GetInstantValues(qt *querytracer.Tracer, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter) []*timeseries {
if qt.Enabled() { if qt.Enabled() {
query := string(expr.AppendString(nil)) query := string(expr.AppendString(nil))
query = bytesutil.LimitStringLen(query, 300) query = bytesutil.LimitStringLen(query, 300)
qt = qt.NewChild("rollup cache get: query=%s, timeRange=%s, step=%d, window=%d", query, ec.timeRangeString(), ec.Step, window) qt = qt.NewChild("rollup cache get instant values: query=%s, window=%d, step=%d", query, window, step)
defer qt.Done()
}
// Obtain instant values from the cache
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], expr, window, step, etfss)
tss, ok := rrc.getSeriesFromCache(qt, bb.B)
if !ok || len(tss) == 0 {
return nil
}
assertInstantValues(tss)
qt.Printf("found %d series for time=%s", len(tss), storage.TimestampToHumanReadableFormat(tss[0].Timestamps[0]))
return tss
}
func (rrc *rollupResultCache) PutInstantValues(qt *querytracer.Tracer, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter, tss []*timeseries) {
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = bytesutil.LimitStringLen(query, 300)
startStr := ""
if len(tss) > 0 {
startStr = storage.TimestampToHumanReadableFormat(tss[0].Timestamps[0])
}
qt = qt.NewChild("rollup cache put instant values: query=%s, window=%d, step=%d, series=%d, time=%s", query, window, step, len(tss), startStr)
defer qt.Done()
}
if len(tss) == 0 {
qt.Printf("do not cache empty series list")
return
}
assertInstantValues(tss)
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], expr, window, step, etfss)
_ = rrc.putSeriesToCache(qt, bb.B, step, tss)
}
func (rrc *rollupResultCache) DeleteInstantValues(qt *querytracer.Tracer, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter) {
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], expr, window, step, etfss)
if !rrc.putSeriesToCache(qt, bb.B, step, nil) {
logger.Panicf("BUG: cannot store zero series to cache")
}
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = bytesutil.LimitStringLen(query, 300)
qt.Printf("rollup result cache delete instant values: query=%s, window=%d, step=%d", query, window, step)
}
}
func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) {
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = bytesutil.LimitStringLen(query, 300)
qt = qt.NewChild("rollup cache get series: query=%s, timeRange=%s, window=%d, step=%d", query, ec.timeRangeString(), window, ec.Step)
defer qt.Done() defer qt.Done()
} }
if !ec.mayCache() { if !ec.mayCache() {
@ -218,7 +281,7 @@ func (rrc *rollupResultCache) Get(qt *querytracer.Tracer, ec *EvalConfig, expr m
bb := bbPool.Get() bb := bbPool.Get()
defer bbPool.Put(bb) defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss) bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss)
metainfoBuf := rrc.c.Get(nil, bb.B) metainfoBuf := rrc.c.Get(nil, bb.B)
if len(metainfoBuf) == 0 { if len(metainfoBuf) == 0 {
qt.Printf("nothing found") qt.Printf("nothing found")
@ -233,31 +296,17 @@ func (rrc *rollupResultCache) Get(qt *querytracer.Tracer, ec *EvalConfig, expr m
qt.Printf("nothing found on the timeRange") qt.Printf("nothing found on the timeRange")
return nil, ec.Start return nil, ec.Start
} }
var ok bool
bb.B = key.Marshal(bb.B[:0]) bb.B = key.Marshal(bb.B[:0])
compressedResultBuf := resultBufPool.Get() tss, ok = rrc.getSeriesFromCache(qt, bb.B)
defer resultBufPool.Put(compressedResultBuf) if !ok {
compressedResultBuf.B = rrc.c.GetBig(compressedResultBuf.B[:0], bb.B)
if len(compressedResultBuf.B) == 0 {
mi.RemoveKey(key) mi.RemoveKey(key)
metainfoBuf = mi.Marshal(metainfoBuf[:0]) metainfoBuf = mi.Marshal(metainfoBuf[:0])
bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss) bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss)
rrc.c.Set(bb.B, metainfoBuf) rrc.c.Set(bb.B, metainfoBuf)
qt.Printf("missing cache entry")
return nil, ec.Start return nil, ec.Start
} }
// Decompress into newly allocated byte slice, since tss returned from unmarshalTimeseriesFast
// refers to the byte slice, so it cannot be returned to the resultBufPool.
qt.Printf("load compressed entry from cache with size %d bytes", len(compressedResultBuf.B))
resultBuf, err := encoding.DecompressZSTD(nil, compressedResultBuf.B)
if err != nil {
logger.Panicf("BUG: cannot decompress resultBuf from rollupResultCache: %s; it looks like it was improperly saved", err)
}
qt.Printf("unpack the entry into %d bytes", len(resultBuf))
tss, err = unmarshalTimeseriesFast(resultBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal timeseries from rollupResultCache: %s; it looks like it was improperly saved", err)
}
qt.Printf("unmarshal %d series", len(tss))
// Extract values for the matching timestamps // Extract values for the matching timestamps
timestamps := tss[0].Timestamps timestamps := tss[0].Timestamps
@ -303,17 +352,21 @@ func (rrc *rollupResultCache) Get(qt *querytracer.Tracer, ec *EvalConfig, expr m
var resultBufPool bytesutil.ByteBufferPool var resultBufPool bytesutil.ByteBufferPool
func (rrc *rollupResultCache) Put(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) { func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) {
if qt.Enabled() { if qt.Enabled() {
query := string(expr.AppendString(nil)) query := string(expr.AppendString(nil))
query = bytesutil.LimitStringLen(query, 300) query = bytesutil.LimitStringLen(query, 300)
qt = qt.NewChild("rollup cache put: query=%s, timeRange=%s, step=%d, window=%d, series=%d", query, ec.timeRangeString(), ec.Step, window, len(tss)) qt = qt.NewChild("rollup cache put series: query=%s, timeRange=%s, step=%d, window=%d, series=%d", query, ec.timeRangeString(), ec.Step, window, len(tss))
defer qt.Done() defer qt.Done()
} }
if len(tss) == 0 || !ec.mayCache() { if !ec.mayCache() {
qt.Printf("do not store series to cache, since it is disabled in the current context") qt.Printf("do not store series to cache, since it is disabled in the current context")
return return
} }
if len(tss) == 0 {
qt.Printf("do not store empty series list")
return
}
// Remove values up to currentTime - step - cacheTimestampOffset, // Remove values up to currentTime - step - cacheTimestampOffset,
// since these values may be added later. // since these values may be added later.
@ -346,7 +399,7 @@ func (rrc *rollupResultCache) Put(qt *querytracer.Tracer, ec *EvalConfig, expr m
metainfoBuf := bbPool.Get() metainfoBuf := bbPool.Get()
defer bbPool.Put(metainfoBuf) defer bbPool.Put(metainfoBuf)
metainfoKey.B = marshalRollupResultCacheKey(metainfoKey.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss) metainfoKey.B = marshalRollupResultCacheKeyForSeries(metainfoKey.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss)
metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B) metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B)
var mi rollupResultCacheMetainfo var mi rollupResultCacheMetainfo
if len(metainfoBuf.B) > 0 { if len(metainfoBuf.B) > 0 {
@ -365,31 +418,17 @@ func (rrc *rollupResultCache) Put(qt *querytracer.Tracer, ec *EvalConfig, expr m
return return
} }
maxMarshaledSize := getRollupResultCacheSize() / 4
resultBuf := resultBufPool.Get()
defer resultBufPool.Put(resultBuf)
resultBuf.B = marshalTimeseriesFast(resultBuf.B[:0], tss, maxMarshaledSize, ec.Step)
if len(resultBuf.B) == 0 {
tooBigRollupResults.Inc()
qt.Printf("cannot store series in the cache, since they would occupy more than %d bytes", maxMarshaledSize)
return
}
if qt.Enabled() {
startString := storage.TimestampToHumanReadableFormat(start)
endString := storage.TimestampToHumanReadableFormat(end)
qt.Printf("marshal %d series on a timeRange=[%s..%s] into %d bytes", len(tss), startString, endString, len(resultBuf.B))
}
compressedResultBuf := resultBufPool.Get()
defer resultBufPool.Put(compressedResultBuf)
compressedResultBuf.B = encoding.CompressZSTDLevel(compressedResultBuf.B[:0], resultBuf.B, 1)
qt.Printf("compress %d bytes into %d bytes", len(resultBuf.B), len(compressedResultBuf.B))
var key rollupResultCacheKey var key rollupResultCacheKey
key.prefix = rollupResultCacheKeyPrefix key.prefix = rollupResultCacheKeyPrefix
key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1) key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1)
rollupResultKey := key.Marshal(nil)
rrc.c.SetBig(rollupResultKey, compressedResultBuf.B) bb := bbPool.Get()
qt.Printf("store %d bytes in the cache", len(compressedResultBuf.B)) bb.B = key.Marshal(bb.B[:0])
ok := rrc.putSeriesToCache(qt, bb.B, ec.Step, tss)
bbPool.Put(bb)
if !ok {
return
}
mi.AddKey(key, timestamps[0], timestamps[len(timestamps)-1]) mi.AddKey(key, timestamps[0], timestamps[len(timestamps)-1])
metainfoBuf.B = mi.Marshal(metainfoBuf.B[:0]) metainfoBuf.B = mi.Marshal(metainfoBuf.B[:0])
@ -401,6 +440,52 @@ var (
rollupResultCacheKeySuffix = uint64(time.Now().UnixNano()) rollupResultCacheKeySuffix = uint64(time.Now().UnixNano())
) )
func (rrc *rollupResultCache) getSeriesFromCache(qt *querytracer.Tracer, key []byte) ([]*timeseries, bool) {
compressedResultBuf := resultBufPool.Get()
compressedResultBuf.B = rrc.c.GetBig(compressedResultBuf.B[:0], key)
if len(compressedResultBuf.B) == 0 {
qt.Printf("nothing found in the cache")
resultBufPool.Put(compressedResultBuf)
return nil, false
}
qt.Printf("load compressed entry from cache with size %d bytes", len(compressedResultBuf.B))
// Decompress into newly allocated byte slice, since tss returned from unmarshalTimeseriesFast
// refers to the byte slice, so it cannot be re-used.
resultBuf, err := encoding.DecompressZSTD(nil, compressedResultBuf.B)
if err != nil {
logger.Panicf("BUG: cannot decompress resultBuf from rollupResultCache: %s; it looks like it was improperly saved", err)
}
resultBufPool.Put(compressedResultBuf)
qt.Printf("unpack the entry into %d bytes", len(resultBuf))
tss, err := unmarshalTimeseriesFast(resultBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal timeseries from rollupResultCache: %s; it looks like it was improperly saved", err)
}
qt.Printf("unmarshal %d series", len(tss))
return tss, true
}
func (rrc *rollupResultCache) putSeriesToCache(qt *querytracer.Tracer, key []byte, step int64, tss []*timeseries) bool {
maxMarshaledSize := getRollupResultCacheSize() / 4
resultBuf := resultBufPool.Get()
defer resultBufPool.Put(resultBuf)
resultBuf.B = marshalTimeseriesFast(resultBuf.B[:0], tss, maxMarshaledSize, step)
if len(resultBuf.B) == 0 {
tooBigRollupResults.Inc()
qt.Printf("cannot store %d series in the cache, since they would occupy more than %d bytes", len(tss), maxMarshaledSize)
return false
}
qt.Printf("marshal %d series into %d bytes", len(tss), len(resultBuf.B))
compressedResultBuf := resultBufPool.Get()
defer resultBufPool.Put(compressedResultBuf)
compressedResultBuf.B = encoding.CompressZSTDLevel(compressedResultBuf.B[:0], resultBuf.B, 1)
qt.Printf("compress %d bytes into %d bytes", len(resultBuf.B), len(compressedResultBuf.B))
rrc.c.SetBig(key, compressedResultBuf.B)
qt.Printf("store %d bytes in the cache", len(compressedResultBuf.B))
return true
}
func newRollupResultCacheKeyPrefix() uint64 { func newRollupResultCacheKeyPrefix() uint64 {
var buf [8]byte var buf [8]byte
if _, err := rand.Read(buf[:]); err != nil { if _, err := rand.Read(buf[:]); err != nil {
@ -439,14 +524,36 @@ func mustSaveRollupResultCacheKeyPrefix(path string) {
var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total") var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
// Increment this value every time the format of the cache changes. // Increment this value every time the format of the cache changes.
const rollupResultCacheVersion = 9 const rollupResultCacheVersion = 10
func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte { const (
rollupResultCacheTypeSeries = 0
rollupResultCacheTypeInstantValues = 1
)
func marshalRollupResultCacheKeyForSeries(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion) dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix) dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix)
dst = append(dst, rollupResultCacheTypeSeries)
dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step) dst = encoding.MarshalInt64(dst, step)
dst = marshalTagFiltersForRollupResultCacheKey(dst, etfs)
dst = expr.AppendString(dst) dst = expr.AppendString(dst)
return dst
}
func marshalRollupResultCacheKeyForInstantValues(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix)
dst = append(dst, rollupResultCacheTypeInstantValues)
dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step)
dst = marshalTagFiltersForRollupResultCacheKey(dst, etfs)
dst = expr.AppendString(dst)
return dst
}
func marshalTagFiltersForRollupResultCacheKey(dst []byte, etfs [][]storage.TagFilter) []byte {
for i, etf := range etfs { for i, etf := range etfs {
for _, f := range etf { for _, f := range etf {
dst = f.Marshal(dst) dst = f.Marshal(dst)
@ -461,12 +568,15 @@ func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step i
// mergeTimeseries concatenates b with a and returns the result. // mergeTimeseries concatenates b with a and returns the result.
// //
// Preconditions: // Preconditions:
// - a mustn't intersect with b. // - a mustn't intersect with b by timestamps.
// - a timestamps must be smaller than b timestamps. // - a timestamps must be smaller than b timestamps.
// //
// Postconditions: // Postconditions:
// - a and b cannot be used after returning from the call. // - a and b cannot be used after returning from the call.
func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timeseries { func mergeTimeseries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, ec *EvalConfig) ([]*timeseries, error) {
qt = qt.NewChild("merge series len(a)=%d, len(b)=%d", len(a), len(b))
defer qt.Done()
sharedTimestamps := ec.getSharedTimestamps() sharedTimestamps := ec.getSharedTimestamps()
if bStart == ec.Start { if bStart == ec.Start {
// Nothing to merge - b covers all the time range. // Nothing to merge - b covers all the time range.
@ -478,7 +588,7 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese
logger.Panicf("BUG: unexpected number of values in b; got %d; want %d", len(tsB.Values), len(tsB.Timestamps)) logger.Panicf("BUG: unexpected number of values in b; got %d; want %d", len(tsB.Values), len(tsB.Timestamps))
} }
} }
return b return b, nil
} }
m := make(map[string]*timeseries, len(a)) m := make(map[string]*timeseries, len(a))
@ -486,6 +596,9 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese
defer bbPool.Put(bb) defer bbPool.Put(bb)
for _, ts := range a { for _, ts := range a {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName)
}
m[string(bb.B)] = ts m[string(bb.B)] = ts
} }
@ -536,7 +649,8 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese
} }
rvs = append(rvs, &tmp) rvs = append(rvs, &tmp)
} }
return rvs qt.Printf("resulting series=%d", len(rvs))
return rvs, nil
} }
type rollupResultCacheMetainfo struct { type rollupResultCacheMetainfo struct {

View File

@ -61,7 +61,7 @@ func TestRollupResultCache(t *testing.T) {
// Try obtaining an empty value. // Try obtaining an empty value.
t.Run("empty", func(t *testing.T) { t.Run("empty", func(t *testing.T) {
tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != ec.Start { if newStart != ec.Start {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start) t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start)
} }
@ -79,8 +79,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(nil, ec, fe, window, tss) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != 1400 { if newStart != 1400 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
} }
@ -100,8 +100,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(nil, ec, ae, window, tss) rollupResultCacheV.PutSeries(nil, ec, ae, window, tss)
tss, newStart := rollupResultCacheV.Get(nil, ec, ae, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, ae, window)
if newStart != 1400 { if newStart != 1400 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
} }
@ -123,8 +123,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{333, 0, 1, 2}, Values: []float64{333, 0, 1, 2},
}, },
} }
rollupResultCacheV.Put(nil, ec, fe, window, tss) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != 1000 { if newStart != 1000 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
} }
@ -142,8 +142,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(nil, ec, fe, window, tss) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != 1000 { if newStart != 1000 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
} }
@ -161,8 +161,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(nil, ec, fe, window, tss) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != 1000 { if newStart != 1000 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
} }
@ -180,8 +180,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(nil, ec, fe, window, tss) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != 1000 { if newStart != 1000 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
} }
@ -199,8 +199,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2, 3, 4, 5, 6, 7}, Values: []float64{0, 1, 2, 3, 4, 5, 6, 7},
}, },
} }
rollupResultCacheV.Put(nil, ec, fe, window, tss) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != 2200 { if newStart != 2200 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
} }
@ -222,8 +222,8 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{1, 2, 3, 4, 5, 6}, Values: []float64{1, 2, 3, 4, 5, 6},
}, },
} }
rollupResultCacheV.Put(nil, ec, fe, window, tss) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != 2200 { if newStart != 2200 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
} }
@ -247,8 +247,8 @@ func TestRollupResultCache(t *testing.T) {
} }
tss = append(tss, ts) tss = append(tss, ts)
} }
rollupResultCacheV.Put(nil, ec, fe, window, tss) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss)
tssResult, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tssResult, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != 2200 { if newStart != 2200 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
} }
@ -276,10 +276,10 @@ func TestRollupResultCache(t *testing.T) {
Values: []float64{0, 1, 2}, Values: []float64{0, 1, 2},
}, },
} }
rollupResultCacheV.Put(nil, ec, fe, window, tss1) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss1)
rollupResultCacheV.Put(nil, ec, fe, window, tss2) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss2)
rollupResultCacheV.Put(nil, ec, fe, window, tss3) rollupResultCacheV.PutSeries(nil, ec, fe, window, tss3)
tss, newStart := rollupResultCacheV.Get(nil, ec, fe, window) tss, newStart := rollupResultCacheV.GetSeries(nil, ec, fe, window)
if newStart != 1400 { if newStart != 1400 {
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
} }
@ -311,7 +311,10 @@ func TestMergeTimeseries(t *testing.T) {
Values: []float64{1, 2, 3, 4, 5, 6}, Values: []float64{1, 2, 3, 4, 5, 6},
}, },
} }
tss := mergeTimeseries(a, b, 1000, ec) tss, err := mergeTimeseries(nil, a, b, 1000, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tssExpected := []*timeseries{ tssExpected := []*timeseries{
{ {
Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000},
@ -328,7 +331,10 @@ func TestMergeTimeseries(t *testing.T) {
Values: []float64{3, 4, 5, 6}, Values: []float64{3, 4, 5, 6},
}, },
} }
tss := mergeTimeseries(a, b, bStart, ec) tss, err := mergeTimeseries(nil, a, b, bStart, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tssExpected := []*timeseries{ tssExpected := []*timeseries{
{ {
Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000},
@ -345,7 +351,10 @@ func TestMergeTimeseries(t *testing.T) {
}, },
} }
b := []*timeseries{} b := []*timeseries{}
tss := mergeTimeseries(a, b, bStart, ec) tss, err := mergeTimeseries(nil, a, b, bStart, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tssExpected := []*timeseries{ tssExpected := []*timeseries{
{ {
Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000},
@ -367,7 +376,10 @@ func TestMergeTimeseries(t *testing.T) {
Values: []float64{3, 4, 5, 6}, Values: []float64{3, 4, 5, 6},
}, },
} }
tss := mergeTimeseries(a, b, bStart, ec) tss, err := mergeTimeseries(nil, a, b, bStart, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tssExpected := []*timeseries{ tssExpected := []*timeseries{
{ {
Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: []int64{1000, 1200, 1400, 1600, 1800, 2000},
@ -391,7 +403,10 @@ func TestMergeTimeseries(t *testing.T) {
}, },
} }
b[0].MetricName.MetricGroup = []byte("foo") b[0].MetricName.MetricGroup = []byte("foo")
tss := mergeTimeseries(a, b, bStart, ec) tss, err := mergeTimeseries(nil, a, b, bStart, ec)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tssExpected := []*timeseries{ tssExpected := []*timeseries{
{ {
MetricName: storage.MetricName{ MetricName: storage.MetricName{

View File

@ -79,7 +79,10 @@ var timeseriesPool sync.Pool
func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int64) []byte { func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int64) []byte {
if len(tss) == 0 { if len(tss) == 0 {
logger.Panicf("BUG: tss cannot be empty") // marshal zero timeseries and zero timestamps
dst = encoding.MarshalUint64(dst, 0)
dst = encoding.MarshalUint64(dst, 0)
return dst
} }
// timestamps are stored only once for all the tss, since they must be identical // timestamps are stored only once for all the tss, since they must be identical

View File

@ -33,6 +33,13 @@ The sandbox cluster installation is running under the constant load generated by
* SECURITY: upgrade Go builder from Go1.21.1 to Go1.21.3. See [the list of issues addressed in Go1.21.2](https://github.com/golang/go/issues?q=milestone%3AGo1.21.2+label%3ACherryPickApproved) and [the list of issues addressed in Go1.21.3](https://github.com/golang/go/issues?q=milestone%3AGo1.21.3+label%3ACherryPickApproved). * SECURITY: upgrade Go builder from Go1.21.1 to Go1.21.3. See [the list of issues addressed in Go1.21.2](https://github.com/golang/go/issues?q=milestone%3AGo1.21.2+label%3ACherryPickApproved) and [the list of issues addressed in Go1.21.3](https://github.com/golang/go/issues?q=milestone%3AGo1.21.3+label%3ACherryPickApproved).
* FEATURE: `vmselect`: improve performance for repeated [instant queries](https://docs.victoriametrics.com/keyConcepts.html#instant-query) if they contain one of the following [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) with lookbehind window in square brackets bigger or equal to 1 day:
- [sum_over_time](https://docs.victoriametrics.com/MetricsQL.html#sum_over_time)
- [count_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_over_time)
- [avg_over_time](https://docs.victoriametrics.com/MetricsQL.html#avg_over_time)
- [increase](https://docs.victoriametrics.com/MetricsQL.html#increase)
- [rate](https://docs.victoriametrics.com/MetricsQL.html#rate)
These functions are usually used in SLO/SLI queries such as `avg_over_time(up[30d])` or `sum(rate(http_request_errors_total[3d])) / sum(rate(http_requests_total[3d]))`.
* FEATURE: `vmselect`: improve query performance on systems with big number of CPU cores (`>=32`). Add `-search.maxWorkersPerQuery` command-line flag, which can be used for fine-tuning query performance on systems with big number of CPU cores. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5195). * FEATURE: `vmselect`: improve query performance on systems with big number of CPU cores (`>=32`). Add `-search.maxWorkersPerQuery` command-line flag, which can be used for fine-tuning query performance on systems with big number of CPU cores. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5195).
* FEATURE: `vmselect`: expose `vm_memory_intensive_queries_total` counter metric which gets increased each time `-search.logQueryMemoryUsage` memory limit is exceeded by a query. This metric should help to identify expensive and heavy queries without inspecting the logs. * FEATURE: `vmselect`: expose `vm_memory_intensive_queries_total` counter metric which gets increased each time `-search.logQueryMemoryUsage` memory limit is exceeded by a query. This metric should help to identify expensive and heavy queries without inspecting the logs.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [drop_empty_series()](https://docs.victoriametrics.com/MetricsQL.html#drop_empty_series) function, which can be used for filtering out empty series before performing additional calculations as shown in [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5071). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [drop_empty_series()](https://docs.victoriametrics.com/MetricsQL.html#drop_empty_series) function, which can be used for filtering out empty series before performing additional calculations as shown in [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5071).