package promql

import (
	"flag"
	"fmt"
	"math"
	"regexp"
	"sort"
	"strings"
	"sync"
	"sync/atomic"
	"unsafe"

	"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
	"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
	"github.com/VictoriaMetrics/metrics"
	"github.com/VictoriaMetrics/metricsql"
)

var (
	disableCache                   = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful during data backfilling")
	maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+
		"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3")
	maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+
		"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+
		"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . "+
		"See also -search.logQueryMemoryUsage")
	logQueryMemoryUsage = flagutil.NewBytes("search.logQueryMemoryUsage", 0, "Log queries, which require more memory than specified by this flag. "+
		"This may help detecting and optimizing heavy queries. Query logging is disabled by default. "+
		"See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery")
	noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+
		"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets")
)

// The minimum number of points per timeseries for enabling time rounding.
// This improves cache hit ratio for frequently requested queries over
// big time ranges.
const minTimeseriesPointsForTimeRounding = 50

// ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints.
func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error {
	if step == 0 {
		return fmt.Errorf("step can't be equal to zero")
	}
	points := (end-start)/step + 1
	if points > int64(maxPoints) {
		return fmt.Errorf("too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d",
			start, end, step, points, maxPoints)
	}
	return nil
}

// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
//
// See EvalConfig.mayCache for details.
func AdjustStartEnd(start, end, step int64) (int64, int64) {
	if *disableCache {
		// Do not adjust start and end values when cache is disabled.
		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563
		return start, end
	}
	points := (end-start)/step + 1
	if points < minTimeseriesPointsForTimeRounding {
		// Too small number of points for rounding.
		return start, end
	}

	// Round start and end to values divisible by step in order
	// to enable response caching (see EvalConfig.mayCache).
	start, end = alignStartEnd(start, end, step)

	// Make sure that the new number of points is the same as the initial number of points.
	newPoints := (end-start)/step + 1
	for newPoints > points {
		end -= step
		newPoints--
	}

	return start, end
}

func alignStartEnd(start, end, step int64) (int64, int64) {
	// Round start to the nearest smaller value divisible by step.
	start -= start % step
	// Round end to the nearest bigger value divisible by step.
	adjust := end % step
	if adjust > 0 {
		end += step - adjust
	}
	return start, end
}

// EvalConfig is the configuration required for query evaluation via Exec
type EvalConfig struct {
	Start int64
	End   int64
	Step  int64

	// MaxSeries is the maximum number of time series, which can be scanned by the query.
	// Zero means 'no limit'
	MaxSeries int

	// MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series.
	MaxPointsPerSeries int

	// QuotedRemoteAddr contains quoted remote address.
	QuotedRemoteAddr string

	Deadline searchutils.Deadline

	// Whether the response can be cached.
	MayCache bool

	// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
	LookbackDelta int64

	// How many decimal digits after the point to leave in response.
	RoundDigits int

	// EnforcedTagFilterss may contain additional label filters to use in the query.
	EnforcedTagFilterss [][]storage.TagFilter

	// The callback, which returns the request URI during logging.
	// The request URI isn't stored here because its' construction may take non-trivial amounts of CPU.
	GetRequestURI func() string

	// QueryStats contains various stats for the currently executed query.
	//
	// The caller must initialize the QueryStats if it needs the stats.
	// Otherwise the stats isn't collected.
	QueryStats *QueryStats

	timestamps     []int64
	timestampsOnce sync.Once
}

// copyEvalConfig returns src copy.
func copyEvalConfig(src *EvalConfig) *EvalConfig {
	var ec EvalConfig
	ec.Start = src.Start
	ec.End = src.End
	ec.Step = src.Step
	ec.MaxSeries = src.MaxSeries
	ec.MaxPointsPerSeries = src.MaxPointsPerSeries
	ec.Deadline = src.Deadline
	ec.MayCache = src.MayCache
	ec.LookbackDelta = src.LookbackDelta
	ec.RoundDigits = src.RoundDigits
	ec.EnforcedTagFilterss = src.EnforcedTagFilterss
	ec.GetRequestURI = src.GetRequestURI
	ec.QueryStats = src.QueryStats

	// do not copy src.timestamps - they must be generated again.
	return &ec
}

// QueryStats contains various stats for the query.
type QueryStats struct {
	// SeriesFetched contains the number of series fetched from storage during the query evaluation.
	SeriesFetched int
}

func (qs *QueryStats) addSeriesFetched(n int) {
	if qs != nil {
		qs.SeriesFetched += n
	}
}

func (ec *EvalConfig) validate() {
	if ec.Start > ec.End {
		logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End)
	}
	if ec.Step <= 0 {
		logger.Panicf("BUG: step must be greater than 0; got %d", ec.Step)
	}
}

func (ec *EvalConfig) mayCache() bool {
	if *disableCache {
		return false
	}
	if !ec.MayCache {
		return false
	}
	if ec.Start%ec.Step != 0 {
		return false
	}
	if ec.End%ec.Step != 0 {
		return false
	}
	return true
}

