VictoriaMetrics/app/vmselect/promql/aggr.go
Aliaksandr Valialkin 388d020b7c
app/vmselect/promql: follow-up for ce4f26db02
- Document the bugfix at docs/CHANGELOG.md
- Filter out NaN values before sorting as suggested at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5509#discussion_r1447369218
- Revert unrelated changes in lib/filestream and lib/fs
- Use simpler test at app/vmselect/promql/exec_test.go

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5509
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506
2024-01-16 02:55:08 +02:00

1287 lines
31 KiB
Go

package promql
import (
"flag"
"fmt"
"math"
"sort"
"strconv"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
"github.com/cespare/xxhash/v2"
)
var maxSeriesPerAggrFunc = flag.Int("search.maxSeriesPerAggrFunc", 1e6, "The maximum number of time series an aggregate MetricsQL function can generate")
var aggrFuncs = map[string]aggrFunc{
"any": aggrFuncAny,
"avg": newAggrFunc(aggrFuncAvg),
"bottomk": newAggrFuncTopK(true),
"bottomk_avg": newAggrFuncRangeTopK(avgValue, true),
"bottomk_max": newAggrFuncRangeTopK(maxValue, true),
"bottomk_median": newAggrFuncRangeTopK(medianValue, true),
"bottomk_last": newAggrFuncRangeTopK(lastValue, true),
"bottomk_min": newAggrFuncRangeTopK(minValue, true),
"count": newAggrFunc(aggrFuncCount),
"count_values": aggrFuncCountValues,
"distinct": newAggrFunc(aggrFuncDistinct),
"geomean": newAggrFunc(aggrFuncGeomean),
"group": newAggrFunc(aggrFuncGroup),
"histogram": newAggrFunc(aggrFuncHistogram),
"limitk": aggrFuncLimitK,
"mad": newAggrFunc(aggrFuncMAD),
"max": newAggrFunc(aggrFuncMax),
"median": aggrFuncMedian,
"min": newAggrFunc(aggrFuncMin),
"mode": newAggrFunc(aggrFuncMode),
"outliers_iqr": aggrFuncOutliersIQR,
"outliers_mad": aggrFuncOutliersMAD,
"outliersk": aggrFuncOutliersK,
"quantile": aggrFuncQuantile,
"quantiles": aggrFuncQuantiles,
"share": aggrFuncShare,
"stddev": newAggrFunc(aggrFuncStddev),
"stdvar": newAggrFunc(aggrFuncStdvar),
"sum": newAggrFunc(aggrFuncSum),
"sum2": newAggrFunc(aggrFuncSum2),
"topk": newAggrFuncTopK(false),
"topk_avg": newAggrFuncRangeTopK(avgValue, false),
"topk_max": newAggrFuncRangeTopK(maxValue, false),
"topk_median": newAggrFuncRangeTopK(medianValue, false),
"topk_last": newAggrFuncRangeTopK(lastValue, false),
"topk_min": newAggrFuncRangeTopK(minValue, false),
"zscore": aggrFuncZScore,
}
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
type aggrFuncArg struct {
args [][]*timeseries
ae *metricsql.AggrFuncExpr
ec *EvalConfig
}
func getAggrFunc(s string) aggrFunc {
s = strings.ToLower(s)
return aggrFuncs[s]
}
func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc {
return func(afa *aggrFuncArg) ([]*timeseries, error) {
tss, err := getAggrTimeseries(afa.args)
if err != nil {
return nil, err
}
return aggrFuncExt(func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries {
return afe(tss)
}, tss, &afa.ae.Modifier, afa.ae.Limit, false)
}
}
func getAggrTimeseries(args [][]*timeseries) ([]*timeseries, error) {
if len(args) == 0 {
return nil, fmt.Errorf("expecting at least one arg")
}
tss := args[0]
for _, arg := range args[1:] {
tss = append(tss, arg...)
}
return tss, nil
}
func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.ModifierExpr) {
groupOp := strings.ToLower(modifier.Op)
switch groupOp {
case "", "by":
metricName.RemoveTagsOn(modifier.Args)
case "without":
metricName.RemoveTagsIgnoring(modifier.Args)
// Reset metric group as Prometheus does on `aggr(...) without (...)` call.
metricName.ResetMetricGroup()
default:
logger.Panicf("BUG: unknown group modifier: %q", groupOp)
}
}
func aggrFuncExt(afe func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries, argOrig []*timeseries,
modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) {
m := aggrPrepareSeries(argOrig, modifier, maxSeries, keepOriginal)
rvs := make([]*timeseries, 0, len(m))
for _, tss := range m {
rv := afe(tss, modifier)
rvs = append(rvs, rv...)
}
return rvs, nil
}
func aggrPrepareSeries(argOrig []*timeseries, modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) map[string][]*timeseries {
// Remove empty time series, e.g. series with all NaN samples,
// since such series are ignored by aggregate functions.
argOrig = removeEmptySeries(argOrig)
arg := copyTimeseriesMetricNames(argOrig, keepOriginal)
// Perform grouping.
m := make(map[string][]*timeseries)
bb := bbPool.Get()
for i, ts := range arg {
removeGroupTags(&ts.MetricName, modifier)
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := string(bb.B)
if keepOriginal {
ts = argOrig[i]
}
tss := m[k]
if tss == nil && maxSeries > 0 && len(m) >= maxSeries {
// We already reached time series limit after grouping. Skip other time series.
continue
}
tss = append(tss, ts)
m[k] = tss
}
bbPool.Put(bb)
return m
}
func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) {
tss, err := getAggrTimeseries(afa.args)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
return tss[:1]
}
limit := afa.ae.Limit
if limit > 1 {
// Only a single time series per group must be returned
limit = 1
}
return aggrFuncExt(afe, tss, &afa.ae.Modifier, limit, true)
}
func aggrFuncGroup(tss []*timeseries) []*timeseries {
// See https://github.com/prometheus/prometheus/commit/72425d4e3d14d209cc3f3f6e10e3240411303399
dst := tss[0]
for i := range dst.Values {
v := nan
for _, ts := range tss {
if math.IsNaN(ts.Values[i]) {
continue
}
v = 1
}
dst.Values[i] = v
}
return tss[:1]
}
func aggrFuncSum(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - nothing to sum.
return tss
}
dst := tss[0]
for i := range dst.Values {
sum := float64(0)
count := 0
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
sum += v
count++
}
if count == 0 {
sum = nan
}
dst.Values[i] = sum
}
return tss[:1]
}
func aggrFuncSum2(tss []*timeseries) []*timeseries {
dst := tss[0]
for i := range dst.Values {
sum2 := float64(0)
count := 0
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
sum2 += v * v
count++
}
if count == 0 {
sum2 = nan
}
dst.Values[i] = sum2
}
return tss[:1]
}
func aggrFuncGeomean(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - nothing to geomean.
return tss
}
dst := tss[0]
for i := range dst.Values {
p := 1.0
count := 0
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
p *= v
count++
}
if count == 0 {
p = nan
}
dst.Values[i] = math.Pow(p, 1/float64(count))
}
return tss[:1]
}
func aggrFuncHistogram(tss []*timeseries) []*timeseries {
var h metrics.Histogram
m := make(map[string]*timeseries)
for i := range tss[0].Values {
h.Reset()
for _, ts := range tss {
v := ts.Values[i]
h.Update(v)
}
h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ts := m[vmrange]
if ts == nil {
ts = &timeseries{}
ts.CopyFromShallowTimestamps(tss[0])
ts.MetricName.RemoveTag("vmrange")
ts.MetricName.AddTag("vmrange", vmrange)
values := ts.Values
for k := range values {
values[k] = 0
}
m[vmrange] = ts
}
ts.Values[i] = float64(count)
})
}
rvs := make([]*timeseries, 0, len(m))
for _, ts := range m {
rvs = append(rvs, ts)
}
return vmrangeBucketsToLE(rvs)
}
func aggrFuncMin(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - nothing to min.
return tss
}
dst := tss[0]
for i := range dst.Values {
min := dst.Values[i]
for _, ts := range tss {
if math.IsNaN(min) || ts.Values[i] < min {
min = ts.Values[i]
}
}
dst.Values[i] = min
}
return tss[:1]
}
func aggrFuncMax(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - nothing to max.
return tss
}
dst := tss[0]
for i := range dst.Values {
max := dst.Values[i]
for _, ts := range tss {
if math.IsNaN(max) || ts.Values[i] > max {
max = ts.Values[i]
}
}
dst.Values[i] = max
}
return tss[:1]
}
func aggrFuncAvg(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - nothing to avg.
return tss
}
dst := tss[0]
for i := range dst.Values {
// Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation,
// since it is slower and has no obvious benefits in increased precision.
var sum float64
count := 0
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
count++
sum += v
}
avg := nan
if count > 0 {
avg = sum / float64(count)
}
dst.Values[i] = avg
}
return tss[:1]
}
func aggrFuncStddev(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - stddev over a single time series is zero
values := tss[0].Values
for i, v := range values {
if !math.IsNaN(v) {
values[i] = 0
}
}
return tss
}
rvs := aggrFuncStdvar(tss)
dst := rvs[0]
for i, v := range dst.Values {
dst.Values[i] = math.Sqrt(v)
}
return rvs
}
func aggrFuncStdvar(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - stdvar over a single time series is zero
values := tss[0].Values
for i, v := range values {
if !math.IsNaN(v) {
values[i] = 0
}
}
return tss
}
dst := tss[0]
for i := range dst.Values {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
var avg, count, q float64
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
count++
avgNew := avg + (v-avg)/count
q += (v - avg) * (v - avgNew)
avg = avgNew
}
if count == 0 {
q = nan
}
dst.Values[i] = q / count
}
return tss[:1]
}
func aggrFuncCount(tss []*timeseries) []*timeseries {
dst := tss[0]
for i := range dst.Values {
count := 0
for _, ts := range tss {
if math.IsNaN(ts.Values[i]) {
continue
}
count++
}
v := float64(count)
if count == 0 {
v = nan
}
dst.Values[i] = v
}
return tss[:1]
}
func aggrFuncDistinct(tss []*timeseries) []*timeseries {
dst := tss[0]
m := make(map[float64]struct{}, len(tss))
for i := range dst.Values {
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
m[v] = struct{}{}
}
n := float64(len(m))
if n == 0 {
n = nan
}
dst.Values[i] = n
for k := range m {
delete(m, k)
}
}
return tss[:1]
}
func aggrFuncMode(tss []*timeseries) []*timeseries {
dst := tss[0]
a := make([]float64, 0, len(tss))
for i := range dst.Values {
a := a[:0]
for _, ts := range tss {
v := ts.Values[i]
if !math.IsNaN(v) {
a = append(a, v)
}
}
dst.Values[i] = modeNoNaNs(nan, a)
}
return tss[:1]
}
func aggrFuncShare(afa *aggrFuncArg) ([]*timeseries, error) {
tss, err := getAggrTimeseries(afa.args)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
for i := range tss[0].Values {
// Calculate sum for non-negative points at position i.
var sum float64
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) || v < 0 {
continue
}
sum += v
}
// Divide every non-negative value at poisition i by sum in order to get its' share.
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) || v < 0 {
ts.Values[i] = nan
} else {
ts.Values[i] = v / sum
}
}
}
return tss
}
return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, true)
}
func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
tss, err := getAggrTimeseries(afa.args)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
for i := range tss[0].Values {
// Calculate avg and stddev for tss points at position i.
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
var avg, count, q float64
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
count++
avgNew := avg + (v-avg)/count
q += (v - avg) * (v - avgNew)
avg = avgNew
}
if count == 0 {
// Cannot calculate z-score for NaN points.
continue
}
// Calculate z-score for tss points at position i.
// See https://en.wikipedia.org/wiki/Standard_score
stddev := math.Sqrt(q / count)
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
ts.Values[i] = (v - avg) / stddev
}
}
return tss
}
return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, true)
}
// modeNoNaNs returns mode for a.
//
// It is expected that a doesn't contain NaNs.
//
// The function modifies contents for a, so the caller must prepare it accordingly.
//
// See https://en.wikipedia.org/wiki/Mode_(statistics)
func modeNoNaNs(prevValue float64, a []float64) float64 {
if len(a) == 0 {
return prevValue
}
sort.Float64s(a)
j := -1
dMax := 0
mode := prevValue
for i, v := range a {
if prevValue == v {
continue
}
if d := i - j; d > dMax || math.IsNaN(mode) {
dMax = d
mode = prevValue
}
j = i
prevValue = v
}
if d := len(a) - j; d > dMax || math.IsNaN(mode) {
mode = prevValue
}
return mode
}
func aggrFuncCountValues(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err
}
dstLabel, err := getString(args[0], 0)
if err != nil {
return nil, err
}
// Remove dstLabel from grouping like Prometheus does.
modifier := &afa.ae.Modifier
switch strings.ToLower(modifier.Op) {
case "without":
modifier.Args = append(modifier.Args, dstLabel)
case "by":
dstArgs := modifier.Args[:0]
for _, arg := range modifier.Args {
if arg == dstLabel {
continue
}
dstArgs = append(dstArgs, arg)
}
modifier.Args = dstArgs
default:
// Do nothing
}
afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) ([]*timeseries, error) {
m := make(map[float64]*timeseries)
for _, ts := range tss {
for i, v := range ts.Values {
if math.IsNaN(v) {
continue
}
dst := m[v]
if dst == nil {
if len(m) >= *maxSeriesPerAggrFunc {
return nil, fmt.Errorf("more than -search.maxSeriesPerAggrFunc=%d are generated by count_values()", *maxSeriesPerAggrFunc)
}
dst = &timeseries{}
dst.CopyFromShallowTimestamps(tss[0])
dst.MetricName.RemoveTag(dstLabel)
dst.MetricName.AddTag(dstLabel, strconv.FormatFloat(v, 'f', -1, 64))
values := dst.Values
for j := range values {
values[j] = nan
}
m[v] = dst
}
values := dst.Values
if math.IsNaN(values[i]) {
values[i] = 1
} else {
values[i]++
}
}
}
rvs := make([]*timeseries, 0, len(m))
for _, ts := range m {
rvs = append(rvs, ts)
}
return rvs, nil
}
m := aggrPrepareSeries(args[1], &afa.ae.Modifier, afa.ae.Limit, false)
rvs := make([]*timeseries, 0, len(m))
for _, tss := range m {
rv, err := afe(tss, modifier)
if err != nil {
return nil, err
}
rvs = append(rvs, rv...)
if len(rvs) > *maxSeriesPerAggrFunc {
return nil, fmt.Errorf("more than -search.maxSeriesPerAggrFunc=%d are generated by count_values()", *maxSeriesPerAggrFunc)
}
}
return rvs, nil
}
func newAggrFuncTopK(isReverse bool) aggrFunc {
return func(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err
}
ks, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries {
var tssNoNaNs []*timeseries
for n := range tss[0].Values {
// Drop series with NaNs at Values[n] before sorting.
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506
tssNoNaNs = tssNoNaNs[:0]
for _, ts := range tss {
if !math.IsNaN(ts.Values[n]) {
tssNoNaNs = append(tssNoNaNs, ts)
}
}
sort.Slice(tssNoNaNs, func(i, j int) bool {
a := tssNoNaNs[i].Values[n]
b := tssNoNaNs[j].Values[n]
if isReverse {
a, b = b, a
}
return a < b
})
fillNaNsAtIdx(n, ks[n], tssNoNaNs)
}
tss = removeEmptySeries(tss)
reverseSeries(tss)
return tss
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
}
func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc {
return func(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if len(args) < 2 {
return nil, fmt.Errorf(`unexpected number of args; got %d; want at least %d`, len(args), 2)
}
if len(args) > 3 {
return nil, fmt.Errorf(`unexpected number of args; got %d; want no more than %d`, len(args), 3)
}
ks, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
remainingSumTagName := ""
if len(args) == 3 {
remainingSumTagName, err = getString(args[2], 2)
if err != nil {
return nil, err
}
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
return getRangeTopKTimeseries(tss, modifier, ks, remainingSumTagName, f, isReverse)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
}
func getRangeTopKTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr, ks []float64, remainingSumTagName string,
f func(values []float64) float64, isReverse bool) []*timeseries {
type tsWithValue struct {
ts *timeseries
value float64
}
maxs := make([]tsWithValue, len(tss))
for i, ts := range tss {
value := f(ts.Values)
maxs[i] = tsWithValue{
ts: ts,
value: value,
}
}
// Drop maxs with NaNs before sorting.
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506
maxsNoNaNs := make([]tsWithValue, 0, len(maxs))
for _, tsv := range maxs {
if !math.IsNaN(tsv.value) {
maxsNoNaNs = append(maxsNoNaNs, tsv)
}
}
sort.Slice(maxsNoNaNs, func(i, j int) bool {
a := maxsNoNaNs[i].value
b := maxsNoNaNs[j].value
if isReverse {
a, b = b, a
}
return a < b
})
for _, tsv := range maxs {
if math.IsNaN(tsv.value) {
maxsNoNaNs = append(maxsNoNaNs, tsv)
}
}
for i := range maxsNoNaNs {
tss[i] = maxsNoNaNs[i].ts
}
remainingSumTS := getRemainingSumTimeseries(tss, modifier, ks, remainingSumTagName)
for i, k := range ks {
fillNaNsAtIdx(i, k, tss)
}
if remainingSumTS != nil {
tss = append(tss, remainingSumTS)
}
tss = removeEmptySeries(tss)
reverseSeries(tss)
return tss
}
func reverseSeries(tss []*timeseries) {
j := len(tss)
for i := 0; i < len(tss)/2; i++ {
j--
tss[i], tss[j] = tss[j], tss[i]
}
}
func getRemainingSumTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr, ks []float64, remainingSumTagName string) *timeseries {
if len(remainingSumTagName) == 0 || len(tss) == 0 {
return nil
}
var dst timeseries
dst.CopyFromShallowTimestamps(tss[0])
removeGroupTags(&dst.MetricName, modifier)
tagValue := remainingSumTagName
n := strings.IndexByte(remainingSumTagName, '=')
if n >= 0 {
tagValue = remainingSumTagName[n+1:]
remainingSumTagName = remainingSumTagName[:n]
}
dst.MetricName.RemoveTag(remainingSumTagName)
dst.MetricName.AddTag(remainingSumTagName, tagValue)
for i, k := range ks {
kn := getIntK(k, len(tss))
var sum float64
count := 0
for _, ts := range tss[:len(tss)-kn] {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
sum += v
count++
}
if count == 0 {
sum = nan
}
dst.Values[i] = sum
}
return &dst
}
func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) {
kn := getIntK(k, len(tss))
for _, ts := range tss[:len(tss)-kn] {
ts.Values[idx] = nan
}
}
func getIntK(k float64, max int) int {
if math.IsNaN(k) {
return 0
}
kn := floatToIntBounded(k)
if kn < 0 {
return 0
}
if kn > max {
return max
}
return kn
}
func minValue(values []float64) float64 {
min := nan
for len(values) > 0 && math.IsNaN(min) {
min = values[0]
values = values[1:]
}
for _, v := range values {
if !math.IsNaN(v) && v < min {
min = v
}
}
return min
}
func maxValue(values []float64) float64 {
max := nan
for len(values) > 0 && math.IsNaN(max) {
max = values[0]
values = values[1:]
}
for _, v := range values {
if !math.IsNaN(v) && v > max {
max = v
}
}
return max
}
func avgValue(values []float64) float64 {
sum := float64(0)
count := 0
for _, v := range values {
if math.IsNaN(v) {
continue
}
count++
sum += v
}
if count == 0 {
return nan
}
return sum / float64(count)
}
func medianValue(values []float64) float64 {
return quantile(0.5, values)
}
func lastValue(values []float64) float64 {
values = skipTrailingNaNs(values)
if len(values) == 0 {
return nan
}
return values[len(values)-1]
}
// quantiles calculates the given phis from originValues without modifying originValues, appends them to qs and returns the result.
func quantiles(qs, phis []float64, originValues []float64) []float64 {
a := getFloat64s()
a.prepareForQuantileFloat64(originValues)
qs = quantilesSorted(qs, phis, a.A)
putFloat64s(a)
return qs
}
// quantile calculates the given phi from originValues without modifying originValues
func quantile(phi float64, originValues []float64) float64 {
a := getFloat64s()
a.prepareForQuantileFloat64(originValues)
q := quantileSorted(phi, a.A)
putFloat64s(a)
return q
}
// prepareForQuantileFloat64 copies items from src to a but removes NaNs and sorts items in a.
func (a *float64s) prepareForQuantileFloat64(src []float64) {
dst := a.A[:0]
for _, v := range src {
if math.IsNaN(v) {
continue
}
dst = append(dst, v)
}
a.A = dst
// Use sort.Sort instead of sort.Float64s in order to avoid a memory allocation
sort.Sort(a)
}
func (a *float64s) Len() int {
return len(a.A)
}
func (a *float64s) Swap(i, j int) {
x := a.A
x[i], x[j] = x[j], x[i]
}
func (a *float64s) Less(i, j int) bool {
x := a.A
return x[i] < x[j]
}
// quantilesSorted calculates the given phis over a sorted list of values, appends them to qs and returns the result.
//
// It is expected that values won't contain NaN items.
// The implementation mimics Prometheus implementation for compatibility's sake.
func quantilesSorted(qs, phis []float64, values []float64) []float64 {
for _, phi := range phis {
q := quantileSorted(phi, values)
qs = append(qs, q)
}
return qs
}
// quantileSorted calculates the given quantile over a sorted list of values.
//
// It is expected that values won't contain NaN items.
// The implementation mimics Prometheus implementation for compatibility's sake.
func quantileSorted(phi float64, values []float64) float64 {
if len(values) == 0 || math.IsNaN(phi) {
return nan
}
if phi < 0 {
return math.Inf(-1)
}
if phi > 1 {
return math.Inf(+1)
}
n := float64(len(values))
rank := phi * (n - 1)
lowerIndex := math.Max(0, math.Floor(rank))
upperIndex := math.Min(n-1, lowerIndex+1)
weight := rank - math.Floor(rank)
return values[int(lowerIndex)]*(1-weight) + values[int(upperIndex)]*weight
}
func aggrFuncMAD(tss []*timeseries) []*timeseries {
// Calculate medians for each point across tss.
medians := getPerPointMedians(tss)
// Calculate MAD values multiplied by tolerance for each point across tss.
// See https://en.wikipedia.org/wiki/Median_absolute_deviation
mads := getPerPointMADs(tss, medians)
tss[0].Values = append(tss[0].Values[:0], mads...)
return tss[:1]
}
func aggrFuncOutliersIQR(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 1); err != nil {
return nil, err
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
// Calculate lower and upper bounds for interquartile range per each point across tss
// according to Outliers section at https://en.wikipedia.org/wiki/Interquartile_range
lower, upper := getPerPointIQRBounds(tss)
// Leave only time series with outliers above upper bound or below lower bound
tssDst := tss[:0]
for _, ts := range tss {
values := ts.Values
for i, v := range values {
if v > upper[i] || v < lower[i] {
tssDst = append(tssDst, ts)
break
}
}
}
return tssDst
}
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, true)
}
func getPerPointIQRBounds(tss []*timeseries) ([]float64, []float64) {
if len(tss) == 0 {
return nil, nil
}
pointsLen := len(tss[0].Values)
values := make([]float64, 0, len(tss))
var qs []float64
lower := make([]float64, pointsLen)
upper := make([]float64, pointsLen)
for i := 0; i < pointsLen; i++ {
values = values[:0]
for _, ts := range tss {
v := ts.Values[i]
if !math.IsNaN(v) {
values = append(values, v)
}
}
qs := quantiles(qs[:0], iqrPhis, values)
iqr := 1.5 * (qs[1] - qs[0])
lower[i] = qs[0] - iqr
upper[i] = qs[1] + iqr
}
return lower, upper
}
var iqrPhis = []float64{0.25, 0.75}
func aggrFuncOutliersMAD(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err
}
tolerances, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
// Calculate medians for each point across tss.
medians := getPerPointMedians(tss)
// Calculate MAD values multiplied by tolerance for each point across tss.
// See https://en.wikipedia.org/wiki/Median_absolute_deviation
mads := getPerPointMADs(tss, medians)
for n := range mads {
mads[n] *= tolerances[n]
}
// Leave only time series with at least a single peak above the MAD multiplied by tolerance.
tssDst := tss[:0]
for _, ts := range tss {
values := ts.Values
for n, v := range values {
ad := math.Abs(v - medians[n])
mad := mads[n]
if ad > mad {
tssDst = append(tssDst, ts)
break
}
}
}
return tssDst
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err
}
ks, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
// Calculate medians for each point across tss.
medians := getPerPointMedians(tss)
// Return topK time series with the highest variance from median.
f := func(values []float64) float64 {
sum2 := float64(0)
for n, v := range values {
d := v - medians[n]
sum2 += d * d
}
return sum2
}
return getRangeTopKTimeseries(tss, &afa.ae.Modifier, ks, "", f, false)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
func getPerPointMedians(tss []*timeseries) []float64 {
if len(tss) == 0 {
logger.Panicf("BUG: expecting non-empty tss")
}
medians := make([]float64, len(tss[0].Values))
a := getFloat64s()
values := a.A
for n := range medians {
values = values[:0]
for j := range tss {
v := tss[j].Values[n]
if !math.IsNaN(v) {
values = append(values, v)
}
}
medians[n] = quantile(0.5, values)
}
a.A = values
putFloat64s(a)
return medians
}
func getPerPointMADs(tss []*timeseries, medians []float64) []float64 {
mads := make([]float64, len(medians))
a := getFloat64s()
values := a.A
for n, median := range medians {
values = values[:0]
for j := range tss {
v := tss[j].Values[n]
if !math.IsNaN(v) {
ad := math.Abs(v - median)
values = append(values, ad)
}
}
mads[n] = quantile(0.5, values)
}
a.A = values
putFloat64s(a)
return mads
}
func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err
}
limit, err := getIntNumber(args[0], 0)
if err != nil {
return nil, fmt.Errorf("cannot obtain limit arg: %w", err)
}
if limit < 0 {
limit = 0
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
// Sort series by metricName hash in order to get consistent set of output series
// across multiple calls to limitk() function.
// Sort series by hash in order to guarantee uniform selection across series.
type hashSeries struct {
h uint64
ts *timeseries
}
hss := make([]hashSeries, len(tss))
d := xxhash.New()
for i, ts := range tss {
h := getHash(d, &ts.MetricName)
hss[i] = hashSeries{
h: h,
ts: ts,
}
}
sort.Slice(hss, func(i, j int) bool {
return hss[i].h < hss[j].h
})
for i, hs := range hss {
tss[i] = hs.ts
}
if limit < len(tss) {
tss = tss[:limit]
}
return tss
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
func getHash(d *xxhash.Digest, mn *storage.MetricName) uint64 {
d.Reset()
_, _ = d.Write(mn.MetricGroup)
for _, tag := range mn.Tags {
_, _ = d.Write(tag.Key)
_, _ = d.Write(tag.Value)
}
return d.Sum64()
}
func aggrFuncQuantiles(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if len(args) < 3 {
return nil, fmt.Errorf("unexpected number of args: %d; expecting at least 3 args", len(args))
}
dstLabel, err := getString(args[0], 0)
if err != nil {
return nil, fmt.Errorf("cannot obtain dstLabel: %w", err)
}
phiArgs := args[1 : len(args)-1]
phis := make([]float64, len(phiArgs))
for i, phiArg := range phiArgs {
phisLocal, err := getScalar(phiArg, i+1)
if err != nil {
return nil, err
}
if len(phis) == 0 {
logger.Panicf("BUG: expecting at least a single sample")
}
phis[i] = phisLocal[0]
}
argOrig := args[len(args)-1]
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
tssDst := make([]*timeseries, len(phiArgs))
for j := range tssDst {
ts := &timeseries{}
ts.CopyFromShallowTimestamps(tss[0])
ts.MetricName.RemoveTag(dstLabel)
ts.MetricName.AddTag(dstLabel, fmt.Sprintf("%g", phis[j]))
tssDst[j] = ts
}
b := getFloat64s()
qs := b.A
a := getFloat64s()
values := a.A
for n := range tss[0].Values {
values = values[:0]
for j := range tss {
values = append(values, tss[j].Values[n])
}
qs = quantiles(qs[:0], phis, values)
for j := range tssDst {
tssDst[j].Values[n] = qs[j]
}
}
a.A = values
putFloat64s(a)
b.A = qs
putFloat64s(b)
return tssDst
}
return aggrFuncExt(afe, argOrig, &afa.ae.Modifier, afa.ae.Limit, false)
}
func aggrFuncQuantile(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
afe := newAggrQuantileFunc(phis)
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, false)
}
func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) {
tss, err := getAggrTimeseries(afa.args)
if err != nil {
return nil, err
}
phis := evalNumber(afa.ec, 0.5)[0].Values
afe := newAggrQuantileFunc(phis)
return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false)
}
func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
dst := tss[0]
a := getFloat64s()
values := a.A
for n := range dst.Values {
values = values[:0]
for j := range tss {
values = append(values, tss[j].Values[n])
}
dst.Values[n] = quantile(phis[n], values)
}
a.A = values
putFloat64s(a)
tss[0] = dst
return tss[:1]
}
}
func floatToIntBounded(f float64) int {
if f > math.MaxInt {
return math.MaxInt
}
if f < math.MinInt {
return math.MinInt
}
return int(f)
}