VictoriaMetrics/app/vmselect/promql/eval.go
Aliaksandr Valialkin 024e2f18da
app/vmselect/promql: evaluate union() args in parallel in order to increase query performance
Note that the parallel execution of `union()` args may take more memory and CPU time
than the sequential execution if args contain heavy queries, which may load all the available CPU,
disk and memory resources and vmselect and vmstorage levels.
2022-09-02 21:01:04 +03:00

1286 lines
40 KiB
Go

package promql
import (
"flag"
"fmt"
"math"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
)
var (
disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful during data backfilling")
maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+
"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3")
noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets")
)
// The minimum number of points per timeseries for enabling time rounding.
// This improves cache hit ratio for frequently requested queries over
// big time ranges.
const minTimeseriesPointsForTimeRounding = 50
// ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints.
func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error {
if step == 0 {
return fmt.Errorf("step can't be equal to zero")
}
points := (end-start)/step + 1
if points > int64(maxPoints) {
return fmt.Errorf("too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d (see -search.maxPoints* command-line flags)",
start, end, step, points, maxPoints)
}
return nil
}
// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
//
// See EvalConfig.mayCache for details.
func AdjustStartEnd(start, end, step int64) (int64, int64) {
if *disableCache {
// Do not adjust start and end values when cache is disabled.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563
return start, end
}
points := (end-start)/step + 1
if points < minTimeseriesPointsForTimeRounding {
// Too small number of points for rounding.
return start, end
}
// Round start and end to values divisible by step in order
// to enable response caching (see EvalConfig.mayCache).
start, end = alignStartEnd(start, end, step)
// Make sure that the new number of points is the same as the initial number of points.
newPoints := (end-start)/step + 1
for newPoints > points {
end -= step
newPoints--
}
return start, end
}
func alignStartEnd(start, end, step int64) (int64, int64) {
// Round start to the nearest smaller value divisible by step.
start -= start % step
// Round end to the nearest bigger value divisible by step.
adjust := end % step
if adjust > 0 {
end += step - adjust
}
return start, end
}
// EvalConfig is the configuration required for query evaluation via Exec
type EvalConfig struct {
AuthToken *auth.Token
Start int64
End int64
Step int64
// MaxSeries is the maximum number of time series, which can be scanned by the query.
// Zero means 'no limit'
MaxSeries int
// MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series.
MaxPointsPerSeries int
// QuotedRemoteAddr contains quoted remote address.
QuotedRemoteAddr string
Deadline searchutils.Deadline
// Whether the response can be cached.
MayCache bool
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
// How many decimal digits after the point to leave in response.
RoundDigits int
// EnforcedTagFilterss may contain additional label filters to use in the query.
EnforcedTagFilterss [][]storage.TagFilter
// Whether to deny partial response.
DenyPartialResponse bool
// IsPartialResponse is set during query execution and can be used by Exec caller after query execution.
IsPartialResponse bool
timestamps []int64
timestampsOnce sync.Once
}
// copyEvalConfig returns src copy.
func copyEvalConfig(src *EvalConfig) *EvalConfig {
var ec EvalConfig
ec.AuthToken = src.AuthToken
ec.Start = src.Start
ec.End = src.End
ec.Step = src.Step
ec.MaxSeries = src.MaxSeries
ec.MaxPointsPerSeries = src.MaxPointsPerSeries
ec.Deadline = src.Deadline
ec.MayCache = src.MayCache
ec.LookbackDelta = src.LookbackDelta
ec.RoundDigits = src.RoundDigits
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
ec.DenyPartialResponse = src.DenyPartialResponse
ec.IsPartialResponse = src.IsPartialResponse
// do not copy src.timestamps - they must be generated again.
return &ec
}
func (ec *EvalConfig) updateIsPartialResponse(isPartialResponse bool) {
if !ec.IsPartialResponse {
ec.IsPartialResponse = isPartialResponse
}
}
func (ec *EvalConfig) validate() {
if ec.Start > ec.End {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End)
}
if ec.Step <= 0 {
logger.Panicf("BUG: step must be greater than 0; got %d", ec.Step)
}
}
func (ec *EvalConfig) mayCache() bool {
if *disableCache {
return false
}
if !ec.MayCache {
return false
}
if ec.Start%ec.Step != 0 {
return false
}
if ec.End%ec.Step != 0 {
return false
}
return true
}
func (ec *EvalConfig) timeRangeString() string {
start := storage.TimestampToHumanReadableFormat(ec.Start)
end := storage.TimestampToHumanReadableFormat(ec.End)
return fmt.Sprintf("[%s..%s]", start, end)
}
func (ec *EvalConfig) getSharedTimestamps() []int64 {
ec.timestampsOnce.Do(ec.timestampsInit)
return ec.timestamps
}
func (ec *EvalConfig) timestampsInit() {
ec.timestamps = getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
}
func getTimestamps(start, end, step int64, maxPointsPerSeries int) []int64 {
// Sanity checks.
if step <= 0 {
logger.Panicf("BUG: Step must be bigger than 0; got %d", step)
}
if start > end {
logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", start, end)
}
if err := ValidateMaxPointsPerSeries(start, end, step, maxPointsPerSeries); err != nil {
logger.Panicf("BUG: %s; this must be validated before the call to getTimestamps", err)
}
// Prepare timestamps.
points := 1 + (end-start)/step
timestamps := make([]int64, points)
for i := range timestamps {
timestamps[i] = start
start += step
}
return timestamps
}
func evalExpr(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
if qt.Enabled() {
query := string(e.AppendString(nil))
query = bytesutil.LimitStringLen(query, 300)
mayCache := ec.mayCache()
qt = qt.NewChild("eval: query=%s, timeRange=%s, step=%d, mayCache=%v", query, ec.timeRangeString(), ec.Step, mayCache)
}
rv, err := evalExprInternal(qt, ec, e)
if err != nil {
return nil, err
}
if qt.Enabled() {
seriesCount := len(rv)
pointsPerSeries := 0
if len(rv) > 0 {
pointsPerSeries = len(rv[0].Timestamps)
}
pointsCount := seriesCount * pointsPerSeries
qt.Donef("series=%d, points=%d, pointsPerSeries=%d", seriesCount, pointsCount, pointsPerSeries)
}
return rv, nil
}
func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
if me, ok := e.(*metricsql.MetricExpr); ok {
re := &metricsql.RollupExpr{
Expr: me,
}
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, me.AppendString(nil), err)
}
return rv, nil
}
if re, ok := e.(*metricsql.RollupExpr); ok {
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, re.AppendString(nil), err)
}
return rv, nil
}
if fe, ok := e.(*metricsql.FuncExpr); ok {
nrf := getRollupFunc(fe.Name)
if nrf == nil {
qtChild := qt.NewChild("transform %s()", fe.Name)
rv, err := evalTransformFunc(qtChild, ec, fe)
qtChild.Donef("series=%d", len(rv))
return rv, err
}
args, re, err := evalRollupFuncArgs(qt, ec, fe)
if err != nil {
return nil, err
}
rf, err := nrf(args)
if err != nil {
return nil, err
}
rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err)
}
return rv, nil
}
if ae, ok := e.(*metricsql.AggrFuncExpr); ok {
qtChild := qt.NewChild("aggregate %s()", ae.Name)
rv, err := evalAggrFunc(qtChild, ec, ae)
qtChild.Donef("series=%d", len(rv))
return rv, err
}
if be, ok := e.(*metricsql.BinaryOpExpr); ok {
qtChild := qt.NewChild("binary op %q", be.Op)
rv, err := evalBinaryOp(qtChild, ec, be)
qtChild.Donef("series=%d", len(rv))
return rv, err
}
if ne, ok := e.(*metricsql.NumberExpr); ok {
rv := evalNumber(ec, ne.N)
return rv, nil
}
if se, ok := e.(*metricsql.StringExpr); ok {
rv := evalString(ec, se.S)
return rv, nil
}
if de, ok := e.(*metricsql.DurationExpr); ok {
d := de.Duration(ec.Step)
dSec := float64(d) / 1000
rv := evalNumber(ec, dSec)
return rv, nil
}
return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
}
func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) {
tf := getTransformFunc(fe.Name)
if tf == nil {
return nil, &UserReadableError{
Err: fmt.Errorf(`unknown func %q`, fe.Name),
}
}
var args [][]*timeseries
var err error
switch fe.Name {
case "", "union":
args, err = evalExprsInParallel(qt, ec, fe.Args)
default:
args, err = evalExprsSequentially(qt, ec, fe.Args)
}
if err != nil {
return nil, err
}
tfa := &transformFuncArg{
ec: ec,
fe: fe,
args: args,
}
rv, err := tf(tfa)
if err != nil {
return nil, &UserReadableError{
Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err),
}
}
return rv, nil
}
func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFuncExpr) ([]*timeseries, error) {
if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil {
fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae)
if fe != nil {
// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
// The optimized path saves RAM for aggregates over big number of time series.
args, re, err := evalRollupFuncArgs(qt, ec, fe)
if err != nil {
return nil, err
}
rf, err := nrf(args)
if err != nil {
return nil, err
}
iafc := newIncrementalAggrFuncContext(ae, callbacks)
return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc)
}
}
args, err := evalExprsInParallel(qt, ec, ae.Args)
if err != nil {
return nil, err
}
af := getAggrFunc(ae.Name)
if af == nil {
return nil, &UserReadableError{
Err: fmt.Errorf(`unknown func %q`, ae.Name),
}
}
afa := &aggrFuncArg{
ae: ae,
args: args,
ec: ec,
}
rv, err := af(afa)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err)
}
return rv, nil
}
func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, error) {
bf := getBinaryOpFunc(be.Op)
if bf == nil {
return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
}
var err error
var tssLeft, tssRight []*timeseries
switch strings.ToLower(be.Op) {
case "and", "if":
// Fetch right-side series at first, since it usually contains
// lower number of time series for `and` and `if` operator.
// This should produce more specific label filters for the left side of the query.
// This, in turn, should reduce the time to select series for the left side of the query.
tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be)
default:
tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be)
}
if err != nil {
return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
}
bfa := &binaryOpFuncArg{
be: be,
left: tssLeft,
right: tssRight,
}
rv, err := bf(bfa)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, be.AppendString(nil), err)
}
return rv, nil
}
func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool {
switch strings.ToLower(be.Op) {
case "or", "default":
return false
}
if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) {
return false
}
return true
}
func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
afe, ok := e.(*metricsql.AggrFuncExpr)
if !ok {
return false
}
return len(afe.Modifier.Args) == 0
}
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
if !canPushdownCommonFilters(be) {
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
// from exprFirst to exprSecond.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
qt = qt.NewChild("execute left and right sides of %q in parallel", be.Op)
defer qt.Done()
var wg sync.WaitGroup
var tssFirst []*timeseries
var errFirst error
qtFirst := qt.NewChild("expr1")
wg.Add(1)
go func() {
defer wg.Done()
tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst)
qtFirst.Done()
}()
var tssSecond []*timeseries
var errSecond error
qtSecond := qt.NewChild("expr2")
wg.Add(1)
go func() {
defer wg.Done()
tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond)
qtSecond.Done()
}()
wg.Wait()
if errFirst != nil {
return nil, nil, errFirst
}
if errSecond != nil {
return nil, nil, errSecond
}
return tssFirst, tssSecond, nil
}
// Execute binary operation in the following way:
//
// 1) execute the exprFirst
// 2) get common label filters for series returned at step 1
// 3) push down the found common label filters to exprSecond. This filters out unneeded series
// during exprSecond exection instead of spending compute resources on extracting and processing these series
// before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
// 4) execute the exprSecond with possible additional filters found at step 3
//
// Typical use cases:
// - Kubernetes-related: show pod creation time with the node name:
//
// kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
//
// Without the optimization `kube_pod_info` would select and spend compute resources
// for more time series than needed. The selected time series would be dropped later
// when matching time series on the right and left sides of binary operand.
//
// - Generic alerting queries, which rely on `info` metrics.
// See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
//
// - Queries, which get additional labels from `info` metrics.
// See https://www.robustperception.io/exposing-the-software-version-to-prometheus
tssFirst, err := evalExpr(qt, ec, exprFirst)
if err != nil {
return nil, nil, err
}
lfs := getCommonLabelFilters(tssFirst)
lfs = metricsql.TrimFiltersByGroupModifier(lfs, be)
exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs)
tssSecond, err := evalExpr(qt, ec, exprSecond)
if err != nil {
return nil, nil, err
}
return tssFirst, tssSecond, nil
}
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
m := make(map[string][]string)
for _, ts := range tss {
for _, tag := range ts.MetricName.Tags {
m[string(tag.Key)] = append(m[string(tag.Key)], string(tag.Value))
}
}
lfs := make([]metricsql.LabelFilter, 0, len(m))
for key, values := range m {
if len(values) != len(tss) {
// Skip the tag, since it doesn't belong to all the time series.
continue
}
values = getUniqueValues(values)
if len(values) > 1000 {
// Skip the filter on the given tag, since it needs to enumerate too many unique values.
// This may slow down the search for matching time series.
continue
}
lf := metricsql.LabelFilter{
Label: key,
}
if len(values) == 1 {
lf.Value = values[0]
} else {
sort.Strings(values)
lf.Value = joinRegexpValues(values)
lf.IsRegexp = true
}
lfs = append(lfs, lf)
}
sort.Slice(lfs, func(i, j int) bool {
return lfs[i].Label < lfs[j].Label
})
return lfs
}
func getUniqueValues(a []string) []string {
m := make(map[string]struct{}, len(a))
results := make([]string, 0, len(a))
for _, s := range a {
if _, ok := m[s]; !ok {
results = append(results, s)
m[s] = struct{}{}
}
}
return results
}
func joinRegexpValues(a []string) string {
var b []byte
for i, s := range a {
sQuoted := regexp.QuoteMeta(s)
b = append(b, sQuoted...)
if i < len(a)-1 {
b = append(b, '|')
}
}
return string(b)
}
func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) {
if len(ae.Args) != 1 {
return nil, nil
}
e := ae.Args[0]
// Make sure e contains one of the following:
// - metricExpr
// - metricExpr[d]
// - rollupFunc(metricExpr)
// - rollupFunc(metricExpr[d])
if me, ok := e.(*metricsql.MetricExpr); ok {
// e = metricExpr
if me.IsEmpty() {
return nil, nil
}
fe := &metricsql.FuncExpr{
Name: "default_rollup",
Args: []metricsql.Expr{me},
}
nrf := getRollupFunc(fe.Name)
return fe, nrf
}
if re, ok := e.(*metricsql.RollupExpr); ok {
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
return nil, nil
}
// e = metricExpr[d]
fe := &metricsql.FuncExpr{
Name: "default_rollup",
Args: []metricsql.Expr{re},
}
nrf := getRollupFunc(fe.Name)
return fe, nrf
}
fe, ok := e.(*metricsql.FuncExpr)
if !ok {
return nil, nil
}
nrf := getRollupFunc(fe.Name)
if nrf == nil {
return nil, nil
}
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
if rollupArgIdx >= len(fe.Args) {
// Incorrect number of args for rollup func.
return nil, nil
}
arg := fe.Args[rollupArgIdx]
if me, ok := arg.(*metricsql.MetricExpr); ok {
if me.IsEmpty() {
return nil, nil
}
// e = rollupFunc(metricExpr)
return &metricsql.FuncExpr{
Name: fe.Name,
Args: []metricsql.Expr{me},
}, nrf
}
if re, ok := arg.(*metricsql.RollupExpr); ok {
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
return nil, nil
}
// e = rollupFunc(metricExpr[d])
return fe, nrf
}
return nil, nil
}
func evalExprsSequentially(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
var rvs [][]*timeseries
for _, e := range es {
rv, err := evalExpr(qt, ec, e)
if err != nil {
return nil, err
}
rvs = append(rvs, rv)
}
return rvs, nil
}
func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
if len(es) < 2 {
return evalExprsSequentially(qt, ec, es)
}
rvs := make([][]*timeseries, len(es))
errs := make([]error, len(es))
var wg sync.WaitGroup
for i, e := range es {
qt.Printf("eval function args in parallel")
wg.Add(1)
qtChild := qt.NewChild("eval arg %d", i)
go func(e metricsql.Expr, i int) {
defer func() {
qtChild.Done()
wg.Done()
}()
rv, err := evalExpr(qtChild, ec, e)
rvs[i] = rv
errs[i] = err
}(e, i)
}
wg.Wait()
for _, err := range errs {
if err != nil {
return nil, err
}
}
return rvs, nil
}
func evalRollupFuncArgs(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) {
var re *metricsql.RollupExpr
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
if len(fe.Args) <= rollupArgIdx {
return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil))
}
args := make([]interface{}, len(fe.Args))
for i, arg := range fe.Args {
if i == rollupArgIdx {
re = getRollupExprArg(arg)
args[i] = re
continue
}
ts, err := evalExpr(qt, ec, arg)
if err != nil {
return nil, nil, fmt.Errorf("cannot evaluate arg #%d for %q: %w", i+1, fe.AppendString(nil), err)
}
args[i] = ts
}
return args, re, nil
}
func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
re, ok := arg.(*metricsql.RollupExpr)
if !ok {
// Wrap non-rollup arg into metricsql.RollupExpr.
return &metricsql.RollupExpr{
Expr: arg,
}
}
if !re.ForSubquery() {
// Return standard rollup if it doesn't contain subquery.
return re
}
me, ok := re.Expr.(*metricsql.MetricExpr)
if !ok {
// arg contains subquery.
return re
}
// Convert me[w:step] -> default_rollup(me)[w:step]
reNew := *re
reNew.Expr = &metricsql.FuncExpr{
Name: "default_rollup",
Args: []metricsql.Expr{
&metricsql.RollupExpr{Expr: me},
},
}
return &reNew
}
// expr may contain:
// - rollupFunc(m) if iafc is nil
// - aggrFunc(rollupFunc(m)) if iafc isn't nil
func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr,
re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
if re.At == nil {
return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc)
}
tssAt, err := evalExpr(qt, ec, re.At)
if err != nil {
return nil, &UserReadableError{
Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err),
}
}
if len(tssAt) != 1 {
return nil, &UserReadableError{
Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)),
}
}
atTimestamp := int64(tssAt[0].Values[0] * 1000)
ecNew := copyEvalConfig(ec)
ecNew.Start = atTimestamp
ecNew.End = atTimestamp
tss, err := evalRollupFuncWithoutAt(qt, ecNew, funcName, rf, expr, re, iafc)
if err != nil {
return nil, err
}
// expand single-point tss to the original time range.
timestamps := ec.getSharedTimestamps()
for _, ts := range tss {
v := ts.Values[0]
values := make([]float64, len(timestamps))
for i := range timestamps {
values[i] = v
}
ts.Timestamps = timestamps
ts.Values = values
}
return tss, nil
}
func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
funcName = strings.ToLower(funcName)
ecNew := ec
var offset int64
if re.Offset != nil {
offset = re.Offset.Duration(ec.Step)
ecNew = copyEvalConfig(ecNew)
ecNew.Start -= offset
ecNew.End -= offset
// There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true,
// since the time range alignment has been already performed by the caller,
// so cache hit rate should be quite good.
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
}
if funcName == "rollup_candlestick" {
// Automatically apply `offset -step` to `rollup_candlestick` function
// in order to obtain expected OHLC results.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
step := ecNew.Step
ecNew = copyEvalConfig(ecNew)
ecNew.Start += step
ecNew.End += step
offset -= step
}
var rvs []*timeseries
var err error
if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window)
} else {
if iafc != nil {
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil))
}
rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re)
}
if err != nil {
return nil, &UserReadableError{
Err: err,
}
}
if funcName == "absent_over_time" {
rvs = aggregateAbsentOverTime(ec, re.Expr, rvs)
}
ec.updateIsPartialResponse(ecNew.IsPartialResponse)
if offset != 0 && len(rvs) > 0 {
// Make a copy of timestamps, since they may be used in other values.
srcTimestamps := rvs[0].Timestamps
dstTimestamps := append([]int64{}, srcTimestamps...)
for i := range dstTimestamps {
dstTimestamps[i] += offset
}
for _, ts := range rvs {
ts.Timestamps = dstTimestamps
}
}
return rvs, nil
}
// aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values.
//
// Values for returned series are set to nan if at least a single tss series contains nan at that point.
// This means that tss contains a series with non-empty results at that point.
// This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130
func aggregateAbsentOverTime(ec *EvalConfig, expr metricsql.Expr, tss []*timeseries) []*timeseries {
rvs := getAbsentTimeseries(ec, expr)
if len(tss) == 0 {
return rvs
}
for i := range tss[0].Values {
for _, ts := range tss {
if math.IsNaN(ts.Values[i]) {
rvs[0].Values[i] = nan
break
}
}
}
return rvs
}
func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
// TODO: determine whether to use rollupResultCacheV here.
qt = qt.NewChild("subquery")
defer qt.Done()
step := re.Step.Duration(ec.Step)
if step == 0 {
step = ec.Step
}
window := re.Window.Duration(ec.Step)
ecSQ := copyEvalConfig(ec)
ecSQ.Start -= window + maxSilenceInterval + step
ecSQ.End += step
ecSQ.Step = step
ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
if err := ValidateMaxPointsPerSeries(ecSQ.Start, ecSQ.End, ecSQ.Step, ecSQ.MaxPointsPerSeries); err != nil {
return nil, err
}
// unconditionally align start and end args to step for subquery as Prometheus does.
ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step)
tssSQ, err := evalExpr(qt, ecSQ, re.Expr)
if err != nil {
return nil, err
}
ec.updateIsPartialResponse(ecSQ.IsPartialResponse)
if len(tssSQ) == 0 {
return nil, nil
}
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
if err != nil {
return nil, err
}
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
var tssLock sync.Mutex
var samplesScannedTotal uint64
keepMetricNames := getKeepMetricNames(expr)
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) {
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
preFunc(values, timestamps)
for _, rc := range rcs {
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
tssLock.Lock()
tss = tsm.AppendTimeseriesTo(tss)
tssLock.Unlock()
continue
}
var ts timeseries
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
tssLock.Lock()
tss = append(tss, &ts)
tssLock.Unlock()
}
return values, timestamps
})
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
return tss, nil
}
var rowsScannedPerQuery = metrics.NewHistogram(`vm_rows_scanned_per_query`)
func getKeepMetricNames(expr metricsql.Expr) bool {
if ae, ok := expr.(*metricsql.AggrFuncExpr); ok {
// Extract rollupFunc(...) from aggrFunc(rollupFunc(...)).
// This case is possible when optimized aggrFunc calculations are used
// such as `sum(rate(...))`
if len(ae.Args) != 1 {
return false
}
expr = ae.Args[0]
}
if fe, ok := expr.(*metricsql.FuncExpr); ok {
return fe.KeepMetricNames
}
return false
}
func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) {
concurrency := cgroup.AvailableCPUs()
if concurrency > len(tss) {
concurrency = len(tss)
}
workCh := make(chan *timeseries, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
var tmpValues []float64
var tmpTimestamps []int64
for ts := range workCh {
tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps)
}
}()
}
for _, ts := range tss {
workCh <- ts
}
close(workCh)
wg.Wait()
}
func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float64, timestamps []int64) ([]float64, []int64) {
hasNan := false
for _, v := range values {
if math.IsNaN(v) {
hasNan = true
}
}
if !hasNan {
// Fast path - no NaNs.
dstValues = append(dstValues, values...)
dstTimestamps = append(dstTimestamps, timestamps...)
return dstValues, dstTimestamps
}
// Slow path - remove NaNs.
for i, v := range values {
if math.IsNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, timestamps[i])
}
return dstValues, dstTimestamps
}
var (
rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`)
rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`)
rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
)
func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) {
var rollupMemorySize int64
window := windowExpr.Duration(ec.Step)
if qt.Enabled() {
qt = qt.NewChild("rollup %s(): timeRange=%s, step=%d, window=%d", funcName, ec.timeRangeString(), ec.Step, window)
defer func() {
qt.Donef("neededMemoryBytes=%d", rollupMemorySize)
}()
}
if me.IsEmpty() {
return evalNumber(ec, nan), nil
}
// Search for partial results in cache.
tssCached, start := rollupResultCacheV.Get(qt, ec, expr, window)
if start > ec.End {
// The result is fully cached.
rollupResultCacheFullHits.Inc()
return tssCached, nil
}
if start > ec.Start {
rollupResultCachePartialHits.Inc()
} else {
rollupResultCacheMiss.Inc()
}
// Obtain rollup configs before fetching data from db,
// so type errors can be caught earlier.
sharedTimestamps := getTimestamps(start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
if err != nil {
return nil, err
}
// Fetch the remaining part of the result.
tfs := searchutils.ToTagFilters(me.LabelFilters)
tfss := searchutils.JoinTagFilterss([][]storage.TagFilter{tfs}, ec.EnforcedTagFilterss)
minTimestamp := start - maxSilenceInterval
if window > ec.Step {
minTimestamp -= window
} else {
minTimestamp -= ec.Step
}
sq := storage.NewSearchQuery(ec.AuthToken.AccountID, ec.AuthToken.ProjectID, minTimestamp, ec.End, tfss, ec.MaxSeries)
rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline)
if err != nil {
return nil, &UserReadableError{
Err: err,
}
}
ec.updateIsPartialResponse(isPartial)
rssLen := rss.Len()
if rssLen == 0 {
rss.Cancel()
tss := mergeTimeseries(tssCached, nil, start, ec)
return tss, nil
}
// Verify timeseries fit available memory after the rollup.
// Take into account points from tssCached.
pointsPerTimeseries := 1 + (ec.End-ec.Start)/ec.Step
timeseriesLen := rssLen
if iafc != nil {
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
timeseriesLen = cgroup.AvailableCPUs()
if iafc.ae.Modifier.Op != "" {
if iafc.ae.Limit > 0 {
// There is an explicit limit on the number of output time series.
timeseriesLen *= iafc.ae.Limit
} else {
// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
// since each group can have own set of time series in memory.
timeseriesLen *= 1000
}
}
// The maximum number of output time series is limited by rssLen.
if timeseriesLen > rssLen {
timeseriesLen = rssLen
}
}
rollupPoints := mulNoOverflow(pointsPerTimeseries, int64(timeseriesLen*len(rcs)))
rollupMemorySize = mulNoOverflow(rollupPoints, 16)
rml := getRollupMemoryLimiter()
if !rml.Get(uint64(rollupMemorySize)) {
rss.Cancel()
return nil, &UserReadableError{
Err: fmt.Errorf("not enough memory for processing %d data points across %d time series with %d points in each time series; "+
"total available memory for concurrent requests: %d bytes; "+
"requested memory: %d bytes; "+
"possible solutions are: reducing the number of matching time series; switching to node with more RAM; "+
"increasing -memory.allowedPercent; increasing `step` query arg (%gs)",
rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3),
}
}
defer rml.Put(uint64(rollupMemorySize))
// Evaluate rollup
keepMetricNames := getKeepMetricNames(expr)
var tss []*timeseries
if iafc != nil {
tss, err = evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps)
} else {
tss, err = evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)
}
if err != nil {
return nil, &UserReadableError{
Err: err,
}
}
tss = mergeTimeseries(tssCached, tss, start, ec)
if !isPartial {
rollupResultCacheV.Put(qt, ec, expr, window, tss)
}
return tss, nil
}
var (
rollupMemoryLimiter memoryLimiter
rollupMemoryLimiterOnce sync.Once
)
func getRollupMemoryLimiter() *memoryLimiter {
rollupMemoryLimiterOnce.Do(func() {
rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 2
})
return &rollupMemoryLimiter
}
func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool,
iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs)
defer qt.Done()
var samplesScannedTotal uint64
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
preFunc(rs.Values, rs.Timestamps)
ts := getTimeseries()
defer putTimeseries(ts)
for _, rc := range rcs {
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
for _, ts := range tsm.m {
iafc.updateTimeseries(ts, workerID)
}
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
continue
}
ts.Reset()
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
iafc.updateTimeseries(ts, workerID)
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
ts.Timestamps = nil
ts.denyReuse = false
}
return nil
})
if err != nil {
return nil, err
}
tss := iafc.finalizeTimeseries()
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal)
return tss, nil
}
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
defer qt.Done()
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
var tssLock sync.Mutex
var samplesScannedTotal uint64
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
preFunc(rs.Values, rs.Timestamps)
for _, rc := range rcs {
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
tssLock.Lock()
tss = tsm.AppendTimeseriesTo(tss)
tssLock.Unlock()
continue
}
var ts timeseries
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
tssLock.Lock()
tss = append(tss, &ts)
tssLock.Unlock()
}
return nil
})
if err != nil {
return nil, err
}
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("samplesScanned=%d", samplesScannedTotal)
return tss, nil
}
func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName,
valuesSrc []float64, timestampsSrc []int64, sharedTimestamps []int64) uint64 {
tsDst.MetricName.CopyFrom(mnSrc)
if len(rc.TagValue) > 0 {
tsDst.MetricName.AddTag("rollup", rc.TagValue)
}
if !keepMetricNames && !rollupFuncsKeepMetricName[funcName] {
tsDst.MetricName.ResetMetricGroup()
}
var samplesScanned uint64
tsDst.Values, samplesScanned = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc)
tsDst.Timestamps = sharedTimestamps
tsDst.denyReuse = true
return samplesScanned
}
var bbPool bytesutil.ByteBufferPool
func evalNumber(ec *EvalConfig, n float64) []*timeseries {
var ts timeseries
ts.denyReuse = true
ts.MetricName.AccountID = ec.AuthToken.AccountID
ts.MetricName.ProjectID = ec.AuthToken.ProjectID
timestamps := ec.getSharedTimestamps()
values := make([]float64, len(timestamps))
for i := range timestamps {
values[i] = n
}
ts.Values = values
ts.Timestamps = timestamps
return []*timeseries{&ts}
}
func evalString(ec *EvalConfig, s string) []*timeseries {
rv := evalNumber(ec, nan)
rv[0].MetricName.MetricGroup = append(rv[0].MetricName.MetricGroup[:0], s...)
return rv
}
func evalTime(ec *EvalConfig) []*timeseries {
rv := evalNumber(ec, nan)
timestamps := rv[0].Timestamps
values := rv[0].Values
for i, ts := range timestamps {
values[i] = float64(ts) / 1e3
}
return rv
}
func mulNoOverflow(a, b int64) int64 {
if math.MaxInt64/b < a {
// Overflow
return math.MaxInt64
}
return a * b
}
func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) {
if *noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" {
// Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function,
// since it uses them for Prometheus-style staleness detection.
// Do not drop staleness marks for stale_samples_over_time() function, since it needs
// to calculate the number of staleness markers.
return values, timestamps
}
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
hasStaleSamples := false
for _, v := range values {
if decimal.IsStaleNaN(v) {
hasStaleSamples = true
break
}
}
if !hasStaleSamples {
// Fast path: values have no Prometheus staleness marks.
return values, timestamps
}
// Slow path: drop Prometheus staleness marks from values.
dstValues := values[:0]
dstTimestamps := timestamps[:0]
for i, v := range values {
if decimal.IsStaleNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, timestamps[i])
}
return dstValues, dstTimestamps
}