func (ec *EvalConfig) timeRangeString() string {
	start := storage.TimestampToHumanReadableFormat(ec.Start)
	end := storage.TimestampToHumanReadableFormat(ec.End)
	return fmt.Sprintf("[%s..%s]", start, end)
}

func (ec *EvalConfig) getSharedTimestamps() []int64 {
	ec.timestampsOnce.Do(ec.timestampsInit)
	return ec.timestamps
}

func (ec *EvalConfig) timestampsInit() {
	ec.timestamps = getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
}

func getTimestamps(start, end, step int64, maxPointsPerSeries int) []int64 {
	// Sanity checks.
	if step <= 0 {
		logger.Panicf("BUG: Step must be bigger than 0; got %d", step)
	}
	if start > end {
		logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", start, end)
	}
	if err := ValidateMaxPointsPerSeries(start, end, step, maxPointsPerSeries); err != nil {
		logger.Panicf("BUG: %s; this must be validated before the call to getTimestamps", err)
	}

	// Prepare timestamps.
	points := 1 + (end-start)/step
	timestamps := make([]int64, points)
	for i := range timestamps {
		timestamps[i] = start
		start += step
	}
	return timestamps
}

func evalExpr(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
	if qt.Enabled() {
		query := string(e.AppendString(nil))
		query = bytesutil.LimitStringLen(query, 300)
		mayCache := ec.mayCache()
		qt = qt.NewChild("eval: query=%s, timeRange=%s, step=%d, mayCache=%v", query, ec.timeRangeString(), ec.Step, mayCache)
	}
	rv, err := evalExprInternal(qt, ec, e)
	if err != nil {
		return nil, err
	}
	if qt.Enabled() {
		seriesCount := len(rv)
		pointsPerSeries := 0
		if len(rv) > 0 {
			pointsPerSeries = len(rv[0].Timestamps)
		}
		pointsCount := seriesCount * pointsPerSeries
		qt.Donef("series=%d, points=%d, pointsPerSeries=%d", seriesCount, pointsCount, pointsPerSeries)
	}
	return rv, nil
}

func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
	if me, ok := e.(*metricsql.MetricExpr); ok {
		re := &metricsql.RollupExpr{
			Expr: me,
		}
		rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
		if err != nil {
			return nil, fmt.Errorf(`cannot evaluate %q: %w`, me.AppendString(nil), err)
		}
		return rv, nil
	}
	if re, ok := e.(*metricsql.RollupExpr); ok {
		rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
		if err != nil {
			return nil, fmt.Errorf(`cannot evaluate %q: %w`, re.AppendString(nil), err)
		}
		return rv, nil
	}
	if fe, ok := e.(*metricsql.FuncExpr); ok {
		nrf := getRollupFunc(fe.Name)
		if nrf == nil {
			qtChild := qt.NewChild("transform %s()", fe.Name)
			rv, err := evalTransformFunc(qtChild, ec, fe)
			qtChild.Donef("series=%d", len(rv))
			return rv, err
		}
		args, re, err := evalRollupFuncArgs(qt, ec, fe)
		if err != nil {
			return nil, err
		}
		rf, err := nrf(args)
		if err != nil {
			return nil, err
		}
		rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil)
		if err != nil {
			return nil, fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err)
		}
		return rv, nil
	}
	if ae, ok := e.(*metricsql.AggrFuncExpr); ok {
		qtChild := qt.NewChild("aggregate %s()", ae.Name)
		rv, err := evalAggrFunc(qtChild, ec, ae)
		qtChild.Donef("series=%d", len(rv))
		return rv, err
	}
	if be, ok := e.(*metricsql.BinaryOpExpr); ok {
		qtChild := qt.NewChild("binary op %q", be.Op)
		rv, err := evalBinaryOp(qtChild, ec, be)
		qtChild.Donef("series=%d", len(rv))
		return rv, err
	}
	if ne, ok := e.(*metricsql.NumberExpr); ok {
		rv := evalNumber(ec, ne.N)
		return rv, nil
	}
	if se, ok := e.(*metricsql.StringExpr); ok {
		rv := evalString(ec, se.S)
		return rv, nil
	}
	if de, ok := e.(*metricsql.DurationExpr); ok {
		d := de.Duration(ec.Step)
		dSec := float64(d) / 1000
		rv := evalNumber(ec, dSec)
		return rv, nil
	}
	return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
}

func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) {
	tf := getTransformFunc(fe.Name)
	if tf == nil {
		return nil, &UserReadableError{
			Err: fmt.Errorf(`unknown func %q`, fe.Name),
		}
	}
	var args [][]*timeseries
	var err error
	switch fe.Name {
	case "", "union":
		args, err = evalExprsInParallel(qt, ec, fe.Args)
	default:
		args, err = evalExprsSequentially(qt, ec, fe.Args)
	}
	if err != nil {
		return nil, err
	}
	tfa := &transformFuncArg{
		ec:   ec,
		fe:   fe,
		args: args,
	}
	rv, err := tf(tfa)
	if err != nil {
		return nil, &UserReadableError{
			Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err),
		}
	}
	return rv, nil
}

