Aliaksandr Valialkin 2021-09-27 18:55:35 +03:00
parent 6061464d80
commit eff31c10ec
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
4 changed files with 94 additions and 73 deletions

View File

@ -2,15 +2,15 @@ package promql
import ( import (
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
"github.com/valyala/histogram"
"math" "math"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
) )
var aggrFuncs = map[string]aggrFunc{ var aggrFuncs = map[string]aggrFunc{
@ -803,36 +803,25 @@ func medianValue(values []float64) float64 {
return quantile(0.5, values) return quantile(0.5, values)
} }
// quantiles calculates the given phis from originValues // quantiles calculates the given phis from originValues without modifying originValues, appends them to qs and returns the result.
// without modifying originValues func quantiles(qs, phis []float64, originValues []float64) []float64 {
func quantiles(phis []float64, originValues []float64) []float64 { a := getFloat64s()
a := float64sPool.Get().(*float64s)
a.A = prepareForQuantileFloat64(a.A[:0], originValues) a.A = prepareForQuantileFloat64(a.A[:0], originValues)
res := quantilesSorted(phis, a.A) qs = quantilesSorted(qs, phis, a.A)
float64sPool.Put(a) putFloat64s(a)
return res return qs
} }
func quantilesSorted(phis []float64, values []float64) []float64 { // quantile calculates the given phi from originValues without modifying originValues
res := make([]float64, len(phis))
for i, phi := range phis {
res[i] = quantileSorted(phi, values)
}
return res
}
// quantile calculates the given phi from originValues
// without modifying originValues
func quantile(phi float64, originValues []float64) float64 { func quantile(phi float64, originValues []float64) float64 {
a := float64sPool.Get().(*float64s) a := getFloat64s()
a.A = prepareForQuantileFloat64(a.A[:0], originValues) a.A = prepareForQuantileFloat64(a.A[:0], originValues)
res := quantileSorted(phi, a.A) q := quantileSorted(phi, a.A)
float64sPool.Put(a) putFloat64s(a)
return res return q
} }
// prepareForQuantileFloat64 copies items from src // prepareForQuantileFloat64 copies items from src to dst but removes NaNs and sorts the dst
// to dst but removes NaNs and sorts the dst
func prepareForQuantileFloat64(dst, src []float64) []float64 { func prepareForQuantileFloat64(dst, src []float64) []float64 {
for _, v := range src { for _, v := range src {
if math.IsNaN(v) { if math.IsNaN(v) {
@ -844,7 +833,20 @@ func prepareForQuantileFloat64(dst, src []float64) []float64 {
return dst return dst
} }
// quantileSorted calculates the given quantile of a sorted list of values. // quantilesSorted calculates the given phis over a sorted list of values, appends them to qs and returns the result.
//
// It is expected that values won't contain NaN items.
// The implementation mimics Prometheus implementation for compatibility's sake.
func quantilesSorted(qs, phis []float64, values []float64) []float64 {
for _, phi := range phis {
q := quantileSorted(phi, values)
qs = append(qs, q)
}
return qs
}
// quantileSorted calculates the given quantile over a sorted list of values.
//
// It is expected that values won't contain NaN items. // It is expected that values won't contain NaN items.
// The implementation mimics Prometheus implementation for compatibility's sake. // The implementation mimics Prometheus implementation for compatibility's sake.
func quantileSorted(phi float64, values []float64) float64 { func quantileSorted(phi float64, values []float64) float64 {
@ -944,36 +946,40 @@ func getPerPointMedians(tss []*timeseries) []float64 {
logger.Panicf("BUG: expecting non-empty tss") logger.Panicf("BUG: expecting non-empty tss")
} }
medians := make([]float64, len(tss[0].Values)) medians := make([]float64, len(tss[0].Values))
h := histogram.GetFast() a := getFloat64s()
values := a.A
for n := range medians { for n := range medians {
h.Reset() values = values[:0]
for j := range tss { for j := range tss {
v := tss[j].Values[n] v := tss[j].Values[n]
if !math.IsNaN(v) { if !math.IsNaN(v) {
h.Update(v) values = append(values, v)
} }
} }
medians[n] = h.Quantile(0.5) medians[n] = quantile(0.5, values)
} }
histogram.PutFast(h) a.A = values
putFloat64s(a)
return medians return medians
} }
func getPerPointMADs(tss []*timeseries, medians []float64) []float64 { func getPerPointMADs(tss []*timeseries, medians []float64) []float64 {
mads := make([]float64, len(medians)) mads := make([]float64, len(medians))
h := histogram.GetFast() a := getFloat64s()
values := a.A
for n, median := range medians { for n, median := range medians {
h.Reset() values = values[:0]
for j := range tss { for j := range tss {
v := tss[j].Values[n] v := tss[j].Values[n]
if !math.IsNaN(v) { if !math.IsNaN(v) {
ad := math.Abs(v - median) ad := math.Abs(v - median)
h.Update(ad) values = append(values, ad)
} }
} }
mads[n] = h.Quantile(0.5) mads[n] = quantile(0.5, values)
} }
histogram.PutFast(h) a.A = values
putFloat64s(a)
return mads return mads
} }
@ -1043,24 +1049,24 @@ func aggrFuncQuantiles(afa *aggrFuncArg) ([]*timeseries, error) {
tssDst[j] = ts tssDst[j] = ts
} }
var qs []float64 b := getFloat64s()
values := float64sPool.Get().(*float64s) qs := b.A
a := getFloat64s()
values := a.A
for n := range tss[0].Values { for n := range tss[0].Values {
values.A = values.A[:0] values = values[:0]
for j := range tss { for j := range tss {
v := tss[j].Values[n] values = append(values, tss[j].Values[n])
if math.IsNaN(v) {
continue
}
values.A = append(values.A, v)
} }
sort.Float64s(values.A) qs = quantiles(qs[:0], phis, values)
qs = quantilesSorted(phis, values.A)
for j := range tssDst { for j := range tssDst {
tssDst[j].Values[n] = qs[j] tssDst[j].Values[n] = qs[j]
} }
} }
float64sPool.Put(values) a.A = values
putFloat64s(a)
b.A = qs
putFloat64s(b)
return tssDst return tssDst
} }
return aggrFuncExt(afe, argOrig, &afa.ae.Modifier, afa.ae.Limit, false) return aggrFuncExt(afe, argOrig, &afa.ae.Modifier, afa.ae.Limit, false)
@ -1092,20 +1098,17 @@ func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) {
func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
dst := tss[0] dst := tss[0]
var values []float64 a := getFloat64s()
values := a.A
for n := range dst.Values { for n := range dst.Values {
values = values[:0] values = values[:0]
for j := range tss { for j := range tss {
v := tss[j].Values[n] values = append(values, tss[j].Values[n])
if math.IsNaN(v) {
continue
}
values = append(values, v)
} }
phi := phis[n] dst.Values[n] = quantile(phis[n], values)
sort.Float64s(values)
dst.Values[n] = quantileSorted(phi, values)
} }
a.A = values
putFloat64s(a)
tss[0] = dst tss[0] = dst
return tss[:1] return tss[:1]
} }

