From 4f7116d1ee6de056712e5eb2ffd9fdff85adbc84 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 4 Feb 2020 22:42:10 +0200 Subject: [PATCH] app/vmselect/promql: adjust rollup_candlestick calculations to the exepcted results Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309 --- app/vmselect/promql/exec_test.go | 10 ++--- app/vmselect/promql/rollup.go | 76 ++++++++++++++++++++++++++++++-- 2 files changed, 77 insertions(+), 9 deletions(-) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index b8365d330f..cc0e85fa80 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -4708,25 +4708,25 @@ func TestExecSuccess(t *testing.T) { }} r2 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{0.85, 0.15, 0.43, 0.76, 0.47, 0.21}, + Values: []float64{0.1, 0.04, 0.49, 0.46, 0.57, 0.92}, Timestamps: timestampsExpected, } r2.MetricName.Tags = []storage.Tag{{ Key: []byte("rollup"), - Value: []byte("open"), + Value: []byte("close"), }} r3 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{0.32, 0.82, 0.13, 0.28, 0.86, 0.57}, + Values: []float64{0.9, 0.32, 0.82, 0.13, 0.28, 0.86}, Timestamps: timestampsExpected, } r3.MetricName.Tags = []storage.Tag{{ Key: []byte("rollup"), - Value: []byte("close"), + Value: []byte("open"), }} r4 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{0.85, 0.94, 0.97, 0.93, 0.98, 0.92}, + Values: []float64{0.9, 0.94, 0.97, 0.93, 0.98, 0.92}, Timestamps: timestampsExpected, } r4.MetricName.Tags = []storage.Tag{{ diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 0175b734cc..ca9a988ec4 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -259,10 +259,10 @@ func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, en } rcs = appendRollupConfigs(rcs) case "rollup_candlestick": - rcs = append(rcs, newRollupConfig(rollupFirst, "open")) - rcs = append(rcs, newRollupConfig(rollupLast, "close")) - rcs = append(rcs, newRollupConfig(rollupMin, "low")) - rcs = append(rcs, newRollupConfig(rollupMax, "high")) + rcs = append(rcs, newRollupConfig(rollupOpen, "open")) + rcs = append(rcs, newRollupConfig(rollupClose, "close")) + rcs = append(rcs, newRollupConfig(rollupLow, "low")) + rcs = append(rcs, newRollupConfig(rollupHigh, "high")) case "aggr_over_time": aggrFuncNames, err := getRollupAggrFuncNames(expr) if err != nil { @@ -1419,6 +1419,74 @@ func rollupResets(rfa *rollupFuncArg) float64 { return float64(n) } +// getCandlestickValues returns a subset of rfa.values suitable for rollup_candlestick +// +// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309 for details. +func getCandlestickValues(rfa *rollupFuncArg) []float64 { + currTimestamp := rfa.currTimestamp + timestamps := rfa.timestamps + for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= currTimestamp { + timestamps = timestamps[:len(timestamps)-1] + } + if len(timestamps) == 0 { + return nil + } + return rfa.values[:len(timestamps)] +} + +func rollupOpen(rfa *rollupFuncArg) float64 { + if !math.IsNaN(rfa.prevValue) { + return rfa.prevValue + } + values := getCandlestickValues(rfa) + if len(values) == 0 { + return nan + } + return values[0] +} + +func rollupClose(rfa *rollupFuncArg) float64 { + values := getCandlestickValues(rfa) + if len(values) == 0 { + return rfa.prevValue + } + return values[len(values)-1] +} + +func rollupHigh(rfa *rollupFuncArg) float64 { + values := getCandlestickValues(rfa) + max := rfa.prevValue + if math.IsNaN(max) { + if len(values) == 0 { + return nan + } + max = values[0] + } + for _, v := range values { + if v > max { + max = v + } + } + return max +} + +func rollupLow(rfa *rollupFuncArg) float64 { + values := getCandlestickValues(rfa) + min := rfa.prevValue + if math.IsNaN(min) { + if len(values) == 0 { + return nan + } + min = values[0] + } + for _, v := range values { + if v < min { + min = v + } + } + return min +} + func rollupFirst(rfa *rollupFuncArg) float64 { // There is no need in handling NaNs here, since they must be cleaned up // before calling rollup funcs.