func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFuncExpr) ([]*timeseries, error) {
	if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil {
		fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae)
		if fe != nil {
			// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
			// The optimized path saves RAM for aggregates over big number of time series.
			args, re, err := evalRollupFuncArgs(qt, ec, fe)
			if err != nil {
				return nil, err
			}
			rf, err := nrf(args)
			if err != nil {
				return nil, err
			}
			iafc := newIncrementalAggrFuncContext(ae, callbacks)
			return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc)
		}
	}
	args, err := evalExprsInParallel(qt, ec, ae.Args)
	if err != nil {
		return nil, err
	}
	af := getAggrFunc(ae.Name)
	if af == nil {
		return nil, &UserReadableError{
			Err: fmt.Errorf(`unknown func %q`, ae.Name),
		}
	}
	afa := &aggrFuncArg{
		ae:   ae,
		args: args,
		ec:   ec,
	}
	qtChild := qt.NewChild("eval %s", ae.Name)
	rv, err := af(afa)
	qtChild.Done()
	if err != nil {
		return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err)
	}
	return rv, nil
}

func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, error) {
	bf := getBinaryOpFunc(be.Op)
	if bf == nil {
		return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
	}
	var err error
	var tssLeft, tssRight []*timeseries
	switch strings.ToLower(be.Op) {
	case "and", "if":
		// Fetch right-side series at first, since it usually contains
		// lower number of time series for `and` and `if` operator.
		// This should produce more specific label filters for the left side of the query.
		// This, in turn, should reduce the time to select series for the left side of the query.
		tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be)
	default:
		tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be)
	}
	if err != nil {
		return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
	}
	bfa := &binaryOpFuncArg{
		be:    be,
		left:  tssLeft,
		right: tssRight,
	}
	rv, err := bf(bfa)
	if err != nil {
		return nil, fmt.Errorf(`cannot evaluate %q: %w`, be.AppendString(nil), err)
	}
	return rv, nil
}

func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool {
	switch strings.ToLower(be.Op) {
	case "or", "default":
		return false
	}
	if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) {
		return false
	}
	return true
}

func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
	afe, ok := e.(*metricsql.AggrFuncExpr)
	if !ok {
		return false
	}
	return len(afe.Modifier.Args) == 0
}

func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
	if !canPushdownCommonFilters(be) {
		// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
		// from exprFirst to exprSecond.
		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
		qt = qt.NewChild("execute left and right sides of %q in parallel", be.Op)
		defer qt.Done()
		var wg sync.WaitGroup

		var tssFirst []*timeseries
		var errFirst error
		qtFirst := qt.NewChild("expr1")
		wg.Add(1)
		go func() {
			defer wg.Done()
			tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst)
			qtFirst.Done()
		}()

		var tssSecond []*timeseries
		var errSecond error
		qtSecond := qt.NewChild("expr2")
		wg.Add(1)
		go func() {
			defer wg.Done()
			tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond)
			qtSecond.Done()
		}()

		wg.Wait()
		if errFirst != nil {
			return nil, nil, errFirst
		}
		if errSecond != nil {
			return nil, nil, errSecond
		}
		return tssFirst, tssSecond, nil
	}

	// Execute binary operation in the following way:
	//
	// 1) execute the exprFirst
	// 2) get common label filters for series returned at step 1
	// 3) push down the found common label filters to exprSecond. This filters out unneeded series
	//    during exprSecond execution instead of spending compute resources on extracting and processing these series
	//    before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
	// 4) execute the exprSecond with possible additional filters found at step 3
	//
	// Typical use cases:
	// - Kubernetes-related: show pod creation time with the node name:
	//
	//     kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
	//
	//   Without the optimization `kube_pod_info` would select and spend compute resources
	//   for more time series than needed. The selected time series would be dropped later
	//   when matching time series on the right and left sides of binary operand.
	//
	// - Generic alerting queries, which rely on `info` metrics.
	//   See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
	//
	// - Queries, which get additional labels from `info` metrics.
	//   See https://www.robustperception.io/exposing-the-software-version-to-prometheus
	tssFirst, err := evalExpr(qt, ec, exprFirst)
	if err != nil {
		return nil, nil, err
	}
	if len(tssFirst) == 0 && strings.ToLower(be.Op) != "or" {
		// Fast path: there is no sense in executing the exprSecond when exprFirst returns an empty result,
		// since the "exprFirst op exprSecond" would return an empty result in any case.
		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3349
		return nil, nil, nil
	}
	lfs := getCommonLabelFilters(tssFirst)
	lfs = metricsql.TrimFiltersByGroupModifier(lfs, be)
	exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs)
	tssSecond, err := evalExpr(qt, ec, exprSecond)
	if err != nil {
		return nil, nil, err
	}
	return tssFirst, tssSecond, nil
}

