VictoriaMetrics/app/vmselect/promql/eval.go
Aliaksandr Valialkin 41a0fdaf39
app/vmselect/promql: optimize repeated SLI-like instant queries with lookbehind windows >= 1d
Repeated instant queries with long lookbehind windows, which contain one of the following rollup functions,
are optimized via partial result caching:

- sum_over_time()
- count_over_time()
- avg_over_time()
- increase()
- rate()

The basic idea of optimization is to calculate

  rf(m[d] @ t)

as

  rf(m[offset] @ t) + rf(m[d] @ (t-offset)) - rf(m[offset] @ (t-d))

where rf(m[d] @ (t-offset)) is cached query result, which was calculated previously

The offset may be in the range of up to 1 hour.
2023-10-31 19:25:23 +01:00

1744 lines
56 KiB
Go

package promql
import (
"flag"
"fmt"
"math"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
)
var (
disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful during data backfilling")
maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+
"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3")
maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+
"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+
"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . "+
"See also -search.logQueryMemoryUsage")
logQueryMemoryUsage = flagutil.NewBytes("search.logQueryMemoryUsage", 0, "Log query and increment vm_memory_intensive_queries_total metric each time "+
"the query requires more memory than specified by this flag. "+
"This may help detecting and optimizing heavy queries. Query logging is disabled by default. "+
"See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery")
noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+
"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets")
)
// The minimum number of points per timeseries for enabling time rounding.
// This improves cache hit ratio for frequently requested queries over
// big time ranges.
const minTimeseriesPointsForTimeRounding = 50
// ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints.
func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error {
if step == 0 {
return fmt.Errorf("step can't be equal to zero")
}
points := (end-start)/step + 1
if points > int64(maxPoints) {
return fmt.Errorf("too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d",
start, end, step, points, maxPoints)
}
return nil
}
// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
//
// See EvalConfig.mayCache() for details.
func AdjustStartEnd(start, end, step int64) (int64, int64) {
if *disableCache {
// Do not adjust start and end values when cache is disabled.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563
return start, end
}
points := (end-start)/step + 1
if points < minTimeseriesPointsForTimeRounding {
// Too small number of points for rounding.
return start, end
}
// Round start and end to values divisible by step in order
// to enable response caching (see EvalConfig.mayCache).
start, end = alignStartEnd(start, end, step)
// Make sure that the new number of points is the same as the initial number of points.
newPoints := (end-start)/step + 1
for newPoints > points {
end -= step
newPoints--
}
return start, end
}
func alignStartEnd(start, end, step int64) (int64, int64) {
// Round start to the nearest smaller value divisible by step.
start -= start % step
// Round end to the nearest bigger value divisible by step.
adjust := end % step
if adjust > 0 {
end += step - adjust
}
return start, end
}
// EvalConfig is the configuration required for query evaluation via Exec
type EvalConfig struct {
Start int64
End int64
Step int64
// MaxSeries is the maximum number of time series, which can be scanned by the query.
// Zero means 'no limit'
MaxSeries int
// MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series.
MaxPointsPerSeries int
// QuotedRemoteAddr contains quoted remote address.
QuotedRemoteAddr string
Deadline searchutils.Deadline
// Whether the response can be cached.
MayCache bool
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
// How many decimal digits after the point to leave in response.
RoundDigits int
// EnforcedTagFilterss may contain additional label filters to use in the query.
EnforcedTagFilterss [][]storage.TagFilter
// The callback, which returns the request URI during logging.
// The request URI isn't stored here because its' construction may take non-trivial amounts of CPU.
GetRequestURI func() string
// QueryStats contains various stats for the currently executed query.
//
// The caller must initialize the QueryStats if it needs the stats.
// Otherwise the stats isn't collected.
QueryStats *QueryStats
timestamps []int64
timestampsOnce sync.Once
}
// copyEvalConfig returns src copy.
func copyEvalConfig(src *EvalConfig) *EvalConfig {
var ec EvalConfig
ec.Start = src.Start
ec.End = src.End
ec.Step = src.Step
ec.MaxSeries = src.MaxSeries
ec.MaxPointsPerSeries = src.MaxPointsPerSeries
ec.Deadline = src.Deadline
ec.MayCache = src.MayCache
ec.LookbackDelta = src.LookbackDelta
ec.RoundDigits = src.RoundDigits
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
ec.GetRequestURI = src.GetRequestURI
ec.QueryStats = src.QueryStats
// do not copy src.timestamps - they must be generated again.
return &ec
}
// QueryStats contains various stats for the query.
type QueryStats struct {
// SeriesFetched contains the number of series fetched from storage during the query evaluation.
SeriesFetched int
}
func (qs *QueryStats) addSeriesFetched(n int) {
if qs != nil {
qs.SeriesFetched += n
}
}
func (ec *EvalConfig) validate() {
if ec.Start > ec.End {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End)
}
if ec.Step <= 0 {
logger.Panicf("BUG: step must be greater than 0; got %d", ec.Step)
}
}
func (ec *EvalConfig) mayCache() bool {
if *disableCache {
return false
}
if !ec.MayCache {
return false
}
if ec.Start == ec.End {
// There is no need in aligning start and end to step for instant query
// in order to cache its results.
return true
}
if ec.Start%ec.Step != 0 {
return false
}
if ec.End%ec.Step != 0 {
return false
}
return true
}
func (ec *EvalConfig) timeRangeString() string {
start := storage.TimestampToHumanReadableFormat(ec.Start)
end := storage.TimestampToHumanReadableFormat(ec.End)
return fmt.Sprintf("[%s..%s]", start, end)
}
func (ec *EvalConfig) getSharedTimestamps() []int64 {
ec.timestampsOnce.Do(ec.timestampsInit)
return ec.timestamps
}
func (ec *EvalConfig) timestampsInit() {
ec.timestamps = getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
}
func getTimestamps(start, end, step int64, maxPointsPerSeries int) []int64 {
// Sanity checks.
if step <= 0 {
logger.Panicf("BUG: Step must be bigger than 0; got %d", step)
}
if start > end {
logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", start, end)
}
if err := ValidateMaxPointsPerSeries(start, end, step, maxPointsPerSeries); err != nil {
logger.Panicf("BUG: %s; this must be validated before the call to getTimestamps", err)
}
// Prepare timestamps.
points := 1 + (end-start)/step
timestamps := make([]int64, points)
for i := range timestamps {
timestamps[i] = start
start += step
}
return timestamps
}
func evalExpr(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
if qt.Enabled() {
query := string(e.AppendString(nil))
query = bytesutil.LimitStringLen(query, 300)
mayCache := ec.mayCache()
qt = qt.NewChild("eval: query=%s, timeRange=%s, step=%d, mayCache=%v", query, ec.timeRangeString(), ec.Step, mayCache)
}
rv, err := evalExprInternal(qt, ec, e)
if err != nil {
return nil, err
}
if qt.Enabled() {
seriesCount := len(rv)
pointsPerSeries := 0
if len(rv) > 0 {
pointsPerSeries = len(rv[0].Timestamps)
}
pointsCount := seriesCount * pointsPerSeries
qt.Donef("series=%d, points=%d, pointsPerSeries=%d", seriesCount, pointsCount, pointsPerSeries)
}
return rv, nil
}
func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
if me, ok := e.(*metricsql.MetricExpr); ok {
re := &metricsql.RollupExpr{
Expr: me,
}
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, me.AppendString(nil), err)
}
return rv, nil
}
if re, ok := e.(*metricsql.RollupExpr); ok {
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, re.AppendString(nil), err)
}
return rv, nil
}
if fe, ok := e.(*metricsql.FuncExpr); ok {
nrf := getRollupFunc(fe.Name)
if nrf == nil {
qtChild := qt.NewChild("transform %s()", fe.Name)
rv, err := evalTransformFunc(qtChild, ec, fe)
qtChild.Donef("series=%d", len(rv))
return rv, err
}
args, re, err := evalRollupFuncArgs(qt, ec, fe)
if err != nil {
return nil, err
}
rf, err := nrf(args)
if err != nil {
return nil, err
}
rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err)
}
return rv, nil
}
if ae, ok := e.(*metricsql.AggrFuncExpr); ok {
qtChild := qt.NewChild("aggregate %s()", ae.Name)
rv, err := evalAggrFunc(qtChild, ec, ae)
qtChild.Donef("series=%d", len(rv))
return rv, err
}
if be, ok := e.(*metricsql.BinaryOpExpr); ok {
qtChild := qt.NewChild("binary op %q", be.Op)
rv, err := evalBinaryOp(qtChild, ec, be)
qtChild.Donef("series=%d", len(rv))
return rv, err
}
if ne, ok := e.(*metricsql.NumberExpr); ok {
rv := evalNumber(ec, ne.N)
return rv, nil
}
if se, ok := e.(*metricsql.StringExpr); ok {
rv := evalString(ec, se.S)
return rv, nil
}
if de, ok := e.(*metricsql.DurationExpr); ok {
d := de.Duration(ec.Step)
dSec := float64(d) / 1000
rv := evalNumber(ec, dSec)
return rv, nil
}
return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
}
func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) {
tf := getTransformFunc(fe.Name)
if tf == nil {
return nil, &UserReadableError{
Err: fmt.Errorf(`unknown func %q`, fe.Name),
}
}
var args [][]*timeseries
var err error
switch fe.Name {
case "", "union":
args, err = evalExprsInParallel(qt, ec, fe.Args)
default:
args, err = evalExprsSequentially(qt, ec, fe.Args)
}
if err != nil {
return nil, err
}
tfa := &transformFuncArg{
ec: ec,
fe: fe,
args: args,
}
rv, err := tf(tfa)
if err != nil {
return nil, &UserReadableError{
Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err),
}
}
return rv, nil
}
func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFuncExpr) ([]*timeseries, error) {
if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil {
fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae)
if fe != nil {
// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
// The optimized path saves RAM for aggregates over big number of time series.
args, re, err := evalRollupFuncArgs(qt, ec, fe)
if err != nil {
return nil, err
}
rf, err := nrf(args)
if err != nil {
return nil, err
}
iafc := newIncrementalAggrFuncContext(ae, callbacks)
return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc)
}
}
args, err := evalExprsInParallel(qt, ec, ae.Args)
if err != nil {
return nil, err
}
af := getAggrFunc(ae.Name)
if af == nil {
return nil, &UserReadableError{
Err: fmt.Errorf(`unknown func %q`, ae.Name),
}
}
afa := &aggrFuncArg{
ae: ae,
args: args,
ec: ec,
}
qtChild := qt.NewChild("eval %s", ae.Name)
rv, err := af(afa)
qtChild.Done()
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err)
}
return rv, nil
}
func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, error) {
bf := getBinaryOpFunc(be.Op)
if bf == nil {
return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
}
var err error
var tssLeft, tssRight []*timeseries
switch strings.ToLower(be.Op) {
case "and", "if":
// Fetch right-side series at first, since it usually contains
// lower number of time series for `and` and `if` operator.
// This should produce more specific label filters for the left side of the query.
// This, in turn, should reduce the time to select series for the left side of the query.
tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be)
default:
tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be)
}
if err != nil {
return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
}
bfa := &binaryOpFuncArg{
be: be,
left: tssLeft,
right: tssRight,
}
rv, err := bf(bfa)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, be.AppendString(nil), err)
}
return rv, nil
}
func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool {
switch strings.ToLower(be.Op) {
case "or", "default":
return false
}
if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) {
return false
}
return true
}
func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
afe, ok := e.(*metricsql.AggrFuncExpr)
if !ok {
return false
}
return len(afe.Modifier.Args) == 0
}
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
if !canPushdownCommonFilters(be) {
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
// from exprFirst to exprSecond.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
qt = qt.NewChild("execute left and right sides of %q in parallel", be.Op)
defer qt.Done()
var wg sync.WaitGroup
var tssFirst []*timeseries
var errFirst error
qtFirst := qt.NewChild("expr1")
wg.Add(1)
go func() {
defer wg.Done()
tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst)
qtFirst.Done()
}()
var tssSecond []*timeseries
var errSecond error
qtSecond := qt.NewChild("expr2")
wg.Add(1)
go func() {
defer wg.Done()
tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond)
qtSecond.Done()
}()
wg.Wait()
if errFirst != nil {
return nil, nil, errFirst
}
if errSecond != nil {
return nil, nil, errSecond
}
return tssFirst, tssSecond, nil
}
// Execute binary operation in the following way:
//
// 1) execute the exprFirst
// 2) get common label filters for series returned at step 1
// 3) push down the found common label filters to exprSecond. This filters out unneeded series
// during exprSecond execution instead of spending compute resources on extracting and processing these series
// before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
// 4) execute the exprSecond with possible additional filters found at step 3
//
// Typical use cases:
// - Kubernetes-related: show pod creation time with the node name:
//
// kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
//
// Without the optimization `kube_pod_info` would select and spend compute resources
// for more time series than needed. The selected time series would be dropped later
// when matching time series on the right and left sides of binary operand.
//
// - Generic alerting queries, which rely on `info` metrics.
// See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
//
// - Queries, which get additional labels from `info` metrics.
// See https://www.robustperception.io/exposing-the-software-version-to-prometheus
tssFirst, err := evalExpr(qt, ec, exprFirst)
if err != nil {
return nil, nil, err
}
if len(tssFirst) == 0 && strings.ToLower(be.Op) != "or" {
// Fast path: there is no sense in executing the exprSecond when exprFirst returns an empty result,
// since the "exprFirst op exprSecond" would return an empty result in any case.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3349
return nil, nil, nil
}
lfs := getCommonLabelFilters(tssFirst)
lfs = metricsql.TrimFiltersByGroupModifier(lfs, be)
exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs)
tssSecond, err := evalExpr(qt, ec, exprSecond)
if err != nil {
return nil, nil, err
}
return tssFirst, tssSecond, nil
}
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
if len(tss) == 0 {
return nil
}
type valuesCounter struct {
values map[string]struct{}
count int
}
m := make(map[string]*valuesCounter, len(tss[0].MetricName.Tags))
for _, ts := range tss {
for _, tag := range ts.MetricName.Tags {
vc, ok := m[string(tag.Key)]
if !ok {
k := string(tag.Key)
v := string(tag.Value)
m[k] = &valuesCounter{
values: map[string]struct{}{
v: {},
},
count: 1,
}
continue
}
if len(vc.values) > 100 {
// Too many unique values found for the given tag.
// Do not make a filter on such values, since it may slow down
// search for matching time series.
continue
}
vc.count++
if _, ok := vc.values[string(tag.Value)]; !ok {
vc.values[string(tag.Value)] = struct{}{}
}
}
}
lfs := make([]metricsql.LabelFilter, 0, len(m))
var values []string
for k, vc := range m {
if vc.count != len(tss) {
// Skip the tag, since it doesn't belong to all the time series.
continue
}
values = values[:0]
for s := range vc.values {
values = append(values, s)
}
lf := metricsql.LabelFilter{
Label: k,
}
if len(values) == 1 {
lf.Value = values[0]
} else {
sort.Strings(values)
lf.Value = joinRegexpValues(values)
lf.IsRegexp = true
}
lfs = append(lfs, lf)
}
sort.Slice(lfs, func(i, j int) bool {
return lfs[i].Label < lfs[j].Label
})
return lfs
}
func joinRegexpValues(a []string) string {
var b []byte
for i, s := range a {
sQuoted := regexp.QuoteMeta(s)
b = append(b, sQuoted...)
if i < len(a)-1 {
b = append(b, '|')
}
}
return string(b)
}
func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) {
if len(ae.Args) != 1 {
return nil, nil
}
e := ae.Args[0]
// Make sure e contains one of the following:
// - metricExpr
// - metricExpr[d]
// - rollupFunc(metricExpr)
// - rollupFunc(metricExpr[d])
if me, ok := e.(*metricsql.MetricExpr); ok {
// e = metricExpr
if me.IsEmpty() {
return nil, nil
}
fe := &metricsql.FuncExpr{
Name: "default_rollup",
Args: []metricsql.Expr{me},
}
nrf := getRollupFunc(fe.Name)
return fe, nrf
}
if re, ok := e.(*metricsql.RollupExpr); ok {
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
return nil, nil
}
// e = metricExpr[d]
fe := &metricsql.FuncExpr{
Name: "default_rollup",
Args: []metricsql.Expr{re},
}
nrf := getRollupFunc(fe.Name)
return fe, nrf
}
fe, ok := e.(*metricsql.FuncExpr)
if !ok {
return nil, nil
}
nrf := getRollupFunc(fe.Name)
if nrf == nil {
return nil, nil
}
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
if rollupArgIdx >= len(fe.Args) {
// Incorrect number of args for rollup func.
return nil, nil
}
arg := fe.Args[rollupArgIdx]
if me, ok := arg.(*metricsql.MetricExpr); ok {
if me.IsEmpty() {
return nil, nil
}
// e = rollupFunc(metricExpr)
return &metricsql.FuncExpr{
Name: fe.Name,
Args: []metricsql.Expr{me},
}, nrf
}
if re, ok := arg.(*metricsql.RollupExpr); ok {
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
return nil, nil
}
// e = rollupFunc(metricExpr[d])
return fe, nrf
}
return nil, nil
}
func evalExprsSequentially(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
var rvs [][]*timeseries
for _, e := range es {
rv, err := evalExpr(qt, ec, e)
if err != nil {
return nil, err
}
rvs = append(rvs, rv)
}
return rvs, nil
}
func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
if len(es) < 2 {
return evalExprsSequentially(qt, ec, es)
}
rvs := make([][]*timeseries, len(es))
errs := make([]error, len(es))
qt.Printf("eval function args in parallel")
var wg sync.WaitGroup
for i, e := range es {
wg.Add(1)
qtChild := qt.NewChild("eval arg %d", i)
go func(e metricsql.Expr, i int) {
defer func() {
qtChild.Done()
wg.Done()
}()
rv, err := evalExpr(qtChild, ec, e)
rvs[i] = rv
errs[i] = err
}(e, i)
}
wg.Wait()
for _, err := range errs {
if err != nil {
return nil, err
}
}
return rvs, nil
}
func evalRollupFuncArgs(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) {
var re *metricsql.RollupExpr
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
if len(fe.Args) <= rollupArgIdx {
return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil))
}
args := make([]interface{}, len(fe.Args))
for i, arg := range fe.Args {
if i == rollupArgIdx {
re = getRollupExprArg(arg)
args[i] = re
continue
}
ts, err := evalExpr(qt, ec, arg)
if err != nil {
return nil, nil, fmt.Errorf("cannot evaluate arg #%d for %q: %w", i+1, fe.AppendString(nil), err)
}
args[i] = ts
}
return args, re, nil
}
func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
re, ok := arg.(*metricsql.RollupExpr)
if !ok {
// Wrap non-rollup arg into metricsql.RollupExpr.
return &metricsql.RollupExpr{
Expr: arg,
}
}
if !re.ForSubquery() {
// Return standard rollup if it doesn't contain subquery.
return re
}
me, ok := re.Expr.(*metricsql.MetricExpr)
if !ok {
// arg contains subquery.
return re
}
// Convert me[w:step] -> default_rollup(me)[w:step]
reNew := *re
reNew.Expr = &metricsql.FuncExpr{
Name: "default_rollup",
Args: []metricsql.Expr{
&metricsql.RollupExpr{Expr: me},
},
}
return &reNew
}
// expr may contain:
// - rollupFunc(m) if iafc is nil
// - aggrFunc(rollupFunc(m)) if iafc isn't nil
func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr,
re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
if re.At == nil {
return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc)
}
tssAt, err := evalExpr(qt, ec, re.At)
if err != nil {
return nil, &UserReadableError{
Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err),
}
}
if len(tssAt) != 1 {
return nil, &UserReadableError{
Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)),
}
}
atTimestamp := int64(tssAt[0].Values[0] * 1000)
ecNew := copyEvalConfig(ec)
ecNew.Start = atTimestamp
ecNew.End = atTimestamp
tss, err := evalRollupFuncWithoutAt(qt, ecNew, funcName, rf, expr, re, iafc)
if err != nil {
return nil, err
}
// expand single-point tss to the original time range.
timestamps := ec.getSharedTimestamps()
for _, ts := range tss {
v := ts.Values[0]
values := make([]float64, len(timestamps))
for i := range timestamps {
values[i] = v
}
ts.Timestamps = timestamps
ts.Values = values
}
return tss, nil
}
func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
funcName = strings.ToLower(funcName)
ecNew := ec
var offset int64
if re.Offset != nil {
offset = re.Offset.Duration(ec.Step)
ecNew = copyEvalConfig(ecNew)
ecNew.Start -= offset
ecNew.End -= offset
// There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true,
// since the time range alignment has been already performed by the caller,
// so cache hit rate should be quite good.
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
}
if funcName == "rollup_candlestick" {
// Automatically apply `offset -step` to `rollup_candlestick` function
// in order to obtain expected OHLC results.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
step := ecNew.Step
ecNew = copyEvalConfig(ecNew)
ecNew.Start += step
ecNew.End += step
offset -= step
}
var rvs []*timeseries
var err error
if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window)
} else {
if iafc != nil {
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil))
}
rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re)
}
if err != nil {
return nil, &UserReadableError{
Err: err,
}
}
if funcName == "absent_over_time" {
rvs = aggregateAbsentOverTime(ec, re.Expr, rvs)
}
if offset != 0 && len(rvs) > 0 {
// Make a copy of timestamps, since they may be used in other values.
srcTimestamps := rvs[0].Timestamps
dstTimestamps := append([]int64{}, srcTimestamps...)
for i := range dstTimestamps {
dstTimestamps[i] += offset
}
for _, ts := range rvs {
ts.Timestamps = dstTimestamps
}
}
return rvs, nil
}
// aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values.
//
// Values for returned series are set to nan if at least a single tss series contains nan at that point.
// This means that tss contains a series with non-empty results at that point.
// This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130
func aggregateAbsentOverTime(ec *EvalConfig, expr metricsql.Expr, tss []*timeseries) []*timeseries {
rvs := getAbsentTimeseries(ec, expr)
if len(tss) == 0 {
return rvs
}
for i := range tss[0].Values {
for _, ts := range tss {
if math.IsNaN(ts.Values[i]) {
rvs[0].Values[i] = nan
break
}
}
}
return rvs
}
func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
// TODO: determine whether to use rollupResultCacheV here.
qt = qt.NewChild("subquery")
defer qt.Done()
step, err := re.Step.NonNegativeDuration(ec.Step)
if err != nil {
return nil, fmt.Errorf("cannot parse step in square brackets at %s: %w", expr.AppendString(nil), err)
}
if step == 0 {
step = ec.Step
}
window, err := re.Window.NonNegativeDuration(ec.Step)
if err != nil {
return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
}
ecSQ := copyEvalConfig(ec)
ecSQ.Start -= window + maxSilenceInterval + step
ecSQ.End += step
ecSQ.Step = step
ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
if err := ValidateMaxPointsPerSeries(ecSQ.Start, ecSQ.End, ecSQ.Step, ecSQ.MaxPointsPerSeries); err != nil {
return nil, fmt.Errorf("%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)", err)
}
// unconditionally align start and end args to step for subquery as Prometheus does.
ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step)
tssSQ, err := evalExpr(qt, ecSQ, re.Expr)
if err != nil {
return nil, err
}
if len(tssSQ) == 0 {
return nil, nil
}
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
if err != nil {
return nil, err
}
var samplesScannedTotal uint64
keepMetricNames := getKeepMetricNames(expr)
tsw := getTimeseriesByWorkerID()
seriesByWorkerID := tsw.byWorkerID
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
preFunc(values, timestamps)
for _, rc := range rcs {
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
continue
}
var ts timeseries
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
}
return values, timestamps
})
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
for i := range seriesByWorkerID {
tss = append(tss, seriesByWorkerID[i].tss...)
}
putTimeseriesByWorkerID(tsw)
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
return tss, nil
}
var rowsScannedPerQuery = metrics.NewHistogram(`vm_rows_scanned_per_query`)
func getKeepMetricNames(expr metricsql.Expr) bool {
if ae, ok := expr.(*metricsql.AggrFuncExpr); ok {
// Extract rollupFunc(...) from aggrFunc(rollupFunc(...)).
// This case is possible when optimized aggrFunc calculations are used
// such as `sum(rate(...))`
if len(ae.Args) != 1 {
return false
}
expr = ae.Args[0]
}
if fe, ok := expr.(*metricsql.FuncExpr); ok {
return fe.KeepMetricNames
}
return false
}
func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64)) {
workers := netstorage.MaxWorkers()
if workers > len(tss) {
workers = len(tss)
}
seriesPerWorker := (len(tss) + workers - 1) / workers
workChs := make([]chan *timeseries, workers)
for i := range workChs {
workChs[i] = make(chan *timeseries, seriesPerWorker)
}
for i, ts := range tss {
idx := i % len(workChs)
workChs[idx] <- ts
}
for _, workCh := range workChs {
close(workCh)
}
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(workerID uint) {
defer wg.Done()
var tmpValues []float64
var tmpTimestamps []int64
for ts := range workChs[workerID] {
tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, workerID)
}
}(uint(i))
}
wg.Wait()
}
func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float64, timestamps []int64) ([]float64, []int64) {
hasNan := false
for _, v := range values {
if math.IsNaN(v) {
hasNan = true
}
}
if !hasNan {
// Fast path - no NaNs.
dstValues = append(dstValues, values...)
dstTimestamps = append(dstTimestamps, timestamps...)
return dstValues, dstTimestamps
}
// Slow path - remove NaNs.
for i, v := range values {
if math.IsNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, timestamps[i])
}
return dstValues, dstTimestamps
}
// minWindowForInstantRollupOptimization is the minimum lookbehind window in milliseconds
// for enabling smart caching of instant rollup function results.
const minWindowForInstantRollupOptimization = 24 * 3600 * 1000
// evalInstantRollup evaluates instant rollup where ec.Start == ec.End.
func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) ([]*timeseries, error) {
if ec.Start != ec.End {
logger.Panicf("BUG: evalInstantRollup cannot be called on non-empty time range; got %s", ec.timeRangeString())
}
timestamp := ec.Start
if qt.Enabled() {
qt = qt.NewChild("instant rollup %s; time=%s, window=%d", expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
defer qt.Done()
}
evalAt := func(qt *querytracer.Tracer, timestamp, window int64) ([]*timeseries, error) {
ecCopy := copyEvalConfig(ec)
ecCopy.Start = timestamp
ecCopy.End = timestamp
pointsPerSeries := int64(1)
return evalRollupFuncNoCache(qt, ecCopy, funcName, rf, expr, me, iafc, window, pointsPerSeries)
}
tooBigOffset := func(offset int64) bool {
maxOffset := window / 2
if maxOffset > 3600*1000 {
maxOffset = 3600 * 1000
}
return offset >= maxOffset
}
if !ec.mayCache() {
qt.Printf("do not apply instant rollup optimization because of disabled cache")
return evalAt(qt, timestamp, window)
}
if window < minWindowForInstantRollupOptimization {
qt.Printf("do not apply instant rollup optimization because of too small window=%d; must be equal or bigger than %d", window, minWindowForInstantRollupOptimization)
return evalAt(qt, timestamp, window)
}
switch funcName {
case "avg_over_time":
if iafc != nil {
qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name)
return evalAt(qt, timestamp, window)
}
qt.Printf("optimized calculation for instant rollup avg_over_time(m[d]) as (sum_over_time(m[d]) / count_over_time(m[d]))")
fe := expr.(*metricsql.FuncExpr)
feSum := *fe
feSum.Name = "sum_over_time"
feCount := *fe
feCount.Name = "count_over_time"
be := &metricsql.BinaryOpExpr{
Op: "/",
KeepMetricNames: fe.KeepMetricNames,
Left: &feSum,
Right: &feCount,
}
return evalExpr(qt, ec, be)
case "rate":
if iafc != nil {
if strings.ToLower(iafc.ae.Name) != "sum" {
qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name)
return evalAt(qt, timestamp, window)
}
qt.Printf("optimized calculation for sum(rate(m[d])) as (sum(increase(m[d])) / d)")
afe := expr.(*metricsql.AggrFuncExpr)
fe := afe.Args[0].(*metricsql.FuncExpr)
feIncrease := *fe
feIncrease.Name = "increase"
re := fe.Args[0].(*metricsql.RollupExpr)
d := re.Window.Duration(ec.Step)
if d == 0 {
d = ec.Step
}
afeIncrease := *afe
afeIncrease.Args = []metricsql.Expr{&feIncrease}
be := &metricsql.BinaryOpExpr{
Op: "/",
KeepMetricNames: true,
Left: &afeIncrease,
Right: &metricsql.NumberExpr{
N: float64(d) / 1000,
},
}
return evalExpr(qt, ec, be)
}
qt.Printf("optimized calculation for instant rollup rate(m[d]) as (increase(m[d]) / d)")
fe := expr.(*metricsql.FuncExpr)
feIncrease := *fe
feIncrease.Name = "increase"
re := fe.Args[0].(*metricsql.RollupExpr)
d := re.Window.Duration(ec.Step)
if d == 0 {
d = ec.Step
}
be := &metricsql.BinaryOpExpr{
Op: "/",
KeepMetricNames: fe.KeepMetricNames,
Left: &feIncrease,
Right: &metricsql.NumberExpr{
N: float64(d) / 1000,
},
}
return evalExpr(qt, ec, be)
case "count_over_time", "sum_over_time", "increase":
if iafc != nil && strings.ToLower(iafc.ae.Name) != "sum" {
qt.Printf("do not apply instant rollup optimization for non-sum incremental aggregate %s()", iafc.ae.Name)
return evalAt(qt, timestamp, window)
}
// Calculate
//
// rf(m[window] @ timestamp)
//
// as
//
// rf(m[window] @ (timestamp-offset)) + rf(m[offset] @ timestamp) - rf(m[offset] @ (timestamp-window))
//
// where
//
// - rf is count_over_time, sum_over_time or increase
// - rf(m[window] @ (timestamp-offset)) is obtained from cache
// - rf(m[offset] @ timestamp) and rf(m[offset] @ (timestamp-window)) are calculated from the storage
// These rollups are calculated faster than rf(m[window]) because offset is smaller than window.
qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
defer qtChild.Done()
again:
offset := int64(0)
tssCached := rollupResultCacheV.GetInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss)
ec.QueryStats.addSeriesFetched(len(tssCached))
if len(tssCached) == 0 {
// Cache miss. Re-populate it
start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds()
offset = timestamp - start
if offset < 0 {
start = timestamp
offset = 0
}
if tooBigOffset(offset) {
qtChild.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+
"for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window)
return evalAt(qtChild, timestamp, window)
}
qtChild.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start))
tss, err := evalAt(qtChild, start, window)
if err != nil {
return nil, err
}
rollupResultCacheV.PutInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss, tss)
tssCached = tss
} else {
offset = timestamp - tssCached[0].Timestamps[0]
if offset < 0 {
qtChild.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s",
storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp))
// Delete the outdated cached values, so the cache could be re-populated with newer values.
rollupResultCacheV.DeleteInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss)
goto again
}
if tooBigOffset(offset) {
qtChild.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+
"and the cached values is too big comparing to window=%d", offset, window)
// Delete the outdated cached values, so the cache could be re-populated with newer values.
rollupResultCacheV.DeleteInstantValues(qtChild, expr, window, ec.Step, ec.EnforcedTagFilterss)
goto again
}
}
if offset == 0 {
qtChild.Printf("return cached values, since they have the requested timestamp=%s", storage.TimestampToHumanReadableFormat(timestamp))
return tssCached, nil
}
// Calculate count_over_time(m[offset] @ timestamp)
tssStart, err := evalAt(qtChild, timestamp, offset)
if err != nil {
return nil, err
}
// Calculate count_over_time(m[offset] @ (timestamp - window))
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
if err != nil {
return nil, err
}
tss, err := mergeInstantValues(qtChild, tssCached, tssStart, tssEnd)
if err != nil {
return nil, fmt.Errorf("cannot merge instant series: %w", err)
}
return tss, nil
default:
qt.Printf("instant rollup optimization isn't implemented for %s()", funcName)
return evalAt(qt, timestamp, window)
}
}
// mergeInstantValues calculates tssCached + tssStart - tssEnd
func mergeInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, error) {
qt = qt.NewChild("merge instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
defer qt.Done()
assertInstantValues(tssCached)
assertInstantValues(tssStart)
assertInstantValues(tssEnd)
m := make(map[string]*timeseries, len(tssCached))
bb := bbPool.Get()
defer bbPool.Put(bb)
for _, ts := range tssCached {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if tsExisting := m[string(bb.B)]; tsExisting != nil {
return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName)
}
m[string(bb.B)] = ts
}
for _, ts := range tssStart {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
tsCached := m[string(bb.B)]
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
if !math.IsNaN(ts.Values[0]) {
tsCached.Values[0] += ts.Values[0]
}
} else {
m[string(bb.B)] = ts
}
}
for _, ts := range tssEnd {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
tsCached := m[string(bb.B)]
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
if !math.IsNaN(ts.Values[0]) {
tsCached.Values[0] -= ts.Values[0]
}
}
}
rvs := make([]*timeseries, 0, len(m))
for _, ts := range m {
rvs = append(rvs, ts)
}
qt.Printf("resulting series=%d", len(rvs))
return rvs, nil
}
func assertInstantValues(tss []*timeseries) {
for _, ts := range tss {
if len(ts.Values) != 1 {
logger.Panicf("BUG: instant series must contain a single value; got %d values", len(ts.Values))
}
if len(ts.Timestamps) != 1 {
logger.Panicf("BUG: instant series must contain a single timestamp; got %d timestamps", len(ts.Timestamps))
}
}
}
var (
rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`)
rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`)
rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
memoryIntensiveQueries = metrics.NewCounter(`vm_memory_intensive_queries_total`)
)
func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) {
window, err := windowExpr.NonNegativeDuration(ec.Step)
if err != nil {
return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
}
if me.IsEmpty() {
return evalNumber(ec, nan), nil
}
if ec.Start == ec.End {
rvs, err := evalInstantRollup(qt, ec, funcName, rf, expr, me, iafc, window)
if err != nil {
err = &UserReadableError{
Err: err,
}
return nil, err
}
return rvs, nil
}
// Search for partial results in cache.
tssCached, start := rollupResultCacheV.GetSeries(qt, ec, expr, window)
ec.QueryStats.addSeriesFetched(len(tssCached))
if start > ec.End {
// The result is fully cached.
rollupResultCacheFullHits.Inc()
return tssCached, nil
}
if start > ec.Start {
rollupResultCachePartialHits.Inc()
} else {
rollupResultCacheMiss.Inc()
}
ecCopy := copyEvalConfig(ec)
ecCopy.Start = start
pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step
tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries)
if err != nil {
err = &UserReadableError{
Err: err,
}
return nil, err
}
rvs, err := mergeTimeseries(qt, tssCached, tss, start, ec)
if err != nil {
return nil, fmt.Errorf("cannot merge series: %w", err)
}
if tss != nil {
rollupResultCacheV.PutSeries(qt, ec, expr, window, tss)
}
return rvs, nil
}
// evalRollupFuncNoCache calculates the given rf with the given lookbehind window.
//
// pointsPerSeries is used only for estimating the needed memory for query processing
func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, pointsPerSeries int64) ([]*timeseries, error) {
if qt.Enabled() {
qt = qt.NewChild("rollup %s: timeRange=%s, step=%d, window=%d", expr.AppendString(nil), ec.timeRangeString(), ec.Step, window)
defer qt.Done()
}
if window <= 0 {
return nil, nil
}
// Obtain rollup configs before fetching data from db, so type errors could be caught earlier.
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
if err != nil {
return nil, err
}
// Fetch the remaining part of the result.
tfss := searchutils.ToTagFilterss(me.LabelFilterss)
tfss = searchutils.JoinTagFilterss(tfss, ec.EnforcedTagFilterss)
minTimestamp := ec.Start
if needSilenceIntervalForRollupFunc(funcName) {
minTimestamp -= maxSilenceInterval
}
if window > ec.Step {
minTimestamp -= window
} else {
minTimestamp -= ec.Step
}
if minTimestamp < 0 {
minTimestamp = 0
}
sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries)
rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline)
if err != nil {
return nil, err
}
rssLen := rss.Len()
if rssLen == 0 {
rss.Cancel()
return nil, nil
}
ec.QueryStats.addSeriesFetched(rssLen)
// Verify timeseries fit available memory during rollup calculations.
timeseriesLen := rssLen
if iafc != nil {
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
timeseriesLen = cgroup.AvailableCPUs()
if iafc.ae.Modifier.Op != "" {
if iafc.ae.Limit > 0 {
// There is an explicit limit on the number of output time series.
timeseriesLen *= iafc.ae.Limit
} else {
// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
// since each group can have own set of time series in memory.
timeseriesLen *= 1000
}
}
// The maximum number of output time series is limited by rssLen.
if timeseriesLen > rssLen {
timeseriesLen = rssLen
}
}
rollupPoints := mulNoOverflow(pointsPerSeries, int64(timeseriesLen*len(rcs)))
rollupMemorySize := sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16))
if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory {
memoryIntensiveQueries.Inc()
requestURI := ec.GetRequestURI()
logger.Warnf("remoteAddr=%s, requestURI=%s: the %s requires %d bytes of memory for processing; "+
"logging this query, since it exceeds the -search.logQueryMemoryUsage=%d; "+
"the query selects %d time series and generates %d points across all the time series; try reducing the number of selected time series",
ec.QuotedRemoteAddr, requestURI, expr.AppendString(nil), rollupMemorySize, maxMemory, timeseriesLen*len(rcs), rollupPoints)
}
if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory {
rss.Cancel()
err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+
"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
"increasing -search.maxMemoryPerQuery",
expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3)
return nil, err
}
rml := getRollupMemoryLimiter()
if !rml.Get(uint64(rollupMemorySize)) {
rss.Cancel()
err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+
"total available memory for concurrent requests: %d bytes; requested memory: %d bytes; "+
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
"switching to node with more RAM; increasing -memory.allowedPercent",
expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3)
return nil, err
}
defer rml.Put(uint64(rollupMemorySize))
qt.Printf("the rollup evaluation needs an estimated %d bytes of RAM for %d series and %d points per series (summary %d points)",
rollupMemorySize, timeseriesLen, pointsPerSeries, rollupPoints)
// Evaluate rollup
keepMetricNames := getKeepMetricNames(expr)
if iafc != nil {
return evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps)
}
return evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)
}
var (
rollupMemoryLimiter memoryLimiter
rollupMemoryLimiterOnce sync.Once
)
func getRollupMemoryLimiter() *memoryLimiter {
rollupMemoryLimiterOnce.Do(func() {
rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 4
})
return &rollupMemoryLimiter
}
func needSilenceIntervalForRollupFunc(funcName string) bool {
// All rollup the functions, which do not rely on the previous sample
// before the lookbehind window (aka prevValue), do not need silence interval.
switch strings.ToLower(funcName) {
case
"absent_over_time",
"avg_over_time",
"count_eq_over_time",
"count_gt_over_time",
"count_le_over_time",
"count_ne_over_time",
"count_over_time",
"default_rollup",
"first_over_time",
"histogram_over_time",
"hoeffding_bound_lower",
"hoeffding_bound_upper",
"last_over_time",
"mad_over_time",
"max_over_time",
"median_over_time",
"min_over_time",
"predict_linear",
"present_over_time",
"quantile_over_time",
"quantiles_over_time",
"range_over_time",
"share_gt_over_time",
"share_le_over_time",
"share_eq_over_time",
"stale_samples_over_time",
"stddev_over_time",
"stdvar_over_time",
"sum_over_time",
"tfirst_over_time",
"timestamp",
"timestamp_with_name",
"tlast_over_time",
"tmax_over_time",
"tmin_over_time",
"zscore_over_time":
return false
default:
return true
}
}
func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool,
iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs)
defer qt.Done()
var samplesScannedTotal uint64
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
preFunc(rs.Values, rs.Timestamps)
ts := getTimeseries()
defer putTimeseries(ts)
for _, rc := range rcs {
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
for _, ts := range tsm.m {
iafc.updateTimeseries(ts, workerID)
}
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
continue
}
ts.Reset()
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
iafc.updateTimeseries(ts, workerID)
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
ts.Timestamps = nil
ts.denyReuse = false
}
return nil
})
if err != nil {
return nil, err
}
tss := iafc.finalizeTimeseries()
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal)
return tss, nil
}
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
defer qt.Done()
var samplesScannedTotal uint64
tsw := getTimeseriesByWorkerID()
seriesByWorkerID := tsw.byWorkerID
seriesLen := rss.Len()
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
preFunc(rs.Values, rs.Timestamps)
for _, rc := range rcs {
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
continue
}
var ts timeseries
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
}
return nil
})
if err != nil {
return nil, err
}
tss := make([]*timeseries, 0, seriesLen*len(rcs))
for i := range seriesByWorkerID {
tss = append(tss, seriesByWorkerID[i].tss...)
}
putTimeseriesByWorkerID(tsw)
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("samplesScanned=%d", samplesScannedTotal)
return tss, nil
}
func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName,
valuesSrc []float64, timestampsSrc []int64, sharedTimestamps []int64) uint64 {
tsDst.MetricName.CopyFrom(mnSrc)
if len(rc.TagValue) > 0 {
tsDst.MetricName.AddTag("rollup", rc.TagValue)
}
if !keepMetricNames && !rollupFuncsKeepMetricName[funcName] {
tsDst.MetricName.ResetMetricGroup()
}
var samplesScanned uint64
tsDst.Values, samplesScanned = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc)
tsDst.Timestamps = sharedTimestamps
tsDst.denyReuse = true
return samplesScanned
}
type timeseriesWithPadding struct {
tss []*timeseries
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof([]*timeseries{})%128]byte
}
type timeseriesByWorkerID struct {
byWorkerID []timeseriesWithPadding
}
func (tsw *timeseriesByWorkerID) reset() {
byWorkerID := tsw.byWorkerID
for i := range byWorkerID {
byWorkerID[i].tss = nil
}
}
func getTimeseriesByWorkerID() *timeseriesByWorkerID {
v := timeseriesByWorkerIDPool.Get()
if v == nil {
return &timeseriesByWorkerID{
byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()),
}
}
return v.(*timeseriesByWorkerID)
}
func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) {
tsw.reset()
timeseriesByWorkerIDPool.Put(tsw)
}
var timeseriesByWorkerIDPool sync.Pool
var bbPool bytesutil.ByteBufferPool
func evalNumber(ec *EvalConfig, n float64) []*timeseries {
var ts timeseries
ts.denyReuse = true
timestamps := ec.getSharedTimestamps()
values := make([]float64, len(timestamps))
for i := range timestamps {
values[i] = n
}
ts.Values = values
ts.Timestamps = timestamps
return []*timeseries{&ts}
}
func evalString(ec *EvalConfig, s string) []*timeseries {
rv := evalNumber(ec, nan)
rv[0].MetricName.MetricGroup = append(rv[0].MetricName.MetricGroup[:0], s...)
return rv
}
func evalTime(ec *EvalConfig) []*timeseries {
rv := evalNumber(ec, nan)
timestamps := rv[0].Timestamps
values := rv[0].Values
for i, ts := range timestamps {
values[i] = float64(ts) / 1e3
}
return rv
}
func mulNoOverflow(a, b int64) int64 {
if math.MaxInt64/b < a {
// Overflow
return math.MaxInt64
}
return a * b
}
func sumNoOverflow(a, b int64) int64 {
if math.MaxInt64-a < b {
// Overflow
return math.MaxInt64
}
return a + b
}
func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) {
if *noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" {
// Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function,
// since it uses them for Prometheus-style staleness detection.
// Do not drop staleness marks for stale_samples_over_time() function, since it needs
// to calculate the number of staleness markers.
return values, timestamps
}
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
hasStaleSamples := false
for _, v := range values {
if decimal.IsStaleNaN(v) {
hasStaleSamples = true
break
}
}
if !hasStaleSamples {
// Fast path: values have no Prometheus staleness marks.
return values, timestamps
}
// Slow path: drop Prometheus staleness marks from values.
dstValues := values[:0]
dstTimestamps := timestamps[:0]
for i, v := range values {
if decimal.IsStaleNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, timestamps[i])
}
return dstValues, dstTimestamps
}