From eff31c10eced336b01ca23e713a64c8f690f50c9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 27 Sep 2021 18:55:35 +0300 Subject: [PATCH] app/vmselect/promql: follow-up after 526dd93b3272ad6182bb563f06b6824ced159ec1 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612 --- app/vmselect/promql/aggr.go | 121 ++++++++++++++++--------------- app/vmselect/promql/rollup.go | 38 ++++++---- app/vmselect/promql/transform.go | 5 +- docs/CHANGELOG.md | 3 + 4 files changed, 94 insertions(+), 73 deletions(-) diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index 6288e03695..661d547a8d 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -2,15 +2,15 @@ package promql import ( "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/metricsql" - "github.com/valyala/histogram" "math" "sort" "strconv" "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metricsql" ) var aggrFuncs = map[string]aggrFunc{ @@ -803,36 +803,25 @@ func medianValue(values []float64) float64 { return quantile(0.5, values) } -// quantiles calculates the given phis from originValues -// without modifying originValues -func quantiles(phis []float64, originValues []float64) []float64 { - a := float64sPool.Get().(*float64s) +// quantiles calculates the given phis from originValues without modifying originValues, appends them to qs and returns the result. +func quantiles(qs, phis []float64, originValues []float64) []float64 { + a := getFloat64s() a.A = prepareForQuantileFloat64(a.A[:0], originValues) - res := quantilesSorted(phis, a.A) - float64sPool.Put(a) - return res + qs = quantilesSorted(qs, phis, a.A) + putFloat64s(a) + return qs } -func quantilesSorted(phis []float64, values []float64) []float64 { - res := make([]float64, len(phis)) - for i, phi := range phis { - res[i] = quantileSorted(phi, values) - } - return res -} - -// quantile calculates the given phi from originValues -// without modifying originValues +// quantile calculates the given phi from originValues without modifying originValues func quantile(phi float64, originValues []float64) float64 { - a := float64sPool.Get().(*float64s) + a := getFloat64s() a.A = prepareForQuantileFloat64(a.A[:0], originValues) - res := quantileSorted(phi, a.A) - float64sPool.Put(a) - return res + q := quantileSorted(phi, a.A) + putFloat64s(a) + return q } -// prepareForQuantileFloat64 copies items from src -// to dst but removes NaNs and sorts the dst +// prepareForQuantileFloat64 copies items from src to dst but removes NaNs and sorts the dst func prepareForQuantileFloat64(dst, src []float64) []float64 { for _, v := range src { if math.IsNaN(v) { @@ -844,7 +833,20 @@ func prepareForQuantileFloat64(dst, src []float64) []float64 { return dst } -// quantileSorted calculates the given quantile of a sorted list of values. +// quantilesSorted calculates the given phis over a sorted list of values, appends them to qs and returns the result. +// +// It is expected that values won't contain NaN items. +// The implementation mimics Prometheus implementation for compatibility's sake. +func quantilesSorted(qs, phis []float64, values []float64) []float64 { + for _, phi := range phis { + q := quantileSorted(phi, values) + qs = append(qs, q) + } + return qs +} + +// quantileSorted calculates the given quantile over a sorted list of values. +// // It is expected that values won't contain NaN items. // The implementation mimics Prometheus implementation for compatibility's sake. func quantileSorted(phi float64, values []float64) float64 { @@ -944,36 +946,40 @@ func getPerPointMedians(tss []*timeseries) []float64 { logger.Panicf("BUG: expecting non-empty tss") } medians := make([]float64, len(tss[0].Values)) - h := histogram.GetFast() + a := getFloat64s() + values := a.A for n := range medians { - h.Reset() + values = values[:0] for j := range tss { v := tss[j].Values[n] if !math.IsNaN(v) { - h.Update(v) + values = append(values, v) } } - medians[n] = h.Quantile(0.5) + medians[n] = quantile(0.5, values) } - histogram.PutFast(h) + a.A = values + putFloat64s(a) return medians } func getPerPointMADs(tss []*timeseries, medians []float64) []float64 { mads := make([]float64, len(medians)) - h := histogram.GetFast() + a := getFloat64s() + values := a.A for n, median := range medians { - h.Reset() + values = values[:0] for j := range tss { v := tss[j].Values[n] if !math.IsNaN(v) { ad := math.Abs(v - median) - h.Update(ad) + values = append(values, ad) } } - mads[n] = h.Quantile(0.5) + mads[n] = quantile(0.5, values) } - histogram.PutFast(h) + a.A = values + putFloat64s(a) return mads } @@ -1043,24 +1049,24 @@ func aggrFuncQuantiles(afa *aggrFuncArg) ([]*timeseries, error) { tssDst[j] = ts } - var qs []float64 - values := float64sPool.Get().(*float64s) + b := getFloat64s() + qs := b.A + a := getFloat64s() + values := a.A for n := range tss[0].Values { - values.A = values.A[:0] + values = values[:0] for j := range tss { - v := tss[j].Values[n] - if math.IsNaN(v) { - continue - } - values.A = append(values.A, v) + values = append(values, tss[j].Values[n]) } - sort.Float64s(values.A) - qs = quantilesSorted(phis, values.A) + qs = quantiles(qs[:0], phis, values) for j := range tssDst { tssDst[j].Values[n] = qs[j] } } - float64sPool.Put(values) + a.A = values + putFloat64s(a) + b.A = qs + putFloat64s(b) return tssDst } return aggrFuncExt(afe, argOrig, &afa.ae.Modifier, afa.ae.Limit, false) @@ -1092,20 +1098,17 @@ func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) { func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { dst := tss[0] - var values []float64 + a := getFloat64s() + values := a.A for n := range dst.Values { values = values[:0] for j := range tss { - v := tss[j].Values[n] - if math.IsNaN(v) { - continue - } - values = append(values, v) + values = append(values, tss[j].Values[n]) } - phi := phis[n] - sort.Float64s(values) - dst.Values[n] = quantileSorted(phi, values) + dst.Values[n] = quantile(phis[n], values) } + a.A = values + putFloat64s(a) tss[0] = dst return tss[:1] } diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index f871c2a582..9203e65196 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" - "github.com/valyala/histogram" ) var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+ @@ -643,18 +642,20 @@ func getScrapeInterval(timestamps []int64) int64 { } // Estimate scrape interval as 0.6 quantile for the first 20 intervals. - h := histogram.GetFast() tsPrev := timestamps[0] timestamps = timestamps[1:] if len(timestamps) > 20 { timestamps = timestamps[:20] } + a := getFloat64s() + intervals := a.A[:0] for _, ts := range timestamps { - h.Update(float64(ts - tsPrev)) + intervals = append(intervals, float64(ts-tsPrev)) tsPrev = ts } - scrapeInterval := int64(h.Quantile(0.6)) - histogram.PutFast(h) + scrapeInterval := int64(quantile(0.6, intervals)) + a.A = intervals + putFloat64s(a) if scrapeInterval <= 0 { return int64(maxSilenceInterval) } @@ -1066,13 +1067,15 @@ func newRollupQuantiles(args []interface{}) (rollupFunc, error) { // Fast path - only a single value. return values[0] } - qs := quantiles(phis, values) + qs := getFloat64s() + qs.A = quantiles(qs.A[:0], phis, values) idx := rfa.idx tsm := rfa.tsm for i, phiStr := range phiStrs { ts := tsm.GetOrCreateTimeseries(phiLabel, phiStr) - ts.Values[idx] = qs[i] + ts.Values[idx] = qs.A[i] } + putFloat64s(qs) return nan } return rf, nil @@ -1772,19 +1775,28 @@ func rollupModeOverTime(rfa *rollupFuncArg) float64 { // before calling rollup funcs. // Copy rfa.values to a.A, since modeNoNaNs modifies a.A contents. - a := float64sPool.Get().(*float64s) + a := getFloat64s() a.A = append(a.A[:0], rfa.values...) result := modeNoNaNs(rfa.prevValue, a.A) - float64sPool.Put(a) + putFloat64s(a) return result } -var float64sPool = &sync.Pool{ - New: func() interface{} { - return &float64s{} - }, +func getFloat64s() *float64s { + v := float64sPool.Get() + if v == nil { + v = &float64s{} + } + return v.(*float64s) } +func putFloat64s(a *float64s) { + a.A = a.A[:0] + float64sPool.Put(a) +} + +var float64sPool sync.Pool + type float64s struct { A []float64 } diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index c4a3e11cf5..722db3bbb4 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -1177,7 +1177,8 @@ func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) { } phi := phis[0] rvs := args[1] - var values []float64 + a := getFloat64s() + values := a.A[:0] for _, ts := range rvs { lastIdx := -1 originValues := ts.Values @@ -1194,6 +1195,8 @@ func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) { originValues[lastIdx] = quantileSorted(phi, values) } } + a.A = values + putFloat64s(a) setLastValues(rvs) return rvs, nil } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 47c6c5a06d..5797e8db79 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,9 @@ sort: 15 ## tip +* FEATURE: vmagent [enterprise](https://victoriametrics.com/enterprise.html): add support for data reading from [Apache Kafka](https://kafka.apache.org/). +* FEATURE: calculate quantiles in the same way as Prometheus does in such functions as [quantile_over_time](https://docs.victoriametrics.com/MetricsQL.html#quantile_over_time) and [quantile](https://docs.victoriametrics.com/MetricsQL.html#quantile). Previously results from VictoriaMetrics could be slightly different than results from Prometheus. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612) issues. + * BUGFIX: align behavior of the queries `a or on (labels) b`, `a and on (labels) b` and `a unless on (labels) b` where `b` has multiple time series with the given `labels` to Prometheus behavior. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1643).