mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
app/vmselect/promql: add lag(q[d])
function, which returns the lag between the current timestamp and the timstamp for the last data point in q
This commit is contained in:
parent
bd065aad5e
commit
4a8251feff
@ -3720,6 +3720,17 @@ func TestExecSuccess(t *testing.T) {
|
|||||||
resultExpected := []netstorage.Result{r}
|
resultExpected := []netstorage.Result{r}
|
||||||
f(q, resultExpected)
|
f(q, resultExpected)
|
||||||
})
|
})
|
||||||
|
t.Run(`lag()`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `lag(time()[60s:17s])`
|
||||||
|
r := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{14, 10, 6, 2, 15, 11},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
resultExpected := []netstorage.Result{r}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
t.Run(`()`, func(t *testing.T) {
|
t.Run(`()`, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := `()`
|
q := `()`
|
||||||
@ -4189,6 +4200,8 @@ func TestExecError(t *testing.T) {
|
|||||||
f(`alias()`)
|
f(`alias()`)
|
||||||
f(`alias(1)`)
|
f(`alias(1)`)
|
||||||
f(`alias(1, "foo", "bar")`)
|
f(`alias(1, "foo", "bar")`)
|
||||||
|
f(`lifetime()`)
|
||||||
|
f(`lag()`)
|
||||||
|
|
||||||
// Invalid argument type
|
// Invalid argument type
|
||||||
f(`median_over_time({}, 2)`)
|
f(`median_over_time({}, 2)`)
|
||||||
|
@ -48,6 +48,7 @@ var rollupFuncs = map[string]newRollupFunc{
|
|||||||
"integrate": newRollupFuncOneArg(rollupIntegrate),
|
"integrate": newRollupFuncOneArg(rollupIntegrate),
|
||||||
"ideriv": newRollupFuncOneArg(rollupIderiv),
|
"ideriv": newRollupFuncOneArg(rollupIderiv),
|
||||||
"lifetime": newRollupFuncOneArg(rollupLifetime),
|
"lifetime": newRollupFuncOneArg(rollupLifetime),
|
||||||
|
"lag": newRollupFuncOneArg(rollupLag),
|
||||||
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
|
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
|
||||||
"rollup": newRollupFuncOneArg(rollupFake),
|
"rollup": newRollupFuncOneArg(rollupFake),
|
||||||
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
||||||
@ -113,6 +114,7 @@ type rollupFuncArg struct {
|
|||||||
values []float64
|
values []float64
|
||||||
timestamps []int64
|
timestamps []int64
|
||||||
|
|
||||||
|
currTimestamp int64
|
||||||
idx int
|
idx int
|
||||||
step int64
|
step int64
|
||||||
}
|
}
|
||||||
@ -122,6 +124,7 @@ func (rfa *rollupFuncArg) reset() {
|
|||||||
rfa.prevTimestamp = 0
|
rfa.prevTimestamp = 0
|
||||||
rfa.values = nil
|
rfa.values = nil
|
||||||
rfa.timestamps = nil
|
rfa.timestamps = nil
|
||||||
|
rfa.currTimestamp = 0
|
||||||
rfa.idx = 0
|
rfa.idx = 0
|
||||||
rfa.step = 0
|
rfa.step = 0
|
||||||
}
|
}
|
||||||
@ -226,6 +229,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
|
|||||||
|
|
||||||
rfa.values = values[i:j]
|
rfa.values = values[i:j]
|
||||||
rfa.timestamps = timestamps[i:j]
|
rfa.timestamps = timestamps[i:j]
|
||||||
|
rfa.currTimestamp = tEnd
|
||||||
value := rc.Func(rfa)
|
value := rc.Func(rfa)
|
||||||
rfa.idx++
|
rfa.idx++
|
||||||
dstValues = append(dstValues, value)
|
dstValues = append(dstValues, value)
|
||||||
@ -799,6 +803,18 @@ func rollupLifetime(rfa *rollupFuncArg) float64 {
|
|||||||
return float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) * 1e-3
|
return float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) * 1e-3
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func rollupLag(rfa *rollupFuncArg) float64 {
|
||||||
|
// Calculate the duration between the current timestamp and the last data point.
|
||||||
|
timestamps := rfa.timestamps
|
||||||
|
if len(timestamps) == 0 {
|
||||||
|
if math.IsNaN(rfa.prevValue) {
|
||||||
|
return nan
|
||||||
|
}
|
||||||
|
return float64(rfa.currTimestamp-rfa.prevTimestamp) * 1e-3
|
||||||
|
}
|
||||||
|
return float64(rfa.currTimestamp-timestamps[len(timestamps)-1]) * 1e-3
|
||||||
|
}
|
||||||
|
|
||||||
func rollupScrapeInterval(rfa *rollupFuncArg) float64 {
|
func rollupScrapeInterval(rfa *rollupFuncArg) float64 {
|
||||||
// Calculate the average interval between data points.
|
// Calculate the average interval between data points.
|
||||||
timestamps := rfa.timestamps
|
timestamps := rfa.timestamps
|
||||||
|
@ -632,6 +632,20 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
|||||||
timestampsExpected := []int64{10, 50, 90, 130}
|
timestampsExpected := []int64{10, 50, 90, 130}
|
||||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||||
})
|
})
|
||||||
|
t.Run("lag", func(t *testing.T) {
|
||||||
|
rc := rollupConfig{
|
||||||
|
Func: rollupLag,
|
||||||
|
Start: 0,
|
||||||
|
End: 160,
|
||||||
|
Step: 40,
|
||||||
|
Window: 0,
|
||||||
|
}
|
||||||
|
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||||
|
values := rc.Do(nil, testValues, testTimestamps)
|
||||||
|
valuesExpected := []float64{nan, 0.004, 0, 0, 0.03}
|
||||||
|
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||||
|
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||||
|
})
|
||||||
t.Run("lifetime_1", func(t *testing.T) {
|
t.Run("lifetime_1", func(t *testing.T) {
|
||||||
rc := rollupConfig{
|
rc := rollupConfig{
|
||||||
Func: rollupLifetime,
|
Func: rollupLifetime,
|
||||||
|
Loading…
Reference in New Issue
Block a user