2019-05-22 23:16:55 +02:00
package promql
import (
"flag"
"fmt"
"math"
2022-01-31 18:32:36 +01:00
"regexp"
"sort"
2021-09-17 22:33:15 +02:00
"strings"
2019-05-22 23:16:55 +02:00
"sync"
2022-06-28 18:26:17 +02:00
"sync/atomic"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
2020-09-11 12:18:57 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2021-08-15 12:20:02 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
2022-10-08 00:07:42 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2022-06-01 01:31:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
2020-04-28 14:28:22 +02:00
"github.com/VictoriaMetrics/metricsql"
2019-05-22 23:16:55 +02:00
)
var (
2022-08-24 14:25:18 +02:00
disableCache = flag . Bool ( "search.disableCache" , false , "Whether to disable response caching. This may be useful during data backfilling" )
maxPointsSubqueryPerTimeseries = flag . Int ( "search.maxPointsSubqueryPerTimeseries" , 100e3 , "The maximum number of points per series, which can be generated by subquery. " +
"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3" )
2022-10-08 00:07:42 +02:00
maxMemoryPerQuery = flagutil . NewBytes ( "search.maxMemoryPerQuery" , 0 , "The maximum amounts of memory a single query may consume. " +
2022-10-10 20:43:36 +02:00
"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated " +
"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests" )
2022-10-08 00:07:42 +02:00
noStaleMarkers = flag . Bool ( "search.noStaleMarkers" , false , "Set this flag to true if the database doesn't contain Prometheus stale markers, " +
"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets" )
2019-05-22 23:16:55 +02:00
)
2019-05-25 20:51:11 +02:00
// The minimum number of points per timeseries for enabling time rounding.
2019-05-22 23:16:55 +02:00
// This improves cache hit ratio for frequently requested queries over
// big time ranges.
const minTimeseriesPointsForTimeRounding = 50
2022-08-24 14:25:18 +02:00
// ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints.
func ValidateMaxPointsPerSeries ( start , end , step int64 , maxPoints int ) error {
if step == 0 {
return fmt . Errorf ( "step can't be equal to zero" )
}
2019-05-22 23:16:55 +02:00
points := ( end - start ) / step + 1
2022-08-24 14:25:18 +02:00
if points > int64 ( maxPoints ) {
2022-09-06 12:25:59 +02:00
return fmt . Errorf ( "too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d" ,
2022-08-24 14:25:18 +02:00
start , end , step , points , maxPoints )
2019-05-22 23:16:55 +02:00
}
return nil
}
// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
//
// See EvalConfig.mayCache for details.
func AdjustStartEnd ( start , end , step int64 ) ( int64 , int64 ) {
2020-07-30 22:14:15 +02:00
if * disableCache {
// Do not adjust start and end values when cache is disabled.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563
return start , end
}
2019-05-22 23:16:55 +02:00
points := ( end - start ) / step + 1
if points < minTimeseriesPointsForTimeRounding {
// Too small number of points for rounding.
return start , end
}
// Round start and end to values divisible by step in order
// to enable response caching (see EvalConfig.mayCache).
2020-09-03 12:21:51 +02:00
start , end = alignStartEnd ( start , end , step )
2019-12-24 21:44:24 +01:00
// Make sure that the new number of points is the same as the initial number of points.
newPoints := ( end - start ) / step + 1
for newPoints > points {
end -= step
newPoints --
}
2019-05-22 23:16:55 +02:00
return start , end
}
2020-09-03 12:21:51 +02:00
func alignStartEnd ( start , end , step int64 ) ( int64 , int64 ) {
// Round start to the nearest smaller value divisible by step.
start -= start % step
// Round end to the nearest bigger value divisible by step.
adjust := end % step
if adjust > 0 {
end += step - adjust
}
return start , end
}
2019-05-22 23:16:55 +02:00
// EvalConfig is the configuration required for query evaluation via Exec
type EvalConfig struct {
2019-05-22 23:23:23 +02:00
AuthToken * auth . Token
Start int64
End int64
Step int64
2019-05-22 23:16:55 +02:00
2022-03-26 09:17:37 +01:00
// MaxSeries is the maximum number of time series, which can be scanned by the query.
// Zero means 'no limit'
MaxSeries int
2022-08-24 14:25:18 +02:00
// MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series.
MaxPointsPerSeries int
2020-07-31 17:00:21 +02:00
// QuotedRemoteAddr contains quoted remote address.
QuotedRemoteAddr string
2020-09-11 12:18:57 +02:00
Deadline searchutils . Deadline
2019-05-22 23:16:55 +02:00
2022-06-01 01:31:40 +02:00
// Whether the response can be cached.
2019-05-22 23:16:55 +02:00
MayCache bool
2019-10-15 18:12:27 +02:00
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
2021-03-15 11:35:44 +01:00
// How many decimal digits after the point to leave in response.
RoundDigits int
2021-12-06 16:07:06 +01:00
// EnforcedTagFilterss may contain additional label filters to use in the query.
EnforcedTagFilterss [ ] [ ] storage . TagFilter
2021-02-01 16:42:35 +01:00
2021-03-15 11:35:44 +01:00
// Whether to deny partial response.
2019-06-30 00:27:03 +02:00
DenyPartialResponse bool
2020-11-14 11:36:21 +01:00
// IsPartialResponse is set during query execution and can be used by Exec caller after query execution.
IsPartialResponse bool
2019-05-22 23:16:55 +02:00
timestamps [ ] int64
timestampsOnce sync . Once
}
2022-03-26 09:17:37 +01:00
// copyEvalConfig returns src copy.
func copyEvalConfig ( src * EvalConfig ) * EvalConfig {
2019-05-22 23:16:55 +02:00
var ec EvalConfig
2019-05-22 23:23:23 +02:00
ec . AuthToken = src . AuthToken
2019-05-22 23:16:55 +02:00
ec . Start = src . Start
ec . End = src . End
ec . Step = src . Step
2022-03-26 09:17:37 +01:00
ec . MaxSeries = src . MaxSeries
2022-08-24 14:25:18 +02:00
ec . MaxPointsPerSeries = src . MaxPointsPerSeries
2019-05-22 23:16:55 +02:00
ec . Deadline = src . Deadline
ec . MayCache = src . MayCache
2019-10-15 18:12:27 +02:00
ec . LookbackDelta = src . LookbackDelta
2021-03-15 11:35:44 +01:00
ec . RoundDigits = src . RoundDigits
2021-12-06 16:07:06 +01:00
ec . EnforcedTagFilterss = src . EnforcedTagFilterss
2019-06-30 00:27:03 +02:00
ec . DenyPartialResponse = src . DenyPartialResponse
2020-11-14 11:36:21 +01:00
ec . IsPartialResponse = src . IsPartialResponse
2019-05-22 23:16:55 +02:00
// do not copy src.timestamps - they must be generated again.
return & ec
}
2020-11-14 11:36:21 +01:00
func ( ec * EvalConfig ) updateIsPartialResponse ( isPartialResponse bool ) {
if ! ec . IsPartialResponse {
ec . IsPartialResponse = isPartialResponse
}
}
2019-05-22 23:16:55 +02:00
func ( ec * EvalConfig ) validate ( ) {
if ec . Start > ec . End {
logger . Panicf ( "BUG: start cannot exceed end; got %d vs %d" , ec . Start , ec . End )
}
if ec . Step <= 0 {
logger . Panicf ( "BUG: step must be greater than 0; got %d" , ec . Step )
}
}
func ( ec * EvalConfig ) mayCache ( ) bool {
2020-07-30 22:14:15 +02:00
if * disableCache {
return false
}
2019-05-22 23:16:55 +02:00
if ! ec . MayCache {
return false
}
if ec . Start % ec . Step != 0 {
return false
}
if ec . End % ec . Step != 0 {
return false
}
return true
}
2022-06-27 12:32:47 +02:00
func ( ec * EvalConfig ) timeRangeString ( ) string {
start := storage . TimestampToHumanReadableFormat ( ec . Start )
end := storage . TimestampToHumanReadableFormat ( ec . End )
return fmt . Sprintf ( "[%s..%s]" , start , end )
}
2019-05-22 23:16:55 +02:00
func ( ec * EvalConfig ) getSharedTimestamps ( ) [ ] int64 {
ec . timestampsOnce . Do ( ec . timestampsInit )
return ec . timestamps
}
func ( ec * EvalConfig ) timestampsInit ( ) {
2022-08-24 14:25:18 +02:00
ec . timestamps = getTimestamps ( ec . Start , ec . End , ec . Step , ec . MaxPointsPerSeries )
2019-05-22 23:16:55 +02:00
}
2022-08-24 14:25:18 +02:00
func getTimestamps ( start , end , step int64 , maxPointsPerSeries int ) [ ] int64 {
2019-05-22 23:16:55 +02:00
// Sanity checks.
if step <= 0 {
logger . Panicf ( "BUG: Step must be bigger than 0; got %d" , step )
}
if start > end {
logger . Panicf ( "BUG: Start cannot exceed End; got %d vs %d" , start , end )
}
2022-08-24 14:25:18 +02:00
if err := ValidateMaxPointsPerSeries ( start , end , step , maxPointsPerSeries ) ; err != nil {
2019-05-22 23:16:55 +02:00
logger . Panicf ( "BUG: %s; this must be validated before the call to getTimestamps" , err )
}
// Prepare timestamps.
points := 1 + ( end - start ) / step
timestamps := make ( [ ] int64 , points )
for i := range timestamps {
timestamps [ i ] = start
start += step
}
return timestamps
}
2022-06-01 01:31:40 +02:00
func evalExpr ( qt * querytracer . Tracer , ec * EvalConfig , e metricsql . Expr ) ( [ ] * timeseries , error ) {
2022-06-08 20:05:17 +02:00
if qt . Enabled ( ) {
2022-06-30 17:17:07 +02:00
query := string ( e . AppendString ( nil ) )
query = bytesutil . LimitStringLen ( query , 300 )
2022-06-08 20:05:17 +02:00
mayCache := ec . mayCache ( )
2022-06-27 12:32:47 +02:00
qt = qt . NewChild ( "eval: query=%s, timeRange=%s, step=%d, mayCache=%v" , query , ec . timeRangeString ( ) , ec . Step , mayCache )
2022-06-08 20:05:17 +02:00
}
2022-06-01 01:31:40 +02:00
rv , err := evalExprInternal ( qt , ec , e )
if err != nil {
return nil , err
}
if qt . Enabled ( ) {
seriesCount := len ( rv )
pointsPerSeries := 0
if len ( rv ) > 0 {
pointsPerSeries = len ( rv [ 0 ] . Timestamps )
}
pointsCount := seriesCount * pointsPerSeries
2022-06-08 20:05:17 +02:00
qt . Donef ( "series=%d, points=%d, pointsPerSeries=%d" , seriesCount , pointsCount , pointsPerSeries )
2022-06-01 01:31:40 +02:00
}
return rv , nil
}
func evalExprInternal ( qt * querytracer . Tracer , ec * EvalConfig , e metricsql . Expr ) ( [ ] * timeseries , error ) {
2019-12-25 20:35:47 +01:00
if me , ok := e . ( * metricsql . MetricExpr ) ; ok {
re := & metricsql . RollupExpr {
2019-05-22 23:16:55 +02:00
Expr : me ,
}
2022-06-01 01:31:40 +02:00
rv , err := evalRollupFunc ( qt , ec , "default_rollup" , rollupDefault , e , re , nil )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , me . AppendString ( nil ) , err )
2019-05-22 23:16:55 +02:00
}
return rv , nil
}
2019-12-25 20:35:47 +01:00
if re , ok := e . ( * metricsql . RollupExpr ) ; ok {
2022-06-01 01:31:40 +02:00
rv , err := evalRollupFunc ( qt , ec , "default_rollup" , rollupDefault , e , re , nil )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , re . AppendString ( nil ) , err )
2019-05-22 23:16:55 +02:00
}
return rv , nil
}
2019-12-25 20:35:47 +01:00
if fe , ok := e . ( * metricsql . FuncExpr ) ; ok {
2019-05-22 23:16:55 +02:00
nrf := getRollupFunc ( fe . Name )
if nrf == nil {
2022-06-08 20:05:17 +02:00
qtChild := qt . NewChild ( "transform %s()" , fe . Name )
2022-06-01 01:31:40 +02:00
rv , err := evalTransformFunc ( qtChild , ec , fe )
2022-06-08 20:05:17 +02:00
qtChild . Donef ( "series=%d" , len ( rv ) )
2022-06-01 01:31:40 +02:00
return rv , err
2019-05-22 23:16:55 +02:00
}
2022-06-01 01:31:40 +02:00
args , re , err := evalRollupFuncArgs ( qt , ec , fe )
2019-05-22 23:16:55 +02:00
if err != nil {
return nil , err
}
rf , err := nrf ( args )
if err != nil {
return nil , err
}
2022-06-01 01:31:40 +02:00
rv , err := evalRollupFunc ( qt , ec , fe . Name , rf , e , re , nil )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , fe . AppendString ( nil ) , err )
2019-05-22 23:16:55 +02:00
}
return rv , nil
}
2019-12-25 20:35:47 +01:00
if ae , ok := e . ( * metricsql . AggrFuncExpr ) ; ok {
2022-06-08 20:05:17 +02:00
qtChild := qt . NewChild ( "aggregate %s()" , ae . Name )
2022-06-01 01:31:40 +02:00
rv , err := evalAggrFunc ( qtChild , ec , ae )
2022-06-08 20:05:17 +02:00
qtChild . Donef ( "series=%d" , len ( rv ) )
2022-06-01 01:31:40 +02:00
return rv , err
2019-05-22 23:16:55 +02:00
}
2019-12-25 20:35:47 +01:00
if be , ok := e . ( * metricsql . BinaryOpExpr ) ; ok {
2022-06-08 20:05:17 +02:00
qtChild := qt . NewChild ( "binary op %q" , be . Op )
2022-06-01 01:31:40 +02:00
rv , err := evalBinaryOp ( qtChild , ec , be )
2022-06-08 20:05:17 +02:00
qtChild . Donef ( "series=%d" , len ( rv ) )
2022-06-01 01:31:40 +02:00
return rv , err
2019-05-22 23:16:55 +02:00
}
2019-12-25 20:35:47 +01:00
if ne , ok := e . ( * metricsql . NumberExpr ) ; ok {
2019-05-22 23:16:55 +02:00
rv := evalNumber ( ec , ne . N )
return rv , nil
}
2019-12-25 20:35:47 +01:00
if se , ok := e . ( * metricsql . StringExpr ) ; ok {
2019-05-22 23:16:55 +02:00
rv := evalString ( ec , se . S )
return rv , nil
}
2021-07-12 16:16:38 +02:00
if de , ok := e . ( * metricsql . DurationExpr ) ; ok {
d := de . Duration ( ec . Step )
dSec := float64 ( d ) / 1000
rv := evalNumber ( ec , dSec )
return rv , nil
}
2019-05-22 23:16:55 +02:00
return nil , fmt . Errorf ( "unexpected expression %q" , e . AppendString ( nil ) )
}
2022-06-01 01:31:40 +02:00
func evalTransformFunc ( qt * querytracer . Tracer , ec * EvalConfig , fe * metricsql . FuncExpr ) ( [ ] * timeseries , error ) {
tf := getTransformFunc ( fe . Name )
if tf == nil {
2022-08-15 12:50:14 +02:00
return nil , & UserReadableError {
Err : fmt . Errorf ( ` unknown func %q ` , fe . Name ) ,
}
2022-06-01 01:31:40 +02:00
}
2022-09-02 18:46:25 +02:00
var args [ ] [ ] * timeseries
var err error
switch fe . Name {
case "" , "union" :
args , err = evalExprsInParallel ( qt , ec , fe . Args )
default :
args , err = evalExprsSequentially ( qt , ec , fe . Args )
}
2022-07-12 18:48:24 +02:00
if err != nil {
return nil , err
}
2022-06-01 01:31:40 +02:00
tfa := & transformFuncArg {
ec : ec ,
fe : fe ,
args : args ,
}
rv , err := tf ( tfa )
if err != nil {
2022-08-27 00:35:46 +02:00
return nil , & UserReadableError {
Err : fmt . Errorf ( ` cannot evaluate %q: %w ` , fe . AppendString ( nil ) , err ) ,
}
2022-06-01 01:31:40 +02:00
}
return rv , nil
}
func evalAggrFunc ( qt * querytracer . Tracer , ec * EvalConfig , ae * metricsql . AggrFuncExpr ) ( [ ] * timeseries , error ) {
if callbacks := getIncrementalAggrFuncCallbacks ( ae . Name ) ; callbacks != nil {
fe , nrf := tryGetArgRollupFuncWithMetricExpr ( ae )
if fe != nil {
// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
// The optimized path saves RAM for aggregates over big number of time series.
args , re , err := evalRollupFuncArgs ( qt , ec , fe )
if err != nil {
return nil , err
}
rf , err := nrf ( args )
if err != nil {
return nil , err
}
iafc := newIncrementalAggrFuncContext ( ae , callbacks )
return evalRollupFunc ( qt , ec , fe . Name , rf , ae , re , iafc )
}
}
2022-09-02 18:46:25 +02:00
args , err := evalExprsInParallel ( qt , ec , ae . Args )
2022-06-01 01:31:40 +02:00
if err != nil {
return nil , err
}
af := getAggrFunc ( ae . Name )
if af == nil {
2022-08-15 12:50:14 +02:00
return nil , & UserReadableError {
Err : fmt . Errorf ( ` unknown func %q ` , ae . Name ) ,
}
2022-06-01 01:31:40 +02:00
}
afa := & aggrFuncArg {
ae : ae ,
args : args ,
ec : ec ,
}
rv , err := af ( afa )
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , ae . AppendString ( nil ) , err )
}
return rv , nil
}
func evalBinaryOp ( qt * querytracer . Tracer , ec * EvalConfig , be * metricsql . BinaryOpExpr ) ( [ ] * timeseries , error ) {
bf := getBinaryOpFunc ( be . Op )
if bf == nil {
return nil , fmt . Errorf ( ` unknown binary op %q ` , be . Op )
}
var err error
var tssLeft , tssRight [ ] * timeseries
switch strings . ToLower ( be . Op ) {
case "and" , "if" :
// Fetch right-side series at first, since it usually contains
// lower number of time series for `and` and `if` operator.
// This should produce more specific label filters for the left side of the query.
// This, in turn, should reduce the time to select series for the left side of the query.
tssRight , tssLeft , err = execBinaryOpArgs ( qt , ec , be . Right , be . Left , be )
default :
tssLeft , tssRight , err = execBinaryOpArgs ( qt , ec , be . Left , be . Right , be )
}
if err != nil {
return nil , fmt . Errorf ( "cannot execute %q: %w" , be . AppendString ( nil ) , err )
}
bfa := & binaryOpFuncArg {
be : be ,
left : tssLeft ,
right : tssRight ,
}
rv , err := bf ( bfa )
if err != nil {
return nil , fmt . Errorf ( ` cannot evaluate %q: %w ` , be . AppendString ( nil ) , err )
}
return rv , nil
}
2022-07-19 13:27:45 +02:00
func canPushdownCommonFilters ( be * metricsql . BinaryOpExpr ) bool {
switch strings . ToLower ( be . Op ) {
case "or" , "default" :
return false
}
if isAggrFuncWithoutGrouping ( be . Left ) || isAggrFuncWithoutGrouping ( be . Right ) {
return false
}
return true
}
func isAggrFuncWithoutGrouping ( e metricsql . Expr ) bool {
afe , ok := e . ( * metricsql . AggrFuncExpr )
if ! ok {
return false
}
return len ( afe . Modifier . Args ) == 0
}
2022-06-01 01:31:40 +02:00
func execBinaryOpArgs ( qt * querytracer . Tracer , ec * EvalConfig , exprFirst , exprSecond metricsql . Expr , be * metricsql . BinaryOpExpr ) ( [ ] * timeseries , [ ] * timeseries , error ) {
2022-07-19 13:27:45 +02:00
if ! canPushdownCommonFilters ( be ) {
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
// from exprFirst to exprSecond.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
qt = qt . NewChild ( "execute left and right sides of %q in parallel" , be . Op )
defer qt . Done ( )
var wg sync . WaitGroup
var tssFirst [ ] * timeseries
var errFirst error
qtFirst := qt . NewChild ( "expr1" )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
tssFirst , errFirst = evalExpr ( qtFirst , ec , exprFirst )
qtFirst . Done ( )
} ( )
var tssSecond [ ] * timeseries
var errSecond error
qtSecond := qt . NewChild ( "expr2" )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
tssSecond , errSecond = evalExpr ( qtSecond , ec , exprSecond )
qtSecond . Done ( )
} ( )
wg . Wait ( )
if errFirst != nil {
return nil , nil , errFirst
}
if errSecond != nil {
2022-07-20 16:44:28 +02:00
return nil , nil , errSecond
2022-07-19 13:27:45 +02:00
}
return tssFirst , tssSecond , nil
}
2022-01-31 18:32:36 +01:00
// Execute binary operation in the following way:
//
// 1) execute the exprFirst
// 2) get common label filters for series returned at step 1
// 3) push down the found common label filters to exprSecond. This filters out unneeded series
// during exprSecond exection instead of spending compute resources on extracting and processing these series
// before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
// 4) execute the exprSecond with possible additional filters found at step 3
//
// Typical use cases:
// - Kubernetes-related: show pod creation time with the node name:
//
// kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
//
// Without the optimization `kube_pod_info` would select and spend compute resources
// for more time series than needed. The selected time series would be dropped later
// when matching time series on the right and left sides of binary operand.
//
// - Generic alerting queries, which rely on `info` metrics.
// See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
//
// - Queries, which get additional labels from `info` metrics.
// See https://www.robustperception.io/exposing-the-software-version-to-prometheus
2022-06-01 01:31:40 +02:00
tssFirst , err := evalExpr ( qt , ec , exprFirst )
2022-01-31 18:32:36 +01:00
if err != nil {
return nil , nil , err
}
2022-11-21 15:08:48 +01:00
if len ( tssFirst ) == 0 && strings . ToLower ( be . Op ) != "or" {
// Fast path: there is no sense in executing the exprSecond when exprFirst returns an empty result,
// since the "exprFirst op exprSecond" would return an empty result in any case.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3349
return nil , nil , nil
}
2022-07-19 13:27:45 +02:00
lfs := getCommonLabelFilters ( tssFirst )
lfs = metricsql . TrimFiltersByGroupModifier ( lfs , be )
exprSecond = metricsql . PushdownBinaryOpFilters ( exprSecond , lfs )
2022-06-01 01:31:40 +02:00
tssSecond , err := evalExpr ( qt , ec , exprSecond )
2022-01-31 18:32:36 +01:00
if err != nil {
return nil , nil , err
}
return tssFirst , tssSecond , nil
}
func getCommonLabelFilters ( tss [ ] * timeseries ) [ ] metricsql . LabelFilter {
m := make ( map [ string ] [ ] string )
for _ , ts := range tss {
for _ , tag := range ts . MetricName . Tags {
2023-01-12 10:26:19 +01:00
k := bytesutil . InternBytes ( tag . Key )
v := bytesutil . InternBytes ( tag . Value )
m [ k ] = append ( m [ k ] , v )
2022-01-31 18:32:36 +01:00
}
}
lfs := make ( [ ] metricsql . LabelFilter , 0 , len ( m ) )
for key , values := range m {
if len ( values ) != len ( tss ) {
// Skip the tag, since it doesn't belong to all the time series.
continue
}
values = getUniqueValues ( values )
2022-02-24 03:03:25 +01:00
if len ( values ) > 1000 {
2022-02-02 22:37:35 +01:00
// Skip the filter on the given tag, since it needs to enumerate too many unique values.
// This may slow down the search for matching time series.
continue
}
2022-01-31 18:32:36 +01:00
lf := metricsql . LabelFilter {
Label : key ,
}
if len ( values ) == 1 {
lf . Value = values [ 0 ]
} else {
2022-02-02 22:37:35 +01:00
sort . Strings ( values )
2022-01-31 18:32:36 +01:00
lf . Value = joinRegexpValues ( values )
lf . IsRegexp = true
}
lfs = append ( lfs , lf )
}
sort . Slice ( lfs , func ( i , j int ) bool {
return lfs [ i ] . Label < lfs [ j ] . Label
} )
return lfs
}
func getUniqueValues ( a [ ] string ) [ ] string {
m := make ( map [ string ] struct { } , len ( a ) )
results := make ( [ ] string , 0 , len ( a ) )
for _ , s := range a {
if _ , ok := m [ s ] ; ! ok {
results = append ( results , s )
m [ s ] = struct { } { }
}
}
return results
}
func joinRegexpValues ( a [ ] string ) string {
var b [ ] byte
for i , s := range a {
sQuoted := regexp . QuoteMeta ( s )
b = append ( b , sQuoted ... )
if i < len ( a ) - 1 {
b = append ( b , '|' )
}
}
return string ( b )
}
2019-12-25 20:35:47 +01:00
func tryGetArgRollupFuncWithMetricExpr ( ae * metricsql . AggrFuncExpr ) ( * metricsql . FuncExpr , newRollupFunc ) {
2019-07-10 11:57:27 +02:00
if len ( ae . Args ) != 1 {
return nil , nil
}
e := ae . Args [ 0 ]
// Make sure e contains one of the following:
// - metricExpr
// - metricExpr[d]
// - rollupFunc(metricExpr)
// - rollupFunc(metricExpr[d])
2019-12-25 20:35:47 +01:00
if me , ok := e . ( * metricsql . MetricExpr ) ; ok {
2019-07-10 11:57:27 +02:00
// e = metricExpr
if me . IsEmpty ( ) {
return nil , nil
}
2019-12-25 20:35:47 +01:00
fe := & metricsql . FuncExpr {
2019-07-10 11:57:27 +02:00
Name : "default_rollup" ,
2019-12-25 20:35:47 +01:00
Args : [ ] metricsql . Expr { me } ,
2019-07-10 11:57:27 +02:00
}
nrf := getRollupFunc ( fe . Name )
return fe , nrf
}
2019-12-25 20:35:47 +01:00
if re , ok := e . ( * metricsql . RollupExpr ) ; ok {
if me , ok := re . Expr . ( * metricsql . MetricExpr ) ; ! ok || me . IsEmpty ( ) || re . ForSubquery ( ) {
2019-07-10 11:57:27 +02:00
return nil , nil
}
2019-09-13 20:40:46 +02:00
// e = metricExpr[d]
2019-12-25 20:35:47 +01:00
fe := & metricsql . FuncExpr {
2019-07-10 11:57:27 +02:00
Name : "default_rollup" ,
2019-12-25 20:35:47 +01:00
Args : [ ] metricsql . Expr { re } ,
2019-07-10 11:57:27 +02:00
}
nrf := getRollupFunc ( fe . Name )
return fe , nrf
}
2019-12-25 20:35:47 +01:00
fe , ok := e . ( * metricsql . FuncExpr )
2019-07-10 11:57:27 +02:00
if ! ok {
return nil , nil
}
nrf := getRollupFunc ( fe . Name )
if nrf == nil {
return nil , nil
}
2022-01-13 21:12:02 +01:00
rollupArgIdx := metricsql . GetRollupArgIdx ( fe )
2020-01-15 15:26:02 +01:00
if rollupArgIdx >= len ( fe . Args ) {
// Incorrect number of args for rollup func.
return nil , nil
}
2019-07-10 11:57:27 +02:00
arg := fe . Args [ rollupArgIdx ]
2019-12-25 20:35:47 +01:00
if me , ok := arg . ( * metricsql . MetricExpr ) ; ok {
2019-07-10 11:57:27 +02:00
if me . IsEmpty ( ) {
return nil , nil
}
2019-09-13 20:40:46 +02:00
// e = rollupFunc(metricExpr)
2019-12-25 20:35:47 +01:00
return & metricsql . FuncExpr {
2019-07-10 11:57:27 +02:00
Name : fe . Name ,
2019-12-25 20:35:47 +01:00
Args : [ ] metricsql . Expr { me } ,
2019-07-10 11:57:27 +02:00
} , nrf
}
2019-12-25 20:35:47 +01:00
if re , ok := arg . ( * metricsql . RollupExpr ) ; ok {
if me , ok := re . Expr . ( * metricsql . MetricExpr ) ; ! ok || me . IsEmpty ( ) || re . ForSubquery ( ) {
2019-07-10 11:57:27 +02:00
return nil , nil
}
2019-09-13 20:40:46 +02:00
// e = rollupFunc(metricExpr[d])
2019-07-10 11:57:27 +02:00
return fe , nrf
}
return nil , nil
}
2022-09-02 18:46:25 +02:00
func evalExprsSequentially ( qt * querytracer . Tracer , ec * EvalConfig , es [ ] metricsql . Expr ) ( [ ] [ ] * timeseries , error ) {
2019-05-22 23:16:55 +02:00
var rvs [ ] [ ] * timeseries
for _ , e := range es {
2022-06-01 01:31:40 +02:00
rv , err := evalExpr ( qt , ec , e )
2019-05-22 23:16:55 +02:00
if err != nil {
return nil , err
}
rvs = append ( rvs , rv )
}
return rvs , nil
}
2022-09-02 18:46:25 +02:00
func evalExprsInParallel ( qt * querytracer . Tracer , ec * EvalConfig , es [ ] metricsql . Expr ) ( [ ] [ ] * timeseries , error ) {
if len ( es ) < 2 {
return evalExprsSequentially ( qt , ec , es )
}
rvs := make ( [ ] [ ] * timeseries , len ( es ) )
errs := make ( [ ] error , len ( es ) )
2023-01-11 07:23:27 +01:00
qt . Printf ( "eval function args in parallel" )
2022-09-02 18:46:25 +02:00
var wg sync . WaitGroup
for i , e := range es {
wg . Add ( 1 )
qtChild := qt . NewChild ( "eval arg %d" , i )
go func ( e metricsql . Expr , i int ) {
defer func ( ) {
qtChild . Done ( )
wg . Done ( )
} ( )
rv , err := evalExpr ( qtChild , ec , e )
rvs [ i ] = rv
errs [ i ] = err
} ( e , i )
}
wg . Wait ( )
for _ , err := range errs {
if err != nil {
return nil , err
}
}
return rvs , nil
}
2022-06-01 01:31:40 +02:00
func evalRollupFuncArgs ( qt * querytracer . Tracer , ec * EvalConfig , fe * metricsql . FuncExpr ) ( [ ] interface { } , * metricsql . RollupExpr , error ) {
2019-12-25 20:35:47 +01:00
var re * metricsql . RollupExpr
2022-01-13 21:12:02 +01:00
rollupArgIdx := metricsql . GetRollupArgIdx ( fe )
2020-01-10 20:16:14 +01:00
if len ( fe . Args ) <= rollupArgIdx {
2020-01-15 15:26:02 +01:00
return nil , nil , fmt . Errorf ( "expecting at least %d args to %q; got %d args; expr: %q" , rollupArgIdx + 1 , fe . Name , len ( fe . Args ) , fe . AppendString ( nil ) )
2020-01-10 20:16:14 +01:00
}
2019-05-22 23:16:55 +02:00
args := make ( [ ] interface { } , len ( fe . Args ) )
for i , arg := range fe . Args {
if i == rollupArgIdx {
re = getRollupExprArg ( arg )
args [ i ] = re
continue
}
2022-06-01 01:31:40 +02:00
ts , err := evalExpr ( qt , ec , arg )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , nil , fmt . Errorf ( "cannot evaluate arg #%d for %q: %w" , i + 1 , fe . AppendString ( nil ) , err )
2019-05-22 23:16:55 +02:00
}
args [ i ] = ts
}
return args , re , nil
}
2019-12-25 20:35:47 +01:00
func getRollupExprArg ( arg metricsql . Expr ) * metricsql . RollupExpr {
re , ok := arg . ( * metricsql . RollupExpr )
2019-05-22 23:16:55 +02:00
if ! ok {
2019-12-25 20:35:47 +01:00
// Wrap non-rollup arg into metricsql.RollupExpr.
return & metricsql . RollupExpr {
2019-05-22 23:16:55 +02:00
Expr : arg ,
}
}
2019-09-13 20:40:46 +02:00
if ! re . ForSubquery ( ) {
// Return standard rollup if it doesn't contain subquery.
2019-05-22 23:16:55 +02:00
return re
}
2019-12-25 20:35:47 +01:00
me , ok := re . Expr . ( * metricsql . MetricExpr )
2019-05-22 23:16:55 +02:00
if ! ok {
// arg contains subquery.
return re
}
// Convert me[w:step] -> default_rollup(me)[w:step]
reNew := * re
2019-12-25 20:35:47 +01:00
reNew . Expr = & metricsql . FuncExpr {
2019-05-22 23:16:55 +02:00
Name : "default_rollup" ,
2019-12-25 20:35:47 +01:00
Args : [ ] metricsql . Expr {
& metricsql . RollupExpr { Expr : me } ,
2019-05-22 23:16:55 +02:00
} ,
}
return & reNew
}
2022-01-17 14:27:00 +01:00
// expr may contain:
// - rollupFunc(m) if iafc is nil
// - aggrFunc(rollupFunc(m)) if iafc isn't nil
2022-06-01 01:31:40 +02:00
func evalRollupFunc ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc , expr metricsql . Expr ,
re * metricsql . RollupExpr , iafc * incrementalAggrFuncContext ) ( [ ] * timeseries , error ) {
2022-01-13 21:12:02 +01:00
if re . At == nil {
2022-06-01 01:31:40 +02:00
return evalRollupFuncWithoutAt ( qt , ec , funcName , rf , expr , re , iafc )
2022-01-13 21:12:02 +01:00
}
2022-06-01 01:31:40 +02:00
tssAt , err := evalExpr ( qt , ec , re . At )
2022-01-13 21:12:02 +01:00
if err != nil {
2022-08-15 12:50:14 +02:00
return nil , & UserReadableError {
Err : fmt . Errorf ( "cannot evaluate `@` modifier: %w" , err ) ,
}
2022-01-13 21:12:02 +01:00
}
if len ( tssAt ) != 1 {
2022-08-15 12:50:14 +02:00
return nil , & UserReadableError {
Err : fmt . Errorf ( "`@` modifier must return a single series; it returns %d series instead" , len ( tssAt ) ) ,
}
2022-01-13 21:12:02 +01:00
}
atTimestamp := int64 ( tssAt [ 0 ] . Values [ 0 ] * 1000 )
2022-03-26 09:17:37 +01:00
ecNew := copyEvalConfig ( ec )
2022-01-13 21:12:02 +01:00
ecNew . Start = atTimestamp
ecNew . End = atTimestamp
2022-06-01 01:31:40 +02:00
tss , err := evalRollupFuncWithoutAt ( qt , ecNew , funcName , rf , expr , re , iafc )
2022-01-13 21:12:02 +01:00
if err != nil {
return nil , err
}
2022-01-14 03:05:39 +01:00
// expand single-point tss to the original time range.
2022-01-13 21:12:02 +01:00
timestamps := ec . getSharedTimestamps ( )
for _ , ts := range tss {
v := ts . Values [ 0 ]
values := make ( [ ] float64 , len ( timestamps ) )
for i := range timestamps {
values [ i ] = v
}
ts . Timestamps = timestamps
ts . Values = values
}
return tss , nil
}
2022-06-01 01:31:40 +02:00
func evalRollupFuncWithoutAt ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc ,
expr metricsql . Expr , re * metricsql . RollupExpr , iafc * incrementalAggrFuncContext ) ( [ ] * timeseries , error ) {
2021-09-17 22:33:15 +02:00
funcName = strings . ToLower ( funcName )
2019-05-22 23:16:55 +02:00
ecNew := ec
var offset int64
2021-07-12 16:16:38 +02:00
if re . Offset != nil {
offset = re . Offset . Duration ( ec . Step )
2022-03-26 09:17:37 +01:00
ecNew = copyEvalConfig ( ecNew )
2019-05-22 23:16:55 +02:00
ecNew . Start -= offset
ecNew . End -= offset
2020-12-27 13:09:22 +01:00
// There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true,
// since the time range alignment has been already performed by the caller,
// so cache hit rate should be quite good.
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
2019-05-22 23:16:55 +02:00
}
2021-09-17 22:33:15 +02:00
if funcName == "rollup_candlestick" {
2020-02-04 22:23:37 +01:00
// Automatically apply `offset -step` to `rollup_candlestick` function
// in order to obtain expected OHLC results.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
step := ecNew . Step
2022-03-26 09:17:37 +01:00
ecNew = copyEvalConfig ( ecNew )
2020-02-04 22:23:37 +01:00
ecNew . Start += step
ecNew . End += step
offset -= step
}
2019-05-22 23:16:55 +02:00
var rvs [ ] * timeseries
var err error
2019-12-25 20:35:47 +01:00
if me , ok := re . Expr . ( * metricsql . MetricExpr ) ; ok {
2022-06-01 01:31:40 +02:00
rvs , err = evalRollupFuncWithMetricExpr ( qt , ecNew , funcName , rf , expr , me , iafc , re . Window )
2019-05-22 23:16:55 +02:00
} else {
2019-07-10 11:57:27 +02:00
if iafc != nil {
2021-09-17 22:33:15 +02:00
logger . Panicf ( "BUG: iafc must be nil for rollup %q over subquery %q" , funcName , re . AppendString ( nil ) )
2019-07-10 11:57:27 +02:00
}
2022-06-01 01:31:40 +02:00
rvs , err = evalRollupFuncWithSubquery ( qt , ecNew , funcName , rf , expr , re )
2019-05-22 23:16:55 +02:00
}
if err != nil {
2022-08-15 12:50:14 +02:00
return nil , & UserReadableError {
Err : err ,
}
2019-05-22 23:16:55 +02:00
}
2022-02-12 14:45:06 +01:00
if funcName == "absent_over_time" {
rvs = aggregateAbsentOverTime ( ec , re . Expr , rvs )
}
2020-11-14 11:36:21 +01:00
ec . updateIsPartialResponse ( ecNew . IsPartialResponse )
2019-05-22 23:16:55 +02:00
if offset != 0 && len ( rvs ) > 0 {
// Make a copy of timestamps, since they may be used in other values.
srcTimestamps := rvs [ 0 ] . Timestamps
dstTimestamps := append ( [ ] int64 { } , srcTimestamps ... )
for i := range dstTimestamps {
dstTimestamps [ i ] += offset
}
for _ , ts := range rvs {
ts . Timestamps = dstTimestamps
}
}
return rvs , nil
}
2022-02-12 14:45:06 +01:00
// aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values.
//
// Values for returned series are set to nan if at least a single tss series contains nan at that point.
// This means that tss contains a series with non-empty results at that point.
// This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130
func aggregateAbsentOverTime ( ec * EvalConfig , expr metricsql . Expr , tss [ ] * timeseries ) [ ] * timeseries {
rvs := getAbsentTimeseries ( ec , expr )
if len ( tss ) == 0 {
return rvs
}
for i := range tss [ 0 ] . Values {
for _ , ts := range tss {
if math . IsNaN ( ts . Values [ i ] ) {
rvs [ 0 ] . Values [ i ] = nan
break
}
}
}
return rvs
}
2022-06-01 01:31:40 +02:00
func evalRollupFuncWithSubquery ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc , expr metricsql . Expr , re * metricsql . RollupExpr ) ( [ ] * timeseries , error ) {
2020-01-03 19:42:51 +01:00
// TODO: determine whether to use rollupResultCacheV here.
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "subquery" )
defer qt . Done ( )
2021-07-12 16:16:38 +02:00
step := re . Step . Duration ( ec . Step )
if step == 0 {
2019-05-22 23:16:55 +02:00
step = ec . Step
}
2021-07-12 16:16:38 +02:00
window := re . Window . Duration ( ec . Step )
2019-05-22 23:16:55 +02:00
2022-03-26 09:17:37 +01:00
ecSQ := copyEvalConfig ( ec )
2019-06-21 20:50:44 +02:00
ecSQ . Start -= window + maxSilenceInterval + step
2020-09-03 12:21:51 +02:00
ecSQ . End += step
2019-05-22 23:16:55 +02:00
ecSQ . Step = step
2022-08-24 14:25:18 +02:00
ecSQ . MaxPointsPerSeries = * maxPointsSubqueryPerTimeseries
if err := ValidateMaxPointsPerSeries ( ecSQ . Start , ecSQ . End , ecSQ . Step , ecSQ . MaxPointsPerSeries ) ; err != nil {
2022-09-06 12:25:59 +02:00
return nil , fmt . Errorf ( "%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)" , err )
2019-05-22 23:16:55 +02:00
}
2020-09-03 12:21:51 +02:00
// unconditionally align start and end args to step for subquery as Prometheus does.
ecSQ . Start , ecSQ . End = alignStartEnd ( ecSQ . Start , ecSQ . End , ecSQ . Step )
2022-06-01 01:31:40 +02:00
tssSQ , err := evalExpr ( qt , ecSQ , re . Expr )
2019-05-22 23:16:55 +02:00
if err != nil {
return nil , err
}
2020-11-14 11:36:21 +01:00
ec . updateIsPartialResponse ( ecSQ . IsPartialResponse )
2020-01-03 23:46:39 +01:00
if len ( tssSQ ) == 0 {
return nil , nil
}
2022-08-24 14:25:18 +02:00
sharedTimestamps := getTimestamps ( ec . Start , ec . End , ec . Step , ec . MaxPointsPerSeries )
preFunc , rcs , err := getRollupConfigs ( funcName , rf , expr , ec . Start , ec . End , ec . Step , ec . MaxPointsPerSeries , window , ec . LookbackDelta , sharedTimestamps )
2020-01-10 20:16:14 +01:00
if err != nil {
return nil , err
}
2019-05-22 23:16:55 +02:00
tss := make ( [ ] * timeseries , 0 , len ( tssSQ ) * len ( rcs ) )
var tssLock sync . Mutex
2022-06-28 18:26:17 +02:00
var samplesScannedTotal uint64
2022-01-14 03:05:39 +01:00
keepMetricNames := getKeepMetricNames ( expr )
2019-05-22 23:16:55 +02:00
doParallel ( tssSQ , func ( tsSQ * timeseries , values [ ] float64 , timestamps [ ] int64 ) ( [ ] float64 , [ ] int64 ) {
values , timestamps = removeNanValues ( values [ : 0 ] , timestamps [ : 0 ] , tsSQ . Values , tsSQ . Timestamps )
preFunc ( values , timestamps )
for _ , rc := range rcs {
2022-01-14 03:05:39 +01:00
if tsm := newTimeseriesMap ( funcName , keepMetricNames , sharedTimestamps , & tsSQ . MetricName ) ; tsm != nil {
2022-06-28 18:26:17 +02:00
samplesScanned := rc . DoTimeseriesMap ( tsm , values , timestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2020-01-03 22:50:47 +01:00
tssLock . Lock ( )
tss = tsm . AppendTimeseriesTo ( tss )
tssLock . Unlock ( )
continue
}
2019-05-22 23:16:55 +02:00
var ts timeseries
2022-06-28 18:26:17 +02:00
samplesScanned := doRollupForTimeseries ( funcName , keepMetricNames , rc , & ts , & tsSQ . MetricName , values , timestamps , sharedTimestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2019-05-22 23:16:55 +02:00
tssLock . Lock ( )
tss = append ( tss , & ts )
tssLock . Unlock ( )
}
return values , timestamps
} )
2022-06-28 19:18:08 +02:00
rowsScannedPerQuery . Update ( float64 ( samplesScannedTotal ) )
2022-06-28 18:26:17 +02:00
qt . Printf ( "rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d" , funcName , len ( tssSQ ) , len ( tss ) , samplesScannedTotal )
2019-05-22 23:16:55 +02:00
return tss , nil
}
2022-06-28 19:18:08 +02:00
var rowsScannedPerQuery = metrics . NewHistogram ( ` vm_rows_scanned_per_query ` )
2022-01-14 03:05:39 +01:00
func getKeepMetricNames ( expr metricsql . Expr ) bool {
2022-01-17 14:27:00 +01:00
if ae , ok := expr . ( * metricsql . AggrFuncExpr ) ; ok {
// Extract rollupFunc(...) from aggrFunc(rollupFunc(...)).
// This case is possible when optimized aggrFunc calculations are used
// such as `sum(rate(...))`
if len ( ae . Args ) != 1 {
return false
}
expr = ae . Args [ 0 ]
}
2022-01-14 03:05:39 +01:00
if fe , ok := expr . ( * metricsql . FuncExpr ) ; ok {
return fe . KeepMetricNames
}
return false
}
2019-05-22 23:16:55 +02:00
func doParallel ( tss [ ] * timeseries , f func ( ts * timeseries , values [ ] float64 , timestamps [ ] int64 ) ( [ ] float64 , [ ] int64 ) ) {
2020-12-08 19:49:32 +01:00
concurrency := cgroup . AvailableCPUs ( )
2019-05-22 23:16:55 +02:00
if concurrency > len ( tss ) {
concurrency = len ( tss )
}
workCh := make ( chan * timeseries , concurrency )
var wg sync . WaitGroup
wg . Add ( concurrency )
for i := 0 ; i < concurrency ; i ++ {
go func ( ) {
defer wg . Done ( )
var tmpValues [ ] float64
var tmpTimestamps [ ] int64
for ts := range workCh {
tmpValues , tmpTimestamps = f ( ts , tmpValues , tmpTimestamps )
}
} ( )
}
for _ , ts := range tss {
workCh <- ts
}
close ( workCh )
wg . Wait ( )
}
func removeNanValues ( dstValues [ ] float64 , dstTimestamps [ ] int64 , values [ ] float64 , timestamps [ ] int64 ) ( [ ] float64 , [ ] int64 ) {
hasNan := false
for _ , v := range values {
if math . IsNaN ( v ) {
hasNan = true
}
}
if ! hasNan {
// Fast path - no NaNs.
dstValues = append ( dstValues , values ... )
dstTimestamps = append ( dstTimestamps , timestamps ... )
return dstValues , dstTimestamps
}
// Slow path - remove NaNs.
for i , v := range values {
if math . IsNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , timestamps [ i ] )
}
return dstValues , dstTimestamps
}
var (
rollupResultCacheFullHits = metrics . NewCounter ( ` vm_rollup_result_cache_full_hits_total ` )
rollupResultCachePartialHits = metrics . NewCounter ( ` vm_rollup_result_cache_partial_hits_total ` )
rollupResultCacheMiss = metrics . NewCounter ( ` vm_rollup_result_cache_miss_total ` )
)
2022-06-01 01:31:40 +02:00
func evalRollupFuncWithMetricExpr ( qt * querytracer . Tracer , ec * EvalConfig , funcName string , rf rollupFunc ,
2021-07-12 16:16:38 +02:00
expr metricsql . Expr , me * metricsql . MetricExpr , iafc * incrementalAggrFuncContext , windowExpr * metricsql . DurationExpr ) ( [ ] * timeseries , error ) {
2022-06-01 01:31:40 +02:00
var rollupMemorySize int64
window := windowExpr . Duration ( ec . Step )
2022-06-27 12:32:47 +02:00
if qt . Enabled ( ) {
qt = qt . NewChild ( "rollup %s(): timeRange=%s, step=%d, window=%d" , funcName , ec . timeRangeString ( ) , ec . Step , window )
defer func ( ) {
qt . Donef ( "neededMemoryBytes=%d" , rollupMemorySize )
} ( )
}
2019-07-10 11:57:27 +02:00
if me . IsEmpty ( ) {
return evalNumber ( ec , nan ) , nil
}
2019-05-22 23:16:55 +02:00
// Search for partial results in cache.
2022-06-01 01:31:40 +02:00
tssCached , start := rollupResultCacheV . Get ( qt , ec , expr , window )
2019-05-22 23:16:55 +02:00
if start > ec . End {
// The result is fully cached.
rollupResultCacheFullHits . Inc ( )
return tssCached , nil
}
if start > ec . Start {
rollupResultCachePartialHits . Inc ( )
} else {
rollupResultCacheMiss . Inc ( )
}
2020-01-11 13:40:32 +01:00
// Obtain rollup configs before fetching data from db,
// so type errors can be caught earlier.
2022-08-24 14:25:18 +02:00
sharedTimestamps := getTimestamps ( start , ec . End , ec . Step , ec . MaxPointsPerSeries )
preFunc , rcs , err := getRollupConfigs ( funcName , rf , expr , start , ec . End , ec . Step , ec . MaxPointsPerSeries , window , ec . LookbackDelta , sharedTimestamps )
2020-01-11 13:40:32 +01:00
if err != nil {
return nil , err
}
2019-05-22 23:16:55 +02:00
// Fetch the remaining part of the result.
2021-12-06 16:07:06 +01:00
tfs := searchutils . ToTagFilters ( me . LabelFilters )
tfss := searchutils . JoinTagFilterss ( [ ] [ ] storage . TagFilter { tfs } , ec . EnforcedTagFilterss )
2020-02-05 18:20:54 +01:00
minTimestamp := start - maxSilenceInterval
if window > ec . Step {
minTimestamp -= window
} else {
minTimestamp -= ec . Step
}
2022-03-26 09:17:37 +01:00
sq := storage . NewSearchQuery ( ec . AuthToken . AccountID , ec . AuthToken . ProjectID , minTimestamp , ec . End , tfss , ec . MaxSeries )
2022-07-05 23:11:59 +02:00
rss , isPartial , err := netstorage . ProcessSearchQuery ( qt , ec . DenyPartialResponse , sq , ec . Deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2022-08-15 12:50:14 +02:00
return nil , & UserReadableError {
Err : err ,
}
2019-05-22 23:16:55 +02:00
}
2020-11-14 11:36:21 +01:00
ec . updateIsPartialResponse ( isPartial )
2019-05-22 23:16:55 +02:00
rssLen := rss . Len ( )
if rssLen == 0 {
rss . Cancel ( )
2022-02-12 14:45:06 +01:00
tss := mergeTimeseries ( tssCached , nil , start , ec )
2019-05-22 23:16:55 +02:00
return tss , nil
}
// Verify timeseries fit available memory after the rollup.
// Take into account points from tssCached.
pointsPerTimeseries := 1 + ( ec . End - ec . Start ) / ec . Step
2019-11-08 17:45:25 +01:00
timeseriesLen := rssLen
if iafc != nil {
2020-05-12 18:06:54 +02:00
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
2020-12-08 19:49:32 +01:00
timeseriesLen = cgroup . AvailableCPUs ( )
2019-11-08 17:45:25 +01:00
if iafc . ae . Modifier . Op != "" {
2020-05-12 18:06:54 +02:00
if iafc . ae . Limit > 0 {
// There is an explicit limit on the number of output time series.
timeseriesLen *= iafc . ae . Limit
} else {
// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
// since each group can have own set of time series in memory.
timeseriesLen *= 1000
}
2019-11-08 17:45:25 +01:00
}
2020-04-28 23:18:47 +02:00
// The maximum number of output time series is limited by rssLen.
if timeseriesLen > rssLen {
timeseriesLen = rssLen
}
2019-11-08 17:45:25 +01:00
}
rollupPoints := mulNoOverflow ( pointsPerTimeseries , int64 ( timeseriesLen * len ( rcs ) ) )
2022-10-08 00:07:42 +02:00
rollupMemorySize = sumNoOverflow ( mulNoOverflow ( int64 ( rssLen ) , 1000 ) , mulNoOverflow ( rollupPoints , 16 ) )
2022-10-12 08:23:43 +02:00
if maxMemory := int64 ( maxMemoryPerQuery . N ) ; maxMemory > 0 && rollupMemorySize > maxMemory {
2019-05-22 23:16:55 +02:00
rss . Cancel ( )
2022-08-15 12:50:14 +02:00
return nil , & UserReadableError {
2022-10-08 00:07:42 +02:00
Err : fmt . Errorf ( "not enough memory for processing %d data points across %d time series with %d points in each time series " +
"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; " +
2022-10-10 20:43:36 +02:00
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); " +
"increasing -search.maxMemoryPerQuery" ,
2022-10-12 08:23:43 +02:00
rollupPoints , timeseriesLen * len ( rcs ) , pointsPerTimeseries , maxMemory , rollupMemorySize , float64 ( ec . Step ) / 1e3 ) ,
2022-08-15 12:50:14 +02:00
}
2019-05-22 23:16:55 +02:00
}
2022-10-10 20:43:36 +02:00
rml := getRollupMemoryLimiter ( )
if ! rml . Get ( uint64 ( rollupMemorySize ) ) {
rss . Cancel ( )
return nil , & UserReadableError {
Err : fmt . Errorf ( "not enough memory for processing %d data points across %d time series with %d points in each time series; " +
"total available memory for concurrent requests: %d bytes; " +
"requested memory: %d bytes; " +
"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); " +
"switching to node with more RAM; increasing -memory.allowedPercent" ,
rollupPoints , timeseriesLen * len ( rcs ) , pointsPerTimeseries , rml . MaxSize , uint64 ( rollupMemorySize ) , float64 ( ec . Step ) / 1e3 ) ,
}
}
defer rml . Put ( uint64 ( rollupMemorySize ) )
2019-05-22 23:16:55 +02:00
// Evaluate rollup
2022-01-14 03:05:39 +01:00
keepMetricNames := getKeepMetricNames ( expr )
2019-07-10 11:57:27 +02:00
var tss [ ] * timeseries
if iafc != nil {
2022-06-01 01:31:40 +02:00
tss , err = evalRollupWithIncrementalAggregate ( qt , funcName , keepMetricNames , iafc , rss , rcs , preFunc , sharedTimestamps )
2019-07-10 11:57:27 +02:00
} else {
2022-06-01 01:31:40 +02:00
tss , err = evalRollupNoIncrementalAggregate ( qt , funcName , keepMetricNames , rss , rcs , preFunc , sharedTimestamps )
2019-07-10 11:57:27 +02:00
}
2019-05-22 23:16:55 +02:00
if err != nil {
2022-08-15 12:50:14 +02:00
return nil , & UserReadableError {
Err : err ,
}
2019-05-22 23:16:55 +02:00
}
tss = mergeTimeseries ( tssCached , tss , start , ec )
2019-06-30 00:27:03 +02:00
if ! isPartial {
2022-06-01 01:31:40 +02:00
rollupResultCacheV . Put ( qt , ec , expr , window , tss )
2019-05-22 23:23:23 +02:00
}
2019-05-22 23:16:55 +02:00
return tss , nil
}
2022-10-10 20:43:36 +02:00
var (
rollupMemoryLimiter memoryLimiter
rollupMemoryLimiterOnce sync . Once
)
2019-06-12 21:25:37 +02:00
2022-10-10 20:43:36 +02:00
func getRollupMemoryLimiter ( ) * memoryLimiter {
rollupMemoryLimiterOnce . Do ( func ( ) {
rollupMemoryLimiter . MaxSize = uint64 ( memory . Allowed ( ) ) / 2
} )
return & rollupMemoryLimiter
2019-06-12 21:25:37 +02:00
}
2022-06-01 01:31:40 +02:00
func evalRollupWithIncrementalAggregate ( qt * querytracer . Tracer , funcName string , keepMetricNames bool ,
iafc * incrementalAggrFuncContext , rss * netstorage . Results , rcs [ ] * rollupConfig ,
2021-09-17 22:33:15 +02:00
preFunc func ( values [ ] float64 , timestamps [ ] int64 ) , sharedTimestamps [ ] int64 ) ( [ ] * timeseries , error ) {
2022-06-28 18:26:17 +02:00
qt = qt . NewChild ( "rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s" , funcName , iafc . ae . Name , rss . Len ( ) , rcs )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2022-06-28 18:26:17 +02:00
var samplesScannedTotal uint64
2022-06-01 01:31:40 +02:00
err := rss . RunParallel ( qt , func ( rs * netstorage . Result , workerID uint ) error {
2021-09-17 22:33:15 +02:00
rs . Values , rs . Timestamps = dropStaleNaNs ( funcName , rs . Values , rs . Timestamps )
2019-07-10 11:57:27 +02:00
preFunc ( rs . Values , rs . Timestamps )
ts := getTimeseries ( )
defer putTimeseries ( ts )
for _ , rc := range rcs {
2022-01-14 03:05:39 +01:00
if tsm := newTimeseriesMap ( funcName , keepMetricNames , sharedTimestamps , & rs . MetricName ) ; tsm != nil {
2022-06-28 18:26:17 +02:00
samplesScanned := rc . DoTimeseriesMap ( tsm , rs . Values , rs . Timestamps )
2020-01-03 22:50:47 +01:00
for _ , ts := range tsm . m {
iafc . updateTimeseries ( ts , workerID )
}
2022-06-28 18:26:17 +02:00
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2020-01-03 22:50:47 +01:00
continue
}
2019-07-10 11:57:27 +02:00
ts . Reset ( )
2022-06-28 18:26:17 +02:00
samplesScanned := doRollupForTimeseries ( funcName , keepMetricNames , rc , ts , & rs . MetricName , rs . Values , rs . Timestamps , sharedTimestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2019-07-12 14:51:02 +02:00
iafc . updateTimeseries ( ts , workerID )
2019-07-25 20:48:12 +02:00
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
2019-07-10 11:57:27 +02:00
ts . Timestamps = nil
2019-07-25 20:48:12 +02:00
ts . denyReuse = false
2019-07-10 11:57:27 +02:00
}
2020-09-27 22:17:14 +02:00
return nil
2019-07-10 11:57:27 +02:00
} )
if err != nil {
return nil , err
}
tss := iafc . finalizeTimeseries ( )
2022-06-28 19:18:08 +02:00
rowsScannedPerQuery . Update ( float64 ( samplesScannedTotal ) )
2022-06-28 18:26:17 +02:00
qt . Printf ( "series after aggregation with %s(): %d; samplesScanned=%d" , iafc . ae . Name , len ( tss ) , samplesScannedTotal )
2019-07-10 11:57:27 +02:00
return tss , nil
}
2022-06-01 01:31:40 +02:00
func evalRollupNoIncrementalAggregate ( qt * querytracer . Tracer , funcName string , keepMetricNames bool , rss * netstorage . Results , rcs [ ] * rollupConfig ,
2021-09-17 22:33:15 +02:00
preFunc func ( values [ ] float64 , timestamps [ ] int64 ) , sharedTimestamps [ ] int64 ) ( [ ] * timeseries , error ) {
2022-06-28 18:26:17 +02:00
qt = qt . NewChild ( "rollup %s() over %d series; rollupConfigs=%s" , funcName , rss . Len ( ) , rcs )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2019-07-10 11:57:27 +02:00
tss := make ( [ ] * timeseries , 0 , rss . Len ( ) * len ( rcs ) )
var tssLock sync . Mutex
2022-06-28 18:26:17 +02:00
var samplesScannedTotal uint64
2022-06-01 01:31:40 +02:00
err := rss . RunParallel ( qt , func ( rs * netstorage . Result , workerID uint ) error {
2021-09-17 22:33:15 +02:00
rs . Values , rs . Timestamps = dropStaleNaNs ( funcName , rs . Values , rs . Timestamps )
2019-07-10 11:57:27 +02:00
preFunc ( rs . Values , rs . Timestamps )
for _ , rc := range rcs {
2022-01-14 03:05:39 +01:00
if tsm := newTimeseriesMap ( funcName , keepMetricNames , sharedTimestamps , & rs . MetricName ) ; tsm != nil {
2022-06-28 18:26:17 +02:00
samplesScanned := rc . DoTimeseriesMap ( tsm , rs . Values , rs . Timestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2020-01-03 22:50:47 +01:00
tssLock . Lock ( )
tss = tsm . AppendTimeseriesTo ( tss )
tssLock . Unlock ( )
continue
}
2019-07-10 11:57:27 +02:00
var ts timeseries
2022-06-28 18:26:17 +02:00
samplesScanned := doRollupForTimeseries ( funcName , keepMetricNames , rc , & ts , & rs . MetricName , rs . Values , rs . Timestamps , sharedTimestamps )
atomic . AddUint64 ( & samplesScannedTotal , samplesScanned )
2019-07-10 11:57:27 +02:00
tssLock . Lock ( )
tss = append ( tss , & ts )
tssLock . Unlock ( )
}
2020-09-27 22:17:14 +02:00
return nil
2019-07-10 11:57:27 +02:00
} )
if err != nil {
return nil , err
}
2022-06-28 19:18:08 +02:00
rowsScannedPerQuery . Update ( float64 ( samplesScannedTotal ) )
qt . Printf ( "samplesScanned=%d" , samplesScannedTotal )
2019-07-10 11:57:27 +02:00
return tss , nil
}
2022-01-14 03:05:39 +01:00
func doRollupForTimeseries ( funcName string , keepMetricNames bool , rc * rollupConfig , tsDst * timeseries , mnSrc * storage . MetricName ,
2022-06-28 18:26:17 +02:00
valuesSrc [ ] float64 , timestampsSrc [ ] int64 , sharedTimestamps [ ] int64 ) uint64 {
2019-07-25 20:48:12 +02:00
tsDst . MetricName . CopyFrom ( mnSrc )
if len ( rc . TagValue ) > 0 {
tsDst . MetricName . AddTag ( "rollup" , rc . TagValue )
}
2022-01-14 03:05:39 +01:00
if ! keepMetricNames && ! rollupFuncsKeepMetricName [ funcName ] {
2019-07-25 20:48:12 +02:00
tsDst . MetricName . ResetMetricGroup ( )
}
2022-06-28 18:26:17 +02:00
var samplesScanned uint64
tsDst . Values , samplesScanned = rc . Do ( tsDst . Values [ : 0 ] , valuesSrc , timestampsSrc )
2019-07-25 20:48:12 +02:00
tsDst . Timestamps = sharedTimestamps
tsDst . denyReuse = true
2022-06-28 18:26:17 +02:00
return samplesScanned
2019-07-25 20:48:12 +02:00
}
2019-05-22 23:16:55 +02:00
var bbPool bytesutil . ByteBufferPool
func evalNumber ( ec * EvalConfig , n float64 ) [ ] * timeseries {
var ts timeseries
ts . denyReuse = true
2019-05-22 23:23:23 +02:00
ts . MetricName . AccountID = ec . AuthToken . AccountID
ts . MetricName . ProjectID = ec . AuthToken . ProjectID
2019-05-22 23:16:55 +02:00
timestamps := ec . getSharedTimestamps ( )
values := make ( [ ] float64 , len ( timestamps ) )
for i := range timestamps {
values [ i ] = n
}
ts . Values = values
ts . Timestamps = timestamps
return [ ] * timeseries { & ts }
}
func evalString ( ec * EvalConfig , s string ) [ ] * timeseries {
rv := evalNumber ( ec , nan )
rv [ 0 ] . MetricName . MetricGroup = append ( rv [ 0 ] . MetricName . MetricGroup [ : 0 ] , s ... )
return rv
}
func evalTime ( ec * EvalConfig ) [ ] * timeseries {
rv := evalNumber ( ec , nan )
timestamps := rv [ 0 ] . Timestamps
values := rv [ 0 ] . Values
for i , ts := range timestamps {
2020-09-08 13:00:47 +02:00
values [ i ] = float64 ( ts ) / 1e3
2019-05-22 23:16:55 +02:00
}
return rv
}
2019-06-12 21:25:37 +02:00
func mulNoOverflow ( a , b int64 ) int64 {
if math . MaxInt64 / b < a {
// Overflow
return math . MaxInt64
}
return a * b
}
2019-12-25 20:35:47 +01:00
2022-10-08 00:07:42 +02:00
func sumNoOverflow ( a , b int64 ) int64 {
if math . MaxInt64 - a < b {
// Overflow
return math . MaxInt64
}
return a + b
}
2021-09-17 22:33:15 +02:00
func dropStaleNaNs ( funcName string , values [ ] float64 , timestamps [ ] int64 ) ( [ ] float64 , [ ] int64 ) {
2022-01-14 00:48:04 +01:00
if * noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" {
2021-08-17 10:00:17 +02:00
// Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function,
// since it uses them for Prometheus-style staleness detection.
2022-01-14 00:48:04 +01:00
// Do not drop staleness marks for stale_samples_over_time() function, since it needs
// to calculate the number of staleness markers.
2021-08-17 10:00:17 +02:00
return values , timestamps
}
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
2021-08-15 12:20:02 +02:00
hasStaleSamples := false
for _ , v := range values {
if decimal . IsStaleNaN ( v ) {
hasStaleSamples = true
break
}
}
if ! hasStaleSamples {
// Fast path: values have no Prometheus staleness marks.
return values , timestamps
}
// Slow path: drop Prometheus staleness marks from values.
dstValues := values [ : 0 ]
dstTimestamps := timestamps [ : 0 ]
for i , v := range values {
if decimal . IsStaleNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , timestamps [ i ] )
}
return dstValues , dstTimestamps
}