View File

@ -12,7 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql" "github.com/VictoriaMetrics/metricsql"
"github.com/valyala/histogram"
) )
var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+ var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
@ -643,18 +642,20 @@ func getScrapeInterval(timestamps []int64) int64 {
} }
// Estimate scrape interval as 0.6 quantile for the first 20 intervals. // Estimate scrape interval as 0.6 quantile for the first 20 intervals.
h := histogram.GetFast()
tsPrev := timestamps[0] tsPrev := timestamps[0]
timestamps = timestamps[1:] timestamps = timestamps[1:]
if len(timestamps) > 20 { if len(timestamps) > 20 {
timestamps = timestamps[:20] timestamps = timestamps[:20]
} }
a := getFloat64s()
intervals := a.A[:0]
for _, ts := range timestamps { for _, ts := range timestamps {
h.Update(float64(ts - tsPrev)) intervals = append(intervals, float64(ts-tsPrev))
tsPrev = ts tsPrev = ts
} }
scrapeInterval := int64(h.Quantile(0.6)) scrapeInterval := int64(quantile(0.6, intervals))
histogram.PutFast(h) a.A = intervals
putFloat64s(a)
if scrapeInterval <= 0 { if scrapeInterval <= 0 {
return int64(maxSilenceInterval) return int64(maxSilenceInterval)
} }
@ -1066,13 +1067,15 @@ func newRollupQuantiles(args []interface{}) (rollupFunc, error) {
// Fast path - only a single value. // Fast path - only a single value.
return values[0] return values[0]
} }
qs := quantiles(phis, values) qs := getFloat64s()
qs.A = quantiles(qs.A[:0], phis, values)
idx := rfa.idx idx := rfa.idx
tsm := rfa.tsm tsm := rfa.tsm
for i, phiStr := range phiStrs { for i, phiStr := range phiStrs {
ts := tsm.GetOrCreateTimeseries(phiLabel, phiStr) ts := tsm.GetOrCreateTimeseries(phiLabel, phiStr)
ts.Values[idx] = qs[i] ts.Values[idx] = qs.A[i]
} }
putFloat64s(qs)
return nan return nan
} }
return rf, nil return rf, nil
@ -1772,19 +1775,28 @@ func rollupModeOverTime(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
// Copy rfa.values to a.A, since modeNoNaNs modifies a.A contents. // Copy rfa.values to a.A, since modeNoNaNs modifies a.A contents.
a := float64sPool.Get().(*float64s) a := getFloat64s()
a.A = append(a.A[:0], rfa.values...) a.A = append(a.A[:0], rfa.values...)
result := modeNoNaNs(rfa.prevValue, a.A) result := modeNoNaNs(rfa.prevValue, a.A)
float64sPool.Put(a) putFloat64s(a)
return result return result
} }
var float64sPool = &sync.Pool{ func getFloat64s() *float64s {
New: func() interface{} { v := float64sPool.Get()
return &float64s{} if v == nil {
}, v = &float64s{}
}
return v.(*float64s)
} }
func putFloat64s(a *float64s) {
a.A = a.A[:0]
float64sPool.Put(a)
}
var float64sPool sync.Pool
type float64s struct { type float64s struct {
A []float64 A []float64
} }