func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
	if len(tss) == 0 {
		return nil
	}
	type valuesCounter struct {
		values map[string]struct{}
		count  int
	}
	m := make(map[string]*valuesCounter, len(tss[0].MetricName.Tags))
	for _, ts := range tss {
		for _, tag := range ts.MetricName.Tags {
			vc, ok := m[string(tag.Key)]
			if !ok {
				k := bytesutil.InternBytes(tag.Key)
				v := bytesutil.InternBytes(tag.Value)
				m[k] = &valuesCounter{
					values: map[string]struct{}{
						v: {},
					},
					count: 1,
				}
				continue
			}
			if len(vc.values) > 100 {
				// Too many unique values found for the given tag.
				// Do not make a filter on such values, since it may slow down
				// search for matching time series.
				continue
			}
			vc.count++
			if _, ok := vc.values[string(tag.Value)]; !ok {
				v := bytesutil.InternBytes(tag.Value)
				vc.values[v] = struct{}{}
			}
		}
	}
	lfs := make([]metricsql.LabelFilter, 0, len(m))
	var values []string
	for k, vc := range m {
		if vc.count != len(tss) {
			// Skip the tag, since it doesn't belong to all the time series.
			continue
		}
		values = values[:0]
		for s := range vc.values {
			values = append(values, s)
		}
		lf := metricsql.LabelFilter{
			Label: k,
		}
		if len(values) == 1 {
			lf.Value = values[0]
		} else {
			sort.Strings(values)
			lf.Value = joinRegexpValues(values)
			lf.IsRegexp = true
		}
		lfs = append(lfs, lf)
	}
	sort.Slice(lfs, func(i, j int) bool {
		return lfs[i].Label < lfs[j].Label
	})
	return lfs
}

func joinRegexpValues(a []string) string {
	var b []byte
	for i, s := range a {
		sQuoted := regexp.QuoteMeta(s)
		b = append(b, sQuoted...)
		if i < len(a)-1 {
			b = append(b, '|')
		}
	}
	return string(b)
}

func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) {
	if len(ae.Args) != 1 {
		return nil, nil
	}
	e := ae.Args[0]
	// Make sure e contains one of the following:
	// - metricExpr
	// - metricExpr[d]
	// - rollupFunc(metricExpr)
	// - rollupFunc(metricExpr[d])

	if me, ok := e.(*metricsql.MetricExpr); ok {
		// e = metricExpr
		if me.IsEmpty() {
			return nil, nil
		}
		fe := &metricsql.FuncExpr{
			Name: "default_rollup",
			Args: []metricsql.Expr{me},
		}
		nrf := getRollupFunc(fe.Name)
		return fe, nrf
	}
	if re, ok := e.(*metricsql.RollupExpr); ok {
		if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
			return nil, nil
		}
		// e = metricExpr[d]
		fe := &metricsql.FuncExpr{
			Name: "default_rollup",
			Args: []metricsql.Expr{re},
		}
		nrf := getRollupFunc(fe.Name)
		return fe, nrf
	}
	fe, ok := e.(*metricsql.FuncExpr)
	if !ok {
		return nil, nil
	}
	nrf := getRollupFunc(fe.Name)
	if nrf == nil {
		return nil, nil
	}
	rollupArgIdx := metricsql.GetRollupArgIdx(fe)
	if rollupArgIdx >= len(fe.Args) {
		// Incorrect number of args for rollup func.
		return nil, nil
	}
	arg := fe.Args[rollupArgIdx]
	if me, ok := arg.(*metricsql.MetricExpr); ok {
		if me.IsEmpty() {
			return nil, nil
		}
		// e = rollupFunc(metricExpr)
		return &metricsql.FuncExpr{
			Name: fe.Name,
			Args: []metricsql.Expr{me},
		}, nrf
	}
	if re, ok := arg.(*metricsql.RollupExpr); ok {
		if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
			return nil, nil
		}
		// e = rollupFunc(metricExpr[d])
		return fe, nrf
	}
	return nil, nil
}

func evalExprsSequentially(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
	var rvs [][]*timeseries
	for _, e := range es {
		rv, err := evalExpr(qt, ec, e)
		if err != nil {
			return nil, err
		}
		rvs = append(rvs, rv)
	}
	return rvs, nil
}

func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
	if len(es) < 2 {
		return evalExprsSequentially(qt, ec, es)
	}
	rvs := make([][]*timeseries, len(es))
	errs := make([]error, len(es))
	qt.Printf("eval function args in parallel")
	var wg sync.WaitGroup
	for i, e := range es {
		wg.Add(1)
		qtChild := qt.NewChild("eval arg %d", i)
		go func(e metricsql.Expr, i int) {
			defer func() {
				qtChild.Done()
				wg.Done()
			}()
			rv, err := evalExpr(qtChild, ec, e)
			rvs[i] = rv
			errs[i] = err
		}(e, i)
	}
	wg.Wait()
	for _, err := range errs {
		if err != nil {
			return nil, err
		}
	}
	return rvs, nil
}

func evalRollupFuncArgs(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) {
	var re *metricsql.RollupExpr
	rollupArgIdx := metricsql.GetRollupArgIdx(fe)
	if len(fe.Args) <= rollupArgIdx {
		return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil))
	}
	args := make([]interface{}, len(fe.Args))
	for i, arg := range fe.Args {
		if i == rollupArgIdx {
			re = getRollupExprArg(arg)
			args[i] = re
			continue
		}
		ts, err := evalExpr(qt, ec, arg)
		if err != nil {
			return nil, nil, fmt.Errorf("cannot evaluate arg #%d for %q: %w", i+1, fe.AppendString(nil), err)
		}
		args[i] = ts
	}
	return args, re, nil
}

func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
	re, ok := arg.(*metricsql.RollupExpr)
	if !ok {
		// Wrap non-rollup arg into metricsql.RollupExpr.
		return &metricsql.RollupExpr{
			Expr: arg,
		}
	}
	if !re.ForSubquery() {
		// Return standard rollup if it doesn't contain subquery.
		return re
	}
	me, ok := re.Expr.(*metricsql.MetricExpr)
	if !ok {
		// arg contains subquery.
		return re
	}
	// Convert me[w:step] -> default_rollup(me)[w:step]
	reNew := *re
	reNew.Expr = &metricsql.FuncExpr{
		Name: "default_rollup",
		Args: []metricsql.Expr{
			&metricsql.RollupExpr{Expr: me},
		},
	}
	return &reNew
}

// expr may contain:
// - rollupFunc(m) if iafc is nil
// - aggrFunc(rollupFunc(m)) if iafc isn't nil
func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr,
	re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
	if re.At == nil {
		return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc)
	}
	tssAt, err := evalExpr(qt, ec, re.At)
	if err != nil {
		return nil, &UserReadableError{
			Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err),
		}
	}
	if len(tssAt) != 1 {
		return nil, &UserReadableError{
			Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)),
		}
	}
	atTimestamp := int64(tssAt[0].Values[0] * 1000)
	ecNew := copyEvalConfig(ec)
	ecNew.Start = atTimestamp
	ecNew.End = atTimestamp
	tss, err := evalRollupFuncWithoutAt(qt, ecNew, funcName, rf, expr, re, iafc)
	if err != nil {
		return nil, err
	}
	// expand single-point tss to the original time range.
	timestamps := ec.getSharedTimestamps()
	for _, ts := range tss {
		v := ts.Values[0]
		values := make([]float64, len(timestamps))
		for i := range timestamps {
			values[i] = v
		}
		ts.Timestamps = timestamps
		ts.Values = values
	}
	return tss, nil
}

func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
	expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
	funcName = strings.ToLower(funcName)
	ecNew := ec
	var offset int64
	if re.Offset != nil {
		offset = re.Offset.Duration(ec.Step)
		ecNew = copyEvalConfig(ecNew)
		ecNew.Start -= offset
		ecNew.End -= offset
		// There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true,
		// since the time range alignment has been already performed by the caller,
		// so cache hit rate should be quite good.
		// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
	}
	if funcName == "rollup_candlestick" {
		// Automatically apply `offset -step` to `rollup_candlestick` function
		// in order to obtain expected OHLC results.
		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
		step := ecNew.Step
		ecNew = copyEvalConfig(ecNew)
		ecNew.Start += step
		ecNew.End += step
		offset -= step
	}
	var rvs []*timeseries
	var err error
	if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
		rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window)
	} else {
		if iafc != nil {
			logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil))
		}
		rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re)
	}
	if err != nil {
		return nil, &UserReadableError{
			Err: err,
		}
	}
	if funcName == "absent_over_time" {
		rvs = aggregateAbsentOverTime(ec, re.Expr, rvs)
	}
	if offset != 0 && len(rvs) > 0 {
		// Make a copy of timestamps, since they may be used in other values.
		srcTimestamps := rvs[0].Timestamps
		dstTimestamps := append([]int64{}, srcTimestamps...)
		for i := range dstTimestamps {
			dstTimestamps[i] += offset
		}
		for _, ts := range rvs {
			ts.Timestamps = dstTimestamps
		}
	}
	return rvs, nil
}

// aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values.
//
// Values for returned series are set to nan if at least a single tss series contains nan at that point.
// This means that tss contains a series with non-empty results at that point.
// This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130
func aggregateAbsentOverTime(ec *EvalConfig, expr metricsql.Expr, tss []*timeseries) []*timeseries {
	rvs := getAbsentTimeseries(ec, expr)
	if len(tss) == 0 {
		return rvs
	}
	for i := range tss[0].Values {
		for _, ts := range tss {
			if math.IsNaN(ts.Values[i]) {
				rvs[0].Values[i] = nan
				break
			}
		}
	}
	return rvs
}

