From c916294b61fe138ad720e3be2ab2581e112bd97c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 11 Nov 2023 12:09:25 +0100 Subject: [PATCH] app/vmselect/promql: optimize instant queries with min_over_time() and max_over_time() rollup functions This is a follow-up for 41a0fdaf39ea99708cf1c899332910965d62dcfc --- app/vmselect/promql/eval.go | 368 ++++++++++++++++++++++++++++++------ docs/CHANGELOG.md | 20 +- 2 files changed, 324 insertions(+), 64 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 669e9fb726..06844cbb02 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -42,7 +42,7 @@ var ( "See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery") noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+ "so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets") - minWindowForInstantRollupOptimization = flagutil.NewDuration("search.minWindowForInstantRollupOptimization", "6h", "Enable cache-based optimization for repeated queries "+ + minWindowForInstantRollupOptimization = flagutil.NewDuration("search.minWindowForInstantRollupOptimization", "3h", "Enable cache-based optimization for repeated queries "+ "to /api/v1/query (aka instant queries), which contain rollup functions with lookbehind window exceeding the given value") ) @@ -1106,6 +1106,59 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, } return offset >= maxOffset } + deleteCachedSeries := func(qt *querytracer.Tracer) { + rollupResultCacheV.DeleteInstantValues(qt, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) + } + getCachedSeries := func(qt *querytracer.Tracer) ([]*timeseries, int64, error) { + again: + offset := int64(0) + tssCached := rollupResultCacheV.GetInstantValues(qt, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) + ec.QueryStats.addSeriesFetched(len(tssCached)) + if len(tssCached) == 0 { + // Cache miss. Re-populate the missing data. + start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() + offset = timestamp - start + if offset < 0 { + start = timestamp + offset = 0 + } + if tooBigOffset(offset) { + qt.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+ + "for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window) + tss, err := evalAt(qt, timestamp, window) + return tss, 0, err + } + qt.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start)) + tss, err := evalAt(qt, start, window) + if err != nil { + return nil, 0, err + } + if hasDuplicateSeries(tss) { + qt.Printf("cannot apply instant rollup optimization because the result contains duplicate series") + tss, err := evalAt(qt, timestamp, window) + return tss, 0, err + } + rollupResultCacheV.PutInstantValues(qt, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss, tss) + return tss, offset, nil + } + // Cache hit. Verify whether it is OK to use the cached data. + offset = timestamp - tssCached[0].Timestamps[0] + if offset < 0 { + qt.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s", + storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp)) + // Delete the outdated cached values, so the cache could be re-populated with newer values. + deleteCachedSeries(qt) + goto again + } + if tooBigOffset(offset) { + qt.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+ + "and the cached values is too big comparing to window=%d", offset, window) + // Delete the outdated cached values, so the cache could be re-populated with newer values. + deleteCachedSeries(qt) + goto again + } + return tssCached, offset, nil + } if !ec.mayCache() { qt.Printf("do not apply instant rollup optimization because of disabled cache") @@ -1181,6 +1234,136 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, }, } return evalExpr(qt, ec, be) + case "max_over_time": + if iafc != nil { + if strings.ToLower(iafc.ae.Name) != "max" { + qt.Printf("do not apply instant rollup optimization for non-max incremental aggregate %s()", iafc.ae.Name) + return evalAt(qt, timestamp, window) + } + } + + // Calculate + // + // max_over_time(m[window] @ timestamp) + // + // as the maximum of + // + // - max_over_time(m[window] @ (timestamp-offset)) + // - max_over_time(m[offset] @ timestamp) + // + // if max_over_time(m[offset] @ (timestamp-window)) < max_over_time(m[window] @ (timestamp-offset)) + // otherwise do not apply the optimization + // + // where + // + // - max_over_time(m[window] @ (timestamp-offset)) is obtained from cache + // - max_over_time(m[offset] @ timestamp) and max_over_time(m[offset] @ (timestamp-window)) are calculated from the storage + // These rollups are calculated faster than max_over_time(m[window]) because offset is smaller than window. + qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d", + expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window) + defer qtChild.Done() + + tssCached, offset, err := getCachedSeries(qtChild) + if err != nil { + return nil, err + } + if offset == 0 { + return tssCached, nil + } + // Calculate max_over_time(m[offset] @ (timestamp - window)) + tssEnd, err := evalAt(qtChild, timestamp-window, offset) + if err != nil { + return nil, err + } + if hasDuplicateSeries(tssEnd) { + qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") + return evalAt(qtChild, timestamp, window) + } + // Verify whether tssCached values are bigger than tssEnd values. + // If this isn't the case, then the optimization cannot be applied. + if !isLowerInstantValues(tssEnd, tssCached) { + qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached") + deleteCachedSeries(qtChild) + return evalAt(qt, timestamp, window) + } + + // Calculate max_over_time(m[offset] @ timestamp) + tssStart, err := evalAt(qtChild, timestamp, offset) + if err != nil { + return nil, err + } + if hasDuplicateSeries(tssStart) { + qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series") + return evalAt(qtChild, timestamp, window) + } + // Calculate the result + tss := getMaxInstantValues(qtChild, tssCached, tssStart) + return tss, nil + case "min_over_time": + if iafc != nil { + if strings.ToLower(iafc.ae.Name) != "min" { + qt.Printf("do not apply instant rollup optimization for non-min incremental aggregate %s()", iafc.ae.Name) + return evalAt(qt, timestamp, window) + } + } + + // Calculate + // + // min_over_time(m[window] @ timestamp) + // + // as the minimum of + // + // - min_over_time(m[window] @ (timestamp-offset)) + // - min_over_time(m[offset] @ timestamp) + // + // if min_over_time(m[offset] @ (timestamp-window)) > min_over_time(m[window] @ (timestamp-offset)) + // otherwise do not apply the optimization + // + // where + // + // - min_over_time(m[window] @ (timestamp-offset)) is obtained from cache + // - min_over_time(m[offset] @ timestamp) and min_over_time(m[offset] @ (timestamp-window)) are calculated from the storage + // These rollups are calculated faster than min_over_time(m[window]) because offset is smaller than window. + qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d", + expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window) + defer qtChild.Done() + + tssCached, offset, err := getCachedSeries(qtChild) + if err != nil { + return nil, err + } + if offset == 0 { + return tssCached, nil + } + // Calculate min_over_time(m[offset] @ (timestamp - window)) + tssEnd, err := evalAt(qtChild, timestamp-window, offset) + if err != nil { + return nil, err + } + if hasDuplicateSeries(tssEnd) { + qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") + return evalAt(qtChild, timestamp, window) + } + // Verify whether tssCached values are smaller than tssEnd values. + // If this isn't the case, then the optimization cannot be applied. + if !isLowerInstantValues(tssCached, tssEnd) { + qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached") + deleteCachedSeries(qtChild) + return evalAt(qt, timestamp, window) + } + + // Calculate min_over_time(m[offset] @ timestamp) + tssStart, err := evalAt(qtChild, timestamp, offset) + if err != nil { + return nil, err + } + if hasDuplicateSeries(tssStart) { + qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series") + return evalAt(qtChild, timestamp, window) + } + // Calculate the result + tss := getMinInstantValues(qtChild, tssCached, tssStart) + return tss, nil case "count_eq_over_time", "count_gt_over_time", @@ -1213,67 +1396,33 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window) defer qtChild.Done() - again: - offset := int64(0) - tssCached := rollupResultCacheV.GetInstantValues(qtChild, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) - ec.QueryStats.addSeriesFetched(len(tssCached)) - if len(tssCached) == 0 { - // Cache miss. Re-populate it - start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() - offset = timestamp - start - if offset < 0 { - start = timestamp - offset = 0 - } - if tooBigOffset(offset) { - qtChild.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+ - "for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window) - return evalAt(qtChild, timestamp, window) - } - qtChild.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start)) - tss, err := evalAt(qtChild, start, window) - if err != nil { - return nil, err - } - if !ec.IsPartialResponse.Load() { - rollupResultCacheV.PutInstantValues(qtChild, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss, tss) - } - tssCached = tss - } else { - offset = timestamp - tssCached[0].Timestamps[0] - if offset < 0 { - qtChild.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s", - storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp)) - // Delete the outdated cached values, so the cache could be re-populated with newer values. - rollupResultCacheV.DeleteInstantValues(qtChild, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) - goto again - } - if tooBigOffset(offset) { - qtChild.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+ - "and the cached values is too big comparing to window=%d", offset, window) - // Delete the outdated cached values, so the cache could be re-populated with newer values. - rollupResultCacheV.DeleteInstantValues(qtChild, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) - goto again - } + tssCached, offset, err := getCachedSeries(qtChild) + if err != nil { + return nil, err } if offset == 0 { - qtChild.Printf("return cached values, since they have the requested timestamp=%s", storage.TimestampToHumanReadableFormat(timestamp)) return tssCached, nil } - // Calculate count_over_time(m[offset] @ timestamp) + // Calculate rf(m[offset] @ timestamp) tssStart, err := evalAt(qtChild, timestamp, offset) if err != nil { return nil, err } - // Calculate count_over_time(m[offset] @ (timestamp - window)) + if hasDuplicateSeries(tssStart) { + qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series") + return evalAt(qtChild, timestamp, window) + } + // Calculate rf(m[offset] @ (timestamp - window)) tssEnd, err := evalAt(qtChild, timestamp-window, offset) if err != nil { return nil, err } - tss, err := mergeInstantValues(qtChild, tssCached, tssStart, tssEnd) - if err != nil { - return nil, fmt.Errorf("cannot merge instant series: %w", err) + if hasDuplicateSeries(tssEnd) { + qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") + return evalAt(qtChild, timestamp, window) } + // Calculate the result + tss := getSumInstantValues(qtChild, tssCached, tssStart, tssEnd) return tss, nil default: qt.Printf("instant rollup optimization isn't implemented for %s()", funcName) @@ -1281,9 +1430,118 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, } } -// mergeInstantValues calculates tssCached + tssStart - tssEnd -func mergeInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, error) { - qt = qt.NewChild("merge instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) +func hasDuplicateSeries(tss []*timeseries) bool { + m := make(map[string]struct{}, len(tss)) + bb := bbPool.Get() + defer bbPool.Put(bb) + + for _, ts := range tss { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if _, ok := m[string(bb.B)]; ok { + return true + } + m[string(bb.B)] = struct{}{} + } + return false +} + +// isLowerInstantValues verifies that tssA contains lower values than tssB +func isLowerInstantValues(tssA, tssB []*timeseries) bool { + assertInstantValues(tssA) + assertInstantValues(tssB) + + m := make(map[string]*timeseries, len(tssA)) + bb := bbPool.Get() + defer bbPool.Put(bb) + + for _, ts := range tssA { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if _, ok := m[string(bb.B)]; ok { + logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName) + } + m[string(bb.B)] = ts + } + + for _, tsB := range tssB { + bb.B = marshalMetricNameSorted(bb.B[:0], &tsB.MetricName) + tsA := m[string(bb.B)] + if tsA != nil && !math.IsNaN(tsA.Values[0]) && !math.IsNaN(tsB.Values[0]) { + if tsA.Values[0] >= tsB.Values[0] { + return false + } + } + } + return true +} + +func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries { + qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart)) + defer qt.Done() + + getMin := func(a, b float64) float64 { + if a < b { + return a + } + return b + } + tss := getMinMaxInstantValues(tssCached, tssStart, getMin) + qt.Printf("resulting series=%d", len(tss)) + return tss +} + +func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries { + qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart)) + defer qt.Done() + + getMax := func(a, b float64) float64 { + if a > b { + return a + } + return b + } + tss := getMinMaxInstantValues(tssCached, tssStart, getMax) + qt.Printf("resulting series=%d", len(tss)) + return tss +} + +func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float64) float64) []*timeseries { + assertInstantValues(tssCached) + assertInstantValues(tssStart) + + m := make(map[string]*timeseries, len(tssCached)) + bb := bbPool.Get() + defer bbPool.Put(bb) + + for _, ts := range tssCached { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if _, ok := m[string(bb.B)]; ok { + logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName) + } + m[string(bb.B)] = ts + } + + for _, ts := range tssStart { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + tsCached := m[string(bb.B)] + if tsCached != nil && !math.IsNaN(tsCached.Values[0]) { + if !math.IsNaN(ts.Values[0]) { + tsCached.Values[0] = f(ts.Values[0], tsCached.Values[0]) + } + } else { + m[string(bb.B)] = ts + } + } + + rvs := make([]*timeseries, 0, len(m)) + for _, ts := range m { + rvs = append(rvs, ts) + } + return rvs +} + +// getSumInstantValues calculates tssCached + tssStart - tssEnd +func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) []*timeseries { + qt = qt.NewChild("calculate the sum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) defer qt.Done() assertInstantValues(tssCached) @@ -1296,8 +1554,8 @@ func mergeInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*t for _, ts := range tssCached { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) - if tsExisting := m[string(bb.B)]; tsExisting != nil { - return nil, fmt.Errorf("duplicate series found: %s", &ts.MetricName) + if _, ok := m[string(bb.B)]; ok { + logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName) } m[string(bb.B)] = ts } @@ -1329,7 +1587,7 @@ func mergeInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*t rvs = append(rvs, ts) } qt.Printf("resulting series=%d", len(rvs)) - return rvs, nil + return rvs } func assertInstantValues(tss []*timeseries) { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a7bd0e0c6b..17696e6620 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -34,15 +34,17 @@ The sandbox cluster installation is running under the constant load generated by * SECURITY: upgrade Go builder from Go1.21.1 to Go1.21.4. See [the list of issues addressed in Go1.21.2](https://github.com/golang/go/issues?q=milestone%3AGo1.21.2+label%3ACherryPickApproved), [the list of issues addressed in Go1.21.3](https://github.com/golang/go/issues?q=milestone%3AGo1.21.3+label%3ACherryPickApproved) and [the list of issues addressed in Go1.21.4](https://github.com/golang/go/issues?q=milestone%3AGo1.21.4+label%3ACherryPickApproved). * FEATURE: `vmselect`: improve performance for repeated [instant queries](https://docs.victoriametrics.com/keyConcepts.html#instant-query) if they contain one of the following [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions): - - [avg_over_time](https://docs.victoriametrics.com/MetricsQL.html#avg_over_time) - - [sum_over_time](https://docs.victoriametrics.com/MetricsQL.html#sum_over_time) - - [count_eq_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_eq_over_time) - - [count_gt_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_gt_over_time) - - [count_le_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_le_over_time) - - [count_ne_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_ne_over_time) - - [count_over_time](https://docs.victoriametrics.com/MetricsQL.html#count_over_time) - - [increase](https://docs.victoriametrics.com/MetricsQL.html#increase) - - [rate](https://docs.victoriametrics.com/MetricsQL.html#rate) + - [`avg_over_time`](https://docs.victoriametrics.com/MetricsQL.html#avg_over_time) + - [`sum_over_time`](https://docs.victoriametrics.com/MetricsQL.html#sum_over_time) + - [`count_eq_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_eq_over_time) + - [`count_gt_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_gt_over_time) + - [`count_le_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_le_over_time) + - [`count_ne_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_ne_over_time) + - [`count_over_time`](https://docs.victoriametrics.com/MetricsQL.html#count_over_time) + - [`increase`](https://docs.victoriametrics.com/MetricsQL.html#increase) + - [`max_over_time`](https://docs.victoriametrics.com/MetricsQL.html#max_over_time) + - [`min_over_time`](https://docs.victoriametrics.com/MetricsQL.html#min_over_time) + - [`rate`](https://docs.victoriametrics.com/MetricsQL.html#rate) The optimization is enabled when these functions contain lookbehind window in square brackets bigger or equal to `6h` (the threshold can be changed via `-search.minWindowForInstantRollupOptimization` command-line flag). The optimization improves performance for SLO/SLI-like queries such as `avg_over_time(up[30d])` or `sum(rate(http_request_errors_total[3d])) / sum(rate(http_requests_total[3d]))`, which can be generated by [sloth](https://github.com/slok/sloth) or similar projects. * FEATURE: `vmselect`: improve query performance on systems with big number of CPU cores (`>=32`). Add `-search.maxWorkersPerQuery` command-line flag, which can be used for fine-tuning query performance on systems with big number of CPU cores. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5195). * FEATURE: `vmselect`: expose `vm_memory_intensive_queries_total` counter metric which gets increased each time `-search.logQueryMemoryUsage` memory limit is exceeded by a query. This metric should help to identify expensive and heavy queries without inspecting the logs.