mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 22:32:20 +01:00
app/vmselect/promql: properly calculate rate
on the first data point
It is calculated as `value / scrape_interval`, since the value was missing on the previous scrape, i.e. we can assume its value was 0 at this time.
This commit is contained in:
parent
97de50dd4c
commit
9a2554691c
@ -25,7 +25,7 @@ var rollupFuncs = map[string]newRollupFunc{
|
|||||||
"increase": newRollupFuncOneArg(rollupIncrease), // + rollupFuncsRemoveCounterResets
|
"increase": newRollupFuncOneArg(rollupIncrease), // + rollupFuncsRemoveCounterResets
|
||||||
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets
|
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets
|
||||||
"predict_linear": newRollupPredictLinear,
|
"predict_linear": newRollupPredictLinear,
|
||||||
"rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets
|
"rate": newRollupFuncOneArg(rollupDerivIncrease), // + rollupFuncsRemoveCounterResets
|
||||||
"resets": newRollupFuncOneArg(rollupResets),
|
"resets": newRollupFuncOneArg(rollupResets),
|
||||||
"avg_over_time": newRollupFuncOneArg(rollupAvg),
|
"avg_over_time": newRollupFuncOneArg(rollupAvg),
|
||||||
"min_over_time": newRollupFuncOneArg(rollupMin),
|
"min_over_time": newRollupFuncOneArg(rollupMin),
|
||||||
@ -116,7 +116,13 @@ type rollupFuncArg struct {
|
|||||||
currTimestamp int64
|
currTimestamp int64
|
||||||
idx int
|
idx int
|
||||||
step int64
|
step int64
|
||||||
|
|
||||||
|
// Real previous value even if it is located too far from the current window.
|
||||||
|
// It matches prevValue if prevValue is not nan.
|
||||||
realPrevValue float64
|
realPrevValue float64
|
||||||
|
|
||||||
|
// Global scrape interval across all the data points in [Start...End] time range.
|
||||||
|
scrapeInterval int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rfa *rollupFuncArg) reset() {
|
func (rfa *rollupFuncArg) reset() {
|
||||||
@ -128,6 +134,7 @@ func (rfa *rollupFuncArg) reset() {
|
|||||||
rfa.idx = 0
|
rfa.idx = 0
|
||||||
rfa.step = 0
|
rfa.step = 0
|
||||||
rfa.realPrevValue = nan
|
rfa.realPrevValue = nan
|
||||||
|
rfa.scrapeInterval = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// rollupFunc must return rollup value for the given rfa.
|
// rollupFunc must return rollup value for the given rfa.
|
||||||
@ -192,7 +199,8 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
|
|||||||
// Extend dstValues in order to remove mallocs below.
|
// Extend dstValues in order to remove mallocs below.
|
||||||
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
|
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
|
||||||
|
|
||||||
maxPrevInterval := getMaxPrevInterval(timestamps)
|
scrapeInterval := getScrapeInterval(timestamps)
|
||||||
|
maxPrevInterval := getMaxPrevInterval(scrapeInterval)
|
||||||
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
|
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
|
||||||
maxPrevInterval = rc.LookbackDelta
|
maxPrevInterval = rc.LookbackDelta
|
||||||
}
|
}
|
||||||
@ -207,6 +215,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
|
|||||||
rfa.idx = 0
|
rfa.idx = 0
|
||||||
rfa.step = rc.Step
|
rfa.step = rc.Step
|
||||||
rfa.realPrevValue = nan
|
rfa.realPrevValue = nan
|
||||||
|
rfa.scrapeInterval = scrapeInterval
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
j := 0
|
j := 0
|
||||||
@ -296,7 +305,7 @@ func binarySearchInt64(a []int64, v int64) uint {
|
|||||||
return i
|
return i
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMaxPrevInterval(timestamps []int64) int64 {
|
func getScrapeInterval(timestamps []int64) int64 {
|
||||||
if len(timestamps) < 2 {
|
if len(timestamps) < 2 {
|
||||||
return int64(maxSilenceInterval)
|
return int64(maxSilenceInterval)
|
||||||
}
|
}
|
||||||
@ -312,30 +321,34 @@ func getMaxPrevInterval(timestamps []int64) int64 {
|
|||||||
h.Update(float64(ts - tsPrev))
|
h.Update(float64(ts - tsPrev))
|
||||||
tsPrev = ts
|
tsPrev = ts
|
||||||
}
|
}
|
||||||
d := int64(h.Quantile(0.6))
|
scrapeInterval := int64(h.Quantile(0.6))
|
||||||
histogram.PutFast(h)
|
histogram.PutFast(h)
|
||||||
if d <= 0 {
|
if scrapeInterval <= 0 {
|
||||||
return int64(maxSilenceInterval)
|
return int64(maxSilenceInterval)
|
||||||
}
|
}
|
||||||
// Increase d more for smaller scrape intervals in order to hide possible gaps
|
return scrapeInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMaxPrevInterval(scrapeInterval int64) int64 {
|
||||||
|
// Increase scrapeInterval more for smaller scrape intervals in order to hide possible gaps
|
||||||
// when high jitter is present.
|
// when high jitter is present.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/139 .
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/139 .
|
||||||
if d <= 2*1000 {
|
if scrapeInterval <= 2*1000 {
|
||||||
return d + 4*d
|
return scrapeInterval + 4*scrapeInterval
|
||||||
}
|
}
|
||||||
if d <= 4*1000 {
|
if scrapeInterval <= 4*1000 {
|
||||||
return d + 2*d
|
return scrapeInterval + 2*scrapeInterval
|
||||||
}
|
}
|
||||||
if d <= 8*1000 {
|
if scrapeInterval <= 8*1000 {
|
||||||
return d + d
|
return scrapeInterval + scrapeInterval
|
||||||
}
|
}
|
||||||
if d <= 16*1000 {
|
if scrapeInterval <= 16*1000 {
|
||||||
return d + d/2
|
return scrapeInterval + scrapeInterval/2
|
||||||
}
|
}
|
||||||
if d <= 32*1000 {
|
if scrapeInterval <= 32*1000 {
|
||||||
return d + d/4
|
return scrapeInterval + scrapeInterval/4
|
||||||
}
|
}
|
||||||
return d + d/8
|
return scrapeInterval + scrapeInterval/8
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeCounterResets(values []float64) {
|
func removeCounterResets(values []float64) {
|
||||||
@ -766,6 +779,14 @@ func rollupDerivSlow(rfa *rollupFuncArg) float64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func rollupDerivFast(rfa *rollupFuncArg) float64 {
|
func rollupDerivFast(rfa *rollupFuncArg) float64 {
|
||||||
|
return rollupDerivFastInternal(rfa, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func rollupDerivIncrease(rfa *rollupFuncArg) float64 {
|
||||||
|
return rollupDerivFastInternal(rfa, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func rollupDerivFastInternal(rfa *rollupFuncArg, canUseRealPrevValue bool) float64 {
|
||||||
// There is no need in handling NaNs here, since they must be cleaned up
|
// There is no need in handling NaNs here, since they must be cleaned up
|
||||||
// before calling rollup funcs.
|
// before calling rollup funcs.
|
||||||
values := rfa.values
|
values := rfa.values
|
||||||
@ -773,10 +794,18 @@ func rollupDerivFast(rfa *rollupFuncArg) float64 {
|
|||||||
prevValue := rfa.prevValue
|
prevValue := rfa.prevValue
|
||||||
prevTimestamp := rfa.prevTimestamp
|
prevTimestamp := rfa.prevTimestamp
|
||||||
if math.IsNaN(prevValue) {
|
if math.IsNaN(prevValue) {
|
||||||
if len(values) < 2 {
|
if len(values) == 0 {
|
||||||
// It is impossible to calculate derivative on 0 or 1 values.
|
|
||||||
return nan
|
return nan
|
||||||
}
|
}
|
||||||
|
if len(values) == 1 {
|
||||||
|
// Assume that the value changed from 0 to the current value during rfa.scrapeInterval.
|
||||||
|
delta := values[0]
|
||||||
|
if canUseRealPrevValue && !math.IsNaN(rfa.realPrevValue) {
|
||||||
|
// Fix against removeCounterResets.
|
||||||
|
delta -= rfa.realPrevValue
|
||||||
|
}
|
||||||
|
return float64(delta) / float64(rfa.scrapeInterval)
|
||||||
|
}
|
||||||
prevValue = values[0]
|
prevValue = values[0]
|
||||||
prevTimestamp = timestamps[0]
|
prevTimestamp = timestamps[0]
|
||||||
values = values[1:]
|
values = values[1:]
|
||||||
|
@ -772,6 +772,20 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
|||||||
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||||
})
|
})
|
||||||
|
t.Run("deriv_fast", func(t *testing.T) {
|
||||||
|
rc := rollupConfig{
|
||||||
|
Func: rollupDerivFast,
|
||||||
|
Start: 0,
|
||||||
|
End: 20,
|
||||||
|
Step: 4,
|
||||||
|
Window: 0,
|
||||||
|
}
|
||||||
|
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||||
|
values := rc.Do(nil, testValues, testTimestamps)
|
||||||
|
valuesExpected := []float64{nan, nan, 10.25, 0, -8900, 0}
|
||||||
|
timestampsExpected := []int64{0, 4, 8, 12, 16, 20}
|
||||||
|
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||||
|
})
|
||||||
t.Run("ideriv", func(t *testing.T) {
|
t.Run("ideriv", func(t *testing.T) {
|
||||||
rc := rollupConfig{
|
rc := rollupConfig{
|
||||||
Func: rollupIderiv,
|
Func: rollupIderiv,
|
||||||
|
Loading…
Reference in New Issue
Block a user