func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
	// TODO: determine whether to use rollupResultCacheV here.
	qt = qt.NewChild("subquery")
	defer qt.Done()
	step, err := re.Step.NonNegativeDuration(ec.Step)
	if err != nil {
		return nil, fmt.Errorf("cannot parse step in square brackets at %s: %w", expr.AppendString(nil), err)
	}
	if step == 0 {
		step = ec.Step
	}
	window, err := re.Window.NonNegativeDuration(ec.Step)
	if err != nil {
		return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
	}

	ecSQ := copyEvalConfig(ec)
	ecSQ.Start -= window + maxSilenceInterval + step
	ecSQ.End += step
	ecSQ.Step = step
	ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
	if err := ValidateMaxPointsPerSeries(ecSQ.Start, ecSQ.End, ecSQ.Step, ecSQ.MaxPointsPerSeries); err != nil {
		return nil, fmt.Errorf("%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)", err)
	}
	// unconditionally align start and end args to step for subquery as Prometheus does.
	ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step)
	tssSQ, err := evalExpr(qt, ecSQ, re.Expr)
	if err != nil {
		return nil, err
	}
	if len(tssSQ) == 0 {
		return nil, nil
	}
	sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
	preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
	if err != nil {
		return nil, err
	}

	var samplesScannedTotal uint64
	keepMetricNames := getKeepMetricNames(expr)
	tsw := getTimeseriesByWorkerID()
	seriesByWorkerID := tsw.byWorkerID
	doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
		values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
		preFunc(values, timestamps)
		for _, rc := range rcs {
			if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
				samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps)
				atomic.AddUint64(&samplesScannedTotal, samplesScanned)
				seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
				continue
			}
			var ts timeseries
			samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
			atomic.AddUint64(&samplesScannedTotal, samplesScanned)
			seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
		}
		return values, timestamps
	})
	tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
	for i := range seriesByWorkerID {
		tss = append(tss, seriesByWorkerID[i].tss...)
	}
	putTimeseriesByWorkerID(tsw)

	rowsScannedPerQuery.Update(float64(samplesScannedTotal))
	qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
	return tss, nil
}

var rowsScannedPerQuery = metrics.NewHistogram(`vm_rows_scanned_per_query`)

func getKeepMetricNames(expr metricsql.Expr) bool {
	if ae, ok := expr.(*metricsql.AggrFuncExpr); ok {
		// Extract rollupFunc(...) from aggrFunc(rollupFunc(...)).
		// This case is possible when optimized aggrFunc calculations are used
		// such as `sum(rate(...))`
		if len(ae.Args) != 1 {
			return false
		}
		expr = ae.Args[0]
	}
	if fe, ok := expr.(*metricsql.FuncExpr); ok {
		return fe.KeepMetricNames
	}
	return false
}

func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64)) {
	workers := netstorage.MaxWorkers()
	if workers > len(tss) {
		workers = len(tss)
	}
	seriesPerWorker := (len(tss) + workers - 1) / workers
	workChs := make([]chan *timeseries, workers)
	for i := range workChs {
		workChs[i] = make(chan *timeseries, seriesPerWorker)
	}
	for i, ts := range tss {
		idx := i % len(workChs)
		workChs[idx] <- ts
	}
	for _, workCh := range workChs {
		close(workCh)
	}

	var wg sync.WaitGroup
	wg.Add(workers)
	for i := 0; i < workers; i++ {
		go func(workerID uint) {
			defer wg.Done()
			var tmpValues []float64
			var tmpTimestamps []int64
			for ts := range workChs[workerID] {
				tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, workerID)
			}
		}(uint(i))
	}
	wg.Wait()
}

func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float64, timestamps []int64) ([]float64, []int64) {
	hasNan := false
	for _, v := range values {
		if math.IsNaN(v) {
			hasNan = true
		}
	}
	if !hasNan {
		// Fast path - no NaNs.
		dstValues = append(dstValues, values...)
		dstTimestamps = append(dstTimestamps, timestamps...)
		return dstValues, dstTimestamps
	}

	// Slow path - remove NaNs.
	for i, v := range values {
		if math.IsNaN(v) {
			continue
		}
		dstValues = append(dstValues, v)
		dstTimestamps = append(dstTimestamps, timestamps[i])
	}
	return dstValues, dstTimestamps
}

var (
	rollupResultCacheFullHits    = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`)
	rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`)
	rollupResultCacheMiss        = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
)

