package promql import ( "flag" "fmt" "math" "regexp" "sort" "strings" "sync" "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" ) var ( disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful during data backfilling") maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+ "See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3") maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+ "Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+ "as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests") noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+ "so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets") ) // The minimum number of points per timeseries for enabling time rounding. // This improves cache hit ratio for frequently requested queries over // big time ranges. const minTimeseriesPointsForTimeRounding = 50 // ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints. func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error { if step == 0 { return fmt.Errorf("step can't be equal to zero") } points := (end-start)/step + 1 if points > int64(maxPoints) { return fmt.Errorf("too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d", start, end, step, points, maxPoints) } return nil } // AdjustStartEnd adjusts start and end values, so response caching may be enabled. // // See EvalConfig.mayCache for details. func AdjustStartEnd(start, end, step int64) (int64, int64) { if *disableCache { // Do not adjust start and end values when cache is disabled. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563 return start, end } points := (end-start)/step + 1 if points < minTimeseriesPointsForTimeRounding { // Too small number of points for rounding. return start, end } // Round start and end to values divisible by step in order // to enable response caching (see EvalConfig.mayCache). start, end = alignStartEnd(start, end, step) // Make sure that the new number of points is the same as the initial number of points. newPoints := (end-start)/step + 1 for newPoints > points { end -= step newPoints-- } return start, end } func alignStartEnd(start, end, step int64) (int64, int64) { // Round start to the nearest smaller value divisible by step. start -= start % step // Round end to the nearest bigger value divisible by step. adjust := end % step if adjust > 0 { end += step - adjust } return start, end } // EvalConfig is the configuration required for query evaluation via Exec type EvalConfig struct { AuthToken *auth.Token Start int64 End int64 Step int64 // MaxSeries is the maximum number of time series, which can be scanned by the query. // Zero means 'no limit' MaxSeries int // MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series. MaxPointsPerSeries int // QuotedRemoteAddr contains quoted remote address. QuotedRemoteAddr string Deadline searchutils.Deadline // Whether the response can be cached. MayCache bool // LookbackDelta is analog to `-query.lookback-delta` from Prometheus. LookbackDelta int64 // How many decimal digits after the point to leave in response. RoundDigits int // EnforcedTagFilterss may contain additional label filters to use in the query. EnforcedTagFilterss [][]storage.TagFilter // Whether to deny partial response. DenyPartialResponse bool // IsPartialResponse is set during query execution and can be used by Exec caller after query execution. IsPartialResponse bool timestamps []int64 timestampsOnce sync.Once } // copyEvalConfig returns src copy. func copyEvalConfig(src *EvalConfig) *EvalConfig { var ec EvalConfig ec.AuthToken = src.AuthToken ec.Start = src.Start ec.End = src.End ec.Step = src.Step ec.MaxSeries = src.MaxSeries ec.MaxPointsPerSeries = src.MaxPointsPerSeries ec.Deadline = src.Deadline ec.MayCache = src.MayCache ec.LookbackDelta = src.LookbackDelta ec.RoundDigits = src.RoundDigits ec.EnforcedTagFilterss = src.EnforcedTagFilterss ec.DenyPartialResponse = src.DenyPartialResponse ec.IsPartialResponse = src.IsPartialResponse // do not copy src.timestamps - they must be generated again. return &ec } func (ec *EvalConfig) updateIsPartialResponse(isPartialResponse bool) { if !ec.IsPartialResponse { ec.IsPartialResponse = isPartialResponse } } func (ec *EvalConfig) validate() { if ec.Start > ec.End { logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End) } if ec.Step <= 0 { logger.Panicf("BUG: step must be greater than 0; got %d", ec.Step) } } func (ec *EvalConfig) mayCache() bool { if *disableCache { return false } if !ec.MayCache { return false } if ec.Start%ec.Step != 0 { return false } if ec.End%ec.Step != 0 { return false } return true } func (ec *EvalConfig) timeRangeString() string { start := storage.TimestampToHumanReadableFormat(ec.Start) end := storage.TimestampToHumanReadableFormat(ec.End) return fmt.Sprintf("[%s..%s]", start, end) } func (ec *EvalConfig) getSharedTimestamps() []int64 { ec.timestampsOnce.Do(ec.timestampsInit) return ec.timestamps } func (ec *EvalConfig) timestampsInit() { ec.timestamps = getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries) } func getTimestamps(start, end, step int64, maxPointsPerSeries int) []int64 { // Sanity checks. if step <= 0 { logger.Panicf("BUG: Step must be bigger than 0; got %d", step) } if start > end { logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", start, end) } if err := ValidateMaxPointsPerSeries(start, end, step, maxPointsPerSeries); err != nil { logger.Panicf("BUG: %s; this must be validated before the call to getTimestamps", err) } // Prepare timestamps. points := 1 + (end-start)/step timestamps := make([]int64, points) for i := range timestamps { timestamps[i] = start start += step } return timestamps } func evalExpr(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { if qt.Enabled() { query := string(e.AppendString(nil)) query = bytesutil.LimitStringLen(query, 300) mayCache := ec.mayCache() qt = qt.NewChild("eval: query=%s, timeRange=%s, step=%d, mayCache=%v", query, ec.timeRangeString(), ec.Step, mayCache) } rv, err := evalExprInternal(qt, ec, e) if err != nil { return nil, err } if qt.Enabled() { seriesCount := len(rv) pointsPerSeries := 0 if len(rv) > 0 { pointsPerSeries = len(rv[0].Timestamps) } pointsCount := seriesCount * pointsPerSeries qt.Donef("series=%d, points=%d, pointsPerSeries=%d", seriesCount, pointsCount, pointsPerSeries) } return rv, nil } func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { if me, ok := e.(*metricsql.MetricExpr); ok { re := &metricsql.RollupExpr{ Expr: me, } rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, me.AppendString(nil), err) } return rv, nil } if re, ok := e.(*metricsql.RollupExpr); ok { rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, re.AppendString(nil), err) } return rv, nil } if fe, ok := e.(*metricsql.FuncExpr); ok { nrf := getRollupFunc(fe.Name) if nrf == nil { qtChild := qt.NewChild("transform %s()", fe.Name) rv, err := evalTransformFunc(qtChild, ec, fe) qtChild.Donef("series=%d", len(rv)) return rv, err } args, re, err := evalRollupFuncArgs(qt, ec, fe) if err != nil { return nil, err } rf, err := nrf(args) if err != nil { return nil, err } rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err) } return rv, nil } if ae, ok := e.(*metricsql.AggrFuncExpr); ok { qtChild := qt.NewChild("aggregate %s()", ae.Name) rv, err := evalAggrFunc(qtChild, ec, ae) qtChild.Donef("series=%d", len(rv)) return rv, err } if be, ok := e.(*metricsql.BinaryOpExpr); ok { qtChild := qt.NewChild("binary op %q", be.Op) rv, err := evalBinaryOp(qtChild, ec, be) qtChild.Donef("series=%d", len(rv)) return rv, err } if ne, ok := e.(*metricsql.NumberExpr); ok { rv := evalNumber(ec, ne.N) return rv, nil } if se, ok := e.(*metricsql.StringExpr); ok { rv := evalString(ec, se.S) return rv, nil } if de, ok := e.(*metricsql.DurationExpr); ok { d := de.Duration(ec.Step) dSec := float64(d) / 1000 rv := evalNumber(ec, dSec) return rv, nil } return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil)) } func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) { tf := getTransformFunc(fe.Name) if tf == nil { return nil, &UserReadableError{ Err: fmt.Errorf(`unknown func %q`, fe.Name), } } var args [][]*timeseries var err error switch fe.Name { case "", "union": args, err = evalExprsInParallel(qt, ec, fe.Args) default: args, err = evalExprsSequentially(qt, ec, fe.Args) } if err != nil { return nil, err } tfa := &transformFuncArg{ ec: ec, fe: fe, args: args, } rv, err := tf(tfa) if err != nil { return nil, &UserReadableError{ Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err), } } return rv, nil } func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFuncExpr) ([]*timeseries, error) { if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil { fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae) if fe != nil { // There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr. // The optimized path saves RAM for aggregates over big number of time series. args, re, err := evalRollupFuncArgs(qt, ec, fe) if err != nil { return nil, err } rf, err := nrf(args) if err != nil { return nil, err } iafc := newIncrementalAggrFuncContext(ae, callbacks) return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc) } } args, err := evalExprsInParallel(qt, ec, ae.Args) if err != nil { return nil, err } af := getAggrFunc(ae.Name) if af == nil { return nil, &UserReadableError{ Err: fmt.Errorf(`unknown func %q`, ae.Name), } } afa := &aggrFuncArg{ ae: ae, args: args, ec: ec, } rv, err := af(afa) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err) } return rv, nil } func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, error) { bf := getBinaryOpFunc(be.Op) if bf == nil { return nil, fmt.Errorf(`unknown binary op %q`, be.Op) } var err error var tssLeft, tssRight []*timeseries switch strings.ToLower(be.Op) { case "and", "if": // Fetch right-side series at first, since it usually contains // lower number of time series for `and` and `if` operator. // This should produce more specific label filters for the left side of the query. // This, in turn, should reduce the time to select series for the left side of the query. tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be) default: tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be) } if err != nil { return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err) } bfa := &binaryOpFuncArg{ be: be, left: tssLeft, right: tssRight, } rv, err := bf(bfa) if err != nil { return nil, fmt.Errorf(`cannot evaluate %q: %w`, be.AppendString(nil), err) } return rv, nil } func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool { switch strings.ToLower(be.Op) { case "or", "default": return false } if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) { return false } return true } func isAggrFuncWithoutGrouping(e metricsql.Expr) bool { afe, ok := e.(*metricsql.AggrFuncExpr) if !ok { return false } return len(afe.Modifier.Args) == 0 } func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) { if !canPushdownCommonFilters(be) { // Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters // from exprFirst to exprSecond. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886 qt = qt.NewChild("execute left and right sides of %q in parallel", be.Op) defer qt.Done() var wg sync.WaitGroup var tssFirst []*timeseries var errFirst error qtFirst := qt.NewChild("expr1") wg.Add(1) go func() { defer wg.Done() tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst) qtFirst.Done() }() var tssSecond []*timeseries var errSecond error qtSecond := qt.NewChild("expr2") wg.Add(1) go func() { defer wg.Done() tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond) qtSecond.Done() }() wg.Wait() if errFirst != nil { return nil, nil, errFirst } if errSecond != nil { return nil, nil, errSecond } return tssFirst, tssSecond, nil } // Execute binary operation in the following way: // // 1) execute the exprFirst // 2) get common label filters for series returned at step 1 // 3) push down the found common label filters to exprSecond. This filters out unneeded series // during exprSecond exection instead of spending compute resources on extracting and processing these series // before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching // 4) execute the exprSecond with possible additional filters found at step 3 // // Typical use cases: // - Kubernetes-related: show pod creation time with the node name: // // kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info // // Without the optimization `kube_pod_info` would select and spend compute resources // for more time series than needed. The selected time series would be dropped later // when matching time series on the right and left sides of binary operand. // // - Generic alerting queries, which rely on `info` metrics. // See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/ // // - Queries, which get additional labels from `info` metrics. // See https://www.robustperception.io/exposing-the-software-version-to-prometheus tssFirst, err := evalExpr(qt, ec, exprFirst) if err != nil { return nil, nil, err } lfs := getCommonLabelFilters(tssFirst) lfs = metricsql.TrimFiltersByGroupModifier(lfs, be) exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs) tssSecond, err := evalExpr(qt, ec, exprSecond) if err != nil { return nil, nil, err } return tssFirst, tssSecond, nil } func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter { m := make(map[string][]string) for _, ts := range tss { for _, tag := range ts.MetricName.Tags { m[string(tag.Key)] = append(m[string(tag.Key)], string(tag.Value)) } } lfs := make([]metricsql.LabelFilter, 0, len(m)) for key, values := range m { if len(values) != len(tss) { // Skip the tag, since it doesn't belong to all the time series. continue } values = getUniqueValues(values) if len(values) > 1000 { // Skip the filter on the given tag, since it needs to enumerate too many unique values. // This may slow down the search for matching time series. continue } lf := metricsql.LabelFilter{ Label: key, } if len(values) == 1 { lf.Value = values[0] } else { sort.Strings(values) lf.Value = joinRegexpValues(values) lf.IsRegexp = true } lfs = append(lfs, lf) } sort.Slice(lfs, func(i, j int) bool { return lfs[i].Label < lfs[j].Label }) return lfs } func getUniqueValues(a []string) []string { m := make(map[string]struct{}, len(a)) results := make([]string, 0, len(a)) for _, s := range a { if _, ok := m[s]; !ok { results = append(results, s) m[s] = struct{}{} } } return results } func joinRegexpValues(a []string) string { var b []byte for i, s := range a { sQuoted := regexp.QuoteMeta(s) b = append(b, sQuoted...) if i < len(a)-1 { b = append(b, '|') } } return string(b) } func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) { if len(ae.Args) != 1 { return nil, nil } e := ae.Args[0] // Make sure e contains one of the following: // - metricExpr // - metricExpr[d] // - rollupFunc(metricExpr) // - rollupFunc(metricExpr[d]) if me, ok := e.(*metricsql.MetricExpr); ok { // e = metricExpr if me.IsEmpty() { return nil, nil } fe := &metricsql.FuncExpr{ Name: "default_rollup", Args: []metricsql.Expr{me}, } nrf := getRollupFunc(fe.Name) return fe, nrf } if re, ok := e.(*metricsql.RollupExpr); ok { if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() { return nil, nil } // e = metricExpr[d] fe := &metricsql.FuncExpr{ Name: "default_rollup", Args: []metricsql.Expr{re}, } nrf := getRollupFunc(fe.Name) return fe, nrf } fe, ok := e.(*metricsql.FuncExpr) if !ok { return nil, nil } nrf := getRollupFunc(fe.Name) if nrf == nil { return nil, nil } rollupArgIdx := metricsql.GetRollupArgIdx(fe) if rollupArgIdx >= len(fe.Args) { // Incorrect number of args for rollup func. return nil, nil } arg := fe.Args[rollupArgIdx] if me, ok := arg.(*metricsql.MetricExpr); ok { if me.IsEmpty() { return nil, nil } // e = rollupFunc(metricExpr) return &metricsql.FuncExpr{ Name: fe.Name, Args: []metricsql.Expr{me}, }, nrf } if re, ok := arg.(*metricsql.RollupExpr); ok { if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() { return nil, nil } // e = rollupFunc(metricExpr[d]) return fe, nrf } return nil, nil } func evalExprsSequentially(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) { var rvs [][]*timeseries for _, e := range es { rv, err := evalExpr(qt, ec, e) if err != nil { return nil, err } rvs = append(rvs, rv) } return rvs, nil } func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) { if len(es) < 2 { return evalExprsSequentially(qt, ec, es) } rvs := make([][]*timeseries, len(es)) errs := make([]error, len(es)) var wg sync.WaitGroup for i, e := range es { qt.Printf("eval function args in parallel") wg.Add(1) qtChild := qt.NewChild("eval arg %d", i) go func(e metricsql.Expr, i int) { defer func() { qtChild.Done() wg.Done() }() rv, err := evalExpr(qtChild, ec, e) rvs[i] = rv errs[i] = err }(e, i) } wg.Wait() for _, err := range errs { if err != nil { return nil, err } } return rvs, nil } func evalRollupFuncArgs(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) { var re *metricsql.RollupExpr rollupArgIdx := metricsql.GetRollupArgIdx(fe) if len(fe.Args) <= rollupArgIdx { return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil)) } args := make([]interface{}, len(fe.Args)) for i, arg := range fe.Args { if i == rollupArgIdx { re = getRollupExprArg(arg) args[i] = re continue } ts, err := evalExpr(qt, ec, arg) if err != nil { return nil, nil, fmt.Errorf("cannot evaluate arg #%d for %q: %w", i+1, fe.AppendString(nil), err) } args[i] = ts } return args, re, nil } func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr { re, ok := arg.(*metricsql.RollupExpr) if !ok { // Wrap non-rollup arg into metricsql.RollupExpr. return &metricsql.RollupExpr{ Expr: arg, } } if !re.ForSubquery() { // Return standard rollup if it doesn't contain subquery. return re } me, ok := re.Expr.(*metricsql.MetricExpr) if !ok { // arg contains subquery. return re } // Convert me[w:step] -> default_rollup(me)[w:step] reNew := *re reNew.Expr = &metricsql.FuncExpr{ Name: "default_rollup", Args: []metricsql.Expr{ &metricsql.RollupExpr{Expr: me}, }, } return &reNew } // expr may contain: // - rollupFunc(m) if iafc is nil // - aggrFunc(rollupFunc(m)) if iafc isn't nil func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { if re.At == nil { return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc) } tssAt, err := evalExpr(qt, ec, re.At) if err != nil { return nil, &UserReadableError{ Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err), } } if len(tssAt) != 1 { return nil, &UserReadableError{ Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)), } } atTimestamp := int64(tssAt[0].Values[0] * 1000) ecNew := copyEvalConfig(ec) ecNew.Start = atTimestamp ecNew.End = atTimestamp tss, err := evalRollupFuncWithoutAt(qt, ecNew, funcName, rf, expr, re, iafc) if err != nil { return nil, err } // expand single-point tss to the original time range. timestamps := ec.getSharedTimestamps() for _, ts := range tss { v := ts.Values[0] values := make([]float64, len(timestamps)) for i := range timestamps { values[i] = v } ts.Timestamps = timestamps ts.Values = values } return tss, nil } func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { funcName = strings.ToLower(funcName) ecNew := ec var offset int64 if re.Offset != nil { offset = re.Offset.Duration(ec.Step) ecNew = copyEvalConfig(ecNew) ecNew.Start -= offset ecNew.End -= offset // There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true, // since the time range alignment has been already performed by the caller, // so cache hit rate should be quite good. // See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976 } if funcName == "rollup_candlestick" { // Automatically apply `offset -step` to `rollup_candlestick` function // in order to obtain expected OHLC results. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462 step := ecNew.Step ecNew = copyEvalConfig(ecNew) ecNew.Start += step ecNew.End += step offset -= step } var rvs []*timeseries var err error if me, ok := re.Expr.(*metricsql.MetricExpr); ok { rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window) } else { if iafc != nil { logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil)) } rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re) } if err != nil { return nil, &UserReadableError{ Err: err, } } if funcName == "absent_over_time" { rvs = aggregateAbsentOverTime(ec, re.Expr, rvs) } ec.updateIsPartialResponse(ecNew.IsPartialResponse) if offset != 0 && len(rvs) > 0 { // Make a copy of timestamps, since they may be used in other values. srcTimestamps := rvs[0].Timestamps dstTimestamps := append([]int64{}, srcTimestamps...) for i := range dstTimestamps { dstTimestamps[i] += offset } for _, ts := range rvs { ts.Timestamps = dstTimestamps } } return rvs, nil } // aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values. // // Values for returned series are set to nan if at least a single tss series contains nan at that point. // This means that tss contains a series with non-empty results at that point. // This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130 func aggregateAbsentOverTime(ec *EvalConfig, expr metricsql.Expr, tss []*timeseries) []*timeseries { rvs := getAbsentTimeseries(ec, expr) if len(tss) == 0 { return rvs } for i := range tss[0].Values { for _, ts := range tss { if math.IsNaN(ts.Values[i]) { rvs[0].Values[i] = nan break } } } return rvs } func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) { // TODO: determine whether to use rollupResultCacheV here. qt = qt.NewChild("subquery") defer qt.Done() step := re.Step.Duration(ec.Step) if step == 0 { step = ec.Step } window := re.Window.Duration(ec.Step) ecSQ := copyEvalConfig(ec) ecSQ.Start -= window + maxSilenceInterval + step ecSQ.End += step ecSQ.Step = step ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries if err := ValidateMaxPointsPerSeries(ecSQ.Start, ecSQ.End, ecSQ.Step, ecSQ.MaxPointsPerSeries); err != nil { return nil, fmt.Errorf("%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)", err) } // unconditionally align start and end args to step for subquery as Prometheus does. ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step) tssSQ, err := evalExpr(qt, ecSQ, re.Expr) if err != nil { return nil, err } ec.updateIsPartialResponse(ecSQ.IsPartialResponse) if len(tssSQ) == 0 { return nil, nil } sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries) preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps) if err != nil { return nil, err } tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) var tssLock sync.Mutex var samplesScannedTotal uint64 keepMetricNames := getKeepMetricNames(expr) doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) { values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) preFunc(values, timestamps) for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) tssLock.Lock() tss = tsm.AppendTimeseriesTo(tss) tssLock.Unlock() continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) tssLock.Lock() tss = append(tss, &ts) tssLock.Unlock() } return values, timestamps }) rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal) return tss, nil } var rowsScannedPerQuery = metrics.NewHistogram(`vm_rows_scanned_per_query`) func getKeepMetricNames(expr metricsql.Expr) bool { if ae, ok := expr.(*metricsql.AggrFuncExpr); ok { // Extract rollupFunc(...) from aggrFunc(rollupFunc(...)). // This case is possible when optimized aggrFunc calculations are used // such as `sum(rate(...))` if len(ae.Args) != 1 { return false } expr = ae.Args[0] } if fe, ok := expr.(*metricsql.FuncExpr); ok { return fe.KeepMetricNames } return false } func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) { concurrency := cgroup.AvailableCPUs() if concurrency > len(tss) { concurrency = len(tss) } workCh := make(chan *timeseries, concurrency) var wg sync.WaitGroup wg.Add(concurrency) for i := 0; i < concurrency; i++ { go func() { defer wg.Done() var tmpValues []float64 var tmpTimestamps []int64 for ts := range workCh { tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps) } }() } for _, ts := range tss { workCh <- ts } close(workCh) wg.Wait() } func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float64, timestamps []int64) ([]float64, []int64) { hasNan := false for _, v := range values { if math.IsNaN(v) { hasNan = true } } if !hasNan { // Fast path - no NaNs. dstValues = append(dstValues, values...) dstTimestamps = append(dstTimestamps, timestamps...) return dstValues, dstTimestamps } // Slow path - remove NaNs. for i, v := range values { if math.IsNaN(v) { continue } dstValues = append(dstValues, v) dstTimestamps = append(dstTimestamps, timestamps[i]) } return dstValues, dstTimestamps } var ( rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`) rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`) rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`) ) func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) { var rollupMemorySize int64 window := windowExpr.Duration(ec.Step) if qt.Enabled() { qt = qt.NewChild("rollup %s(): timeRange=%s, step=%d, window=%d", funcName, ec.timeRangeString(), ec.Step, window) defer func() { qt.Donef("neededMemoryBytes=%d", rollupMemorySize) }() } if me.IsEmpty() { return evalNumber(ec, nan), nil } // Search for partial results in cache. tssCached, start := rollupResultCacheV.Get(qt, ec, expr, window) if start > ec.End { // The result is fully cached. rollupResultCacheFullHits.Inc() return tssCached, nil } if start > ec.Start { rollupResultCachePartialHits.Inc() } else { rollupResultCacheMiss.Inc() } // Obtain rollup configs before fetching data from db, // so type errors can be caught earlier. sharedTimestamps := getTimestamps(start, ec.End, ec.Step, ec.MaxPointsPerSeries) preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps) if err != nil { return nil, err } // Fetch the remaining part of the result. tfs := searchutils.ToTagFilters(me.LabelFilters) tfss := searchutils.JoinTagFilterss([][]storage.TagFilter{tfs}, ec.EnforcedTagFilterss) minTimestamp := start - maxSilenceInterval if window > ec.Step { minTimestamp -= window } else { minTimestamp -= ec.Step } sq := storage.NewSearchQuery(ec.AuthToken.AccountID, ec.AuthToken.ProjectID, minTimestamp, ec.End, tfss, ec.MaxSeries) rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline) if err != nil { return nil, &UserReadableError{ Err: err, } } ec.updateIsPartialResponse(isPartial) rssLen := rss.Len() if rssLen == 0 { rss.Cancel() tss := mergeTimeseries(tssCached, nil, start, ec) return tss, nil } // Verify timeseries fit available memory after the rollup. // Take into account points from tssCached. pointsPerTimeseries := 1 + (ec.End-ec.Start)/ec.Step timeseriesLen := rssLen if iafc != nil { // Incremental aggregates require holding only GOMAXPROCS timeseries in memory. timeseriesLen = cgroup.AvailableCPUs() if iafc.ae.Modifier.Op != "" { if iafc.ae.Limit > 0 { // There is an explicit limit on the number of output time series. timeseriesLen *= iafc.ae.Limit } else { // Increase the number of timeseries for non-empty group list: `aggr() by (something)`, // since each group can have own set of time series in memory. timeseriesLen *= 1000 } } // The maximum number of output time series is limited by rssLen. if timeseriesLen > rssLen { timeseriesLen = rssLen } } rollupPoints := mulNoOverflow(pointsPerTimeseries, int64(timeseriesLen*len(rcs))) rollupMemorySize = sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16)) if rollupMemorySize > int64(maxMemoryPerQuery.N) { rss.Cancel() return nil, &UserReadableError{ Err: fmt.Errorf("not enough memory for processing %d data points across %d time series with %d points in each time series "+ "according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+ "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ "increasing -search.maxMemoryPerQuery", rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, maxMemoryPerQuery.N, rollupMemorySize, float64(ec.Step)/1e3), } } rml := getRollupMemoryLimiter() if !rml.Get(uint64(rollupMemorySize)) { rss.Cancel() return nil, &UserReadableError{ Err: fmt.Errorf("not enough memory for processing %d data points across %d time series with %d points in each time series; "+ "total available memory for concurrent requests: %d bytes; "+ "requested memory: %d bytes; "+ "possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+ "switching to node with more RAM; increasing -memory.allowedPercent", rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3), } } defer rml.Put(uint64(rollupMemorySize)) // Evaluate rollup keepMetricNames := getKeepMetricNames(expr) var tss []*timeseries if iafc != nil { tss, err = evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps) } else { tss, err = evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps) } if err != nil { return nil, &UserReadableError{ Err: err, } } tss = mergeTimeseries(tssCached, tss, start, ec) if !isPartial { rollupResultCacheV.Put(qt, ec, expr, window, tss) } return tss, nil } var ( rollupMemoryLimiter memoryLimiter rollupMemoryLimiterOnce sync.Once ) func getRollupMemoryLimiter() *memoryLimiter { rollupMemoryLimiterOnce.Do(func() { rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 2 }) return &rollupMemoryLimiter } func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs) defer qt.Done() var samplesScannedTotal uint64 err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) ts := getTimeseries() defer putTimeseries(ts) for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) for _, ts := range tsm.m { iafc.updateTimeseries(ts, workerID) } atomic.AddUint64(&samplesScannedTotal, samplesScanned) continue } ts.Reset() samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) iafc.updateTimeseries(ts, workerID) // ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used. ts.Timestamps = nil ts.denyReuse = false } return nil }) if err != nil { return nil, err } tss := iafc.finalizeTimeseries() rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal) return tss, nil } func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs) defer qt.Done() tss := make([]*timeseries, 0, rss.Len()*len(rcs)) var tssLock sync.Mutex var samplesScannedTotal uint64 err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) tssLock.Lock() tss = tsm.AppendTimeseriesTo(tss) tssLock.Unlock() continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) tssLock.Lock() tss = append(tss, &ts) tssLock.Unlock() } return nil }) if err != nil { return nil, err } rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("samplesScanned=%d", samplesScannedTotal) return tss, nil } func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName, valuesSrc []float64, timestampsSrc []int64, sharedTimestamps []int64) uint64 { tsDst.MetricName.CopyFrom(mnSrc) if len(rc.TagValue) > 0 { tsDst.MetricName.AddTag("rollup", rc.TagValue) } if !keepMetricNames && !rollupFuncsKeepMetricName[funcName] { tsDst.MetricName.ResetMetricGroup() } var samplesScanned uint64 tsDst.Values, samplesScanned = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc) tsDst.Timestamps = sharedTimestamps tsDst.denyReuse = true return samplesScanned } var bbPool bytesutil.ByteBufferPool func evalNumber(ec *EvalConfig, n float64) []*timeseries { var ts timeseries ts.denyReuse = true ts.MetricName.AccountID = ec.AuthToken.AccountID ts.MetricName.ProjectID = ec.AuthToken.ProjectID timestamps := ec.getSharedTimestamps() values := make([]float64, len(timestamps)) for i := range timestamps { values[i] = n } ts.Values = values ts.Timestamps = timestamps return []*timeseries{&ts} } func evalString(ec *EvalConfig, s string) []*timeseries { rv := evalNumber(ec, nan) rv[0].MetricName.MetricGroup = append(rv[0].MetricName.MetricGroup[:0], s...) return rv } func evalTime(ec *EvalConfig) []*timeseries { rv := evalNumber(ec, nan) timestamps := rv[0].Timestamps values := rv[0].Values for i, ts := range timestamps { values[i] = float64(ts) / 1e3 } return rv } func mulNoOverflow(a, b int64) int64 { if math.MaxInt64/b < a { // Overflow return math.MaxInt64 } return a * b } func sumNoOverflow(a, b int64) int64 { if math.MaxInt64-a < b { // Overflow return math.MaxInt64 } return a + b } func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) { if *noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" { // Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function, // since it uses them for Prometheus-style staleness detection. // Do not drop staleness marks for stale_samples_over_time() function, since it needs // to calculate the number of staleness markers. return values, timestamps } // Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values. hasStaleSamples := false for _, v := range values { if decimal.IsStaleNaN(v) { hasStaleSamples = true break } } if !hasStaleSamples { // Fast path: values have no Prometheus staleness marks. return values, timestamps } // Slow path: drop Prometheus staleness marks from values. dstValues := values[:0] dstTimestamps := timestamps[:0] for i, v := range values { if decimal.IsStaleNaN(v) { continue } dstValues = append(dstValues, v) dstTimestamps = append(dstTimestamps, timestamps[i]) } return dstValues, dstTimestamps }