mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-12 13:32:25 +01:00
b8da90b893
This is a follow-up for 04a05f161c
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3203
1285 lines
40 KiB
Go
1285 lines
40 KiB
Go
package promql
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"math"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/VictoriaMetrics/metricsql"
|
|
)
|
|
|
|
var (
|
|
disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful during data backfilling")
|
|
maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+
|
|
"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3")
|
|
maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+
|
|
"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+
|
|
"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests")
|
|
noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+
|
|
"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets")
|
|
)
|
|
|
|
// The minimum number of points per timeseries for enabling time rounding.
|
|
// This improves cache hit ratio for frequently requested queries over
|
|
// big time ranges.
|
|
const minTimeseriesPointsForTimeRounding = 50
|
|
|
|
// ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints.
|
|
func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error {
|
|
if step == 0 {
|
|
return fmt.Errorf("step can't be equal to zero")
|
|
}
|
|
points := (end-start)/step + 1
|
|
if points > int64(maxPoints) {
|
|
return fmt.Errorf("too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d",
|
|
start, end, step, points, maxPoints)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
|
|
//
|
|
// See EvalConfig.mayCache for details.
|
|
func AdjustStartEnd(start, end, step int64) (int64, int64) {
|
|
if *disableCache {
|
|
// Do not adjust start and end values when cache is disabled.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563
|
|
return start, end
|
|
}
|
|
points := (end-start)/step + 1
|
|
if points < minTimeseriesPointsForTimeRounding {
|
|
// Too small number of points for rounding.
|
|
return start, end
|
|
}
|
|
|
|
// Round start and end to values divisible by step in order
|
|
// to enable response caching (see EvalConfig.mayCache).
|
|
start, end = alignStartEnd(start, end, step)
|
|
|
|
// Make sure that the new number of points is the same as the initial number of points.
|
|
newPoints := (end-start)/step + 1
|
|
for newPoints > points {
|
|
end -= step
|
|
newPoints--
|
|
}
|
|
|
|
return start, end
|
|
}
|
|
|
|
func alignStartEnd(start, end, step int64) (int64, int64) {
|
|
// Round start to the nearest smaller value divisible by step.
|
|
start -= start % step
|
|
// Round end to the nearest bigger value divisible by step.
|
|
adjust := end % step
|
|
if adjust > 0 {
|
|
end += step - adjust
|
|
}
|
|
return start, end
|
|
}
|
|
|
|
// EvalConfig is the configuration required for query evaluation via Exec
|
|
type EvalConfig struct {
|
|
Start int64
|
|
End int64
|
|
Step int64
|
|
|
|
// MaxSeries is the maximum number of time series, which can be scanned by the query.
|
|
// Zero means 'no limit'
|
|
MaxSeries int
|
|
|
|
// MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series.
|
|
MaxPointsPerSeries int
|
|
|
|
// QuotedRemoteAddr contains quoted remote address.
|
|
QuotedRemoteAddr string
|
|
|
|
Deadline searchutils.Deadline
|
|
|
|
// Whether the response can be cached.
|
|
MayCache bool
|
|
|
|
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
|
|
LookbackDelta int64
|
|
|
|
// How many decimal digits after the point to leave in response.
|
|
RoundDigits int
|
|
|
|
// EnforcedTagFilterss may contain additional label filters to use in the query.
|
|
EnforcedTagFilterss [][]storage.TagFilter
|
|
|
|
timestamps []int64
|
|
timestampsOnce sync.Once
|
|
}
|
|
|
|
// copyEvalConfig returns src copy.
|
|
func copyEvalConfig(src *EvalConfig) *EvalConfig {
|
|
var ec EvalConfig
|
|
ec.Start = src.Start
|
|
ec.End = src.End
|
|
ec.Step = src.Step
|
|
ec.MaxSeries = src.MaxSeries
|
|
ec.MaxPointsPerSeries = src.MaxPointsPerSeries
|
|
ec.Deadline = src.Deadline
|
|
ec.MayCache = src.MayCache
|
|
ec.LookbackDelta = src.LookbackDelta
|
|
ec.RoundDigits = src.RoundDigits
|
|
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
|
|
|
|
// do not copy src.timestamps - they must be generated again.
|
|
return &ec
|
|
}
|
|
|
|
func (ec *EvalConfig) validate() {
|
|
if ec.Start > ec.End {
|
|
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End)
|
|
}
|
|
if ec.Step <= 0 {
|
|
logger.Panicf("BUG: step must be greater than 0; got %d", ec.Step)
|
|
}
|
|
}
|
|
|
|
func (ec *EvalConfig) mayCache() bool {
|
|
if *disableCache {
|
|
return false
|
|
}
|
|
if !ec.MayCache {
|
|
return false
|
|
}
|
|
if ec.Start%ec.Step != 0 {
|
|
return false
|
|
}
|
|
if ec.End%ec.Step != 0 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (ec *EvalConfig) timeRangeString() string {
|
|
start := storage.TimestampToHumanReadableFormat(ec.Start)
|
|
end := storage.TimestampToHumanReadableFormat(ec.End)
|
|
return fmt.Sprintf("[%s..%s]", start, end)
|
|
}
|
|
|
|
func (ec *EvalConfig) getSharedTimestamps() []int64 {
|
|
ec.timestampsOnce.Do(ec.timestampsInit)
|
|
return ec.timestamps
|
|
}
|
|
|
|
func (ec *EvalConfig) timestampsInit() {
|
|
ec.timestamps = getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
|
}
|
|
|
|
func getTimestamps(start, end, step int64, maxPointsPerSeries int) []int64 {
|
|
// Sanity checks.
|
|
if step <= 0 {
|
|
logger.Panicf("BUG: Step must be bigger than 0; got %d", step)
|
|
}
|
|
if start > end {
|
|
logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", start, end)
|
|
}
|
|
if err := ValidateMaxPointsPerSeries(start, end, step, maxPointsPerSeries); err != nil {
|
|
logger.Panicf("BUG: %s; this must be validated before the call to getTimestamps", err)
|
|
}
|
|
|
|
// Prepare timestamps.
|
|
points := 1 + (end-start)/step
|
|
timestamps := make([]int64, points)
|
|
for i := range timestamps {
|
|
timestamps[i] = start
|
|
start += step
|
|
}
|
|
return timestamps
|
|
}
|
|
|
|
func evalExpr(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
|
|
if qt.Enabled() {
|
|
query := string(e.AppendString(nil))
|
|
query = bytesutil.LimitStringLen(query, 300)
|
|
mayCache := ec.mayCache()
|
|
qt = qt.NewChild("eval: query=%s, timeRange=%s, step=%d, mayCache=%v", query, ec.timeRangeString(), ec.Step, mayCache)
|
|
}
|
|
rv, err := evalExprInternal(qt, ec, e)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if qt.Enabled() {
|
|
seriesCount := len(rv)
|
|
pointsPerSeries := 0
|
|
if len(rv) > 0 {
|
|
pointsPerSeries = len(rv[0].Timestamps)
|
|
}
|
|
pointsCount := seriesCount * pointsPerSeries
|
|
qt.Donef("series=%d, points=%d, pointsPerSeries=%d", seriesCount, pointsCount, pointsPerSeries)
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
|
|
if me, ok := e.(*metricsql.MetricExpr); ok {
|
|
re := &metricsql.RollupExpr{
|
|
Expr: me,
|
|
}
|
|
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, me.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
if re, ok := e.(*metricsql.RollupExpr); ok {
|
|
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, re.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
if fe, ok := e.(*metricsql.FuncExpr); ok {
|
|
nrf := getRollupFunc(fe.Name)
|
|
if nrf == nil {
|
|
qtChild := qt.NewChild("transform %s()", fe.Name)
|
|
rv, err := evalTransformFunc(qtChild, ec, fe)
|
|
qtChild.Donef("series=%d", len(rv))
|
|
return rv, err
|
|
}
|
|
args, re, err := evalRollupFuncArgs(qt, ec, fe)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rf, err := nrf(args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
if ae, ok := e.(*metricsql.AggrFuncExpr); ok {
|
|
qtChild := qt.NewChild("aggregate %s()", ae.Name)
|
|
rv, err := evalAggrFunc(qtChild, ec, ae)
|
|
qtChild.Donef("series=%d", len(rv))
|
|
return rv, err
|
|
}
|
|
if be, ok := e.(*metricsql.BinaryOpExpr); ok {
|
|
qtChild := qt.NewChild("binary op %q", be.Op)
|
|
rv, err := evalBinaryOp(qtChild, ec, be)
|
|
qtChild.Donef("series=%d", len(rv))
|
|
return rv, err
|
|
}
|
|
if ne, ok := e.(*metricsql.NumberExpr); ok {
|
|
rv := evalNumber(ec, ne.N)
|
|
return rv, nil
|
|
}
|
|
if se, ok := e.(*metricsql.StringExpr); ok {
|
|
rv := evalString(ec, se.S)
|
|
return rv, nil
|
|
}
|
|
if de, ok := e.(*metricsql.DurationExpr); ok {
|
|
d := de.Duration(ec.Step)
|
|
dSec := float64(d) / 1000
|
|
rv := evalNumber(ec, dSec)
|
|
return rv, nil
|
|
}
|
|
return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
|
|
}
|
|
|
|
func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) {
|
|
tf := getTransformFunc(fe.Name)
|
|
if tf == nil {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf(`unknown func %q`, fe.Name),
|
|
}
|
|
}
|
|
var args [][]*timeseries
|
|
var err error
|
|
switch fe.Name {
|
|
case "", "union":
|
|
args, err = evalExprsInParallel(qt, ec, fe.Args)
|
|
default:
|
|
args, err = evalExprsSequentially(qt, ec, fe.Args)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tfa := &transformFuncArg{
|
|
ec: ec,
|
|
fe: fe,
|
|
args: args,
|
|
}
|
|
rv, err := tf(tfa)
|
|
if err != nil {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err),
|
|
}
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFuncExpr) ([]*timeseries, error) {
|
|
if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil {
|
|
fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae)
|
|
if fe != nil {
|
|
// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
|
|
// The optimized path saves RAM for aggregates over big number of time series.
|
|
args, re, err := evalRollupFuncArgs(qt, ec, fe)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rf, err := nrf(args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
iafc := newIncrementalAggrFuncContext(ae, callbacks)
|
|
return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc)
|
|
}
|
|
}
|
|
args, err := evalExprsInParallel(qt, ec, ae.Args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
af := getAggrFunc(ae.Name)
|
|
if af == nil {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf(`unknown func %q`, ae.Name),
|
|
}
|
|
}
|
|
afa := &aggrFuncArg{
|
|
ae: ae,
|
|
args: args,
|
|
ec: ec,
|
|
}
|
|
rv, err := af(afa)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, error) {
|
|
bf := getBinaryOpFunc(be.Op)
|
|
if bf == nil {
|
|
return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
|
|
}
|
|
var err error
|
|
var tssLeft, tssRight []*timeseries
|
|
switch strings.ToLower(be.Op) {
|
|
case "and", "if":
|
|
// Fetch right-side series at first, since it usually contains
|
|
// lower number of time series for `and` and `if` operator.
|
|
// This should produce more specific label filters for the left side of the query.
|
|
// This, in turn, should reduce the time to select series for the left side of the query.
|
|
tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be)
|
|
default:
|
|
tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
|
|
}
|
|
bfa := &binaryOpFuncArg{
|
|
be: be,
|
|
left: tssLeft,
|
|
right: tssRight,
|
|
}
|
|
rv, err := bf(bfa)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot evaluate %q: %w`, be.AppendString(nil), err)
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool {
|
|
switch strings.ToLower(be.Op) {
|
|
case "or", "default":
|
|
return false
|
|
}
|
|
if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
|
|
afe, ok := e.(*metricsql.AggrFuncExpr)
|
|
if !ok {
|
|
return false
|
|
}
|
|
return len(afe.Modifier.Args) == 0
|
|
}
|
|
|
|
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
|
|
if !canPushdownCommonFilters(be) {
|
|
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
|
|
// from exprFirst to exprSecond.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
|
|
qt = qt.NewChild("execute left and right sides of %q in parallel", be.Op)
|
|
defer qt.Done()
|
|
var wg sync.WaitGroup
|
|
|
|
var tssFirst []*timeseries
|
|
var errFirst error
|
|
qtFirst := qt.NewChild("expr1")
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst)
|
|
qtFirst.Done()
|
|
}()
|
|
|
|
var tssSecond []*timeseries
|
|
var errSecond error
|
|
qtSecond := qt.NewChild("expr2")
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond)
|
|
qtSecond.Done()
|
|
}()
|
|
|
|
wg.Wait()
|
|
if errFirst != nil {
|
|
return nil, nil, errFirst
|
|
}
|
|
if errSecond != nil {
|
|
return nil, nil, errSecond
|
|
}
|
|
return tssFirst, tssSecond, nil
|
|
}
|
|
|
|
// Execute binary operation in the following way:
|
|
//
|
|
// 1) execute the exprFirst
|
|
// 2) get common label filters for series returned at step 1
|
|
// 3) push down the found common label filters to exprSecond. This filters out unneeded series
|
|
// during exprSecond exection instead of spending compute resources on extracting and processing these series
|
|
// before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
|
|
// 4) execute the exprSecond with possible additional filters found at step 3
|
|
//
|
|
// Typical use cases:
|
|
// - Kubernetes-related: show pod creation time with the node name:
|
|
//
|
|
// kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
|
|
//
|
|
// Without the optimization `kube_pod_info` would select and spend compute resources
|
|
// for more time series than needed. The selected time series would be dropped later
|
|
// when matching time series on the right and left sides of binary operand.
|
|
//
|
|
// - Generic alerting queries, which rely on `info` metrics.
|
|
// See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
|
|
//
|
|
// - Queries, which get additional labels from `info` metrics.
|
|
// See https://www.robustperception.io/exposing-the-software-version-to-prometheus
|
|
tssFirst, err := evalExpr(qt, ec, exprFirst)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
lfs := getCommonLabelFilters(tssFirst)
|
|
lfs = metricsql.TrimFiltersByGroupModifier(lfs, be)
|
|
exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs)
|
|
tssSecond, err := evalExpr(qt, ec, exprSecond)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return tssFirst, tssSecond, nil
|
|
}
|
|
|
|
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
|
|
m := make(map[string][]string)
|
|
for _, ts := range tss {
|
|
for _, tag := range ts.MetricName.Tags {
|
|
m[string(tag.Key)] = append(m[string(tag.Key)], string(tag.Value))
|
|
}
|
|
}
|
|
lfs := make([]metricsql.LabelFilter, 0, len(m))
|
|
for key, values := range m {
|
|
if len(values) != len(tss) {
|
|
// Skip the tag, since it doesn't belong to all the time series.
|
|
continue
|
|
}
|
|
values = getUniqueValues(values)
|
|
if len(values) > 1000 {
|
|
// Skip the filter on the given tag, since it needs to enumerate too many unique values.
|
|
// This may slow down the search for matching time series.
|
|
continue
|
|
}
|
|
lf := metricsql.LabelFilter{
|
|
Label: key,
|
|
}
|
|
if len(values) == 1 {
|
|
lf.Value = values[0]
|
|
} else {
|
|
sort.Strings(values)
|
|
lf.Value = joinRegexpValues(values)
|
|
lf.IsRegexp = true
|
|
}
|
|
lfs = append(lfs, lf)
|
|
}
|
|
sort.Slice(lfs, func(i, j int) bool {
|
|
return lfs[i].Label < lfs[j].Label
|
|
})
|
|
return lfs
|
|
}
|
|
|
|
func getUniqueValues(a []string) []string {
|
|
m := make(map[string]struct{}, len(a))
|
|
results := make([]string, 0, len(a))
|
|
for _, s := range a {
|
|
if _, ok := m[s]; !ok {
|
|
results = append(results, s)
|
|
m[s] = struct{}{}
|
|
}
|
|
}
|
|
return results
|
|
}
|
|
|
|
func joinRegexpValues(a []string) string {
|
|
var b []byte
|
|
for i, s := range a {
|
|
sQuoted := regexp.QuoteMeta(s)
|
|
b = append(b, sQuoted...)
|
|
if i < len(a)-1 {
|
|
b = append(b, '|')
|
|
}
|
|
}
|
|
return string(b)
|
|
}
|
|
|
|
func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) {
|
|
if len(ae.Args) != 1 {
|
|
return nil, nil
|
|
}
|
|
e := ae.Args[0]
|
|
// Make sure e contains one of the following:
|
|
// - metricExpr
|
|
// - metricExpr[d]
|
|
// - rollupFunc(metricExpr)
|
|
// - rollupFunc(metricExpr[d])
|
|
|
|
if me, ok := e.(*metricsql.MetricExpr); ok {
|
|
// e = metricExpr
|
|
if me.IsEmpty() {
|
|
return nil, nil
|
|
}
|
|
fe := &metricsql.FuncExpr{
|
|
Name: "default_rollup",
|
|
Args: []metricsql.Expr{me},
|
|
}
|
|
nrf := getRollupFunc(fe.Name)
|
|
return fe, nrf
|
|
}
|
|
if re, ok := e.(*metricsql.RollupExpr); ok {
|
|
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
|
|
return nil, nil
|
|
}
|
|
// e = metricExpr[d]
|
|
fe := &metricsql.FuncExpr{
|
|
Name: "default_rollup",
|
|
Args: []metricsql.Expr{re},
|
|
}
|
|
nrf := getRollupFunc(fe.Name)
|
|
return fe, nrf
|
|
}
|
|
fe, ok := e.(*metricsql.FuncExpr)
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
nrf := getRollupFunc(fe.Name)
|
|
if nrf == nil {
|
|
return nil, nil
|
|
}
|
|
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
|
|
if rollupArgIdx >= len(fe.Args) {
|
|
// Incorrect number of args for rollup func.
|
|
return nil, nil
|
|
}
|
|
arg := fe.Args[rollupArgIdx]
|
|
if me, ok := arg.(*metricsql.MetricExpr); ok {
|
|
if me.IsEmpty() {
|
|
return nil, nil
|
|
}
|
|
// e = rollupFunc(metricExpr)
|
|
return &metricsql.FuncExpr{
|
|
Name: fe.Name,
|
|
Args: []metricsql.Expr{me},
|
|
}, nrf
|
|
}
|
|
if re, ok := arg.(*metricsql.RollupExpr); ok {
|
|
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
|
|
return nil, nil
|
|
}
|
|
// e = rollupFunc(metricExpr[d])
|
|
return fe, nrf
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func evalExprsSequentially(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
|
|
var rvs [][]*timeseries
|
|
for _, e := range es {
|
|
rv, err := evalExpr(qt, ec, e)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rvs = append(rvs, rv)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
|
|
if len(es) < 2 {
|
|
return evalExprsSequentially(qt, ec, es)
|
|
}
|
|
rvs := make([][]*timeseries, len(es))
|
|
errs := make([]error, len(es))
|
|
var wg sync.WaitGroup
|
|
for i, e := range es {
|
|
qt.Printf("eval function args in parallel")
|
|
wg.Add(1)
|
|
qtChild := qt.NewChild("eval arg %d", i)
|
|
go func(e metricsql.Expr, i int) {
|
|
defer func() {
|
|
qtChild.Done()
|
|
wg.Done()
|
|
}()
|
|
rv, err := evalExpr(qtChild, ec, e)
|
|
rvs[i] = rv
|
|
errs[i] = err
|
|
}(e, i)
|
|
}
|
|
wg.Wait()
|
|
for _, err := range errs {
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func evalRollupFuncArgs(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) {
|
|
var re *metricsql.RollupExpr
|
|
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
|
|
if len(fe.Args) <= rollupArgIdx {
|
|
return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil))
|
|
}
|
|
args := make([]interface{}, len(fe.Args))
|
|
for i, arg := range fe.Args {
|
|
if i == rollupArgIdx {
|
|
re = getRollupExprArg(arg)
|
|
args[i] = re
|
|
continue
|
|
}
|
|
ts, err := evalExpr(qt, ec, arg)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("cannot evaluate arg #%d for %q: %w", i+1, fe.AppendString(nil), err)
|
|
}
|
|
args[i] = ts
|
|
}
|
|
return args, re, nil
|
|
}
|
|
|
|
func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
|
|
re, ok := arg.(*metricsql.RollupExpr)
|
|
if !ok {
|
|
// Wrap non-rollup arg into metricsql.RollupExpr.
|
|
return &metricsql.RollupExpr{
|
|
Expr: arg,
|
|
}
|
|
}
|
|
if !re.ForSubquery() {
|
|
// Return standard rollup if it doesn't contain subquery.
|
|
return re
|
|
}
|
|
me, ok := re.Expr.(*metricsql.MetricExpr)
|
|
if !ok {
|
|
// arg contains subquery.
|
|
return re
|
|
}
|
|
// Convert me[w:step] -> default_rollup(me)[w:step]
|
|
reNew := *re
|
|
reNew.Expr = &metricsql.FuncExpr{
|
|
Name: "default_rollup",
|
|
Args: []metricsql.Expr{
|
|
&metricsql.RollupExpr{Expr: me},
|
|
},
|
|
}
|
|
return &reNew
|
|
}
|
|
|
|
// expr may contain:
|
|
// - rollupFunc(m) if iafc is nil
|
|
// - aggrFunc(rollupFunc(m)) if iafc isn't nil
|
|
func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr,
|
|
re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
|
|
if re.At == nil {
|
|
return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc)
|
|
}
|
|
tssAt, err := evalExpr(qt, ec, re.At)
|
|
if err != nil {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err),
|
|
}
|
|
}
|
|
if len(tssAt) != 1 {
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)),
|
|
}
|
|
}
|
|
atTimestamp := int64(tssAt[0].Values[0] * 1000)
|
|
ecNew := copyEvalConfig(ec)
|
|
ecNew.Start = atTimestamp
|
|
ecNew.End = atTimestamp
|
|
tss, err := evalRollupFuncWithoutAt(qt, ecNew, funcName, rf, expr, re, iafc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// expand single-point tss to the original time range.
|
|
timestamps := ec.getSharedTimestamps()
|
|
for _, ts := range tss {
|
|
v := ts.Values[0]
|
|
values := make([]float64, len(timestamps))
|
|
for i := range timestamps {
|
|
values[i] = v
|
|
}
|
|
ts.Timestamps = timestamps
|
|
ts.Values = values
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
|
|
expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
|
|
funcName = strings.ToLower(funcName)
|
|
ecNew := ec
|
|
var offset int64
|
|
if re.Offset != nil {
|
|
offset = re.Offset.Duration(ec.Step)
|
|
ecNew = copyEvalConfig(ecNew)
|
|
ecNew.Start -= offset
|
|
ecNew.End -= offset
|
|
// There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true,
|
|
// since the time range alignment has been already performed by the caller,
|
|
// so cache hit rate should be quite good.
|
|
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
|
|
}
|
|
if funcName == "rollup_candlestick" {
|
|
// Automatically apply `offset -step` to `rollup_candlestick` function
|
|
// in order to obtain expected OHLC results.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
|
|
step := ecNew.Step
|
|
ecNew = copyEvalConfig(ecNew)
|
|
ecNew.Start += step
|
|
ecNew.End += step
|
|
offset -= step
|
|
}
|
|
var rvs []*timeseries
|
|
var err error
|
|
if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
|
|
rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window)
|
|
} else {
|
|
if iafc != nil {
|
|
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil))
|
|
}
|
|
rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re)
|
|
}
|
|
if err != nil {
|
|
return nil, &UserReadableError{
|
|
Err: err,
|
|
}
|
|
}
|
|
if funcName == "absent_over_time" {
|
|
rvs = aggregateAbsentOverTime(ec, re.Expr, rvs)
|
|
}
|
|
if offset != 0 && len(rvs) > 0 {
|
|
// Make a copy of timestamps, since they may be used in other values.
|
|
srcTimestamps := rvs[0].Timestamps
|
|
dstTimestamps := append([]int64{}, srcTimestamps...)
|
|
for i := range dstTimestamps {
|
|
dstTimestamps[i] += offset
|
|
}
|
|
for _, ts := range rvs {
|
|
ts.Timestamps = dstTimestamps
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
// aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values.
|
|
//
|
|
// Values for returned series are set to nan if at least a single tss series contains nan at that point.
|
|
// This means that tss contains a series with non-empty results at that point.
|
|
// This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130
|
|
func aggregateAbsentOverTime(ec *EvalConfig, expr metricsql.Expr, tss []*timeseries) []*timeseries {
|
|
rvs := getAbsentTimeseries(ec, expr)
|
|
if len(tss) == 0 {
|
|
return rvs
|
|
}
|
|
for i := range tss[0].Values {
|
|
for _, ts := range tss {
|
|
if math.IsNaN(ts.Values[i]) {
|
|
rvs[0].Values[i] = nan
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
|
|
// TODO: determine whether to use rollupResultCacheV here.
|
|
qt = qt.NewChild("subquery")
|
|
defer qt.Done()
|
|
step := re.Step.Duration(ec.Step)
|
|
if step == 0 {
|
|
step = ec.Step
|
|
}
|
|
window := re.Window.Duration(ec.Step)
|
|
|
|
ecSQ := copyEvalConfig(ec)
|
|
ecSQ.Start -= window + maxSilenceInterval + step
|
|
ecSQ.End += step
|
|
ecSQ.Step = step
|
|
ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
|
|
if err := ValidateMaxPointsPerSeries(ecSQ.Start, ecSQ.End, ecSQ.Step, ecSQ.MaxPointsPerSeries); err != nil {
|
|
return nil, fmt.Errorf("%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)", err)
|
|
}
|
|
// unconditionally align start and end args to step for subquery as Prometheus does.
|
|
ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step)
|
|
tssSQ, err := evalExpr(qt, ecSQ, re.Expr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(tssSQ) == 0 {
|
|
return nil, nil
|
|
}
|
|
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
|
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
|
|
var tssLock sync.Mutex
|
|
var samplesScannedTotal uint64
|
|
keepMetricNames := getKeepMetricNames(expr)
|
|
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) {
|
|
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
|
|
preFunc(values, timestamps)
|
|
for _, rc := range rcs {
|
|
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
|
|
samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
tssLock.Lock()
|
|
tss = tsm.AppendTimeseriesTo(tss)
|
|
tssLock.Unlock()
|
|
continue
|
|
}
|
|
var ts timeseries
|
|
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
tssLock.Lock()
|
|
tss = append(tss, &ts)
|
|
tssLock.Unlock()
|
|
}
|
|
return values, timestamps
|
|
})
|
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
|
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
|
|
return tss, nil
|
|
}
|
|
|
|
var rowsScannedPerQuery = metrics.NewHistogram(`vm_rows_scanned_per_query`)
|
|
|
|
func getKeepMetricNames(expr metricsql.Expr) bool {
|
|
if ae, ok := expr.(*metricsql.AggrFuncExpr); ok {
|
|
// Extract rollupFunc(...) from aggrFunc(rollupFunc(...)).
|
|
// This case is possible when optimized aggrFunc calculations are used
|
|
// such as `sum(rate(...))`
|
|
if len(ae.Args) != 1 {
|
|
return false
|
|
}
|
|
expr = ae.Args[0]
|
|
}
|
|
if fe, ok := expr.(*metricsql.FuncExpr); ok {
|
|
return fe.KeepMetricNames
|
|
}
|
|
return false
|
|
}
|
|
|
|
func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) {
|
|
concurrency := cgroup.AvailableCPUs()
|
|
if concurrency > len(tss) {
|
|
concurrency = len(tss)
|
|
}
|
|
workCh := make(chan *timeseries, concurrency)
|
|
var wg sync.WaitGroup
|
|
wg.Add(concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
var tmpValues []float64
|
|
var tmpTimestamps []int64
|
|
for ts := range workCh {
|
|
tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps)
|
|
}
|
|
}()
|
|
}
|
|
for _, ts := range tss {
|
|
workCh <- ts
|
|
}
|
|
close(workCh)
|
|
wg.Wait()
|
|
}
|
|
|
|
func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float64, timestamps []int64) ([]float64, []int64) {
|
|
hasNan := false
|
|
for _, v := range values {
|
|
if math.IsNaN(v) {
|
|
hasNan = true
|
|
}
|
|
}
|
|
if !hasNan {
|
|
// Fast path - no NaNs.
|
|
dstValues = append(dstValues, values...)
|
|
dstTimestamps = append(dstTimestamps, timestamps...)
|
|
return dstValues, dstTimestamps
|
|
}
|
|
|
|
// Slow path - remove NaNs.
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
dstValues = append(dstValues, v)
|
|
dstTimestamps = append(dstTimestamps, timestamps[i])
|
|
}
|
|
return dstValues, dstTimestamps
|
|
}
|
|
|
|
var (
|
|
rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`)
|
|
rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`)
|
|
rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
|
|
)
|
|
|
|
func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
|
|
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) {
|
|
var rollupMemorySize int64
|
|
window := windowExpr.Duration(ec.Step)
|
|
if qt.Enabled() {
|
|
qt = qt.NewChild("rollup %s(): timeRange=%s, step=%d, window=%d", funcName, ec.timeRangeString(), ec.Step, window)
|
|
defer func() {
|
|
qt.Donef("neededMemoryBytes=%d", rollupMemorySize)
|
|
}()
|
|
}
|
|
if me.IsEmpty() {
|
|
return evalNumber(ec, nan), nil
|
|
}
|
|
|
|
// Search for partial results in cache.
|
|
tssCached, start := rollupResultCacheV.Get(qt, ec, expr, window)
|
|
if start > ec.End {
|
|
// The result is fully cached.
|
|
rollupResultCacheFullHits.Inc()
|
|
return tssCached, nil
|
|
}
|
|
if start > ec.Start {
|
|
rollupResultCachePartialHits.Inc()
|
|
} else {
|
|
rollupResultCacheMiss.Inc()
|
|
}
|
|
|
|
// Obtain rollup configs before fetching data from db,
|
|
// so type errors can be caught earlier.
|
|
sharedTimestamps := getTimestamps(start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
|
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Fetch the remaining part of the result.
|
|
tfs := searchutils.ToTagFilters(me.LabelFilters)
|
|
tfss := searchutils.JoinTagFilterss([][]storage.TagFilter{tfs}, ec.EnforcedTagFilterss)
|
|
minTimestamp := start - maxSilenceInterval
|
|
if window > ec.Step {
|
|
minTimestamp -= window
|
|
} else {
|
|
minTimestamp -= ec.Step
|
|
}
|
|
sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries)
|
|
rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline)
|
|
if err != nil {
|
|
return nil, &UserReadableError{
|
|
Err: err,
|
|
}
|
|
}
|
|
rssLen := rss.Len()
|
|
if rssLen == 0 {
|
|
rss.Cancel()
|
|
tss := mergeTimeseries(tssCached, nil, start, ec)
|
|
return tss, nil
|
|
}
|
|
|
|
// Verify timeseries fit available memory after the rollup.
|
|
// Take into account points from tssCached.
|
|
pointsPerTimeseries := 1 + (ec.End-ec.Start)/ec.Step
|
|
timeseriesLen := rssLen
|
|
if iafc != nil {
|
|
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
|
|
timeseriesLen = cgroup.AvailableCPUs()
|
|
if iafc.ae.Modifier.Op != "" {
|
|
if iafc.ae.Limit > 0 {
|
|
// There is an explicit limit on the number of output time series.
|
|
timeseriesLen *= iafc.ae.Limit
|
|
} else {
|
|
// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
|
|
// since each group can have own set of time series in memory.
|
|
timeseriesLen *= 1000
|
|
}
|
|
}
|
|
// The maximum number of output time series is limited by rssLen.
|
|
if timeseriesLen > rssLen {
|
|
timeseriesLen = rssLen
|
|
}
|
|
}
|
|
rollupPoints := mulNoOverflow(pointsPerTimeseries, int64(timeseriesLen*len(rcs)))
|
|
rollupMemorySize = sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16))
|
|
if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory {
|
|
rss.Cancel()
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf("not enough memory for processing %d data points across %d time series with %d points in each time series "+
|
|
"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+
|
|
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
|
|
"increasing -search.maxMemoryPerQuery",
|
|
rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3),
|
|
}
|
|
}
|
|
rml := getRollupMemoryLimiter()
|
|
if !rml.Get(uint64(rollupMemorySize)) {
|
|
rss.Cancel()
|
|
return nil, &UserReadableError{
|
|
Err: fmt.Errorf("not enough memory for processing %d data points across %d time series with %d points in each time series; "+
|
|
"total available memory for concurrent requests: %d bytes; "+
|
|
"requested memory: %d bytes; "+
|
|
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
|
|
"switching to node with more RAM; increasing -memory.allowedPercent",
|
|
rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3),
|
|
}
|
|
}
|
|
defer rml.Put(uint64(rollupMemorySize))
|
|
|
|
// Evaluate rollup
|
|
keepMetricNames := getKeepMetricNames(expr)
|
|
var tss []*timeseries
|
|
if iafc != nil {
|
|
tss, err = evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps)
|
|
} else {
|
|
tss, err = evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)
|
|
}
|
|
if err != nil {
|
|
return nil, &UserReadableError{
|
|
Err: err,
|
|
}
|
|
}
|
|
tss = mergeTimeseries(tssCached, tss, start, ec)
|
|
rollupResultCacheV.Put(qt, ec, expr, window, tss)
|
|
return tss, nil
|
|
}
|
|
|
|
var (
|
|
rollupMemoryLimiter memoryLimiter
|
|
rollupMemoryLimiterOnce sync.Once
|
|
)
|
|
|
|
func getRollupMemoryLimiter() *memoryLimiter {
|
|
rollupMemoryLimiterOnce.Do(func() {
|
|
rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 4
|
|
})
|
|
return &rollupMemoryLimiter
|
|
}
|
|
|
|
func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool,
|
|
iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
|
|
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
|
qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs)
|
|
defer qt.Done()
|
|
var samplesScannedTotal uint64
|
|
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
|
|
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
|
|
preFunc(rs.Values, rs.Timestamps)
|
|
ts := getTimeseries()
|
|
defer putTimeseries(ts)
|
|
for _, rc := range rcs {
|
|
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
|
|
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
|
|
for _, ts := range tsm.m {
|
|
iafc.updateTimeseries(ts, workerID)
|
|
}
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
continue
|
|
}
|
|
ts.Reset()
|
|
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
iafc.updateTimeseries(ts, workerID)
|
|
|
|
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
|
|
ts.Timestamps = nil
|
|
ts.denyReuse = false
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tss := iafc.finalizeTimeseries()
|
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
|
qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal)
|
|
return tss, nil
|
|
}
|
|
|
|
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
|
|
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
|
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
|
|
defer qt.Done()
|
|
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
|
|
var tssLock sync.Mutex
|
|
var samplesScannedTotal uint64
|
|
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
|
|
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
|
|
preFunc(rs.Values, rs.Timestamps)
|
|
for _, rc := range rcs {
|
|
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
|
|
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
tssLock.Lock()
|
|
tss = tsm.AppendTimeseriesTo(tss)
|
|
tssLock.Unlock()
|
|
continue
|
|
}
|
|
var ts timeseries
|
|
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
|
|
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
|
|
tssLock.Lock()
|
|
tss = append(tss, &ts)
|
|
tssLock.Unlock()
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
|
qt.Printf("samplesScanned=%d", samplesScannedTotal)
|
|
return tss, nil
|
|
}
|
|
|
|
func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName,
|
|
valuesSrc []float64, timestampsSrc []int64, sharedTimestamps []int64) uint64 {
|
|
tsDst.MetricName.CopyFrom(mnSrc)
|
|
if len(rc.TagValue) > 0 {
|
|
tsDst.MetricName.AddTag("rollup", rc.TagValue)
|
|
}
|
|
if !keepMetricNames && !rollupFuncsKeepMetricName[funcName] {
|
|
tsDst.MetricName.ResetMetricGroup()
|
|
}
|
|
var samplesScanned uint64
|
|
tsDst.Values, samplesScanned = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc)
|
|
tsDst.Timestamps = sharedTimestamps
|
|
tsDst.denyReuse = true
|
|
return samplesScanned
|
|
}
|
|
|
|
var bbPool bytesutil.ByteBufferPool
|
|
|
|
func evalNumber(ec *EvalConfig, n float64) []*timeseries {
|
|
var ts timeseries
|
|
ts.denyReuse = true
|
|
timestamps := ec.getSharedTimestamps()
|
|
values := make([]float64, len(timestamps))
|
|
for i := range timestamps {
|
|
values[i] = n
|
|
}
|
|
ts.Values = values
|
|
ts.Timestamps = timestamps
|
|
return []*timeseries{&ts}
|
|
}
|
|
|
|
func evalString(ec *EvalConfig, s string) []*timeseries {
|
|
rv := evalNumber(ec, nan)
|
|
rv[0].MetricName.MetricGroup = append(rv[0].MetricName.MetricGroup[:0], s...)
|
|
return rv
|
|
}
|
|
|
|
func evalTime(ec *EvalConfig) []*timeseries {
|
|
rv := evalNumber(ec, nan)
|
|
timestamps := rv[0].Timestamps
|
|
values := rv[0].Values
|
|
for i, ts := range timestamps {
|
|
values[i] = float64(ts) / 1e3
|
|
}
|
|
return rv
|
|
}
|
|
|
|
func mulNoOverflow(a, b int64) int64 {
|
|
if math.MaxInt64/b < a {
|
|
// Overflow
|
|
return math.MaxInt64
|
|
}
|
|
return a * b
|
|
}
|
|
|
|
func sumNoOverflow(a, b int64) int64 {
|
|
if math.MaxInt64-a < b {
|
|
// Overflow
|
|
return math.MaxInt64
|
|
}
|
|
return a + b
|
|
}
|
|
|
|
func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) {
|
|
if *noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" {
|
|
// Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function,
|
|
// since it uses them for Prometheus-style staleness detection.
|
|
// Do not drop staleness marks for stale_samples_over_time() function, since it needs
|
|
// to calculate the number of staleness markers.
|
|
return values, timestamps
|
|
}
|
|
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
|
|
hasStaleSamples := false
|
|
for _, v := range values {
|
|
if decimal.IsStaleNaN(v) {
|
|
hasStaleSamples = true
|
|
break
|
|
}
|
|
}
|
|
if !hasStaleSamples {
|
|
// Fast path: values have no Prometheus staleness marks.
|
|
return values, timestamps
|
|
}
|
|
// Slow path: drop Prometheus staleness marks from values.
|
|
dstValues := values[:0]
|
|
dstTimestamps := timestamps[:0]
|
|
for i, v := range values {
|
|
if decimal.IsStaleNaN(v) {
|
|
continue
|
|
}
|
|
dstValues = append(dstValues, v)
|
|
dstTimestamps = append(dstTimestamps, timestamps[i])
|
|
}
|
|
return dstValues, dstTimestamps
|
|
}
|