func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
	expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) {
	var rollupMemorySize int64
	window, err := windowExpr.NonNegativeDuration(ec.Step)
	if err != nil {
		return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
	}
	if qt.Enabled() {
		qt = qt.NewChild("rollup %s(): timeRange=%s, step=%d, window=%d", funcName, ec.timeRangeString(), ec.Step, window)
		defer func() {
			qt.Donef("neededMemoryBytes=%d", rollupMemorySize)
		}()
	}
	if me.IsEmpty() {
		return evalNumber(ec, nan), nil
	}

	// Search for partial results in cache.
	tssCached, start := rollupResultCacheV.Get(qt, ec, expr, window)
	if start > ec.End {
		// The result is fully cached.
		rollupResultCacheFullHits.Inc()
		return tssCached, nil
	}
	if start > ec.Start {
		rollupResultCachePartialHits.Inc()
	} else {
		rollupResultCacheMiss.Inc()
	}

	// Obtain rollup configs before fetching data from db,
	// so type errors can be caught earlier.
	sharedTimestamps := getTimestamps(start, ec.End, ec.Step, ec.MaxPointsPerSeries)
	preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
	if err != nil {
		return nil, err
	}

	// Fetch the remaining part of the result.
	tfss := searchutils.ToTagFilterss(me.LabelFilterss)
	tfss = searchutils.JoinTagFilterss(tfss, ec.EnforcedTagFilterss)
	minTimestamp := start - maxSilenceInterval
	if window > ec.Step {
		minTimestamp -= window
	} else {
		minTimestamp -= ec.Step
	}
	if minTimestamp < 0 {
		minTimestamp = 0
	}
	sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries)
	rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline)
	if err != nil {
		return nil, &UserReadableError{
			Err: err,
		}
	}
	rssLen := rss.Len()
	if rssLen == 0 {
		rss.Cancel()
		tss := mergeTimeseries(tssCached, nil, start, ec)
		return tss, nil
	}
	ec.QueryStats.addSeriesFetched(rssLen)

	// Verify timeseries fit available memory after the rollup.
	// Take into account points from tssCached.
	pointsPerTimeseries := 1 + (ec.End-ec.Start)/ec.Step
	timeseriesLen := rssLen
	if iafc != nil {
		// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
		timeseriesLen = cgroup.AvailableCPUs()
		if iafc.ae.Modifier.Op != "" {
			if iafc.ae.Limit > 0 {
				// There is an explicit limit on the number of output time series.
				timeseriesLen *= iafc.ae.Limit
			} else {
				// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
				// since each group can have own set of time series in memory.
				timeseriesLen *= 1000
			}
		}
		// The maximum number of output time series is limited by rssLen.
		if timeseriesLen > rssLen {
			timeseriesLen = rssLen
		}
	}
	rollupPoints := mulNoOverflow(pointsPerTimeseries, int64(timeseriesLen*len(rcs)))
	rollupMemorySize = sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16))
	if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory {
		requestURI := ec.GetRequestURI()
		logger.Warnf("remoteAddr=%s, requestURI=%s: the %s requires %d bytes of memory for processing; "+
			"logging this query, since it exceeds the -search.logQueryMemoryUsage=%d; "+
			"the query selects %d time series and generates %d points across all the time series; try reducing the number of selected time series",
			ec.QuotedRemoteAddr, requestURI, expr.AppendString(nil), rollupMemorySize, maxMemory, timeseriesLen*len(rcs), rollupPoints)
	}
	if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory {
		rss.Cancel()
		return nil, &UserReadableError{
			Err: fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+
				"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+
				"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
				"increasing -search.maxMemoryPerQuery",
				expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3),
		}
	}
	rml := getRollupMemoryLimiter()
	if !rml.Get(uint64(rollupMemorySize)) {
		rss.Cancel()
		return nil, &UserReadableError{
			Err: fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+
				"total available memory for concurrent requests: %d bytes; "+
				"requested memory: %d bytes; "+
				"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
				"switching to node with more RAM; increasing -memory.allowedPercent",
				expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3),
		}
	}
	defer rml.Put(uint64(rollupMemorySize))

	// Evaluate rollup
	keepMetricNames := getKeepMetricNames(expr)
	var tss []*timeseries
	if iafc != nil {
		tss, err = evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps)
	} else {
		tss, err = evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)
	}
	if err != nil {
		return nil, &UserReadableError{
			Err: err,
		}
	}
	tss = mergeTimeseries(tssCached, tss, start, ec)
	rollupResultCacheV.Put(qt, ec, expr, window, tss)
	return tss, nil
}

var (
	rollupMemoryLimiter     memoryLimiter
	rollupMemoryLimiterOnce sync.Once
)

func getRollupMemoryLimiter() *memoryLimiter {
	rollupMemoryLimiterOnce.Do(func() {
		rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 4
	})
	return &rollupMemoryLimiter
}

func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool,
	iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
	preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
	qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs)
	defer qt.Done()
	var samplesScannedTotal uint64
	err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
		rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
		preFunc(rs.Values, rs.Timestamps)
		ts := getTimeseries()
		defer putTimeseries(ts)
		for _, rc := range rcs {
			if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
				samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
				for _, ts := range tsm.m {
					iafc.updateTimeseries(ts, workerID)
				}
				atomic.AddUint64(&samplesScannedTotal, samplesScanned)
				continue
			}
			ts.Reset()
			samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
			atomic.AddUint64(&samplesScannedTotal, samplesScanned)
			iafc.updateTimeseries(ts, workerID)

			// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
			ts.Timestamps = nil
			ts.denyReuse = false
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	tss := iafc.finalizeTimeseries()
	rowsScannedPerQuery.Update(float64(samplesScannedTotal))
	qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal)
	return tss, nil
}

