From 02e609b1410704e5c88e2446a58a0c674f5c5e24 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Tue, 30 Jan 2024 21:03:34 +0100 Subject: [PATCH] app/vmselect: set proper timestamp for cached instant responses (#5723) * app/vmselect: set proper timestamp for cached instant responses The change updates `getSumInstantValues` to prefer timestamp from the most recent results. Before, timestamp from cached series was used. The old behavior had negative impact on recording rules as they were getting responses with shifted timestamps in past. Subsequent recording or alerting rules fetching results of these recording rules could get no result due to staleness interval. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5659 Signed-off-by: hagen1778 * wip --------- Signed-off-by: hagen1778 Co-authored-by: Aliaksandr Valialkin --- app/vmselect/promql/eval.go | 39 +++++++++++------ app/vmselect/promql/eval_test.go | 75 ++++++++++++++++++++++++++++++++ docs/CHANGELOG.md | 1 + 3 files changed, 102 insertions(+), 13 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 1494b825ed..4398c59b6d 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -1287,7 +1287,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, return evalAt(qtChild, timestamp, window) } // Calculate the result - tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd) + tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp) if !ok { qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached") deleteCachedSeries(qtChild) @@ -1349,7 +1349,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, return evalAt(qtChild, timestamp, window) } // Calculate the result - tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd) + tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp) if !ok { qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached") deleteCachedSeries(qtChild) @@ -1414,7 +1414,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, return evalAt(qtChild, timestamp, window) } // Calculate the result - tss := getSumInstantValues(qtChild, tssCached, tssStart, tssEnd) + tss := getSumInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp) return tss, nil default: qt.Printf("instant rollup optimization isn't implemented for %s()", funcName) @@ -1441,8 +1441,8 @@ func hasDuplicateSeries(tss []*timeseries) bool { return false } -func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) { - qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) +func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) ([]*timeseries, bool) { + qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp) defer qt.Done() getMin := func(a, b float64) float64 { @@ -1451,13 +1451,13 @@ func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []* } return b } - tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMin) + tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, timestamp, getMin) qt.Printf("resulting series=%d; ok=%v", len(tss), ok) return tss, ok } -func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) { - qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) +func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) ([]*timeseries, bool) { + qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp) defer qt.Done() getMax := func(a, b float64) float64 { @@ -1466,12 +1466,12 @@ func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []* } return b } - tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMax) + tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, timestamp, getMax) qt.Printf("resulting series=%d", len(tss)) return tss, ok } -func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, f func(a, b float64) float64) ([]*timeseries, bool) { +func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, timestamp int64, f func(a, b float64) float64) ([]*timeseries, bool) { assertInstantValues(tssCached) assertInstantValues(tssStart) assertInstantValues(tssEnd) @@ -1522,12 +1522,16 @@ func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, f func(a, for _, ts := range m { rvs = append(rvs, ts) } + + setInstantTimestamp(rvs, timestamp) + return rvs, true } -// getSumInstantValues calculates tssCached + tssStart - tssEnd -func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) []*timeseries { - qt = qt.NewChild("calculate the sum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) +// getSumInstantValues aggregates tssCached, tssStart, tssEnd time series +// into a new time series with value = tssCached + tssStart - tssEnd +func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) []*timeseries { + qt = qt.NewChild("calculate the sum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp) defer qt.Done() assertInstantValues(tssCached) @@ -1572,10 +1576,19 @@ func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []* for _, ts := range m { rvs = append(rvs, ts) } + + setInstantTimestamp(rvs, timestamp) + qt.Printf("resulting series=%d", len(rvs)) return rvs } +func setInstantTimestamp(tss []*timeseries, timestamp int64) { + for _, ts := range tss { + ts.Timestamps[0] = timestamp + } +} + func assertInstantValues(tss []*timeseries) { for _, ts := range tss { if len(ts.Values) != 1 { diff --git a/app/vmselect/promql/eval_test.go b/app/vmselect/promql/eval_test.go index 30a1d9d71d..741ecde618 100644 --- a/app/vmselect/promql/eval_test.go +++ b/app/vmselect/promql/eval_test.go @@ -1,6 +1,7 @@ package promql import ( + "reflect" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" @@ -95,3 +96,77 @@ func TestQueryStats_addSeriesFetched(t *testing.T) { t.Fatalf("expected to get 4; got %d instead", qs.SeriesFetched) } } + +func TestGetSumInstantValues(t *testing.T) { + f := func(cached, start, end []*timeseries, timestamp int64, expectedResult []*timeseries) { + t.Helper() + + result := getSumInstantValues(nil, cached, start, end, timestamp) + if !reflect.DeepEqual(result, expectedResult) { + t.Errorf("unexpected result; got\n%v\nwant\n%v", result, expectedResult) + } + } + ts := func(name string, timestamp int64, value float64) *timeseries { + return ×eries{ + MetricName: storage.MetricName{ + MetricGroup: []byte(name), + }, + Timestamps: []int64{timestamp}, + Values: []float64{value}, + } + } + + // start - end + cached = 1 + f( + nil, + []*timeseries{ts("foo", 42, 1)}, + nil, + 100, + []*timeseries{ts("foo", 100, 1)}, + ) + + // start - end + cached = 0 + f( + nil, + []*timeseries{ts("foo", 100, 1)}, + []*timeseries{ts("foo", 10, 1)}, + 100, + []*timeseries{ts("foo", 100, 0)}, + ) + + // start - end + cached = 2 + f( + []*timeseries{ts("foo", 10, 1)}, + []*timeseries{ts("foo", 100, 1)}, + nil, + 100, + []*timeseries{ts("foo", 100, 2)}, + ) + + // start - end + cached = 1 + f( + []*timeseries{ts("foo", 50, 1)}, + []*timeseries{ts("foo", 100, 1)}, + []*timeseries{ts("foo", 10, 1)}, + 100, + []*timeseries{ts("foo", 100, 1)}, + ) + + // start - end + cached = 0 + f( + []*timeseries{ts("foo", 50, 1)}, + nil, + []*timeseries{ts("foo", 10, 1)}, + 100, + []*timeseries{ts("foo", 100, 0)}, + ) + + // start - end + cached = 1 + f( + []*timeseries{ts("foo", 50, 1)}, + nil, + nil, + 100, + []*timeseries{ts("foo", 100, 1)}, + ) +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f22656dda1..5997fabd40 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -95,6 +95,7 @@ The sandbox cluster installation is running under the constant load generated by * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly process queries with too big lookbehind window such as `foo[100y]`. Previously, such queries could return empty responses even if `foo` is present in database. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5553). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle possible negative results caused by float operations precision error in rollup functions like rate() or increase(). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5571). * BUGFIX: `vmselect`: vmsingle/vmselect returns http status 429 (TooManyRequests) instead of 503 (ServiceUnavailable) when max concurrent requests limit is reached. +* BUGFIX: `vmselect`: set proper timestamps in [instant query](https://docs.victoriametrics.com/keyconcepts/#instant-query) results if the query contains [rollup functions](https://docs.victoriametrics.com/metricsql/#rollup-functions) with lookbehind windows in square brackets exceeding 3 hours. For example, `sum(avg_over_time(up[24h]))`. This bug has been introduced in [v1.95.0](https://docs.victoriametrics.com/changelog_2023/#v1950). See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5659) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5730) issues. ## [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0)