app/vmselect/promql: return non-empty value from rate_over_sum(m[d]) even if a single data point is located in the given [d] window

Just divide the data point value by the window duration in this case.
This commit is contained in:
Aliaksandr Valialkin 2020-07-29 12:36:10 +03:00
parent 717c554fb0
commit 338ee47d60
2 changed files with 12 additions and 18 deletions

View File

@ -325,7 +325,7 @@ type rollupFuncArg struct {
currTimestamp int64 currTimestamp int64
idx int idx int
step int64 window int64
tsm *timeseriesMap tsm *timeseriesMap
} }
@ -337,7 +337,7 @@ func (rfa *rollupFuncArg) reset() {
rfa.timestamps = nil rfa.timestamps = nil
rfa.currTimestamp = 0 rfa.currTimestamp = 0
rfa.idx = 0 rfa.idx = 0
rfa.step = 0 rfa.window = 0
rfa.tsm = nil rfa.tsm = nil
} }
@ -481,7 +481,7 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
} }
rfa := getRollupFuncArg() rfa := getRollupFuncArg()
rfa.idx = 0 rfa.idx = 0
rfa.step = rc.Step rfa.window = window
rfa.tsm = tsm rfa.tsm = tsm
i := 0 i := 0
@ -1080,17 +1080,7 @@ func rollupSum(rfa *rollupFuncArg) float64 {
func rollupRateOverSum(rfa *rollupFuncArg) float64 { func rollupRateOverSum(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up // There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps timestamps := rfa.timestamps
prevTimestamp := rfa.prevTimestamp
if math.IsNaN(rfa.prevValue) {
if len(timestamps) == 0 {
return nan
}
prevTimestamp = timestamps[0]
values = values[1:]
timestamps = timestamps[1:]
}
if len(timestamps) == 0 { if len(timestamps) == 0 {
if math.IsNaN(rfa.prevValue) { if math.IsNaN(rfa.prevValue) {
return nan return nan
@ -1098,11 +1088,14 @@ func rollupRateOverSum(rfa *rollupFuncArg) float64 {
// Assume that the value didn't change since rfa.prevValue. // Assume that the value didn't change since rfa.prevValue.
return 0 return 0
} }
dt := rfa.window
if !math.IsNaN(rfa.prevValue) {
dt = timestamps[len(timestamps)-1] - rfa.prevTimestamp
}
sum := float64(0) sum := float64(0)
for _, v := range values { for _, v := range rfa.values {
sum += v sum += v
} }
dt := timestamps[len(timestamps)-1] - prevTimestamp
return sum / (float64(dt) / 1e3) return sum / (float64(dt) / 1e3)
} }
@ -1486,7 +1479,7 @@ func getCandlestickValues(rfa *rollupFuncArg) []float64 {
} }
func getFirstValueForCandlestick(rfa *rollupFuncArg) float64 { func getFirstValueForCandlestick(rfa *rollupFuncArg) float64 {
if rfa.prevTimestamp+rfa.step >= rfa.currTimestamp { if rfa.prevTimestamp+rfa.window >= rfa.currTimestamp {
return rfa.prevValue return rfa.prevValue
} }
return nan return nan

View File

@ -174,6 +174,7 @@ func testRollupFunc(t *testing.T, funcName string, args []interface{}, meExpecte
rfa.prevTimestamp = 0 rfa.prevTimestamp = 0
rfa.values = append(rfa.values, testValues...) rfa.values = append(rfa.values, testValues...)
rfa.timestamps = append(rfa.timestamps, testTimestamps...) rfa.timestamps = append(rfa.timestamps, testTimestamps...)
rfa.window = rfa.timestamps[len(rfa.timestamps)-1] - rfa.timestamps[0]
if rollupFuncsRemoveCounterResets[funcName] { if rollupFuncsRemoveCounterResets[funcName] {
removeCounterResets(rfa.values) removeCounterResets(rfa.values)
} }
@ -393,7 +394,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
f("descent_over_time", 231) f("descent_over_time", 231)
f("timestamp", 0.13) f("timestamp", 0.13)
f("mode_over_time", 34) f("mode_over_time", 34)
f("rate_over_sum", 3536) f("rate_over_sum", 4520)
} }
func TestRollupNewRollupFuncError(t *testing.T) { func TestRollupNewRollupFuncError(t *testing.T) {
@ -978,7 +979,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 2870.967741935484, 3240, 4059.523809523809, 6200} valuesExpected := []float64{nan, 1262.5, 3187.5, 4059.523809523809, 6200}
timestampsExpected := []int64{0, 40, 80, 120, 160} timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })