VictoriaMetrics/app/vmselect/promql/rollup.go
Aliaksandr Valialkin b3bb18d674
app/vmselect/promql: fix panic when calculating aggr_func(rollup*())
The panic has been introduced in dac21d874b
2023-02-27 11:48:27 -08:00

2282 lines
64 KiB
Go

package promql
import (
"flag"
"fmt"
"math"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
)
var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
"See also '-search.maxStalenessInterval'")
var rollupFuncs = map[string]newRollupFunc{
"absent_over_time": newRollupFuncOneArg(rollupAbsent),
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
"ascent_over_time": newRollupFuncOneArg(rollupAscentOverTime),
"avg_over_time": newRollupFuncOneArg(rollupAvg),
"changes": newRollupFuncOneArg(rollupChanges),
"changes_prometheus": newRollupFuncOneArg(rollupChangesPrometheus),
"count_eq_over_time": newRollupCountEQ,
"count_gt_over_time": newRollupCountGT,
"count_le_over_time": newRollupCountLE,
"count_ne_over_time": newRollupCountNE,
"count_over_time": newRollupFuncOneArg(rollupCount),
"decreases_over_time": newRollupFuncOneArg(rollupDecreases),
"default_rollup": newRollupFuncOneArg(rollupDefault), // default rollup func
"delta": newRollupFuncOneArg(rollupDelta),
"delta_prometheus": newRollupFuncOneArg(rollupDeltaPrometheus),
"deriv": newRollupFuncOneArg(rollupDerivSlow),
"deriv_fast": newRollupFuncOneArg(rollupDerivFast),
"descent_over_time": newRollupFuncOneArg(rollupDescentOverTime),
"distinct_over_time": newRollupFuncOneArg(rollupDistinct),
"duration_over_time": newRollupDurationOverTime,
"first_over_time": newRollupFuncOneArg(rollupFirst),
"geomean_over_time": newRollupFuncOneArg(rollupGeomean),
"histogram_over_time": newRollupFuncOneArg(rollupHistogram),
"hoeffding_bound_lower": newRollupHoeffdingBoundLower,
"hoeffding_bound_upper": newRollupHoeffdingBoundUpper,
"holt_winters": newRollupHoltWinters,
"idelta": newRollupFuncOneArg(rollupIdelta),
"ideriv": newRollupFuncOneArg(rollupIderiv),
"increase": newRollupFuncOneArg(rollupDelta), // + rollupFuncsRemoveCounterResets
"increase_prometheus": newRollupFuncOneArg(rollupDeltaPrometheus), // + rollupFuncsRemoveCounterResets
"increase_pure": newRollupFuncOneArg(rollupIncreasePure), // + rollupFuncsRemoveCounterResets
"increases_over_time": newRollupFuncOneArg(rollupIncreases),
"integrate": newRollupFuncOneArg(rollupIntegrate),
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets
"lag": newRollupFuncOneArg(rollupLag),
"last_over_time": newRollupFuncOneArg(rollupLast),
"lifetime": newRollupFuncOneArg(rollupLifetime),
"mad_over_time": newRollupFuncOneArg(rollupMAD),
"max_over_time": newRollupFuncOneArg(rollupMax),
"min_over_time": newRollupFuncOneArg(rollupMin),
"mode_over_time": newRollupFuncOneArg(rollupModeOverTime),
"predict_linear": newRollupPredictLinear,
"present_over_time": newRollupFuncOneArg(rollupPresent),
"quantile_over_time": newRollupQuantile,
"quantiles_over_time": newRollupQuantiles,
"range_over_time": newRollupFuncOneArg(rollupRange),
"rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets
"rate_over_sum": newRollupFuncOneArg(rollupRateOverSum),
"resets": newRollupFuncOneArg(rollupResets),
"rollup": newRollupFuncOneOrTwoArgs(rollupFake),
"rollup_candlestick": newRollupFuncOneOrTwoArgs(rollupFake),
"rollup_delta": newRollupFuncOneOrTwoArgs(rollupFake),
"rollup_deriv": newRollupFuncOneOrTwoArgs(rollupFake),
"rollup_increase": newRollupFuncOneOrTwoArgs(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_rate": newRollupFuncOneOrTwoArgs(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_scrape_interval": newRollupFuncOneOrTwoArgs(rollupFake),
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
"share_gt_over_time": newRollupShareGT,
"share_le_over_time": newRollupShareLE,
"stale_samples_over_time": newRollupFuncOneArg(rollupStaleSamples),
"stddev_over_time": newRollupFuncOneArg(rollupStddev),
"stdvar_over_time": newRollupFuncOneArg(rollupStdvar),
"sum_over_time": newRollupFuncOneArg(rollupSum),
"sum2_over_time": newRollupFuncOneArg(rollupSum2),
"tfirst_over_time": newRollupFuncOneArg(rollupTfirst),
// `timestamp` function must return timestamp for the last datapoint on the current window
// in order to properly handle offset and timestamps unaligned to the current step.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details.
"timestamp": newRollupFuncOneArg(rollupTlast),
"timestamp_with_name": newRollupFuncOneArg(rollupTlast), // + rollupFuncsKeepMetricName
"tlast_change_over_time": newRollupFuncOneArg(rollupTlastChange),
"tlast_over_time": newRollupFuncOneArg(rollupTlast),
"tmax_over_time": newRollupFuncOneArg(rollupTmax),
"tmin_over_time": newRollupFuncOneArg(rollupTmin),
"zscore_over_time": newRollupFuncOneArg(rollupZScoreOverTime),
}
// rollupAggrFuncs are functions that can be passed to `aggr_over_time()`
var rollupAggrFuncs = map[string]rollupFunc{
"absent_over_time": rollupAbsent,
"ascent_over_time": rollupAscentOverTime,
"avg_over_time": rollupAvg,
"changes": rollupChanges,
"count_over_time": rollupCount,
"decreases_over_time": rollupDecreases,
"default_rollup": rollupDefault,
"delta": rollupDelta,
"deriv": rollupDerivSlow,
"deriv_fast": rollupDerivFast,
"descent_over_time": rollupDescentOverTime,
"distinct_over_time": rollupDistinct,
"first_over_time": rollupFirst,
"geomean_over_time": rollupGeomean,
"idelta": rollupIdelta,
"ideriv": rollupIderiv,
"increase": rollupDelta,
"increase_pure": rollupIncreasePure,
"increases_over_time": rollupIncreases,
"integrate": rollupIntegrate,
"irate": rollupIderiv,
"lag": rollupLag,
"last_over_time": rollupLast,
"lifetime": rollupLifetime,
"mad_over_time": rollupMAD,
"max_over_time": rollupMax,
"min_over_time": rollupMin,
"mode_over_time": rollupModeOverTime,
"present_over_time": rollupPresent,
"range_over_time": rollupRange,
"rate": rollupDerivFast,
"rate_over_sum": rollupRateOverSum,
"resets": rollupResets,
"scrape_interval": rollupScrapeInterval,
"stale_samples_over_time": rollupStaleSamples,
"stddev_over_time": rollupStddev,
"stdvar_over_time": rollupStdvar,
"sum_over_time": rollupSum,
"sum2_over_time": rollupSum2,
"tfirst_over_time": rollupTfirst,
"timestamp": rollupTlast,
"timestamp_with_name": rollupTlast,
"tlast_change_over_time": rollupTlastChange,
"tlast_over_time": rollupTlast,
"tmax_over_time": rollupTmax,
"tmin_over_time": rollupTmin,
"zscore_over_time": rollupZScoreOverTime,
}
// VictoriaMetrics can extends lookbehind window for these functions
// in order to make sure it contains enough points for returning non-empty results.
//
// This is needed for returning the expected non-empty graphs when zooming in the graph in Grafana,
// which is built with `func_name(metric)` query.
var rollupFuncsCanAdjustWindow = map[string]bool{
"default_rollup": true,
"deriv": true,
"deriv_fast": true,
"ideriv": true,
"irate": true,
"rate": true,
"rate_over_sum": true,
"rollup": true,
"rollup_candlestick": true,
"rollup_deriv": true,
"rollup_rate": true,
"rollup_scrape_interval": true,
"scrape_interval": true,
"timestamp": true,
}
// rollupFuncsRemoveCounterResets contains functions, which need to call removeCounterResets
// over input samples before calling the corresponding rollup functions.
var rollupFuncsRemoveCounterResets = map[string]bool{
"increase": true,
"increase_prometheus": true,
"increase_pure": true,
"irate": true,
"rate": true,
"rollup_increase": true,
"rollup_rate": true,
}
// rollupFuncsSamplesScannedPerCall contains functions, which scan lower number of samples
// than is passed to the rollup func.
//
// It is expected that the remaining rollupFuncs scan all the samples passed to them.
var rollupFuncsSamplesScannedPerCall = map[string]int{
"absent_over_time": 1,
"count_over_time": 1,
"default_rollup": 1,
"delta": 2,
"delta_prometheus": 2,
"deriv_fast": 2,
"first_over_time": 1,
"idelta": 2,
"ideriv": 2,
"increase": 2,
"increase_prometheus": 2,
"increase_pure": 2,
"irate": 2,
"lag": 1,
"last_over_time": 1,
"lifetime": 2,
"present_over_time": 1,
"rate": 2,
"scrape_interval": 2,
"tfirst_over_time": 1,
"timestamp": 1,
"timestamp_with_name": 1,
"tlast_over_time": 1,
}
// These functions don't change physical meaning of input time series,
// so they don't drop metric name
var rollupFuncsKeepMetricName = map[string]bool{
"avg_over_time": true,
"default_rollup": true,
"first_over_time": true,
"geomean_over_time": true,
"hoeffding_bound_lower": true,
"hoeffding_bound_upper": true,
"holt_winters": true,
"last_over_time": true,
"max_over_time": true,
"min_over_time": true,
"mode_over_time": true,
"predict_linear": true,
"quantile_over_time": true,
"quantiles_over_time": true,
"rollup": true,
"rollup_candlestick": true,
"timestamp_with_name": true,
}
func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
afe, ok := expr.(*metricsql.AggrFuncExpr)
if ok {
// This is for incremental aggregate function case:
//
// sum(aggr_over_time(...))
//
// See aggr_incremental.go for details.
expr = afe.Args[0]
}
fe, ok := expr.(*metricsql.FuncExpr)
if !ok {
logger.Panicf("BUG: unexpected expression; want metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil))
}
if fe.Name != "aggr_over_time" {
logger.Panicf("BUG: unexpected function name: %q; want `aggr_over_time`", fe.Name)
}
if len(fe.Args) != 2 {
return nil, fmt.Errorf("unexpected number of args to aggr_over_time(); got %d; want %d", len(fe.Args), 2)
}
arg := fe.Args[0]
var aggrFuncNames []string
if se, ok := arg.(*metricsql.StringExpr); ok {
aggrFuncNames = append(aggrFuncNames, se.S)
} else {
fe, ok := arg.(*metricsql.FuncExpr)
if !ok || fe.Name != "" {
return nil, fmt.Errorf("%s cannot be passed to aggr_over_time(); expecting quoted aggregate function name or a list of quoted aggregate function names",
arg.AppendString(nil))
}
for _, e := range fe.Args {
se, ok := e.(*metricsql.StringExpr)
if !ok {
return nil, fmt.Errorf("%s cannot be passed here; expecting quoted aggregate function name", e.AppendString(nil))
}
aggrFuncNames = append(aggrFuncNames, se.S)
}
}
if len(aggrFuncNames) == 0 {
return nil, fmt.Errorf("aggr_over_time() must contain at least a single aggregate function name")
}
for _, s := range aggrFuncNames {
if rollupAggrFuncs[s] == nil {
return nil, fmt.Errorf("%q cannot be used in `aggr_over_time` function; expecting quoted aggregate function name", s)
}
}
return aggrFuncNames, nil
}
// getRollupTag returns the possible second arg from the expr.
//
// The expr can have the following forms:
// - rollup_func(q, tag)
// - aggr_func(rollup_func(q, tag)) - this form is used during incremental aggregate calculations
func getRollupTag(expr metricsql.Expr) (string, error) {
af, ok := expr.(*metricsql.AggrFuncExpr)
if ok {
// extract rollup_func() from aggr_func(rollup_func(q, tag))
if len(af.Args) != 1 {
logger.Panicf("BUG: unexpected number of args to %s; got %d; want 1", af.AppendString(nil), len(af.Args))
}
expr = af.Args[0]
}
fe, ok := expr.(*metricsql.FuncExpr)
if !ok {
logger.Panicf("BUG: unexpected expression; want *metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil))
}
if len(fe.Args) < 2 {
return "", nil
}
if len(fe.Args) != 2 {
return "", fmt.Errorf("unexpected number of args; got %d; want %d", len(fe.Args), 2)
}
arg := fe.Args[1]
se, ok := arg.(*metricsql.StringExpr)
if !ok {
return "", fmt.Errorf("unexpected rollup tag type: %s; expecting string", arg.AppendString(nil))
}
if se.S == "" {
return "", fmt.Errorf("rollup tag cannot be empty")
}
return se.S, nil
}
func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start, end, step int64, maxPointsPerSeries int,
window, lookbackDelta int64, sharedTimestamps []int64) (
func(values []float64, timestamps []int64), []*rollupConfig, error) {
preFunc := func(values []float64, timestamps []int64) {}
funcName = strings.ToLower(funcName)
if rollupFuncsRemoveCounterResets[funcName] {
preFunc = func(values []float64, timestamps []int64) {
removeCounterResets(values)
}
}
samplesScannedPerCall := rollupFuncsSamplesScannedPerCall[funcName]
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
return &rollupConfig{
TagValue: tagValue,
Func: rf,
Start: start,
End: end,
Step: step,
Window: window,
MaxPointsPerSeries: maxPointsPerSeries,
MayAdjustWindow: rollupFuncsCanAdjustWindow[funcName],
LookbackDelta: lookbackDelta,
Timestamps: sharedTimestamps,
isDefaultRollup: funcName == "default_rollup",
samplesScannedPerCall: samplesScannedPerCall,
}
}
appendRollupConfigs := func(dst []*rollupConfig, expr metricsql.Expr) ([]*rollupConfig, error) {
tag, err := getRollupTag(expr)
if err != nil {
return nil, fmt.Errorf("invalid args for %s: %w", expr.AppendString(nil), err)
}
switch tag {
case "min":
dst = append(dst, newRollupConfig(rollupMin, ""))
case "max":
dst = append(dst, newRollupConfig(rollupMax, ""))
case "avg":
dst = append(dst, newRollupConfig(rollupAvg, ""))
case "":
dst = append(dst, newRollupConfig(rollupMin, "min"))
dst = append(dst, newRollupConfig(rollupMax, "max"))
dst = append(dst, newRollupConfig(rollupAvg, "avg"))
default:
return nil, fmt.Errorf("unexpected second arg for %s: %q; want `min`, `max` or `avg`", expr.AppendString(nil), tag)
}
return dst, nil
}
var rcs []*rollupConfig
var err error
switch funcName {
case "rollup":
rcs, err = appendRollupConfigs(rcs, expr)
case "rollup_rate", "rollup_deriv":
preFuncPrev := preFunc
preFunc = func(values []float64, timestamps []int64) {
preFuncPrev(values, timestamps)
derivValues(values, timestamps)
}
rcs, err = appendRollupConfigs(rcs, expr)
case "rollup_increase", "rollup_delta":
preFuncPrev := preFunc
preFunc = func(values []float64, timestamps []int64) {
preFuncPrev(values, timestamps)
deltaValues(values)
}
rcs, err = appendRollupConfigs(rcs, expr)
case "rollup_candlestick":
tag, err := getRollupTag(expr)
if err != nil {
return nil, nil, fmt.Errorf("invalid args for %s: %w", expr.AppendString(nil), err)
}
switch tag {
case "open":
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
case "close":
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
case "low":
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
case "high":
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
case "":
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
default:
return nil, nil, fmt.Errorf("unexpected second arg for %s: %q; want `min`, `max` or `avg`", expr.AppendString(nil), tag)
}
case "rollup_scrape_interval":
preFuncPrev := preFunc
preFunc = func(values []float64, timestamps []int64) {
preFuncPrev(values, timestamps)
// Calculate intervals in seconds between samples.
tsSecsPrev := nan
for i, ts := range timestamps {
tsSecs := float64(ts) / 1000
values[i] = tsSecs - tsSecsPrev
tsSecsPrev = tsSecs
}
if len(values) > 1 {
// Overwrite the first NaN interval with the second interval,
// So min, max and avg rollups could be calculated properly, since they don't expect to receive NaNs.
values[0] = values[1]
}
}
rcs, err = appendRollupConfigs(rcs, expr)
case "aggr_over_time":
aggrFuncNames, err := getRollupAggrFuncNames(expr)
if err != nil {
return nil, nil, fmt.Errorf("invalid args to %s: %w", expr.AppendString(nil), err)
}
for _, aggrFuncName := range aggrFuncNames {
if rollupFuncsRemoveCounterResets[aggrFuncName] {
// There is no need to save the previous preFunc, since it is either empty or the same.
preFunc = func(values []float64, timestamps []int64) {
removeCounterResets(values)
}
}
rf := rollupAggrFuncs[aggrFuncName]
rcs = append(rcs, newRollupConfig(rf, aggrFuncName))
}
default:
rcs = append(rcs, newRollupConfig(rf, ""))
}
if err != nil {
return nil, nil, err
}
return preFunc, rcs, nil
}
func getRollupFunc(funcName string) newRollupFunc {
funcName = strings.ToLower(funcName)
return rollupFuncs[funcName]
}
type rollupFuncArg struct {
// The value preceding values if it fits staleness interval.
prevValue float64
// The timestamp for prevValue.
prevTimestamp int64
// Values that fit window ending at currTimestamp.
values []float64
// Timestamps for values.
timestamps []int64
// Real value preceding values without restrictions on staleness interval.
realPrevValue float64
// Real value which goes after values.
realNextValue float64
// Current timestamp for rollup evaluation.
currTimestamp int64
// Index for the currently evaluated point relative to time range for query evaluation.
idx int
// Time window for rollup calculations.
window int64
tsm *timeseriesMap
}
func (rfa *rollupFuncArg) reset() {
rfa.prevValue = 0
rfa.prevTimestamp = 0
rfa.values = nil
rfa.timestamps = nil
rfa.currTimestamp = 0
rfa.idx = 0
rfa.window = 0
rfa.tsm = nil
}
// rollupFunc must return rollup value for the given rfa.
//
// prevValue may be nan, values and timestamps may be empty.
type rollupFunc func(rfa *rollupFuncArg) float64
type rollupConfig struct {
// This tag value must be added to "rollup" tag if non-empty.
TagValue string
Func rollupFunc
Start int64
End int64
Step int64
Window int64
// The maximum number of points, which can be generated per each series.
MaxPointsPerSeries int
// Whether window may be adjusted to 2 x interval between data points.
// This is needed for functions which have dt in the denominator
// such as rate, deriv, etc.
// Without the adjustment their value would jump in unexpected directions
// when using window smaller than 2 x scrape_interval.
MayAdjustWindow bool
Timestamps []int64
// LoookbackDelta is the analog to `-query.lookback-delta` from Prometheus world.
LookbackDelta int64
// Whether default_rollup is used.
isDefaultRollup bool
// The estimated number of samples scanned per Func call.
//
// If zero, then it is considered that Func scans all the samples passed to it.
samplesScannedPerCall int
}
func (rc *rollupConfig) getTimestamps() []int64 {
return getTimestamps(rc.Start, rc.End, rc.Step, rc.MaxPointsPerSeries)
}
func (rc *rollupConfig) String() string {
start := storage.TimestampToHumanReadableFormat(rc.Start)
end := storage.TimestampToHumanReadableFormat(rc.End)
return fmt.Sprintf("timeRange=[%s..%s], step=%d, window=%d, points=%d", start, end, rc.Step, rc.Window, len(rc.Timestamps))
}
var (
nan = math.NaN()
inf = math.Inf(1)
)
// The maximum interval without previous rows.
const maxSilenceInterval = 5 * 60 * 1000
type timeseriesMap struct {
origin *timeseries
h metrics.Histogram
m map[string]*timeseries
}
func newTimeseriesMap(funcName string, keepMetricNames bool, sharedTimestamps []int64, mnSrc *storage.MetricName) *timeseriesMap {
funcName = strings.ToLower(funcName)
switch funcName {
case "histogram_over_time", "quantiles_over_time":
default:
return nil
}
values := make([]float64, len(sharedTimestamps))
for i := range values {
values[i] = nan
}
var origin timeseries
origin.MetricName.CopyFrom(mnSrc)
if !keepMetricNames && !rollupFuncsKeepMetricName[funcName] {
origin.MetricName.ResetMetricGroup()
}
origin.Timestamps = sharedTimestamps
origin.Values = values
return &timeseriesMap{
origin: &origin,
m: make(map[string]*timeseries),
}
}
func (tsm *timeseriesMap) AppendTimeseriesTo(dst []*timeseries) []*timeseries {
for _, ts := range tsm.m {
dst = append(dst, ts)
}
return dst
}
func (tsm *timeseriesMap) GetOrCreateTimeseries(labelName, labelValue string) *timeseries {
ts := tsm.m[labelValue]
if ts != nil {
return ts
}
ts = &timeseries{}
ts.CopyFromShallowTimestamps(tsm.origin)
ts.MetricName.RemoveTag(labelName)
ts.MetricName.AddTag(labelName, labelValue)
tsm.m[labelValue] = ts
return ts
}
// Do calculates rollups for the given timestamps and values, appends
// them to dstValues and returns results.
//
// rc.Timestamps are used as timestamps for dstValues.
//
// timestamps must cover time range [rc.Start - rc.Window - maxSilenceInterval ... rc.End].
//
// Do cannot be called from concurrent goroutines.
func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []int64) ([]float64, uint64) {
return rc.doInternal(dstValues, nil, values, timestamps)
}
// DoTimeseriesMap calculates rollups for the given timestamps and values and puts them to tsm.
func (rc *rollupConfig) DoTimeseriesMap(tsm *timeseriesMap, values []float64, timestamps []int64) uint64 {
ts := getTimeseries()
var samplesScanned uint64
ts.Values, samplesScanned = rc.doInternal(ts.Values[:0], tsm, values, timestamps)
putTimeseries(ts)
return samplesScanned
}
func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, values []float64, timestamps []int64) ([]float64, uint64) {
// Sanity checks.
if rc.Step <= 0 {
logger.Panicf("BUG: Step must be bigger than 0; got %d", rc.Step)
}
if rc.Start > rc.End {
logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", rc.Start, rc.End)
}
if rc.Window < 0 {
logger.Panicf("BUG: Window must be non-negative; got %d", rc.Window)
}
if err := ValidateMaxPointsPerSeries(rc.Start, rc.End, rc.Step, rc.MaxPointsPerSeries); err != nil {
logger.Panicf("BUG: %s; this must be validated before the call to rollupConfig.Do", err)
}
// Extend dstValues in order to remove mallocs below.
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
scrapeInterval := getScrapeInterval(timestamps, rc.Step)
maxPrevInterval := getMaxPrevInterval(scrapeInterval)
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
maxPrevInterval = rc.LookbackDelta
}
if *minStalenessInterval > 0 {
if msi := minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
maxPrevInterval = msi
}
}
window := rc.Window
if window <= 0 {
window = rc.Step
if rc.MayAdjustWindow && window < maxPrevInterval {
// Adjust lookbehind window only if it isn't set explicitly, e.g. rate(foo).
// In the case of missing lookbehind window it should be adjusted in order to return non-empty graph
// when the window doesn't cover at least two raw samples (this is what most users expect).
//
// If the user explicitly sets the lookbehind window to some fixed value, e.g. rate(foo[1s]),
// then it is expected he knows what he is doing. Do not adjust the lookbehind window then.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3483
window = maxPrevInterval
}
if rc.isDefaultRollup && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
// Implicit window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval
// according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784
window = rc.LookbackDelta
}
}
rfa := getRollupFuncArg()
rfa.idx = 0
rfa.window = window
rfa.tsm = tsm
i := 0
j := 0
ni := 0
nj := 0
f := rc.Func
samplesScanned := uint64(len(values))
samplesScannedPerCall := uint64(rc.samplesScannedPerCall)
for _, tEnd := range rc.Timestamps {
tStart := tEnd - window
ni = seekFirstTimestampIdxAfter(timestamps[i:], tStart, ni)
i += ni
if j < i {
j = i
}
nj = seekFirstTimestampIdxAfter(timestamps[j:], tEnd, nj)
j += nj
rfa.prevValue = nan
rfa.prevTimestamp = tStart - maxPrevInterval
if i < len(timestamps) && i > 0 && timestamps[i-1] > rfa.prevTimestamp {
rfa.prevValue = values[i-1]
rfa.prevTimestamp = timestamps[i-1]
}
rfa.values = values[i:j]
rfa.timestamps = timestamps[i:j]
if i > 0 {
rfa.realPrevValue = values[i-1]
} else {
rfa.realPrevValue = nan
}
if j < len(values) {
rfa.realNextValue = values[j]
} else {
rfa.realNextValue = nan
}
rfa.currTimestamp = tEnd
value := f(rfa)
rfa.idx++
if samplesScannedPerCall > 0 {
samplesScanned += samplesScannedPerCall
} else {
samplesScanned += uint64(len(rfa.values))
}
dstValues = append(dstValues, value)
}
putRollupFuncArg(rfa)
return dstValues, samplesScanned
}
func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int {
if len(timestamps) == 0 || timestamps[0] > seekTimestamp {
return 0
}
startIdx := nHint - 2
if startIdx < 0 {
startIdx = 0
}
if startIdx >= len(timestamps) {
startIdx = len(timestamps) - 1
}
endIdx := nHint + 2
if endIdx > len(timestamps) {
endIdx = len(timestamps)
}
if startIdx > 0 && timestamps[startIdx] <= seekTimestamp {
timestamps = timestamps[startIdx:]
endIdx -= startIdx
} else {
startIdx = 0
}
if endIdx < len(timestamps) && timestamps[endIdx] > seekTimestamp {
timestamps = timestamps[:endIdx]
}
if len(timestamps) < 16 {
// Fast path: the number of timestamps to search is small, so scan them all.
for i, timestamp := range timestamps {
if timestamp > seekTimestamp {
return startIdx + i
}
}
return startIdx + len(timestamps)
}
// Slow path: too big len(timestamps), so use binary search.
i := binarySearchInt64(timestamps, seekTimestamp+1)
return startIdx + int(i)
}
func binarySearchInt64(a []int64, v int64) uint {
// Copy-pasted sort.Search from https://golang.org/src/sort/search.go?s=2246:2286#L49
i, j := uint(0), uint(len(a))
for i < j {
h := (i + j) >> 1
if h < uint(len(a)) && a[h] < v {
i = h + 1
} else {
j = h
}
}
return i
}
func getScrapeInterval(timestamps []int64, defaultInterval int64) int64 {
if len(timestamps) < 2 {
// can't calculate scrape interval with less than 2 timestamps
// return defaultInterval
return defaultInterval
}
// Estimate scrape interval as 0.6 quantile for the first 20 intervals.
tsPrev := timestamps[0]
timestamps = timestamps[1:]
if len(timestamps) > 20 {
timestamps = timestamps[:20]
}
a := getFloat64s()
intervals := a.A[:0]
for _, ts := range timestamps {
intervals = append(intervals, float64(ts-tsPrev))
tsPrev = ts
}
scrapeInterval := int64(quantile(0.6, intervals))
a.A = intervals
putFloat64s(a)
if scrapeInterval <= 0 {
return defaultInterval
}
return scrapeInterval
}
func getMaxPrevInterval(scrapeInterval int64) int64 {
// Increase scrapeInterval more for smaller scrape intervals in order to hide possible gaps
// when high jitter is present.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/139 .
if scrapeInterval <= 2*1000 {
return scrapeInterval + 4*scrapeInterval
}
if scrapeInterval <= 4*1000 {
return scrapeInterval + 2*scrapeInterval
}
if scrapeInterval <= 8*1000 {
return scrapeInterval + scrapeInterval
}
if scrapeInterval <= 16*1000 {
return scrapeInterval + scrapeInterval/2
}
if scrapeInterval <= 32*1000 {
return scrapeInterval + scrapeInterval/4
}
return scrapeInterval + scrapeInterval/8
}
func removeCounterResets(values []float64) {
// There is no need in handling NaNs here, since they are impossible
// on values from vmstorage.
if len(values) == 0 {
return
}
var correction float64
prevValue := values[0]
for i, v := range values {
d := v - prevValue
if d < 0 {
if (-d * 8) < prevValue {
// This is likely a partial counter reset.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2787
correction += prevValue - v
} else {
correction += prevValue
}
}
prevValue = v
values[i] = v + correction
}
}
func deltaValues(values []float64) {
// There is no need in handling NaNs here, since they are impossible
// on values from vmstorage.
if len(values) == 0 {
return
}
prevDelta := float64(0)
prevValue := values[0]
for i, v := range values[1:] {
prevDelta = v - prevValue
values[i] = prevDelta
prevValue = v
}
values[len(values)-1] = prevDelta
}
func derivValues(values []float64, timestamps []int64) {
// There is no need in handling NaNs here, since they are impossible
// on values from vmstorage.
if len(values) == 0 {
return
}
prevDeriv := float64(0)
prevValue := values[0]
prevTs := timestamps[0]
for i, v := range values[1:] {
ts := timestamps[i+1]
if ts == prevTs {
// Use the previous value for duplicate timestamps.
values[i] = prevDeriv
continue
}
dt := float64(ts-prevTs) / 1e3
prevDeriv = (v - prevValue) / dt
values[i] = prevDeriv
prevValue = v
prevTs = ts
}
values[len(values)-1] = prevDeriv
}
type newRollupFunc func(args []interface{}) (rollupFunc, error)
func newRollupFuncOneArg(rf rollupFunc) newRollupFunc {
return func(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 1); err != nil {
return nil, err
}
return rf, nil
}
}
func newRollupFuncTwoArgs(rf rollupFunc) newRollupFunc {
return func(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
return rf, nil
}
}
func newRollupFuncOneOrTwoArgs(rf rollupFunc) newRollupFunc {
return func(args []interface{}) (rollupFunc, error) {
if len(args) < 1 || len(args) > 2 {
return nil, fmt.Errorf("unexpected number of args; got %d; want 1...2", len(args))
}
return rf, nil
}
}
func newRollupHoltWinters(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 3); err != nil {
return nil, err
}
sfs, err := getScalar(args[1], 1)
if err != nil {
return nil, err
}
tfs, err := getScalar(args[2], 2)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
sf := sfs[rfa.idx]
if sf <= 0 || sf >= 1 {
return nan
}
tf := tfs[rfa.idx]
if tf <= 0 || tf >= 1 {
return nan
}
// See https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing .
// TODO: determine whether this shit really works.
s0 := rfa.prevValue
if math.IsNaN(s0) {
s0 = values[0]
values = values[1:]
if len(values) == 0 {
return s0
}
}
b0 := values[0] - s0
for _, v := range values {
s1 := sf*v + (1-sf)*(s0+b0)
b1 := tf*(s1-s0) + (1-tf)*b0
s0 = s1
b0 = b1
}
return s0
}
return rf, nil
}
func newRollupPredictLinear(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
secs, err := getScalar(args[1], 1)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
v, k := linearRegression(rfa.values, rfa.timestamps, rfa.currTimestamp)
if math.IsNaN(v) {
return nan
}
sec := secs[rfa.idx]
return v + k*sec
}
return rf, nil
}
func linearRegression(values []float64, timestamps []int64, interceptTime int64) (float64, float64) {
if len(values) == 0 {
return nan, nan
}
if areConstValues(values) {
return values[0], 0
}
// See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example
vSum := float64(0)
tSum := float64(0)
tvSum := float64(0)
ttSum := float64(0)
n := 0
for i, v := range values {
if math.IsNaN(v) {
continue
}
dt := float64(timestamps[i]-interceptTime) / 1e3
vSum += v
tSum += dt
tvSum += dt * v
ttSum += dt * dt
n++
}
if n == 0 {
return nan, nan
}
k := float64(0)
tDiff := ttSum - tSum*tSum/float64(n)
if math.Abs(tDiff) >= 1e-6 {
// Prevent from incorrect division for too small tDiff values.
k = (tvSum - tSum*vSum/float64(n)) / tDiff
}
v := vSum/float64(n) - k*tSum/float64(n)
return v, k
}
func areConstValues(values []float64) bool {
if len(values) <= 1 {
return true
}
vPrev := values[0]
for _, v := range values[1:] {
if v != vPrev {
return false
}
vPrev = v
}
return true
}
func newRollupDurationOverTime(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
dMaxs, err := getScalar(args[1], 1)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
return nan
}
tPrev := timestamps[0]
dSum := int64(0)
dMax := int64(dMaxs[rfa.idx] * 1000)
for _, t := range timestamps {
d := t - tPrev
if d <= dMax {
dSum += d
}
tPrev = t
}
return float64(dSum) / 1000
}
return rf, nil
}
func newRollupShareLE(args []interface{}) (rollupFunc, error) {
return newRollupShareFilter(args, countFilterLE)
}
func countFilterLE(values []float64, le float64) int {
n := 0
for _, v := range values {
if v <= le {
n++
}
}
return n
}
func newRollupShareGT(args []interface{}) (rollupFunc, error) {
return newRollupShareFilter(args, countFilterGT)
}
func countFilterGT(values []float64, gt float64) int {
n := 0
for _, v := range values {
if v > gt {
n++
}
}
return n
}
func countFilterEQ(values []float64, eq float64) int {
n := 0
for _, v := range values {
if v == eq {
n++
}
}
return n
}
func countFilterNE(values []float64, ne float64) int {
n := 0
for _, v := range values {
if v != ne {
n++
}
}
return n
}
func newRollupShareFilter(args []interface{}, countFilter func(values []float64, limit float64) int) (rollupFunc, error) {
rf, err := newRollupCountFilter(args, countFilter)
if err != nil {
return nil, err
}
return func(rfa *rollupFuncArg) float64 {
n := rf(rfa)
return n / float64(len(rfa.values))
}, nil
}
func newRollupCountLE(args []interface{}) (rollupFunc, error) {
return newRollupCountFilter(args, countFilterLE)
}
func newRollupCountGT(args []interface{}) (rollupFunc, error) {
return newRollupCountFilter(args, countFilterGT)
}
func newRollupCountEQ(args []interface{}) (rollupFunc, error) {
return newRollupCountFilter(args, countFilterEQ)
}
func newRollupCountNE(args []interface{}) (rollupFunc, error) {
return newRollupCountFilter(args, countFilterNE)
}
func newRollupCountFilter(args []interface{}, countFilter func(values []float64, limit float64) int) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
limits, err := getScalar(args[1], 1)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return nan
}
limit := limits[rfa.idx]
return float64(countFilter(values, limit))
}
return rf, nil
}
func newRollupHoeffdingBoundLower(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
bound, avg := rollupHoeffdingBoundInternal(rfa, phis)
return avg - bound
}
return rf, nil
}
func newRollupHoeffdingBoundUpper(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
bound, avg := rollupHoeffdingBoundInternal(rfa, phis)
return avg + bound
}
return rf, nil
}
func rollupHoeffdingBoundInternal(rfa *rollupFuncArg, phis []float64) (float64, float64) {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return nan, nan
}
if len(values) == 1 {
return 0, values[0]
}
vMax := rollupMax(rfa)
vMin := rollupMin(rfa)
vAvg := rollupAvg(rfa)
vRange := vMax - vMin
if vRange <= 0 {
return 0, vAvg
}
phi := phis[rfa.idx]
if phi >= 1 {
return inf, vAvg
}
if phi <= 0 {
return 0, vAvg
}
// See https://en.wikipedia.org/wiki/Hoeffding%27s_inequality
// and https://www.youtube.com/watch?v=6UwcqiNsZ8U&feature=youtu.be&t=1237
bound := vRange * math.Sqrt(math.Log(1/(1-phi))/(2*float64(len(values))))
return bound, vAvg
}
func newRollupQuantiles(args []interface{}) (rollupFunc, error) {
if len(args) < 3 {
return nil, fmt.Errorf("unexpected number of args: %d; want at least 3 args", len(args))
}
tssPhi, ok := args[0].([]*timeseries)
if !ok {
return nil, fmt.Errorf("unexpected type for phi arg: %T; want string", args[0])
}
phiLabel, err := getString(tssPhi, 0)
if err != nil {
return nil, err
}
phiArgs := args[1 : len(args)-1]
phis := make([]float64, len(phiArgs))
phiStrs := make([]string, len(phiArgs))
for i, phiArg := range phiArgs {
phiValues, err := getScalar(phiArg, i+1)
if err != nil {
return nil, fmt.Errorf("cannot obtain phi from arg #%d: %w", i+1, err)
}
phis[i] = phiValues[0]
phiStrs[i] = fmt.Sprintf("%g", phiValues[0])
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return nan
}
qs := getFloat64s()
qs.A = quantiles(qs.A[:0], phis, values)
idx := rfa.idx
tsm := rfa.tsm
for i, phiStr := range phiStrs {
ts := tsm.GetOrCreateTimeseries(phiLabel, phiStr)
ts.Values[idx] = qs.A[i]
}
putFloat64s(qs)
return nan
}
return rf, nil
}
func newRollupQuantile(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
phi := phis[rfa.idx]
qv := quantile(phi, values)
return qv
}
return rf, nil
}
func rollupMAD(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
return mad(rfa.values)
}
func mad(values []float64) float64 {
// See https://en.wikipedia.org/wiki/Median_absolute_deviation
median := quantile(0.5, values)
a := getFloat64s()
ds := a.A[:0]
for _, v := range values {
ds = append(ds, math.Abs(v-median))
}
v := quantile(0.5, ds)
a.A = ds
putFloat64s(a)
return v
}
func rollupHistogram(rfa *rollupFuncArg) float64 {
values := rfa.values
tsm := rfa.tsm
tsm.h.Reset()
for _, v := range values {
tsm.h.Update(v)
}
idx := rfa.idx
tsm.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ts := tsm.GetOrCreateTimeseries("vmrange", vmrange)
ts.Values[idx] = float64(count)
})
return nan
}
func rollupAvg(rfa *rollupFuncArg) float64 {
// Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation,
// since it is slower and has no significant benefits in precision.
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
var sum float64
for _, v := range values {
sum += v
}
return sum / float64(len(values))
}
func rollupMin(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
minValue := values[0]
for _, v := range values {
if v < minValue {
minValue = v
}
}
return minValue
}
func rollupMax(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
maxValue := values[0]
for _, v := range values {
if v > maxValue {
maxValue = v
}
}
return maxValue
}
func rollupTmin(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) == 0 {
return nan
}
minValue := values[0]
minTimestamp := timestamps[0]
for i, v := range values {
// Get the last timestamp for the minimum value as most users expect.
if v <= minValue {
minValue = v
minTimestamp = timestamps[i]
}
}
return float64(minTimestamp) / 1e3
}
func rollupTmax(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) == 0 {
return nan
}
maxValue := values[0]
maxTimestamp := timestamps[0]
for i, v := range values {
// Get the last timestamp for the maximum value as most users expect.
if v >= maxValue {
maxValue = v
maxTimestamp = timestamps[i]
}
}
return float64(maxTimestamp) / 1e3
}
func rollupTfirst(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
// Do not take into account rfa.prevTimestamp, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return float64(timestamps[0]) / 1e3
}
func rollupTlast(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
// Do not take into account rfa.prevTimestamp, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return float64(timestamps[len(timestamps)-1]) / 1e3
}
func rollupTlastChange(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return nan
}
timestamps := rfa.timestamps
lastValue := values[len(values)-1]
values = values[:len(values)-1]
for i := len(values) - 1; i >= 0; i-- {
if values[i] != lastValue {
return float64(timestamps[i+1]) / 1e3
}
}
if math.IsNaN(rfa.prevValue) || rfa.prevValue != lastValue {
return float64(timestamps[0]) / 1e3
}
return nan
}
func rollupSum(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
var sum float64
for _, v := range values {
sum += v
}
return sum
}
func rollupRateOverSum(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
// Assume that the value didn't change since rfa.prevValue.
return 0
}
sum := float64(0)
for _, v := range rfa.values {
sum += v
}
return sum / (float64(rfa.window) / 1e3)
}
func rollupRange(rfa *rollupFuncArg) float64 {
max := rollupMax(rfa)
min := rollupMin(rfa)
return max - min
}
func rollupSum2(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue * rfa.prevValue
}
var sum2 float64
for _, v := range values {
sum2 += v * v
}
return sum2
}
func rollupGeomean(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
p := 1.0
for _, v := range values {
p *= v
}
return math.Pow(p, 1/float64(len(values)))
}
func rollupAbsent(rfa *rollupFuncArg) float64 {
if len(rfa.values) == 0 {
return 1
}
return nan
}
func rollupPresent(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
if len(rfa.values) > 0 {
return 1
}
return nan
}
func rollupCount(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return nan
}
return float64(len(values))
}
func rollupStaleSamples(rfa *rollupFuncArg) float64 {
values := rfa.values
if len(values) == 0 {
return nan
}
n := 0
for _, v := range rfa.values {
if decimal.IsStaleNaN(v) {
n++
}
}
return float64(n)
}
func rollupStddev(rfa *rollupFuncArg) float64 {
return stddev(rfa.values)
}
func rollupStdvar(rfa *rollupFuncArg) float64 {
return stdvar(rfa.values)
}
func stddev(values []float64) float64 {
v := stdvar(values)
return math.Sqrt(v)
}
func stdvar(values []float64) float64 {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
if len(values) == 0 {
return nan
}
if len(values) == 1 {
// Fast path.
return 0
}
var avg float64
var count float64
var q float64
for _, v := range values {
if math.IsNaN(v) {
continue
}
count++
avgNew := avg + (v-avg)/count
q += (v - avg) * (v - avgNew)
avg = avgNew
}
if count == 0 {
return nan
}
return q / count
}
func rollupIncreasePure(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
// restore to the real value because of potential staleness reset
prevValue := rfa.realPrevValue
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
// Assume the counter starts from 0.
prevValue = 0
}
if len(values) == 0 {
// Assume the counter didn't change since prevValue.
return 0
}
return values[len(values)-1] - prevValue
}
func rollupDelta(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
if !math.IsNaN(rfa.realPrevValue) {
// Assume that the value didn't change during the current gap.
// This should fix high delta() and increase() values at the end of gaps.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/894
return values[len(values)-1] - rfa.realPrevValue
}
// Assume that the previous non-existing value was 0
// only if the first value doesn't exceed too much the delta with the next value.
//
// This should prevent from improper increase() results for os-level counters
// such as cpu time or bytes sent over the network interface.
// These counters may start long ago before the first value appears in the db.
//
// This also should prevent from improper increase() results when a part of label values are changed
// without counter reset.
var d float64
if len(values) > 1 {
d = values[1] - values[0]
} else if !math.IsNaN(rfa.realNextValue) {
d = rfa.realNextValue - values[0]
}
if math.Abs(values[0]) < 10*(math.Abs(d)+1) {
prevValue = 0
} else {
prevValue = values[0]
values = values[1:]
}
}
if len(values) == 0 {
// Assume that the value didn't change on the given interval.
return 0
}
return values[len(values)-1] - prevValue
}
func rollupDeltaPrometheus(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
// Just return the difference between the last and the first sample like Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1962
if len(values) < 2 {
return nan
}
return values[len(values)-1] - values[0]
}
func rollupIdelta(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
// Assume that the value didn't change on the given interval.
return 0
}
lastValue := values[len(values)-1]
values = values[:len(values)-1]
if len(values) == 0 {
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
// Assume that the previous non-existing value was 0.
return lastValue
}
return lastValue - prevValue
}
return lastValue - values[len(values)-1]
}
func rollupDerivSlow(rfa *rollupFuncArg) float64 {
// Use linear regression like Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/73
_, k := linearRegression(rfa.values, rfa.timestamps, rfa.currTimestamp)
return k
}
func rollupDerivFast(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
prevValue := rfa.prevValue
prevTimestamp := rfa.prevTimestamp
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
if len(values) == 1 {
// It is impossible to determine the duration during which the value changed
// from 0 to the current value.
// The following attempts didn't work well:
// - using scrape interval as the duration. It fails on Prometheus restarts when it
// skips scraping for the counter. This results in too high rate() value for the first point
// after Prometheus restarts.
// - using window or step as the duration. It results in too small rate() values for the first
// points of time series.
//
// So just return nan
return nan
}
prevValue = values[0]
prevTimestamp = timestamps[0]
} else if len(values) == 0 {
// Assume that the value didn't change on the given interval.
return 0
}
vEnd := values[len(values)-1]
tEnd := timestamps[len(timestamps)-1]
dv := vEnd - prevValue
dt := float64(tEnd-prevTimestamp) / 1e3
return dv / dt
}
func rollupIderiv(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) < 2 {
if len(values) == 0 {
return nan
}
if math.IsNaN(rfa.prevValue) {
// It is impossible to determine the duration during which the value changed
// from 0 to the current value.
// The following attempts didn't work well:
// - using scrape interval as the duration. It fails on Prometheus restarts when it
// skips scraping for the counter. This results in too high rate() value for the first point
// after Prometheus restarts.
// - using window or step as the duration. It results in too small rate() values for the first
// points of time series.
//
// So just return nan
return nan
}
return (values[0] - rfa.prevValue) / (float64(timestamps[0]-rfa.prevTimestamp) / 1e3)
}
vEnd := values[len(values)-1]
tEnd := timestamps[len(timestamps)-1]
values = values[:len(values)-1]
timestamps = timestamps[:len(timestamps)-1]
// Skip data points with duplicate timestamps.
for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= tEnd {
timestamps = timestamps[:len(timestamps)-1]
}
var tStart int64
var vStart float64
if len(timestamps) == 0 {
if math.IsNaN(rfa.prevValue) {
return 0
}
tStart = rfa.prevTimestamp
vStart = rfa.prevValue
} else {
tStart = timestamps[len(timestamps)-1]
vStart = values[len(timestamps)-1]
}
dv := vEnd - vStart
dt := tEnd - tStart
return dv / (float64(dt) / 1e3)
}
func rollupLifetime(rfa *rollupFuncArg) float64 {
// Calculate the duration between the first and the last data points.
timestamps := rfa.timestamps
if math.IsNaN(rfa.prevValue) {
if len(timestamps) < 2 {
return nan
}
return float64(timestamps[len(timestamps)-1]-timestamps[0]) / 1e3
}
if len(timestamps) == 0 {
return nan
}
return float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) / 1e3
}
func rollupLag(rfa *rollupFuncArg) float64 {
// Calculate the duration between the current timestamp and the last data point.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return float64(rfa.currTimestamp-rfa.prevTimestamp) / 1e3
}
return float64(rfa.currTimestamp-timestamps[len(timestamps)-1]) / 1e3
}
func rollupScrapeInterval(rfa *rollupFuncArg) float64 {
// Calculate the average interval between data points.
timestamps := rfa.timestamps
if math.IsNaN(rfa.prevValue) {
if len(timestamps) < 2 {
return nan
}
return (float64(timestamps[len(timestamps)-1]-timestamps[0]) / 1e3) / float64(len(timestamps)-1)
}
if len(timestamps) == 0 {
return nan
}
return (float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) / 1e3) / float64(len(timestamps))
}
func rollupChangesPrometheus(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
// Do not take into account rfa.prevValue like Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1962
if len(values) < 1 {
return nan
}
prevValue := values[0]
n := 0
for _, v := range values[1:] {
if v != prevValue {
n++
prevValue = v
}
}
return float64(n)
}
func rollupChanges(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
n := 0
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
values = values[1:]
n++
}
for _, v := range values {
if v != prevValue {
n++
prevValue = v
}
}
return float64(n)
}
func rollupIncreases(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
prevValue = values[0]
values = values[1:]
}
if len(values) == 0 {
return 0
}
n := 0
for _, v := range values {
if v > prevValue {
n++
}
prevValue = v
}
return float64(n)
}
// `decreases_over_time` logic is the same as `resets` logic.
var rollupDecreases = rollupResets
func rollupResets(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
prevValue = values[0]
values = values[1:]
}
if len(values) == 0 {
return 0
}
n := 0
for _, v := range values {
if v < prevValue {
n++
}
prevValue = v
}
return float64(n)
}
// getCandlestickValues returns a subset of rfa.values suitable for rollup_candlestick
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309 for details.
func getCandlestickValues(rfa *rollupFuncArg) []float64 {
currTimestamp := rfa.currTimestamp
timestamps := rfa.timestamps
for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= currTimestamp {
timestamps = timestamps[:len(timestamps)-1]
}
if len(timestamps) == 0 {
return nil
}
return rfa.values[:len(timestamps)]
}
func getFirstValueForCandlestick(rfa *rollupFuncArg) float64 {
if rfa.prevTimestamp+rfa.window >= rfa.currTimestamp {
return rfa.prevValue
}
return nan
}
func rollupOpen(rfa *rollupFuncArg) float64 {
v := getFirstValueForCandlestick(rfa)
if !math.IsNaN(v) {
return v
}
values := getCandlestickValues(rfa)
if len(values) == 0 {
return nan
}
return values[0]
}
func rollupClose(rfa *rollupFuncArg) float64 {
values := getCandlestickValues(rfa)
if len(values) == 0 {
return getFirstValueForCandlestick(rfa)
}
return values[len(values)-1]
}
func rollupHigh(rfa *rollupFuncArg) float64 {
values := getCandlestickValues(rfa)
max := getFirstValueForCandlestick(rfa)
if math.IsNaN(max) {
if len(values) == 0 {
return nan
}
max = values[0]
values = values[1:]
}
for _, v := range values {
if v > max {
max = v
}
}
return max
}
func rollupLow(rfa *rollupFuncArg) float64 {
values := getCandlestickValues(rfa)
min := getFirstValueForCandlestick(rfa)
if math.IsNaN(min) {
if len(values) == 0 {
return nan
}
min = values[0]
values = values[1:]
}
for _, v := range values {
if v < min {
min = v
}
}
return min
}
func rollupModeOverTime(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
// Copy rfa.values to a.A, since modeNoNaNs modifies a.A contents.
a := getFloat64s()
a.A = append(a.A[:0], rfa.values...)
result := modeNoNaNs(rfa.prevValue, a.A)
putFloat64s(a)
return result
}
func getFloat64s() *float64s {
v := float64sPool.Get()
if v == nil {
v = &float64s{}
}
return v.(*float64s)
}
func putFloat64s(a *float64s) {
a.A = a.A[:0]
float64sPool.Put(a)
}
var float64sPool sync.Pool
type float64s struct {
A []float64
}
func rollupAscentOverTime(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
values = values[1:]
}
s := float64(0)
for _, v := range values {
d := v - prevValue
if d > 0 {
s += d
}
prevValue = v
}
return s
}
func rollupDescentOverTime(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
values = values[1:]
}
s := float64(0)
for _, v := range values {
d := prevValue - v
if d > 0 {
s += d
}
prevValue = v
}
return s
}
func rollupZScoreOverTime(rfa *rollupFuncArg) float64 {
// See https://about.gitlab.com/blog/2019/07/23/anomaly-detection-using-prometheus/#using-z-score-for-anomaly-detection
scrapeInterval := rollupScrapeInterval(rfa)
lag := rollupLag(rfa)
if math.IsNaN(scrapeInterval) || math.IsNaN(lag) || lag > scrapeInterval {
return nan
}
d := rollupLast(rfa) - rollupAvg(rfa)
if d == 0 {
return 0
}
return d / rollupStddev(rfa)
}
func rollupFirst(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return values[0]
}
func rollupDefault(rfa *rollupFuncArg) float64 {
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
// Intentionally do not skip the possible last Prometheus staleness mark.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 .
return values[len(values)-1]
}
func rollupLast(rfa *rollupFuncArg) float64 {
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return values[len(values)-1]
}
func rollupDistinct(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
m := make(map[float64]struct{})
for _, v := range values {
m[v] = struct{}{}
}
return float64(len(m))
}
func rollupIntegrate(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
prevValue := rfa.prevValue
prevTimestamp := rfa.currTimestamp - rfa.window
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
prevTimestamp = timestamps[0]
values = values[1:]
timestamps = timestamps[1:]
}
var sum float64
for i, v := range values {
timestamp := timestamps[i]
dt := float64(timestamp-prevTimestamp) / 1e3
sum += prevValue * dt
prevTimestamp = timestamp
prevValue = v
}
dt := float64(rfa.currTimestamp-prevTimestamp) / 1e3
sum += prevValue * dt
return sum
}
func rollupFake(rfa *rollupFuncArg) float64 {
logger.Panicf("BUG: rollupFake shouldn't be called")
return 0
}
func getScalar(arg interface{}, argNum int) ([]float64, error) {
ts, ok := arg.([]*timeseries)
if !ok {
return nil, fmt.Errorf(`unexpected type for arg #%d; got %T; want %T`, argNum+1, arg, ts)
}
if len(ts) != 1 {
return nil, fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(ts))
}
return ts[0].Values, nil
}
func getIntNumber(arg interface{}, argNum int) (int, error) {
v, err := getScalar(arg, argNum)
if err != nil {
return 0, err
}
n := 0
if len(v) > 0 {
n = floatToIntBounded(v[0])
}
return n, nil
}
func getString(tss []*timeseries, argNum int) (string, error) {
if len(tss) != 1 {
return "", fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(tss))
}
ts := tss[0]
for _, v := range ts.Values {
if !math.IsNaN(v) {
return "", fmt.Errorf(`arg #%d contains non-string timeseries`, argNum+1)
}
}
return string(ts.MetricName.MetricGroup), nil
}
func expectRollupArgsNum(args []interface{}, expectedNum int) error {
if len(args) == expectedNum {
return nil
}
return fmt.Errorf(`unexpected number of args; got %d; want %d`, len(args), expectedNum)
}
func getRollupFuncArg() *rollupFuncArg {
v := rfaPool.Get()
if v == nil {
return &rollupFuncArg{}
}
return v.(*rollupFuncArg)
}
func putRollupFuncArg(rfa *rollupFuncArg) {
rfa.reset()
rfaPool.Put(rfa)
}
var rfaPool sync.Pool