mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-03 17:42:13 +01:00
66c76a4d4d
app/vmselect/promql/eval.go:evalAggrFunc shunts evaluation of AggrFuncExpr over rollupFunc over MetricsExpr to an optimized path. tryGetArgRollupFuncWithMetricExpr() checks whether expression can be shunted, but it mangles the AggrFuncExpr when the aggregation function has more than one argument. This results in queries like `sum(aggr_over_time("avg_over_time",m))` failing with error message 'expecting at least 2 args to "aggr_over_time"; got 1 args' while the analogous query `sum(avg_over_time(m))` executes successfully. This fix removes the unnecessary mangling. Signed-off-by: Anton Tykhyy <atykhyy@gmail.com>
2041 lines
66 KiB
Go
2041 lines
66 KiB
Go
package promql
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"math"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/VictoriaMetrics/metricsql"
|
|
)
|
|
|
|
var (
|
|
disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful during data backfilling")
|
|
maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+
|
|
"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3")
|
|
maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+
|
|
"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+
|
|
"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . "+
|
|
"See also -search.logQueryMemoryUsage")
|
|
logQueryMemoryUsage = flagutil.NewBytes("search.logQueryMemoryUsage", 0, "Log query and increment vm_memory_intensive_queries_total metric each time "+
|
|
"the query requires more memory than specified by this flag. "+
|
|
"This may help detecting and optimizing heavy queries. Query logging is disabled by default. "+
|
|
"See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery")
|
|
noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+
|
|
"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets")
|
|
minWindowForInstantRollupOptimization = flagutil.NewDuration("search.minWindowForInstantRollupOptimization", "3h", "Enable cache-based optimization for repeated queries "+
|
|
"to /api/v1/query (aka instant queries), which contain rollup functions with lookbehind window exceeding the given value")
|
|
)
|
|
|
|
// The minimum number of points per timeseries for enabling time rounding.
|
|
// This improves cache hit ratio for frequently requested queries over
|
|
// big time ranges.
|
|
const minTimeseriesPointsForTimeRounding = 50
|
|
|
|
// ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints.
|
|
func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error {
|
|
if step == 0 {
|
|
return fmt.Errorf("step can't be equal to zero")
|
|
}
|
|
points := (end-start)/step + 1
|
|
if points > int64(maxPoints) {
|
|
return fmt.Errorf("too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d",
|
|
start, end, step, points, maxPoints)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
|
|
//
|
|
// See EvalConfig.mayCache() for details.
|
|
func AdjustStartEnd(start, end, step int64) (int64, int64) {
|
|
if *disableCache {
|
|
// Do not adjust start and end values when cache is disabled.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563
|
|
return start, end
|
|
}
|
|
points := (end-start)/step + 1
|
|
if points < minTimeseriesPointsForTimeRounding {
|
|
// Too small number of points for rounding.
|
|
return start, end
|
|
}
|
|
|
|
// Round start and end to values divisible by step in order
|
|
// to enable response caching (see EvalConfig.mayCache).
|
|
start, end = alignStartEnd(start, end, step)
|
|
|
|
// Make sure that the new number of points is the same as the initial number of points.
|
|
newPoints := (end-start)/step + 1
|
|
for newPoints > points {
|
|
end -= step
|
|
newPoints--
|
|
}
|
|
|
|
return start, end
|
|
}
|
|
|
|
func alignStartEnd(start, end, step int64) (int64, int64) {
|
|
// Round start to the nearest smaller value divisible by step.
|
|
start -= start % step
|
|
// Round end to the nearest bigger value divisible by step.
|
|
adjust := end % step
|
|
if adjust > 0 {
|
|
end += step - adjust
|
|
}
|
|
return start, end
|
|
}
|
|
|
|
// EvalConfig is the configuration required for query evaluation via Exec
|
|
type EvalConfig struct {
|
|
Start int64
|
|
End int64
|
|
Step int64
|
|
|
|
// MaxSeries is the maximum number of time series, which can be scanned by the query.
|
|
// Zero means 'no limit'
|
|
MaxSeries int
|
|
|
|
// MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series.
|
|
MaxPointsPerSeries int
|
|
|
|
// QuotedRemoteAddr contains quoted remote address.
|
|
QuotedRemoteAddr string
|
|
|
|
Deadline searchutils.Deadline
|
|
|
|
// Whether the response can be cached.
|
|
MayCache bool
|
|
|
|
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
|
|
LookbackDelta int64
|
|
|
|
// How many decimal digits after the point to leave in response.
|
|
RoundDigits int
|
|
|
|
// EnforcedTagFilterss may contain additional label filters to use in the query.
|
|
EnforcedTagFilterss [][]storage.TagFilter
|
|
|
|
// The callback, which returns the request URI during logging.
|
|
// The request URI isn't stored here because its' construction may take non-trivial amounts of CPU.
|
|
GetRequestURI func() string
|
|
|
|
// QueryStats contains various stats for the currently executed query.
|
|
//
|
|
// The caller must initialize QueryStats, otherwise it isn't collected.
|
|
QueryStats *QueryStats
|
|
|
|
timestamps []int64
|
|
timestampsOnce sync.Once
|
|
}
|
|
|
|
// copyEvalConfig returns src copy.
|
|
func copyEvalConfig(src *EvalConfig) *EvalConfig {
|
|
var ec EvalConfig
|
|
ec.Start = src.Start
|
|
ec.End = src.End
|
|
ec.Step = src.Step
|
|
ec.MaxSeries = src.MaxSeries
|
|
ec.MaxPointsPerSeries = src.MaxPointsPerSeries
|
|
ec.Deadline = src.Deadline
|
|
ec.MayCache = src.MayCache
|
|
ec.LookbackDelta = src.LookbackDelta
|
|
ec.RoundDigits = src.RoundDigits
|
|
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
|
|
ec.GetRequestURI = src.GetRequestURI
|
|
ec.QueryStats = src.QueryStats
|
|
|
|
// do not copy src.timestamps - they must be generated again.
|
|
return &ec
|
|
}
|
|
|
|
// QueryStats contains various stats for the query.
|
|
type QueryStats struct {
|
|
// SeriesFetched contains the number of series fetched from storage during the query evaluation.
|
|
SeriesFetched int64
|
|
// ExecutionTimeMsec contains the number of milliseconds the query took to execute.
|
|
ExecutionTimeMsec int64
|
|
}
|
|
|
|
func (qs *QueryStats) addSeriesFetched(n int) {
|
|
if qs == nil {
|
|
return
|
|
}
|
|
atomic.AddInt64(&qs.SeriesFetched, int64(n))
|
|
}
|
|
|
|
func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) {
|
|
if qs == nil {
|
|
return
|
|
}
|
|
d := time.Since(startTime).Milliseconds()
|
|
atomic.AddInt64(&qs.ExecutionTimeMsec, d)
|
|
}
|
|
|
|
func (ec *EvalConfig) validate() {
|
|
if ec.Start > ec.End {
|
|
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End)
|
|
}
|
|
if ec.Step <= 0 {
|
|
logger.Panicf("BUG: step must be greater than 0; got %d", ec.Step)
|
|
}
|
|
}
|
|
|
|
func (ec *EvalConfig) mayCache() bool {
|
|
if *disableCache {
|
|
return false
|
|
}
|
|
if !ec.MayCache {
|
|
return false
|
|
}
|
|
if ec.Start == ec.End {
|
|
// There is no need in aligning start and end to step for instant query
|
|
// in order to cache its results.
|
|
return true
|
|
}
|
|
if ec.Start%ec.Step != 0 {
|
|
return false
|
|
}
|
|
if ec.End%ec.Step != 0 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (ec *EvalConfig) timeRangeString() string {
|
|
start := storage.TimestampToHumanReadableFormat(ec.Start)
|
|
end := storage.TimestampToHumanReadableFormat(ec.End)
|
|
return fmt.Sprintf("[%s..%s]", start, end)
|
|
}
|
|
|
|
func (ec *EvalConfig) getSharedTimestamps() []int64 {
|
|
ec.timestampsOnce.Do(ec.timestampsInit)
|
|
return ec.timestamps
|
|
}
|
|
|
|
func (ec *EvalConfig) timestampsInit() {
|
|
ec.timestamps = getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
|
}
|
|
|
|
func getTimestamps(start, end, step int64, maxPointsPerSeries int) []int64 {
|
|
// Sanity checks.
|
|
if step <= 0 {
|
|
logger.Panicf("BUG: Step must be bigger than 0; got %d", step)
|
|
}
|
|
if start > end {
|
|
logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", start, end)
|
|
}
|
|
if err := ValidateMaxPointsPerSeries(start, end, step, maxPointsPerSeries); err != nil {
|
|
logger.Panicf("BUG: %s; this must be validated before the call to getTimestamps", err)
|
|
}
|
|
|
|
// Prepare timestamps.
|
|
points := 1 + (end-start)/step
|
|
timestamps := make([]int64, points)
|
|
for i := range timestamps {
|
|
timestamps[i] = start
|
|
start += step
|
|
}
|
|
return timestamps
|
|
}
|
|
|
|
func evalExpr(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
|
|
if qt.Enabled() {
|
|
query := string(e.AppendString(nil))
|
|
query = stringsutil.LimitStringLen(query, 300)
|
|
mayCache := ec.mayCache()
|
|
qt = qt.NewChild("eval: query=%s, timeRange=%s, step=%d, mayCache=%v", query, ec.timeRangeString(), ec.Step, mayCache)
|
|
}
|
|
rv, err := evalExprInternal(qt, ec, e)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if qt.Enabled() {
|
|
seriesCount := len(rv)
|
|
pointsPerSeries := 0
|
|
if len(rv) > 0 {
|
|
pointsPerSeries = len(rv[0].Timestamps)
|
|
}
|
|
pointsCount := seriesCount * pointsPerSeries
|
|
qt.Donef("series=%d, points=%d, pointsPerSeries=%d", seriesCount, pointsCount, pointsPerSeries)
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
|
|
if me, ok := e.(*metricsql.MetricExpr); ok {
|
|
re := &metricsql.RollupExpr{
|
|
Expr: me,
|
|
}
|
|
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, me.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
if re, ok := e.(*metricsql.RollupExpr); ok {
|
|
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, re.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
if fe, ok := e.(*metricsql.FuncExpr); ok {
|
|
nrf := getRollupFunc(fe.Name)
|
|
if nrf == nil {
|
|
qtChild := qt.NewChild("transform %s()", fe.Name)
|
|
rv, err := evalTransformFunc(qtChild, ec, fe)
|
|
qtChild.Donef("series=%d", len(rv))
|
|
return rv, err
|
|
}
|
|
args, re, err := evalRollupFuncArgs(qt, ec, fe)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rf, err := nrf(args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
if ae, ok := e.(*metricsql.AggrFuncExpr); ok {
|
|
qtChild := qt.NewChild("aggregate %s()", ae.Name)
|
|
rv, err := evalAggrFunc(qtChild, ec, ae)
|
|
qtChild.Donef("series=%d", len(rv))
|
|
return rv, err
|
|
}
|
|
if be, ok := e.(*metricsql.BinaryOpExpr); ok {
|
|
qtChild := qt.NewChild("binary op %q", be.Op)
|
|
rv, err := evalBinaryOp(qtChild, ec, be)
|
|
qtChild.Donef("series=%d", len(rv))
|
|
return rv, err
|
|
}
|
|
if ne, ok := e.(*metricsql.NumberExpr); ok {
|
|
rv := evalNumber(ec, ne.N)
|
|
return rv, nil
|
|
}
|
|
if se, ok := e.(*metricsql.StringExpr); ok {
|
|
rv := evalString(ec, se.S)
|
|
return rv, nil
|
|
}
|
|
if de, ok := e.(*metricsql.DurationExpr); ok {
|
|
d := de.Duration(ec.Step)
|
|
dSec := float64(d) / 1000
|
|
rv := evalNumber(ec, dSec)
|
|
return rv, nil
|
|
}
|
|
return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
|
|
}
|
|
|
|
func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) {
|
|
tf := getTransformFunc(fe.Name)
|
|
if tf == nil {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf(`unknown func %q`, fe.Name),
|
|
}
|
|
}
|
|
var args [][]*timeseries
|
|
var err error
|
|
switch fe.Name {
|
|
case "", "union":
|
|
args, err = evalExprsInParallel(qt, ec, fe.Args)
|
|
default:
|
|
args, err = evalExprsSequentially(qt, ec, fe.Args)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tfa := &transformFuncArg{
|
|
ec: ec,
|
|
fe: fe,
|
|
args: args,
|
|
}
|
|
rv, err := tf(tfa)
|
|
if err != nil {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err),
|
|
}
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFuncExpr) ([]*timeseries, error) {
|
|
if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil {
|
|
fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae)
|
|
if fe != nil {
|
|
// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
|
|
// The optimized path saves RAM for aggregates over big number of time series.
|
|
args, re, err := evalRollupFuncArgs(qt, ec, fe)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rf, err := nrf(args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
iafc := newIncrementalAggrFuncContext(ae, callbacks)
|
|
return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc)
|
|
}
|
|
}
|
|
args, err := evalExprsInParallel(qt, ec, ae.Args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
af := getAggrFunc(ae.Name)
|
|
if af == nil {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf(`unknown func %q`, ae.Name),
|
|
}
|
|
}
|
|
afa := &aggrFuncArg{
|
|
ae: ae,
|
|
args: args,
|
|
ec: ec,
|
|
}
|
|
qtChild := qt.NewChild("eval %s", ae.Name)
|
|
rv, err := af(afa)
|
|
qtChild.Done()
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, error) {
|
|
bf := getBinaryOpFunc(be.Op)
|
|
if bf == nil {
|
|
return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
|
|
}
|
|
var err error
|
|
var tssLeft, tssRight []*timeseries
|
|
switch strings.ToLower(be.Op) {
|
|
case "and", "if":
|
|
// Fetch right-side series at first, since it usually contains
|
|
// lower number of time series for `and` and `if` operator.
|
|
// This should produce more specific label filters for the left side of the query.
|
|
// This, in turn, should reduce the time to select series for the left side of the query.
|
|
tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be)
|
|
default:
|
|
tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
|
|
}
|
|
bfa := &binaryOpFuncArg{
|
|
be: be,
|
|
left: tssLeft,
|
|
right: tssRight,
|
|
}
|
|
rv, err := bf(bfa)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, be.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool {
|
|
switch strings.ToLower(be.Op) {
|
|
case "or", "default":
|
|
return false
|
|
}
|
|
if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
|
|
afe, ok := e.(*metricsql.AggrFuncExpr)
|
|
if !ok {
|
|
return false
|
|
}
|
|
return len(afe.Modifier.Args) == 0
|
|
}
|
|
|
|
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
|
|
if !canPushdownCommonFilters(be) {
|
|
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
|
|
// from exprFirst to exprSecond.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
|
|
qt = qt.NewChild("execute left and right sides of %q in parallel", be.Op)
|
|
defer qt.Done()
|
|
var wg sync.WaitGroup
|
|
|
|
var tssFirst []*timeseries
|
|
var errFirst error
|
|
qtFirst := qt.NewChild("expr1")
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst)
|
|
qtFirst.Done()
|
|
}()
|
|
|
|
var tssSecond []*timeseries
|
|
var errSecond error
|
|
qtSecond := qt.NewChild("expr2")
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond)
|
|
qtSecond.Done()
|
|
}()
|
|
|
|
wg.Wait()
|
|
if errFirst != nil {
|
|
return nil, nil, errFirst
|
|
}
|
|
if errSecond != nil {
|
|
return nil, nil, errSecond
|
|
}
|
|
return tssFirst, tssSecond, nil
|
|
}
|
|
|
|
// Execute binary operation in the following way:
|
|
//
|
|
// 1) execute the exprFirst
|
|
// 2) get common label filters for series returned at step 1
|
|
// 3) push down the found common label filters to exprSecond. This filters out unneeded series
|
|
// during exprSecond execution instead of spending compute resources on extracting and processing these series
|
|
// before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
|
|
// 4) execute the exprSecond with possible additional filters found at step 3
|
|
//
|
|
// Typical use cases:
|
|
// - Kubernetes-related: show pod creation time with the node name:
|
|
//
|
|
// kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
|
|
//
|
|
// Without the optimization `kube_pod_info` would select and spend compute resources
|
|
// for more time series than needed. The selected time series would be dropped later
|
|
// when matching time series on the right and left sides of binary operand.
|
|
//
|
|
// - Generic alerting queries, which rely on `info` metrics.
|
|
// See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
|
|
//
|
|
// - Queries, which get additional labels from `info` metrics.
|
|
// See https://www.robustperception.io/exposing-the-software-version-to-prometheus
|
|
tssFirst, err := evalExpr(qt, ec, exprFirst)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if len(tssFirst) == 0 && strings.ToLower(be.Op) != "or" {
|
|
// Fast path: there is no sense in executing the exprSecond when exprFirst returns an empty result,
|
|
// since the "exprFirst op exprSecond" would return an empty result in any case.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3349
|
|
return nil, nil, nil
|
|
}
|
|
lfs := getCommonLabelFilters(tssFirst)
|
|
lfs = metricsql.TrimFiltersByGroupModifier(lfs, be)
|
|
exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs)
|
|
tssSecond, err := evalExpr(qt, ec, exprSecond)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return tssFirst, tssSecond, nil
|
|
}
|
|
|
|
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
|
|
if len(tss) == 0 {
|
|
return nil
|
|
}
|
|
type valuesCounter struct {
|
|
values map[string]struct{}
|
|
count int
|
|
}
|
|
m := make(map[string]*valuesCounter, len(tss[0].MetricName.Tags))
|
|
for _, ts := range tss {
|
|
for _, tag := range ts.MetricName.Tags {
|
|
vc, ok := m[string(tag.Key)]
|
|
if !ok {
|
|
k := string(tag.Key)
|
|
v := string(tag.Value)
|
|
m[k] = &valuesCounter{
|
|
values: map[string]struct{}{
|
|
v: {},
|
|
},
|
|
count: 1,
|
|
}
|
|
continue
|
|
}
|
|
if len(vc.values) > 100 {
|
|
// Too many unique values found for the given tag.
|
|
// Do not make a filter on such values, since it may slow down
|
|
// search for matching time series.
|
|
continue
|
|
}
|
|
vc.count++
|
|
if _, ok := vc.values[string(tag.Value)]; !ok {
|
|
vc.values[string(tag.Value)] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
lfs := make([]metricsql.LabelFilter, 0, len(m))
|
|
var values []string
|
|
for k, vc := range m {
|
|
if vc.count != len(tss) {
|
|
// Skip the tag, since it doesn't belong to all the time series.
|
|
continue
|
|
}
|
|
values = values[:0]
|
|
for s := range vc.values {
|
|
values = append(values, s)
|
|
}
|
|
lf := metricsql.LabelFilter{
|
|
Label: k,
|
|
}
|
|
if len(values) == 1 {
|
|
lf.Value = values[0]
|
|
} else {
|
|
sort.Strings(values)
|
|
lf.Value = joinRegexpValues(values)
|
|
lf.IsRegexp = true
|
|
}
|
|
lfs = append(lfs, lf)
|
|
}
|
|
sort.Slice(lfs, func(i, j int) bool {
|
|
return lfs[i].Label < lfs[j].Label
|
|
})
|
|
return lfs
|
|
}
|
|
|
|
func joinRegexpValues(a []string) string {
|
|
var b []byte
|
|
for i, s := range a {
|
|
sQuoted := regexp.QuoteMeta(s)
|
|
b = append(b, sQuoted...)
|
|
if i < len(a)-1 {
|
|
b = append(b, '|')
|
|
}
|
|
}
|
|
return string(b)
|
|
}
|
|
|
|
func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) {
|
|
if len(ae.Args) != 1 {
|
|
return nil, nil
|
|
}
|
|
e := ae.Args[0]
|
|
// Make sure e contains one of the following:
|
|
// - metricExpr
|
|
// - metricExpr[d]
|
|
// - rollupFunc(metricExpr)
|
|
// - rollupFunc(metricExpr[d])
|
|
|
|
if me, ok := e.(*metricsql.MetricExpr); ok {
|
|
// e = metricExpr
|
|
if me.IsEmpty() {
|
|
return nil, nil
|
|
}
|
|
fe := &metricsql.FuncExpr{
|
|
Name: "default_rollup",
|
|
Args: []metricsql.Expr{me},
|
|
}
|
|
nrf := getRollupFunc(fe.Name)
|
|
return fe, nrf
|
|
}
|
|
if re, ok := e.(*metricsql.RollupExpr); ok {
|
|
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
|
|
return nil, nil
|
|
}
|
|
// e = metricExpr[d]
|
|
fe := &metricsql.FuncExpr{
|
|
Name: "default_rollup",
|
|
Args: []metricsql.Expr{re},
|
|
}
|
|
nrf := getRollupFunc(fe.Name)
|
|
return fe, nrf
|
|
}
|
|
fe, ok := e.(*metricsql.FuncExpr)
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
nrf := getRollupFunc(fe.Name)
|
|
if nrf == nil {
|
|
return nil, nil
|
|
}
|
|
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
|
|
if rollupArgIdx >= len(fe.Args) {
|
|
// Incorrect number of args for rollup func.
|
|
return nil, nil
|
|
}
|
|
arg := fe.Args[rollupArgIdx]
|
|
if me, ok := arg.(*metricsql.MetricExpr); ok {
|
|
if me.IsEmpty() {
|
|
return nil, nil
|
|
}
|
|
// e = rollupFunc(metricExpr)
|
|
return fe, nrf
|
|
}
|
|
if re, ok := arg.(*metricsql.RollupExpr); ok {
|
|
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
|
|
return nil, nil
|
|
}
|
|
// e = rollupFunc(metricExpr[d])
|
|
return fe, nrf
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func evalExprsSequentially(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
|
|
var rvs [][]*timeseries
|
|
for _, e := range es {
|
|
rv, err := evalExpr(qt, ec, e)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rvs = append(rvs, rv)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
|
|
if len(es) < 2 {
|
|
return evalExprsSequentially(qt, ec, es)
|
|
}
|
|
rvs := make([][]*timeseries, len(es))
|
|
errs := make([]error, len(es))
|
|
qt.Printf("eval function args in parallel")
|
|
var wg sync.WaitGroup
|
|
for i, e := range es {
|
|
wg.Add(1)
|
|
qtChild := qt.NewChild("eval arg %d", i)
|
|
go func(e metricsql.Expr, i int) {
|
|
defer func() {
|
|
qtChild.Done()
|
|
wg.Done()
|
|
}()
|
|
rv, err := evalExpr(qtChild, ec, e)
|
|
rvs[i] = rv
|
|
errs[i] = err
|
|
}(e, i)
|
|
}
|
|
wg.Wait()
|
|
for _, err := range errs {
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func evalRollupFuncArgs(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) {
|
|
var re *metricsql.RollupExpr
|
|
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
|
|
if len(fe.Args) <= rollupArgIdx {
|
|
return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil))
|
|
}
|
|
args := make([]interface{}, len(fe.Args))
|
|
for i, arg := range fe.Args {
|
|
if i == rollupArgIdx {
|
|
re = getRollupExprArg(arg)
|
|
args[i] = re
|
|
continue
|
|
}
|
|
ts, err := evalExpr(qt, ec, arg)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("cannot evaluate arg #%d for %q: %w", i+1, fe.AppendString(nil), err)
|
|
}
|
|
args[i] = ts
|
|
}
|
|
return args, re, nil
|
|
}
|
|
|
|
func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
|
|
re, ok := arg.(*metricsql.RollupExpr)
|
|
if !ok {
|
|
// Wrap non-rollup arg into metricsql.RollupExpr.
|
|
return &metricsql.RollupExpr{
|
|
Expr: arg,
|
|
}
|
|
}
|
|
if !re.ForSubquery() {
|
|
// Return standard rollup if it doesn't contain subquery.
|
|
return re
|
|
}
|
|
me, ok := re.Expr.(*metricsql.MetricExpr)
|
|
if !ok {
|
|
// arg contains subquery.
|
|
return re
|
|
}
|
|
// Convert me[w:step] -> default_rollup(me)[w:step]
|
|
reNew := *re
|
|
reNew.Expr = &metricsql.FuncExpr{
|
|
Name: "default_rollup",
|
|
Args: []metricsql.Expr{
|
|
&metricsql.RollupExpr{Expr: me},
|
|
},
|
|
}
|
|
return &reNew
|
|
}
|
|
|
|
// expr may contain:
|
|
// - rollupFunc(m) if iafc is nil
|
|
// - aggrFunc(rollupFunc(m)) if iafc isn't nil
|
|
func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr,
|
|
re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
|
|
if re.At == nil {
|
|
return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc)
|
|
}
|
|
tssAt, err := evalExpr(qt, ec, re.At)
|
|
if err != nil {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err),
|
|
}
|
|
}
|
|
if len(tssAt) != 1 {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)),
|
|
}
|
|
}
|
|
atTimestamp := int64(tssAt[0].Values[0] * 1000)
|
|
ecNew := copyEvalConfig(ec)
|
|
ecNew.Start = atTimestamp
|
|
ecNew.End = atTimestamp
|
|
tss, err := evalRollupFuncWithoutAt(qt, ecNew, funcName, rf, expr, re, iafc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// expand single-point tss to the original time range.
|
|
timestamps := ec.getSharedTimestamps()
|
|
for _, ts := range tss {
|
|
v := ts.Values[0]
|
|
values := make([]float64, len(timestamps))
|
|
for i := range timestamps {
|
|
values[i] = v
|
|
}
|
|
ts.Timestamps = timestamps
|
|
ts.Values = values
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
|
|
expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
|
|
funcName = strings.ToLower(funcName)
|
|
ecNew := ec
|
|
var offset int64
|
|
if re.Offset != nil {
|
|
offset = re.Offset.Duration(ec.Step)
|
|
ecNew = copyEvalConfig(ecNew)
|
|
ecNew.Start -= offset
|
|
ecNew.End -= offset
|
|
// There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true,
|
|
// since the time range alignment has been already performed by the caller,
|
|
// so cache hit rate should be quite good.
|
|
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
|
|
}
|
|
if funcName == "rollup_candlestick" {
|
|
// Automatically apply `offset -step` to `rollup_candlestick` function
|
|
// in order to obtain expected OHLC results.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
|
|
step := ecNew.Step
|
|
ecNew = copyEvalConfig(ecNew)
|
|
ecNew.Start += step
|
|
ecNew.End += step
|
|
offset -= step
|
|
}
|
|
var rvs []*timeseries
|
|
var err error
|
|
if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
|
|
rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window)
|
|
} else {
|
|
if iafc != nil {
|
|
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil))
|
|
}
|
|
rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re)
|
|
}
|
|
if err != nil {
|
|
return nil, &UserReadableError{
|
|
Err: err,
|
|
}
|
|
}
|
|
if funcName == "absent_over_time" {
|
|
rvs = aggregateAbsentOverTime(ecNew, re.Expr, rvs)
|
|
}
|
|
if offset != 0 && len(rvs) > 0 {
|
|
// Make a copy of timestamps, since they may be used in other values.
|
|
srcTimestamps := rvs[0].Timestamps
|
|
dstTimestamps := append([]int64{}, srcTimestamps...)
|
|
for i := range dstTimestamps {
|
|
dstTimestamps[i] += offset
|
|
}
|
|
for _, ts := range rvs {
|
|
ts.Timestamps = dstTimestamps
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
// aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values.
|
|
//
|
|
// Values for returned series are set to nan if at least a single tss series contains nan at that point.
|
|
// This means that tss contains a series with non-empty results at that point.
|
|
// This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130
|
|
func aggregateAbsentOverTime(ec *EvalConfig, expr metricsql.Expr, tss []*timeseries) []*timeseries {
|
|
rvs := getAbsentTimeseries(ec, expr)
|
|
if len(tss) == 0 {
|
|
return rvs
|
|
}
|
|
for i := range tss[0].Values {
|
|
for _, ts := range tss {
|
|
if math.IsNaN(ts.Values[i]) {
|
|
rvs[0].Values[i] = nan
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
|
|
// TODO: determine whether to use rollupResultCacheV here.
|
|
qt = qt.NewChild("subquery")
|
|
defer qt.Done()
|
|
step, err := re.Step.NonNegativeDuration(ec.Step)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse step in square brackets at %s: %w", expr.AppendString(nil), err)
|
|
}
|
|
if step == 0 {
|
|
step = ec.Step
|
|
}
|
|
window, err := re.Window.NonNegativeDuration(ec.Step)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
|
|
}
|
|
|
|
ecSQ := copyEvalConfig(ec)
|
|
ecSQ.Start -= window + step + maxSilenceInterval()
|
|
ecSQ.End += step
|
|
ecSQ.Step = step
|
|
ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
|
|
if err := ValidateMaxPointsPerSeries(ecSQ.Start, ecSQ.End, ecSQ.Step, ecSQ.MaxPointsPerSeries); err != nil {
|
|
return nil, fmt.Errorf("%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)", err)
|
|
}
|
|
// unconditionally align start and end args to step for subquery as Prometheus does.
|
|
ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step)
|
|
tssSQ, err := evalExpr(qt, ecSQ, re.Expr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(tssSQ) == 0 {
|
|
return nil, nil
|
|
}
|
|
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
|
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var samplesScannedTotal uint64
|
|
keepMetricNames := getKeepMetricNames(expr)
|
|
tsw := getTimeseriesByWorkerID()
|
|
seriesByWorkerID := tsw.byWorkerID
|
|
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
|
|
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
|
|
preFunc(values, timestamps)
|
|
for _, rc := range rcs {
|
|
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
|
|
samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
|
|
continue
|
|
}
|
|
var ts timeseries
|
|
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
|
|
}
|
|
return values, timestamps
|
|
})
|
|
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
|
|
for i := range seriesByWorkerID {
|
|
tss = append(tss, seriesByWorkerID[i].tss...)
|
|
}
|
|
putTimeseriesByWorkerID(tsw)
|
|
|
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
|
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
|
|
return tss, nil
|
|
}
|
|
|
|
var rowsScannedPerQuery = metrics.NewHistogram(`vm_rows_scanned_per_query`)
|
|
|
|
func getKeepMetricNames(expr metricsql.Expr) bool {
|
|
if ae, ok := expr.(*metricsql.AggrFuncExpr); ok {
|
|
// Extract rollupFunc(...) from aggrFunc(rollupFunc(...)).
|
|
// This case is possible when optimized aggrFunc calculations are used
|
|
// such as `sum(rate(...))`
|
|
if len(ae.Args) != 1 {
|
|
return false
|
|
}
|
|
expr = ae.Args[0]
|
|
}
|
|
if fe, ok := expr.(*metricsql.FuncExpr); ok {
|
|
return fe.KeepMetricNames
|
|
}
|
|
return false
|
|
}
|
|
|
|
func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64)) {
|
|
workers := netstorage.MaxWorkers()
|
|
if workers > len(tss) {
|
|
workers = len(tss)
|
|
}
|
|
seriesPerWorker := (len(tss) + workers - 1) / workers
|
|
workChs := make([]chan *timeseries, workers)
|
|
for i := range workChs {
|
|
workChs[i] = make(chan *timeseries, seriesPerWorker)
|
|
}
|
|
for i, ts := range tss {
|
|
idx := i % len(workChs)
|
|
workChs[idx] <- ts
|
|
}
|
|
for _, workCh := range workChs {
|
|
close(workCh)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(workers)
|
|
for i := 0; i < workers; i++ {
|
|
go func(workerID uint) {
|
|
defer wg.Done()
|
|
var tmpValues []float64
|
|
var tmpTimestamps []int64
|
|
for ts := range workChs[workerID] {
|
|
tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, workerID)
|
|
}
|
|
}(uint(i))
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float64, timestamps []int64) ([]float64, []int64) {
|
|
hasNan := false
|
|
for _, v := range values {
|
|
if math.IsNaN(v) {
|
|
hasNan = true
|
|
}
|
|
}
|
|
if !hasNan {
|
|
// Fast path - no NaNs.
|
|
dstValues = append(dstValues, values...)
|
|
dstTimestamps = append(dstTimestamps, timestamps...)
|
|
return dstValues, dstTimestamps
|
|
}
|
|
|
|
// Slow path - remove NaNs.
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
dstValues = append(dstValues, v)
|
|
dstTimestamps = append(dstTimestamps, timestamps[i])
|
|
}
|
|
return dstValues, dstTimestamps
|
|
}
|
|
|
|
// evalInstantRollup evaluates instant rollup where ec.Start == ec.End.
|
|
func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
|
|
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) ([]*timeseries, error) {
|
|
if ec.Start != ec.End {
|
|
logger.Panicf("BUG: evalInstantRollup cannot be called on non-empty time range; got %s", ec.timeRangeString())
|
|
}
|
|
timestamp := ec.Start
|
|
if qt.Enabled() {
|
|
qt = qt.NewChild("instant rollup %s; time=%s, window=%d", expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
|
|
defer qt.Done()
|
|
}
|
|
|
|
evalAt := func(qt *querytracer.Tracer, timestamp, window int64) ([]*timeseries, error) {
|
|
ecCopy := copyEvalConfig(ec)
|
|
ecCopy.Start = timestamp
|
|
ecCopy.End = timestamp
|
|
pointsPerSeries := int64(1)
|
|
return evalRollupFuncNoCache(qt, ecCopy, funcName, rf, expr, me, iafc, window, pointsPerSeries)
|
|
}
|
|
tooBigOffset := func(offset int64) bool {
|
|
maxOffset := window / 2
|
|
if maxOffset > 1800*1000 {
|
|
maxOffset = 1800 * 1000
|
|
}
|
|
return offset >= maxOffset
|
|
}
|
|
deleteCachedSeries := func(qt *querytracer.Tracer) {
|
|
rollupResultCacheV.DeleteInstantValues(qt, expr, window, ec.Step, ec.EnforcedTagFilterss)
|
|
}
|
|
getCachedSeries := func(qt *querytracer.Tracer) ([]*timeseries, int64, error) {
|
|
again:
|
|
offset := int64(0)
|
|
tssCached := rollupResultCacheV.GetInstantValues(qt, expr, window, ec.Step, ec.EnforcedTagFilterss)
|
|
ec.QueryStats.addSeriesFetched(len(tssCached))
|
|
if len(tssCached) == 0 {
|
|
// Cache miss. Re-populate the missing data.
|
|
start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds()
|
|
offset = timestamp - start
|
|
if offset < 0 {
|
|
start = timestamp
|
|
offset = 0
|
|
}
|
|
if tooBigOffset(offset) {
|
|
qt.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+
|
|
"for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window)
|
|
tss, err := evalAt(qt, timestamp, window)
|
|
return tss, 0, err
|
|
}
|
|
qt.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start))
|
|
tss, err := evalAt(qt, start, window)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
if hasDuplicateSeries(tss) {
|
|
qt.Printf("cannot apply instant rollup optimization because the result contains duplicate series")
|
|
tss, err := evalAt(qt, timestamp, window)
|
|
return tss, 0, err
|
|
}
|
|
rollupResultCacheV.PutInstantValues(qt, expr, window, ec.Step, ec.EnforcedTagFilterss, tss)
|
|
return tss, offset, nil
|
|
}
|
|
// Cache hit. Verify whether it is OK to use the cached data.
|
|
offset = timestamp - tssCached[0].Timestamps[0]
|
|
if offset < 0 {
|
|
qt.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s",
|
|
storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp))
|
|
// Delete the outdated cached values, so the cache could be re-populated with newer values.
|
|
deleteCachedSeries(qt)
|
|
goto again
|
|
}
|
|
if tooBigOffset(offset) {
|
|
qt.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+
|
|
"and the cached values is too big comparing to window=%d", offset, window)
|
|
// Delete the outdated cached values, so the cache could be re-populated with newer values.
|
|
deleteCachedSeries(qt)
|
|
goto again
|
|
}
|
|
return tssCached, offset, nil
|
|
}
|
|
|
|
if !ec.mayCache() {
|
|
qt.Printf("do not apply instant rollup optimization because of disabled cache")
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
if window < minWindowForInstantRollupOptimization.Milliseconds() {
|
|
qt.Printf("do not apply instant rollup optimization because of too small window=%d; must be equal or bigger than %d",
|
|
window, minWindowForInstantRollupOptimization.Milliseconds())
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
switch funcName {
|
|
case "avg_over_time":
|
|
if iafc != nil {
|
|
qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name)
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
qt.Printf("optimized calculation for instant rollup avg_over_time(m[d]) as (sum_over_time(m[d]) / count_over_time(m[d]))")
|
|
fe := expr.(*metricsql.FuncExpr)
|
|
feSum := *fe
|
|
feSum.Name = "sum_over_time"
|
|
feCount := *fe
|
|
feCount.Name = "count_over_time"
|
|
be := &metricsql.BinaryOpExpr{
|
|
Op: "/",
|
|
KeepMetricNames: fe.KeepMetricNames,
|
|
Left: &feSum,
|
|
Right: &feCount,
|
|
}
|
|
return evalExpr(qt, ec, be)
|
|
case "rate":
|
|
if iafc != nil {
|
|
if strings.ToLower(iafc.ae.Name) != "sum" {
|
|
qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name)
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
qt.Printf("optimized calculation for sum(rate(m[d])) as (sum(increase(m[d])) / d)")
|
|
afe := expr.(*metricsql.AggrFuncExpr)
|
|
fe := afe.Args[0].(*metricsql.FuncExpr)
|
|
feIncrease := *fe
|
|
feIncrease.Name = "increase"
|
|
re := fe.Args[0].(*metricsql.RollupExpr)
|
|
d := re.Window.Duration(ec.Step)
|
|
if d == 0 {
|
|
d = ec.Step
|
|
}
|
|
afeIncrease := *afe
|
|
afeIncrease.Args = []metricsql.Expr{&feIncrease}
|
|
be := &metricsql.BinaryOpExpr{
|
|
Op: "/",
|
|
KeepMetricNames: true,
|
|
Left: &afeIncrease,
|
|
Right: &metricsql.NumberExpr{
|
|
N: float64(d) / 1000,
|
|
},
|
|
}
|
|
return evalExpr(qt, ec, be)
|
|
}
|
|
qt.Printf("optimized calculation for instant rollup rate(m[d]) as (increase(m[d]) / d)")
|
|
fe := expr.(*metricsql.FuncExpr)
|
|
feIncrease := *fe
|
|
feIncrease.Name = "increase"
|
|
re := fe.Args[0].(*metricsql.RollupExpr)
|
|
d := re.Window.Duration(ec.Step)
|
|
if d == 0 {
|
|
d = ec.Step
|
|
}
|
|
be := &metricsql.BinaryOpExpr{
|
|
Op: "/",
|
|
KeepMetricNames: fe.KeepMetricNames,
|
|
Left: &feIncrease,
|
|
Right: &metricsql.NumberExpr{
|
|
N: float64(d) / 1000,
|
|
},
|
|
}
|
|
return evalExpr(qt, ec, be)
|
|
case "max_over_time":
|
|
if iafc != nil {
|
|
if strings.ToLower(iafc.ae.Name) != "max" {
|
|
qt.Printf("do not apply instant rollup optimization for non-max incremental aggregate %s()", iafc.ae.Name)
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
}
|
|
|
|
// Calculate
|
|
//
|
|
// max_over_time(m[window] @ timestamp)
|
|
//
|
|
// as the maximum of
|
|
//
|
|
// - max_over_time(m[window] @ (timestamp-offset))
|
|
// - max_over_time(m[offset] @ timestamp)
|
|
//
|
|
// if max_over_time(m[offset] @ (timestamp-window)) < max_over_time(m[window] @ (timestamp-offset))
|
|
// otherwise do not apply the optimization
|
|
//
|
|
// where
|
|
//
|
|
// - max_over_time(m[window] @ (timestamp-offset)) is obtained from cache
|
|
// - max_over_time(m[offset] @ timestamp) and max_over_time(m[offset] @ (timestamp-window)) are calculated from the storage
|
|
// These rollups are calculated faster than max_over_time(m[window]) because offset is smaller than window.
|
|
qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
|
|
expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
|
|
defer qtChild.Done()
|
|
|
|
tssCached, offset, err := getCachedSeries(qtChild)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if offset == 0 {
|
|
return tssCached, nil
|
|
}
|
|
// Calculate max_over_time(m[offset] @ timestamp)
|
|
tssStart, err := evalAt(qtChild, timestamp, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if hasDuplicateSeries(tssStart) {
|
|
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
|
|
return evalAt(qtChild, timestamp, window)
|
|
}
|
|
// Calculate max_over_time(m[offset] @ (timestamp - window))
|
|
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if hasDuplicateSeries(tssEnd) {
|
|
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
|
|
return evalAt(qtChild, timestamp, window)
|
|
}
|
|
// Calculate the result
|
|
tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd)
|
|
if !ok {
|
|
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached")
|
|
deleteCachedSeries(qtChild)
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
return tss, nil
|
|
case "min_over_time":
|
|
if iafc != nil {
|
|
if strings.ToLower(iafc.ae.Name) != "min" {
|
|
qt.Printf("do not apply instant rollup optimization for non-min incremental aggregate %s()", iafc.ae.Name)
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
}
|
|
|
|
// Calculate
|
|
//
|
|
// min_over_time(m[window] @ timestamp)
|
|
//
|
|
// as the minimum of
|
|
//
|
|
// - min_over_time(m[window] @ (timestamp-offset))
|
|
// - min_over_time(m[offset] @ timestamp)
|
|
//
|
|
// if min_over_time(m[offset] @ (timestamp-window)) > min_over_time(m[window] @ (timestamp-offset))
|
|
// otherwise do not apply the optimization
|
|
//
|
|
// where
|
|
//
|
|
// - min_over_time(m[window] @ (timestamp-offset)) is obtained from cache
|
|
// - min_over_time(m[offset] @ timestamp) and min_over_time(m[offset] @ (timestamp-window)) are calculated from the storage
|
|
// These rollups are calculated faster than min_over_time(m[window]) because offset is smaller than window.
|
|
qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
|
|
expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
|
|
defer qtChild.Done()
|
|
|
|
tssCached, offset, err := getCachedSeries(qtChild)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if offset == 0 {
|
|
return tssCached, nil
|
|
}
|
|
// Calculate min_over_time(m[offset] @ timestamp)
|
|
tssStart, err := evalAt(qtChild, timestamp, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if hasDuplicateSeries(tssStart) {
|
|
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
|
|
return evalAt(qtChild, timestamp, window)
|
|
}
|
|
// Calculate min_over_time(m[offset] @ (timestamp - window))
|
|
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if hasDuplicateSeries(tssEnd) {
|
|
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
|
|
return evalAt(qtChild, timestamp, window)
|
|
}
|
|
// Calculate the result
|
|
tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd)
|
|
if !ok {
|
|
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached")
|
|
deleteCachedSeries(qtChild)
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
return tss, nil
|
|
case
|
|
"count_eq_over_time",
|
|
"count_gt_over_time",
|
|
"count_le_over_time",
|
|
"count_ne_over_time",
|
|
"count_over_time",
|
|
"increase",
|
|
"increase_pure",
|
|
"sum_over_time":
|
|
if iafc != nil && strings.ToLower(iafc.ae.Name) != "sum" {
|
|
qt.Printf("do not apply instant rollup optimization for non-sum incremental aggregate %s()", iafc.ae.Name)
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
|
|
// Calculate
|
|
//
|
|
// rf(m[window] @ timestamp)
|
|
//
|
|
// as
|
|
//
|
|
// rf(m[window] @ (timestamp-offset)) + rf(m[offset] @ timestamp) - rf(m[offset] @ (timestamp-window))
|
|
//
|
|
// where
|
|
//
|
|
// - rf is count_over_time, sum_over_time or increase
|
|
// - rf(m[window] @ (timestamp-offset)) is obtained from cache
|
|
// - rf(m[offset] @ timestamp) and rf(m[offset] @ (timestamp-window)) are calculated from the storage
|
|
// These rollups are calculated faster than rf(m[window]) because offset is smaller than window.
|
|
qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
|
|
expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
|
|
defer qtChild.Done()
|
|
|
|
tssCached, offset, err := getCachedSeries(qtChild)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if offset == 0 {
|
|
return tssCached, nil
|
|
}
|
|
// Calculate rf(m[offset] @ timestamp)
|
|
tssStart, err := evalAt(qtChild, timestamp, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if hasDuplicateSeries(tssStart) {
|
|
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
|
|
return evalAt(qtChild, timestamp, window)
|
|
}
|
|
// Calculate rf(m[offset] @ (timestamp - window))
|
|
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if hasDuplicateSeries(tssEnd) {
|
|
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
|
|
return evalAt(qtChild, timestamp, window)
|
|
}
|
|
// Calculate the result
|
|
tss := getSumInstantValues(qtChild, tssCached, tssStart, tssEnd)
|
|
return tss, nil
|
|
default:
|
|
qt.Printf("instant rollup optimization isn't implemented for %s()", funcName)
|
|
return evalAt(qt, timestamp, window)
|
|
}
|
|
}
|
|
|
|
func hasDuplicateSeries(tss []*timeseries) bool {
|
|
if len(tss) <= 1 {
|
|
return false
|
|
}
|
|
|
|
m := make(map[string]struct{}, len(tss))
|
|
bb := bbPool.Get()
|
|
defer bbPool.Put(bb)
|
|
|
|
for _, ts := range tss {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
if _, ok := m[string(bb.B)]; ok {
|
|
return true
|
|
}
|
|
m[string(bb.B)] = struct{}{}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) {
|
|
qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
|
|
defer qt.Done()
|
|
|
|
getMin := func(a, b float64) float64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMin)
|
|
qt.Printf("resulting series=%d; ok=%v", len(tss), ok)
|
|
return tss, ok
|
|
}
|
|
|
|
func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) {
|
|
qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
|
|
defer qt.Done()
|
|
|
|
getMax := func(a, b float64) float64 {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMax)
|
|
qt.Printf("resulting series=%d", len(tss))
|
|
return tss, ok
|
|
}
|
|
|
|
func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, f func(a, b float64) float64) ([]*timeseries, bool) {
|
|
assertInstantValues(tssCached)
|
|
assertInstantValues(tssStart)
|
|
assertInstantValues(tssEnd)
|
|
|
|
bb := bbPool.Get()
|
|
defer bbPool.Put(bb)
|
|
|
|
m := make(map[string]*timeseries, len(tssCached))
|
|
for _, ts := range tssCached {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
if _, ok := m[string(bb.B)]; ok {
|
|
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
|
|
}
|
|
m[string(bb.B)] = ts
|
|
}
|
|
|
|
mStart := make(map[string]*timeseries, len(tssStart))
|
|
for _, ts := range tssStart {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
if _, ok := mStart[string(bb.B)]; ok {
|
|
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
|
|
}
|
|
mStart[string(bb.B)] = ts
|
|
tsCached := m[string(bb.B)]
|
|
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
|
|
if !math.IsNaN(ts.Values[0]) {
|
|
tsCached.Values[0] = f(ts.Values[0], tsCached.Values[0])
|
|
}
|
|
} else {
|
|
m[string(bb.B)] = ts
|
|
}
|
|
}
|
|
|
|
for _, ts := range tssEnd {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
tsCached := m[string(bb.B)]
|
|
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) && !math.IsNaN(ts.Values[0]) {
|
|
if ts.Values[0] == f(ts.Values[0], tsCached.Values[0]) {
|
|
tsStart := mStart[string(bb.B)]
|
|
if tsStart == nil || math.IsNaN(tsStart.Values[0]) || tsStart.Values[0] != f(ts.Values[0], tsStart.Values[0]) {
|
|
return nil, false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, ts := range m {
|
|
rvs = append(rvs, ts)
|
|
}
|
|
return rvs, true
|
|
}
|
|
|
|
// getSumInstantValues calculates tssCached + tssStart - tssEnd
|
|
func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) []*timeseries {
|
|
qt = qt.NewChild("calculate the sum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
|
|
defer qt.Done()
|
|
|
|
assertInstantValues(tssCached)
|
|
assertInstantValues(tssStart)
|
|
assertInstantValues(tssEnd)
|
|
|
|
m := make(map[string]*timeseries, len(tssCached))
|
|
bb := bbPool.Get()
|
|
defer bbPool.Put(bb)
|
|
|
|
for _, ts := range tssCached {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
if _, ok := m[string(bb.B)]; ok {
|
|
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
|
|
}
|
|
m[string(bb.B)] = ts
|
|
}
|
|
|
|
for _, ts := range tssStart {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
tsCached := m[string(bb.B)]
|
|
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
|
|
if !math.IsNaN(ts.Values[0]) {
|
|
tsCached.Values[0] += ts.Values[0]
|
|
}
|
|
} else {
|
|
m[string(bb.B)] = ts
|
|
}
|
|
}
|
|
|
|
for _, ts := range tssEnd {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
tsCached := m[string(bb.B)]
|
|
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
|
|
if !math.IsNaN(ts.Values[0]) {
|
|
tsCached.Values[0] -= ts.Values[0]
|
|
}
|
|
}
|
|
}
|
|
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, ts := range m {
|
|
rvs = append(rvs, ts)
|
|
}
|
|
qt.Printf("resulting series=%d", len(rvs))
|
|
return rvs
|
|
}
|
|
|
|
func assertInstantValues(tss []*timeseries) {
|
|
for _, ts := range tss {
|
|
if len(ts.Values) != 1 {
|
|
logger.Panicf("BUG: instant series must contain a single value; got %d values", len(ts.Values))
|
|
}
|
|
if len(ts.Timestamps) != 1 {
|
|
logger.Panicf("BUG: instant series must contain a single timestamp; got %d timestamps", len(ts.Timestamps))
|
|
}
|
|
}
|
|
}
|
|
|
|
var (
|
|
rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`)
|
|
rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`)
|
|
rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
|
|
|
|
memoryIntensiveQueries = metrics.NewCounter(`vm_memory_intensive_queries_total`)
|
|
)
|
|
|
|
func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
|
|
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) {
|
|
window, err := windowExpr.NonNegativeDuration(ec.Step)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
|
|
}
|
|
if me.IsEmpty() {
|
|
return evalNumber(ec, nan), nil
|
|
}
|
|
|
|
if ec.Start == ec.End {
|
|
rvs, err := evalInstantRollup(qt, ec, funcName, rf, expr, me, iafc, window)
|
|
if err != nil {
|
|
err = &UserReadableError{
|
|
Err: err,
|
|
}
|
|
return nil, err
|
|
}
|
|
return rvs, nil
|
|
}
|
|
pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step
|
|
evalWithConfig := func(ec *EvalConfig) ([]*timeseries, error) {
|
|
tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries)
|
|
if err != nil {
|
|
err = &UserReadableError{
|
|
Err: err,
|
|
}
|
|
return nil, err
|
|
}
|
|
return tss, nil
|
|
}
|
|
if !ec.mayCache() {
|
|
qt.Printf("do not fetch series from cache, since it is disabled in the current context")
|
|
return evalWithConfig(ec)
|
|
}
|
|
|
|
// Search for cached results.
|
|
tssCached, start := rollupResultCacheV.GetSeries(qt, ec, expr, window)
|
|
ec.QueryStats.addSeriesFetched(len(tssCached))
|
|
if start > ec.End {
|
|
qt.Printf("the result is fully cached")
|
|
rollupResultCacheFullHits.Inc()
|
|
return tssCached, nil
|
|
}
|
|
if start > ec.Start {
|
|
qt.Printf("partial cache hit")
|
|
rollupResultCachePartialHits.Inc()
|
|
} else {
|
|
qt.Printf("cache miss")
|
|
rollupResultCacheMiss.Inc()
|
|
}
|
|
|
|
// Fetch missing results, which aren't cached yet.
|
|
ecNew := ec
|
|
if start != ec.Start {
|
|
ecNew = copyEvalConfig(ec)
|
|
ecNew.Start = start
|
|
}
|
|
tss, err := evalWithConfig(ecNew)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Merge cached results with the fetched additional results.
|
|
rvs, ok := mergeSeries(qt, tssCached, tss, start, ec)
|
|
if !ok {
|
|
// Cannot merge series - fall back to non-cached querying.
|
|
qt.Printf("fall back to non-caching querying")
|
|
rvs, err = evalWithConfig(ec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
rollupResultCacheV.PutSeries(qt, ec, expr, window, rvs)
|
|
return rvs, nil
|
|
}
|
|
|
|
// evalRollupFuncNoCache calculates the given rf with the given lookbehind window.
|
|
//
|
|
// pointsPerSeries is used only for estimating the needed memory for query processing
|
|
func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
|
|
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, pointsPerSeries int64) ([]*timeseries, error) {
|
|
if qt.Enabled() {
|
|
qt = qt.NewChild("rollup %s: timeRange=%s, step=%d, window=%d", expr.AppendString(nil), ec.timeRangeString(), ec.Step, window)
|
|
defer qt.Done()
|
|
}
|
|
if window < 0 {
|
|
return nil, nil
|
|
}
|
|
// Obtain rollup configs before fetching data from db, so type errors could be caught earlier.
|
|
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
|
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Fetch the result.
|
|
tfss := searchutils.ToTagFilterss(me.LabelFilterss)
|
|
tfss = searchutils.JoinTagFilterss(tfss, ec.EnforcedTagFilterss)
|
|
minTimestamp := ec.Start
|
|
if needSilenceIntervalForRollupFunc(funcName) {
|
|
minTimestamp -= maxSilenceInterval()
|
|
}
|
|
if window > ec.Step {
|
|
minTimestamp -= window
|
|
} else {
|
|
minTimestamp -= ec.Step
|
|
}
|
|
if minTimestamp < 0 {
|
|
minTimestamp = 0
|
|
}
|
|
sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries)
|
|
rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rssLen := rss.Len()
|
|
if rssLen == 0 {
|
|
rss.Cancel()
|
|
return nil, nil
|
|
}
|
|
ec.QueryStats.addSeriesFetched(rssLen)
|
|
|
|
// Verify timeseries fit available memory during rollup calculations.
|
|
timeseriesLen := rssLen
|
|
if iafc != nil {
|
|
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
|
|
timeseriesLen = cgroup.AvailableCPUs()
|
|
if iafc.ae.Modifier.Op != "" {
|
|
if iafc.ae.Limit > 0 {
|
|
// There is an explicit limit on the number of output time series.
|
|
timeseriesLen *= iafc.ae.Limit
|
|
} else {
|
|
// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
|
|
// since each group can have own set of time series in memory.
|
|
timeseriesLen *= 1000
|
|
}
|
|
}
|
|
// The maximum number of output time series is limited by rssLen.
|
|
if timeseriesLen > rssLen {
|
|
timeseriesLen = rssLen
|
|
}
|
|
}
|
|
rollupPoints := mulNoOverflow(pointsPerSeries, int64(timeseriesLen*len(rcs)))
|
|
rollupMemorySize := sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16))
|
|
if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory {
|
|
memoryIntensiveQueries.Inc()
|
|
requestURI := ec.GetRequestURI()
|
|
logger.Warnf("remoteAddr=%s, requestURI=%s: the %s requires %d bytes of memory for processing; "+
|
|
"logging this query, since it exceeds the -search.logQueryMemoryUsage=%d; "+
|
|
"the query selects %d time series and generates %d points across all the time series; try reducing the number of selected time series",
|
|
ec.QuotedRemoteAddr, requestURI, expr.AppendString(nil), rollupMemorySize, maxMemory, timeseriesLen*len(rcs), rollupPoints)
|
|
}
|
|
if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory {
|
|
rss.Cancel()
|
|
err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+
|
|
"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+
|
|
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
|
|
"increasing -search.maxMemoryPerQuery",
|
|
expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3)
|
|
return nil, err
|
|
}
|
|
rml := getRollupMemoryLimiter()
|
|
if !rml.Get(uint64(rollupMemorySize)) {
|
|
rss.Cancel()
|
|
err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+
|
|
"total available memory for concurrent requests: %d bytes; requested memory: %d bytes; "+
|
|
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
|
|
"switching to node with more RAM; increasing -memory.allowedPercent",
|
|
expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3)
|
|
return nil, err
|
|
}
|
|
defer rml.Put(uint64(rollupMemorySize))
|
|
qt.Printf("the rollup evaluation needs an estimated %d bytes of RAM for %d series and %d points per series (summary %d points)",
|
|
rollupMemorySize, timeseriesLen, pointsPerSeries, rollupPoints)
|
|
|
|
// Evaluate rollup
|
|
keepMetricNames := getKeepMetricNames(expr)
|
|
if iafc != nil {
|
|
return evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps)
|
|
}
|
|
return evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)
|
|
}
|
|
|
|
var (
|
|
rollupMemoryLimiter memoryLimiter
|
|
rollupMemoryLimiterOnce sync.Once
|
|
)
|
|
|
|
func getRollupMemoryLimiter() *memoryLimiter {
|
|
rollupMemoryLimiterOnce.Do(func() {
|
|
rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 4
|
|
})
|
|
return &rollupMemoryLimiter
|
|
}
|
|
|
|
func maxSilenceInterval() int64 {
|
|
d := minStalenessInterval.Milliseconds()
|
|
if d <= 0 {
|
|
d = 5 * 60 * 1000
|
|
}
|
|
return d
|
|
}
|
|
|
|
func needSilenceIntervalForRollupFunc(funcName string) bool {
|
|
// All the rollup functions, which do not rely on the previous sample
|
|
// before the lookbehind window (aka prevValue and realPrevValue), do not need silence interval.
|
|
switch strings.ToLower(funcName) {
|
|
case "default_rollup":
|
|
// The default_rollup implicitly relies on the previous samples in order to fill gaps.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5388
|
|
return true
|
|
case
|
|
"absent_over_time",
|
|
"avg_over_time",
|
|
"count_eq_over_time",
|
|
"count_gt_over_time",
|
|
"count_le_over_time",
|
|
"count_ne_over_time",
|
|
"count_over_time",
|
|
"first_over_time",
|
|
"histogram_over_time",
|
|
"hoeffding_bound_lower",
|
|
"hoeffding_bound_upper",
|
|
"last_over_time",
|
|
"mad_over_time",
|
|
"max_over_time",
|
|
"median_over_time",
|
|
"min_over_time",
|
|
"predict_linear",
|
|
"present_over_time",
|
|
"quantile_over_time",
|
|
"quantiles_over_time",
|
|
"range_over_time",
|
|
"share_gt_over_time",
|
|
"share_le_over_time",
|
|
"share_eq_over_time",
|
|
"stale_samples_over_time",
|
|
"stddev_over_time",
|
|
"stdvar_over_time",
|
|
"sum_over_time",
|
|
"tfirst_over_time",
|
|
"timestamp",
|
|
"timestamp_with_name",
|
|
"tlast_over_time",
|
|
"tmax_over_time",
|
|
"tmin_over_time",
|
|
"zscore_over_time":
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool,
|
|
iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
|
|
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
|
qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs)
|
|
defer qt.Done()
|
|
var samplesScannedTotal uint64
|
|
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
|
|
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
|
|
preFunc(rs.Values, rs.Timestamps)
|
|
ts := getTimeseries()
|
|
defer putTimeseries(ts)
|
|
for _, rc := range rcs {
|
|
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
|
|
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
|
|
for _, ts := range tsm.m {
|
|
iafc.updateTimeseries(ts, workerID)
|
|
}
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
continue
|
|
}
|
|
ts.Reset()
|
|
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
iafc.updateTimeseries(ts, workerID)
|
|
|
|
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
|
|
ts.Timestamps = nil
|
|
ts.denyReuse = false
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tss := iafc.finalizeTimeseries()
|
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
|
qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal)
|
|
return tss, nil
|
|
}
|
|
|
|
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
|
|
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
|
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
|
|
defer qt.Done()
|
|
|
|
var samplesScannedTotal uint64
|
|
tsw := getTimeseriesByWorkerID()
|
|
seriesByWorkerID := tsw.byWorkerID
|
|
seriesLen := rss.Len()
|
|
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
|
|
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
|
|
preFunc(rs.Values, rs.Timestamps)
|
|
for _, rc := range rcs {
|
|
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
|
|
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
|
|
continue
|
|
}
|
|
var ts timeseries
|
|
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tss := make([]*timeseries, 0, seriesLen*len(rcs))
|
|
for i := range seriesByWorkerID {
|
|
tss = append(tss, seriesByWorkerID[i].tss...)
|
|
}
|
|
putTimeseriesByWorkerID(tsw)
|
|
|
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
|
qt.Printf("samplesScanned=%d", samplesScannedTotal)
|
|
return tss, nil
|
|
}
|
|
|
|
func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName,
|
|
valuesSrc []float64, timestampsSrc []int64, sharedTimestamps []int64) uint64 {
|
|
tsDst.MetricName.CopyFrom(mnSrc)
|
|
if len(rc.TagValue) > 0 {
|
|
tsDst.MetricName.AddTag("rollup", rc.TagValue)
|
|
}
|
|
if !keepMetricNames && !rollupFuncsKeepMetricName[funcName] {
|
|
tsDst.MetricName.ResetMetricGroup()
|
|
}
|
|
var samplesScanned uint64
|
|
tsDst.Values, samplesScanned = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc)
|
|
tsDst.Timestamps = sharedTimestamps
|
|
tsDst.denyReuse = true
|
|
return samplesScanned
|
|
}
|
|
|
|
type timeseriesWithPadding struct {
|
|
tss []*timeseries
|
|
|
|
// The padding prevents false sharing on widespread platforms with
|
|
// 128 mod (cache line size) = 0 .
|
|
_ [128 - unsafe.Sizeof([]*timeseries{})%128]byte
|
|
}
|
|
|
|
type timeseriesByWorkerID struct {
|
|
byWorkerID []timeseriesWithPadding
|
|
}
|
|
|
|
func (tsw *timeseriesByWorkerID) reset() {
|
|
byWorkerID := tsw.byWorkerID
|
|
for i := range byWorkerID {
|
|
byWorkerID[i].tss = nil
|
|
}
|
|
}
|
|
|
|
func getTimeseriesByWorkerID() *timeseriesByWorkerID {
|
|
v := timeseriesByWorkerIDPool.Get()
|
|
if v == nil {
|
|
return ×eriesByWorkerID{
|
|
byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()),
|
|
}
|
|
}
|
|
return v.(*timeseriesByWorkerID)
|
|
}
|
|
|
|
func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) {
|
|
tsw.reset()
|
|
timeseriesByWorkerIDPool.Put(tsw)
|
|
}
|
|
|
|
var timeseriesByWorkerIDPool sync.Pool
|
|
|
|
var bbPool bytesutil.ByteBufferPool
|
|
|
|
func evalNumber(ec *EvalConfig, n float64) []*timeseries {
|
|
var ts timeseries
|
|
ts.denyReuse = true
|
|
timestamps := ec.getSharedTimestamps()
|
|
values := make([]float64, len(timestamps))
|
|
for i := range timestamps {
|
|
values[i] = n
|
|
}
|
|
ts.Values = values
|
|
ts.Timestamps = timestamps
|
|
return []*timeseries{&ts}
|
|
}
|
|
|
|
func evalString(ec *EvalConfig, s string) []*timeseries {
|
|
rv := evalNumber(ec, nan)
|
|
rv[0].MetricName.MetricGroup = append(rv[0].MetricName.MetricGroup[:0], s...)
|
|
return rv
|
|
}
|
|
|
|
func evalTime(ec *EvalConfig) []*timeseries {
|
|
rv := evalNumber(ec, nan)
|
|
timestamps := rv[0].Timestamps
|
|
values := rv[0].Values
|
|
for i, ts := range timestamps {
|
|
values[i] = float64(ts) / 1e3
|
|
}
|
|
return rv
|
|
}
|
|
|
|
func mulNoOverflow(a, b int64) int64 {
|
|
if math.MaxInt64/b < a {
|
|
// Overflow
|
|
return math.MaxInt64
|
|
}
|
|
return a * b
|
|
}
|
|
|
|
func sumNoOverflow(a, b int64) int64 {
|
|
if math.MaxInt64-a < b {
|
|
// Overflow
|
|
return math.MaxInt64
|
|
}
|
|
return a + b
|
|
}
|
|
|
|
func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) {
|
|
if *noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" {
|
|
// Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function,
|
|
// since it uses them for Prometheus-style staleness detection.
|
|
// Do not drop staleness marks for stale_samples_over_time() function, since it needs
|
|
// to calculate the number of staleness markers.
|
|
return values, timestamps
|
|
}
|
|
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
|
|
hasStaleSamples := false
|
|
for _, v := range values {
|
|
if decimal.IsStaleNaN(v) {
|
|
hasStaleSamples = true
|
|
break
|
|
}
|
|
}
|
|
if !hasStaleSamples {
|
|
// Fast path: values have no Prometheus staleness marks.
|
|
return values, timestamps
|
|
}
|
|
// Slow path: drop Prometheus staleness marks from values.
|
|
dstValues := values[:0]
|
|
dstTimestamps := timestamps[:0]
|
|
for i, v := range values {
|
|
if decimal.IsStaleNaN(v) {
|
|
continue
|
|
}
|
|
dstValues = append(dstValues, v)
|
|
dstTimestamps = append(dstTimestamps, timestamps[i])
|
|
}
|
|
return dstValues, dstTimestamps
|
|
}
|