app/vmselect/promql: optimize instant queries with min_over_time() and max_over_time() rollup functions

This is a follow-up for 41a0fdaf39
This commit is contained in:
Aliaksandr Valialkin 2023-11-11 12:09:25 +01:00
parent 7bbdecb79a
commit c916294b61
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 324 additions and 64 deletions

View File

@ -42,7 +42,7 @@ var (
"See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery")
noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+
"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets")
minWindowForInstantRollupOptimization = flagutil.NewDuration("search.minWindowForInstantRollupOptimization", "6h", "Enable cache-based optimization for repeated queries "+
minWindowForInstantRollupOptimization = flagutil.NewDuration("search.minWindowForInstantRollupOptimization", "3h", "Enable cache-based optimization for repeated queries "+
"to /api/v1/query (aka instant queries), which contain rollup functions with lookbehind window exceeding the given value")
)
@ -1106,6 +1106,59 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
}
return offset >= maxOffset
}
deleteCachedSeries := func(qt *querytracer.Tracer) {
rollupResultCacheV.DeleteInstantValues(qt, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss)
}
getCachedSeries := func(qt *querytracer.Tracer) ([]*timeseries, int64, error) {
again:
offset := int64(0)
tssCached := rollupResultCacheV.GetInstantValues(qt, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss)
ec.QueryStats.addSeriesFetched(len(tssCached))
if len(tssCached) == 0 {
// Cache miss. Re-populate the missing data.
start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds()
offset = timestamp - start
if offset < 0 {
start = timestamp
offset = 0
}
if tooBigOffset(offset) {
qt.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+
"for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window)
tss, err := evalAt(qt, timestamp, window)
return tss, 0, err
}
qt.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start))
tss, err := evalAt(qt, start, window)
if err != nil {
return nil, 0, err
}
if hasDuplicateSeries(tss) {
qt.Printf("cannot apply instant rollup optimization because the result contains duplicate series")
tss, err := evalAt(qt, timestamp, window)
return tss, 0, err
}
rollupResultCacheV.PutInstantValues(qt, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss, tss)
return tss, offset, nil
}
// Cache hit. Verify whether it is OK to use the cached data.
offset = timestamp - tssCached[0].Timestamps[0]
if offset < 0 {
qt.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s",
storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp))
// Delete the outdated cached values, so the cache could be re-populated with newer values.
deleteCachedSeries(qt)
goto again
}
if tooBigOffset(offset) {
qt.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+
"and the cached values is too big comparing to window=%d", offset, window)
// Delete the outdated cached values, so the cache could be re-populated with newer values.
deleteCachedSeries(qt)
goto again
}
return tssCached, offset, nil
}
if !ec.mayCache() {
qt.Printf("do not apply instant rollup optimization because of disabled cache")
@ -1181,6 +1234,136 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
},
}
return evalExpr(qt, ec, be)
case "max_over_time":
if iafc != nil {
if strings.ToLower(iafc.ae.Name) != "max" {
qt.Printf("do not apply instant rollup optimization for non-max incremental aggregate %s()", iafc.ae.Name)
return evalAt(qt, timestamp, window)
}
}
// Calculate
//
// max_over_time(m[window] @ timestamp)
//
// as the maximum of
//
// - max_over_time(m[window] @ (timestamp-offset))
// - max_over_time(m[offset] @ timestamp)
//
// if max_over_time(m[offset] @ (timestamp-window)) < max_over_time(m[window] @ (timestamp-offset))
// otherwise do not apply the optimization
//
// where
//
// - max_over_time(m[window] @ (timestamp-offset)) is obtained from cache
// - max_over_time(m[offset] @ timestamp) and max_over_time(m[offset] @ (timestamp-window)) are calculated from the storage
// These rollups are calculated faster than max_over_time(m[window]) because offset is smaller than window.
qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
defer qtChild.Done()
tssCached, offset, err := getCachedSeries(qtChild)
if err != nil {
return nil, err
}
if offset == 0 {
return tssCached, nil
}
// Calculate max_over_time(m[offset] @ (timestamp - window))
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
if err != nil {
return nil, err
}
if hasDuplicateSeries(tssEnd) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Verify whether tssCached values are bigger than tssEnd values.
// If this isn't the case, then the optimization cannot be applied.
if !isLowerInstantValues(tssEnd, tssCached) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached")
deleteCachedSeries(qtChild)
return evalAt(qt, timestamp, window)
}
// Calculate max_over_time(m[offset] @ timestamp)
tssStart, err := evalAt(qtChild, timestamp, offset)
if err != nil {
return nil, err
}
if hasDuplicateSeries(tssStart) {
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Calculate the result
tss := getMaxInstantValues(qtChild, tssCached, tssStart)
return tss, nil
case "min_over_time":
if iafc != nil {
if strings.ToLower(iafc.ae.Name) != "min" {
qt.Printf("do not apply instant rollup optimization for non-min incremental aggregate %s()", iafc.ae.Name)
return evalAt(qt, timestamp, window)
}
}
// Calculate
//
// min_over_time(m[window] @ timestamp)
//
// as the minimum of
//
// - min_over_time(m[window] @ (timestamp-offset))
// - min_over_time(m[offset] @ timestamp)
//
// if min_over_time(m[offset] @ (timestamp-window)) > min_over_time(m[window] @ (timestamp-offset))
// otherwise do not apply the optimization
//
// where
//
// - min_over_time(m[window] @ (timestamp-offset)) is obtained from cache
// - min_over_time(m[offset] @ timestamp) and min_over_time(m[offset] @ (timestamp-window)) are calculated from the storage
// These rollups are calculated faster than min_over_time(m[window]) because offset is smaller than window.
qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
defer qtChild.Done()
tssCached, offset, err := getCachedSeries(qtChild)
if err != nil {
return nil, err
}
if offset == 0 {
return tssCached, nil
}
// Calculate min_over_time(m[offset] @ (timestamp - window))
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
if err != nil {
return nil, err
}
if hasDuplicateSeries(tssEnd) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Verify whether tssCached values are smaller than tssEnd values.
// If this isn't the case, then the optimization cannot be applied.
if !isLowerInstantValues(tssCached, tssEnd) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached")
deleteCachedSeries(qtChild)
return evalAt(qt, timestamp, window)
}
// Calculate min_over_time(m[offset] @ timestamp)
tssStart, err := evalAt(qtChild, timestamp, offset)
if err != nil {
return nil, err
}
if hasDuplicateSeries(tssStart) {
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Calculate the result
tss := getMinInstantValues(qtChild, tssCached, tssStart)
return tss, nil
case
"count_eq_over_time",
"count_gt_over_time",
@ -1213,67 +1396,33 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
defer qtChild.Done()
again:
offset := int64(0)
tssCached := rollupResultCacheV.GetInstantValues(qtChild, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss)
ec.QueryStats.addSeriesFetched(len(tssCached))
if len(tssCached) == 0 {
// Cache miss. Re-populate it
start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds()
offset = timestamp - start
if offset < 0 {
start = timestamp
offset = 0
}
if tooBigOffset(offset) {
qtChild.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+
"for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window)
return evalAt(qtChild, timestamp, window)
}
qtChild.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start))
tss, err := evalAt(qtChild, start, window)
if err != nil {
return nil, err
}
if !ec.IsPartialResponse.Load() {
rollupResultCacheV.PutInstantValues(qtChild, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss, tss)
}
tssCached = tss
} else {
offset = timestamp - tssCached[0].Timestamps[0]
if offset < 0 {
qtChild.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s",
storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp))
// Delete the outdated cached values, so the cache could be re-populated with newer values.
rollupResultCacheV.DeleteInstantValues(qtChild, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss)
goto again
}
if tooBigOffset(offset) {
qtChild.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+
"and the cached values is too big comparing to window=%d", offset, window)
// Delete the outdated cached values, so the cache could be re-populated with newer values.
rollupResultCacheV.DeleteInstantValues(qtChild, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss)
goto again
}
tssCached, offset, err := getCachedSeries(qtChild)
if err != nil {
return nil, err
}
if offset == 0 {
qtChild.Printf("return cached values, since they have the requested timestamp=%s", storage.TimestampToHumanReadableFormat(timestamp))
return tssCached, nil
}
// Calculate count_over_time(m[offset] @ timestamp)
// Calculate rf(m[offset] @ timestamp)
tssStart, err := evalAt(qtChild, timestamp, offset)
if err != nil {
return nil, err
}
// Calculate count_over_time(m[offset] @ (timestamp - window))
if hasDuplicateSeries(tssStart) {
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Calculate rf(m[offset] @ (timestamp - window))
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
if err != nil {
return nil, err
}
tss, err := mergeInstantValues(qtChild, tssCached, tssStart, tssEnd)
if err != nil {
return nil, fmt.Errorf("cannot merge instant series: %w", err)
if hasDuplicateSeries(tssEnd) {
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
return evalAt(qtChild, timestamp, window)
}
// Calculate the result
tss := getSumInstantValues(qtChild, tssCached, tssStart, tssEnd)
return tss, nil
default:
qt.Printf("instant rollup optimization isn't implemented for %s()", funcName)
@ -1281,9 +1430,118 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
}
}
// mergeInstantValues calculates tssCached + tssStart - tssEnd
func mergeInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, error) {
qt = qt.NewChild("merge instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
func hasDuplicateSeries(tss []*timeseries) bool {
m := make(map[string]struct{}, len(tss))
bb := bbPool.Get()
defer bbPool.Put(bb)
for _, ts := range tss {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
return true
}
m[string(bb.B)] = struct{}{}
}
return false
}
// isLowerInstantValues verifies that tssA contains lower values than tssB
func isLowerInstantValues(tssA, tssB []*timeseries) bool {
assertInstantValues(tssA)
assertInstantValues(tssB)
m := make(map[string]*timeseries, len(tssA))
bb := bbPool.Get()
defer bbPool.Put(bb)
for _, ts := range tssA {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
}
m[string(bb.B)] = ts
}
for _, tsB := range tssB {
bb.B = marshalMetricNameSorted(bb.B[:0], &tsB.MetricName)
tsA := m[string(bb.B)]
if tsA != nil && !math.IsNaN(tsA.Values[0]) && !math.IsNaN(tsB.Values[0]) {
if tsA.Values[0] >= tsB.Values[0] {
return false
}
}
}
return true
}
func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries {
qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart))
defer qt.Done()
getMin := func(a, b float64) float64 {
if a < b {
return a
}
return b
}
tss := getMinMaxInstantValues(tssCached, tssStart, getMin)
qt.Printf("resulting series=%d", len(tss))
return tss
}
func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries {
qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart))
defer qt.Done()
getMax := func(a, b float64) float64 {
if a > b {
return a
}
return b
}
tss := getMinMaxInstantValues(tssCached, tssStart, getMax)
qt.Printf("resulting series=%d", len(tss))
return tss
}
func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float64) float64) []*timeseries {
assertInstantValues(tssCached)
assertInstantValues(tssStart)
m := make(map[string]*timeseries, len(tssCached))
bb := bbPool.Get()
defer bbPool.Put(bb)
for _, ts := range tssCached {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
}
m[string(bb.B)] = ts
}
for _, ts := range tssStart {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
tsCached := m[string(bb.B)]
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
if !math.IsNaN(ts.Values[0]) {
tsCached.Values[0] = f(ts.Values[0], tsCached.Values[0])
}
} else {
m[string(bb.B)] = ts
}
}
rvs := make([]*timeseries, 0, len(m))
for _, ts := range m {
rvs = append(rvs, ts)
}
return rvs
}
// getSumInstantValues calculates tssCached + tssStart - tssEnd
func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) []*timeseries {
qt = qt.NewChild("calculate the sum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
defer qt.Done()
assertInstantValues(tssCached)
@ -1296,8 +1554,8 @@ func mergeInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*t
for _, ts := range tssCached {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if tsExisting := m[string(bb.B)]; tsExisting != nil {
return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
}
m[string(bb.B)] = ts
}
@ -1329,7 +1587,7 @@ func mergeInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*t
rvs = append(rvs, ts)
}
qt.Printf("resulting series=%d", len(rvs))
return rvs, nil
return rvs
}
func assertInstantValues(tss []*timeseries) {

View File

@ -34,15 +34,17 @@ The sandbox cluster installation is running under the constant load generated by
* SECURITY: upgrade Go builder from Go1.21.1 to Go1.21.4. See [the list of issues addressed in Go1.21.2](https://github.com/golang/go/issues?q=milestone%3AGo1.21.2+label%3ACherryPickApproved), [the list of issues addressed in Go1.21.3](https://github.com/golang/go/issues?q=milestone%3AGo1.21.3+label%3ACherryPickApproved) and [the list of issues addressed in Go1.21.4](https://github.com/golang/go/issues?q=milestone%3AGo1.21.4+label%3ACherryPickApproved).
* FEATURE: `vmselect`: improve performance for repeated [instant queries](https://docs.victoriametrics.com/keyConcepts.html#instant-query) if they contain one of the following [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions):
- [avg_over_time](https://docs.victoriametrics.com/MetricsQL.html#avg_over_time)
- [sum_over_time](https://docs.victoriametrics.com/MetricsQL.html#sum_over_time)
- [count_eq_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_eq_over_time)
- [count_gt_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_gt_over_time)
- [count_le_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_le_over_time)
- [count_ne_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_ne_over_time)
- [count_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_over_time)
- [increase](https://docs.victoriametrics.com/MetricsQL.html#increase)
- [rate](https://docs.victoriametrics.com/MetricsQL.html#rate)
- [`avg_over_time`](https://docs.victoriametrics.com/MetricsQL.html#avg_over_time)
- [`sum_over_time`](https://docs.victoriametrics.com/MetricsQL.html#sum_over_time)
- [`count_eq_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_eq_over_time)
- [`count_gt_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_gt_over_time)
- [`count_le_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_le_over_time)
- [`count_ne_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_ne_over_time)
- [`count_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_over_time)
- [`increase`](https://docs.victoriametrics.com/MetricsQL.html#increase)
- [`max_over_time`](https://docs.victoriametrics.com/MetricsQL.html#max_over_time)
- [`min_over_time`](https://docs.victoriametrics.com/MetricsQL.html#min_over_time)
- [`rate`](https://docs.victoriametrics.com/MetricsQL.html#rate)
The optimization is enabled when these functions contain lookbehind window in square brackets bigger or equal to `6h` (the threshold can be changed via `-search.minWindowForInstantRollupOptimization` command-line flag). The optimization improves performance for SLO/SLI-like queries such as `avg_over_time(up[30d])` or `sum(rate(http_request_errors_total[3d])) / sum(rate(http_requests_total[3d]))`, which can be generated by [sloth](https://github.com/slok/sloth) or similar projects.
* FEATURE: `vmselect`: improve query performance on systems with big number of CPU cores (`>=32`). Add `-search.maxWorkersPerQuery` command-line flag, which can be used for fine-tuning query performance on systems with big number of CPU cores. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5195).
* FEATURE: `vmselect`: expose `vm_memory_intensive_queries_total` counter metric which gets increased each time `-search.logQueryMemoryUsage` memory limit is exceeded by a query. This metric should help to identify expensive and heavy queries without inspecting the logs.