app/vmselect: set proper timestamp for cached instant responses (#5723)

* app/vmselect: set proper timestamp for cached instant responses

The change updates `getSumInstantValues` to prefer timestamp
from the most recent results. Before, timestamp from cached series
was used.

The old behavior had negative impact on recording rules as they
were getting responses with shifted timestamps in past.
Subsequent recording or alerting rules fetching results of these
recording rules could get no result due to staleness interval.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5659
Signed-off-by: hagen1778 <roman@victoriametrics.com>

* wip

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2024-01-30 21:03:34 +01:00 committed by Aliaksandr Valialkin
parent 3164f7526c
commit 02e609b141
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
3 changed files with 102 additions and 13 deletions

View File

@ -1287,7 +1287,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
return evalAt(qtChild, timestamp, window) return evalAt(qtChild, timestamp, window)
} }
// Calculate the result // Calculate the result
tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd) tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp)
if !ok { if !ok {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached") qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached")
deleteCachedSeries(qtChild) deleteCachedSeries(qtChild)
@ -1349,7 +1349,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
return evalAt(qtChild, timestamp, window) return evalAt(qtChild, timestamp, window)
} }
// Calculate the result // Calculate the result
tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd) tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp)
if !ok { if !ok {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached") qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached")
deleteCachedSeries(qtChild) deleteCachedSeries(qtChild)
@ -1414,7 +1414,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
return evalAt(qtChild, timestamp, window) return evalAt(qtChild, timestamp, window)
} }
// Calculate the result // Calculate the result
tss := getSumInstantValues(qtChild, tssCached, tssStart, tssEnd) tss := getSumInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp)
return tss, nil return tss, nil
default: default:
qt.Printf("instant rollup optimization isn't implemented for %s()", funcName) qt.Printf("instant rollup optimization isn't implemented for %s()", funcName)
@ -1441,8 +1441,8 @@ func hasDuplicateSeries(tss []*timeseries) bool {
return false return false
} }
func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) { func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) ([]*timeseries, bool) {
qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp)
defer qt.Done() defer qt.Done()
getMin := func(a, b float64) float64 { getMin := func(a, b float64) float64 {
@ -1451,13 +1451,13 @@ func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*
} }
return b return b
} }
tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMin) tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, timestamp, getMin)
qt.Printf("resulting series=%d; ok=%v", len(tss), ok) qt.Printf("resulting series=%d; ok=%v", len(tss), ok)
return tss, ok return tss, ok
} }
func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) { func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) ([]*timeseries, bool) {
qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp)
defer qt.Done() defer qt.Done()
getMax := func(a, b float64) float64 { getMax := func(a, b float64) float64 {
@ -1466,12 +1466,12 @@ func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*
} }
return b return b
} }
tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMax) tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, timestamp, getMax)
qt.Printf("resulting series=%d", len(tss)) qt.Printf("resulting series=%d", len(tss))
return tss, ok return tss, ok
} }
func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, f func(a, b float64) float64) ([]*timeseries, bool) { func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, timestamp int64, f func(a, b float64) float64) ([]*timeseries, bool) {
assertInstantValues(tssCached) assertInstantValues(tssCached)
assertInstantValues(tssStart) assertInstantValues(tssStart)
assertInstantValues(tssEnd) assertInstantValues(tssEnd)
@ -1522,12 +1522,16 @@ func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, f func(a,
for _, ts := range m { for _, ts := range m {
rvs = append(rvs, ts) rvs = append(rvs, ts)
} }
setInstantTimestamp(rvs, timestamp)
return rvs, true return rvs, true
} }
// getSumInstantValues calculates tssCached + tssStart - tssEnd // getSumInstantValues aggregates tssCached, tssStart, tssEnd time series
func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) []*timeseries { // into a new time series with value = tssCached + tssStart - tssEnd
qt = qt.NewChild("calculate the sum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) []*timeseries {
qt = qt.NewChild("calculate the sum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp)
defer qt.Done() defer qt.Done()
assertInstantValues(tssCached) assertInstantValues(tssCached)
@ -1572,10 +1576,19 @@ func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*
for _, ts := range m { for _, ts := range m {
rvs = append(rvs, ts) rvs = append(rvs, ts)
} }
setInstantTimestamp(rvs, timestamp)
qt.Printf("resulting series=%d", len(rvs)) qt.Printf("resulting series=%d", len(rvs))
return rvs return rvs
} }
func setInstantTimestamp(tss []*timeseries, timestamp int64) {
for _, ts := range tss {
ts.Timestamps[0] = timestamp
}
}
func assertInstantValues(tss []*timeseries) { func assertInstantValues(tss []*timeseries) {
for _, ts := range tss { for _, ts := range tss {
if len(ts.Values) != 1 { if len(ts.Values) != 1 {

View File

@ -1,6 +1,7 @@
package promql package promql
import ( import (
"reflect"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
@ -95,3 +96,77 @@ func TestQueryStats_addSeriesFetched(t *testing.T) {
t.Fatalf("expected to get 4; got %d instead", qs.SeriesFetched) t.Fatalf("expected to get 4; got %d instead", qs.SeriesFetched)
} }
} }
func TestGetSumInstantValues(t *testing.T) {
f := func(cached, start, end []*timeseries, timestamp int64, expectedResult []*timeseries) {
t.Helper()
result := getSumInstantValues(nil, cached, start, end, timestamp)
if !reflect.DeepEqual(result, expectedResult) {
t.Errorf("unexpected result; got\n%v\nwant\n%v", result, expectedResult)
}
}
ts := func(name string, timestamp int64, value float64) *timeseries {
return &timeseries{
MetricName: storage.MetricName{
MetricGroup: []byte(name),
},
Timestamps: []int64{timestamp},
Values: []float64{value},
}
}
// start - end + cached = 1
f(
nil,
[]*timeseries{ts("foo", 42, 1)},
nil,
100,
[]*timeseries{ts("foo", 100, 1)},
)
// start - end + cached = 0
f(
nil,
[]*timeseries{ts("foo", 100, 1)},
[]*timeseries{ts("foo", 10, 1)},
100,
[]*timeseries{ts("foo", 100, 0)},
)
// start - end + cached = 2
f(
[]*timeseries{ts("foo", 10, 1)},
[]*timeseries{ts("foo", 100, 1)},
nil,
100,
[]*timeseries{ts("foo", 100, 2)},
)
// start - end + cached = 1
f(
[]*timeseries{ts("foo", 50, 1)},
[]*timeseries{ts("foo", 100, 1)},
[]*timeseries{ts("foo", 10, 1)},
100,
[]*timeseries{ts("foo", 100, 1)},
)
// start - end + cached = 0
f(
[]*timeseries{ts("foo", 50, 1)},
nil,
[]*timeseries{ts("foo", 10, 1)},
100,
[]*timeseries{ts("foo", 100, 0)},
)
// start - end + cached = 1
f(
[]*timeseries{ts("foo", 50, 1)},
nil,
nil,
100,
[]*timeseries{ts("foo", 100, 1)},
)
}

View File

@ -95,6 +95,7 @@ The sandbox cluster installation is running under the constant load generated by
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly process queries with too big lookbehind window such as `foo[100y]`. Previously, such queries could return empty responses even if `foo` is present in database. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5553). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly process queries with too big lookbehind window such as `foo[100y]`. Previously, such queries could return empty responses even if `foo` is present in database. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5553).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle possible negative results caused by float operations precision error in rollup functions like rate() or increase(). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5571). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle possible negative results caused by float operations precision error in rollup functions like rate() or increase(). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5571).
* BUGFIX: `vmselect`: vmsingle/vmselect returns http status 429 (TooManyRequests) instead of 503 (ServiceUnavailable) when max concurrent requests limit is reached. * BUGFIX: `vmselect`: vmsingle/vmselect returns http status 429 (TooManyRequests) instead of 503 (ServiceUnavailable) when max concurrent requests limit is reached.
* BUGFIX: `vmselect`: set proper timestamps in [instant query](https://docs.victoriametrics.com/keyconcepts/#instant-query) results if the query contains [rollup functions](https://docs.victoriametrics.com/metricsql/#rollup-functions) with lookbehind windows in square brackets exceeding 3 hours. For example, `sum(avg_over_time(up[24h]))`. This bug has been introduced in [v1.95.0](https://docs.victoriametrics.com/changelog_2023/#v1950). See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5659) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5730) issues.
## [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0) ## [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0)