func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
	preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
	qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
	defer qt.Done()

	var samplesScannedTotal uint64
	tsw := getTimeseriesByWorkerID()
	seriesByWorkerID := tsw.byWorkerID
	seriesLen := rss.Len()
	err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
		rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
		preFunc(rs.Values, rs.Timestamps)
		for _, rc := range rcs {
			if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
				samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
				atomic.AddUint64(&samplesScannedTotal, samplesScanned)
				seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
				continue
			}
			var ts timeseries
			samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
			atomic.AddUint64(&samplesScannedTotal, samplesScanned)
			seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	tss := make([]*timeseries, 0, seriesLen*len(rcs))
	for i := range seriesByWorkerID {
		tss = append(tss, seriesByWorkerID[i].tss...)
	}
	putTimeseriesByWorkerID(tsw)

	rowsScannedPerQuery.Update(float64(samplesScannedTotal))
	qt.Printf("samplesScanned=%d", samplesScannedTotal)
	return tss, nil
}

func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName,
	valuesSrc []float64, timestampsSrc []int64, sharedTimestamps []int64) uint64 {
	tsDst.MetricName.CopyFrom(mnSrc)
	if len(rc.TagValue) > 0 {
		tsDst.MetricName.AddTag("rollup", rc.TagValue)
	}
	if !keepMetricNames && !rollupFuncsKeepMetricName[funcName] {
		tsDst.MetricName.ResetMetricGroup()
	}
	var samplesScanned uint64
	tsDst.Values, samplesScanned = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc)
	tsDst.Timestamps = sharedTimestamps
	tsDst.denyReuse = true
	return samplesScanned
}

type timeseriesWithPadding struct {
	tss []*timeseries

	// The padding prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	_ [128 - unsafe.Sizeof([]*timeseries{})%128]byte
}

type timeseriesByWorkerID struct {
	byWorkerID []timeseriesWithPadding
}

func (tsw *timeseriesByWorkerID) reset() {
	byWorkerID := tsw.byWorkerID
	for i := range byWorkerID {
		byWorkerID[i].tss = nil
	}
}

func getTimeseriesByWorkerID() *timeseriesByWorkerID {
	v := timeseriesByWorkerIDPool.Get()
	if v == nil {
		return &timeseriesByWorkerID{
			byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()),
		}
	}
	return v.(*timeseriesByWorkerID)
}

func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) {
	tsw.reset()
	timeseriesByWorkerIDPool.Put(tsw)
}

var timeseriesByWorkerIDPool sync.Pool

var bbPool bytesutil.ByteBufferPool

func evalNumber(ec *EvalConfig, n float64) []*timeseries {
	var ts timeseries
	ts.denyReuse = true
	timestamps := ec.getSharedTimestamps()
	values := make([]float64, len(timestamps))
	for i := range timestamps {
		values[i] = n
	}
	ts.Values = values
	ts.Timestamps = timestamps
	return []*timeseries{&ts}
}

func evalString(ec *EvalConfig, s string) []*timeseries {
	rv := evalNumber(ec, nan)
	rv[0].MetricName.MetricGroup = append(rv[0].MetricName.MetricGroup[:0], s...)
	return rv
}

func evalTime(ec *EvalConfig) []*timeseries {
	rv := evalNumber(ec, nan)
	timestamps := rv[0].Timestamps
	values := rv[0].Values
	for i, ts := range timestamps {
		values[i] = float64(ts) / 1e3
	}
	return rv
}

func mulNoOverflow(a, b int64) int64 {
	if math.MaxInt64/b < a {
		// Overflow
		return math.MaxInt64
	}
	return a * b
}

func sumNoOverflow(a, b int64) int64 {
	if math.MaxInt64-a < b {
		// Overflow
		return math.MaxInt64
	}
	return a + b
}

func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) {
	if *noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" {
		// Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function,
		// since it uses them for Prometheus-style staleness detection.
		// Do not drop staleness marks for stale_samples_over_time() function, since it needs
		// to calculate the number of staleness markers.
		return values, timestamps
	}
	// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
	hasStaleSamples := false
	for _, v := range values {
		if decimal.IsStaleNaN(v) {
			hasStaleSamples = true
			break
		}
	}
	if !hasStaleSamples {
		// Fast path: values have no Prometheus staleness marks.
		return values, timestamps
	}
	// Slow path: drop Prometheus staleness marks from values.
	dstValues := values[:0]
	dstTimestamps := timestamps[:0]
	for i, v := range values {
		if decimal.IsStaleNaN(v) {
			continue
		}
		dstValues = append(dstValues, v)
		dstTimestamps = append(dstTimestamps, timestamps[i])
	}
	return dstValues, dstTimestamps
}