View File

@ -1177,7 +1177,8 @@ func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
} }
phi := phis[0] phi := phis[0]
rvs := args[1] rvs := args[1]
var values []float64 a := getFloat64s()
values := a.A[:0]
for _, ts := range rvs { for _, ts := range rvs {
lastIdx := -1 lastIdx := -1
originValues := ts.Values originValues := ts.Values
@ -1194,6 +1195,8 @@ func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
originValues[lastIdx] = quantileSorted(phi, values) originValues[lastIdx] = quantileSorted(phi, values)
} }
} }
a.A = values
putFloat64s(a)
setLastValues(rvs) setLastValues(rvs)
return rvs, nil return rvs, nil
} }

View File

@ -6,6 +6,9 @@ sort: 15
## tip ## tip
* FEATURE: vmagent [enterprise](https://victoriametrics.com/enterprise.html): add support for data reading from [Apache Kafka](https://kafka.apache.org/).
* FEATURE: calculate quantiles in the same way as Prometheus does in such functions as [quantile_over_time](https://docs.victoriametrics.com/MetricsQL.html#quantile_over_time) and [quantile](https://docs.victoriametrics.com/MetricsQL.html#quantile). Previously results from VictoriaMetrics could be slightly different than results from Prometheus. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612) issues.
* BUGFIX: align behavior of the queries `a or on (labels) b`, `a and on (labels) b` and `a unless on (labels) b` where `b` has multiple time series with the given `labels` to Prometheus behavior. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1643). * BUGFIX: align behavior of the queries `a or on (labels) b`, `a and on (labels) b` and `a unless on (labels) b` where `b` has multiple time series with the given `labels` to Prometheus behavior. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1643).