diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 0d703a1184..0a4349f623 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -1271,23 +1271,6 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, if offset == 0 { return tssCached, nil } - // Calculate max_over_time(m[offset] @ (timestamp - window)) - tssEnd, err := evalAt(qtChild, timestamp-window, offset) - if err != nil { - return nil, err - } - if hasDuplicateSeries(tssEnd) { - qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") - return evalAt(qtChild, timestamp, window) - } - // Verify whether tssCached values are bigger than tssEnd values. - // If this isn't the case, then the optimization cannot be applied. - if !isLowerInstantValues(tssEnd, tssCached) { - qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached") - deleteCachedSeries(qtChild) - return evalAt(qt, timestamp, window) - } - // Calculate max_over_time(m[offset] @ timestamp) tssStart, err := evalAt(qtChild, timestamp, offset) if err != nil { @@ -1297,8 +1280,22 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series") return evalAt(qtChild, timestamp, window) } + // Calculate max_over_time(m[offset] @ (timestamp - window)) + tssEnd, err := evalAt(qtChild, timestamp-window, offset) + if err != nil { + return nil, err + } + if hasDuplicateSeries(tssEnd) { + qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") + return evalAt(qtChild, timestamp, window) + } // Calculate the result - tss := getMaxInstantValues(qtChild, tssCached, tssStart) + tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd) + if !ok { + qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached") + deleteCachedSeries(qtChild) + return evalAt(qt, timestamp, window) + } return tss, nil case "min_over_time": if iafc != nil { @@ -1336,23 +1333,6 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, if offset == 0 { return tssCached, nil } - // Calculate min_over_time(m[offset] @ (timestamp - window)) - tssEnd, err := evalAt(qtChild, timestamp-window, offset) - if err != nil { - return nil, err - } - if hasDuplicateSeries(tssEnd) { - qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") - return evalAt(qtChild, timestamp, window) - } - // Verify whether tssCached values are smaller than tssEnd values. - // If this isn't the case, then the optimization cannot be applied. - if !isLowerInstantValues(tssCached, tssEnd) { - qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached") - deleteCachedSeries(qtChild) - return evalAt(qt, timestamp, window) - } - // Calculate min_over_time(m[offset] @ timestamp) tssStart, err := evalAt(qtChild, timestamp, offset) if err != nil { @@ -1362,8 +1342,22 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series") return evalAt(qtChild, timestamp, window) } + // Calculate min_over_time(m[offset] @ (timestamp - window)) + tssEnd, err := evalAt(qtChild, timestamp-window, offset) + if err != nil { + return nil, err + } + if hasDuplicateSeries(tssEnd) { + qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series") + return evalAt(qtChild, timestamp, window) + } // Calculate the result - tss := getMinInstantValues(qtChild, tssCached, tssStart) + tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd) + if !ok { + qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached") + deleteCachedSeries(qtChild) + return evalAt(qt, timestamp, window) + } return tss, nil case "count_eq_over_time", @@ -1450,37 +1444,8 @@ func hasDuplicateSeries(tss []*timeseries) bool { return false } -// isLowerInstantValues verifies that tssA contains lower values than tssB -func isLowerInstantValues(tssA, tssB []*timeseries) bool { - assertInstantValues(tssA) - assertInstantValues(tssB) - - m := make(map[string]*timeseries, len(tssA)) - bb := bbPool.Get() - defer bbPool.Put(bb) - - for _, ts := range tssA { - bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) - if _, ok := m[string(bb.B)]; ok { - logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName) - } - m[string(bb.B)] = ts - } - - for _, tsB := range tssB { - bb.B = marshalMetricNameSorted(bb.B[:0], &tsB.MetricName) - tsA := m[string(bb.B)] - if tsA != nil && !math.IsNaN(tsA.Values[0]) && !math.IsNaN(tsB.Values[0]) { - if tsA.Values[0] >= tsB.Values[0] { - return false - } - } - } - return true -} - -func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries { - qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart)) +func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) { + qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) defer qt.Done() getMin := func(a, b float64) float64 { @@ -1489,13 +1454,13 @@ func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseri } return b } - tss := getMinMaxInstantValues(tssCached, tssStart, getMin) - qt.Printf("resulting series=%d", len(tss)) - return tss + tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMin) + qt.Printf("resulting series=%d; ok=%v", len(tss), ok) + return tss, ok } -func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries { - qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart)) +func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) { + qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd)) defer qt.Done() getMax := func(a, b float64) float64 { @@ -1504,19 +1469,20 @@ func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseri } return b } - tss := getMinMaxInstantValues(tssCached, tssStart, getMax) + tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMax) qt.Printf("resulting series=%d", len(tss)) - return tss + return tss, ok } -func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float64) float64) []*timeseries { +func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, f func(a, b float64) float64) ([]*timeseries, bool) { assertInstantValues(tssCached) assertInstantValues(tssStart) + assertInstantValues(tssEnd) - m := make(map[string]*timeseries, len(tssCached)) bb := bbPool.Get() defer bbPool.Put(bb) + m := make(map[string]*timeseries, len(tssCached)) for _, ts := range tssCached { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) if _, ok := m[string(bb.B)]; ok { @@ -1525,8 +1491,13 @@ func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float m[string(bb.B)] = ts } + mStart := make(map[string]*timeseries, len(tssStart)) for _, ts := range tssStart { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + if _, ok := m[string(bb.B)]; ok { + logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName) + } + m[string(bb.B)] = ts tsCached := m[string(bb.B)] if tsCached != nil && !math.IsNaN(tsCached.Values[0]) { if !math.IsNaN(ts.Values[0]) { @@ -1537,11 +1508,24 @@ func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float } } + for _, ts := range tssEnd { + bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) + tsCached := m[string(bb.B)] + if tsCached != nil && !math.IsNaN(tsCached.Values[0]) && !math.IsNaN(ts.Values[0]) { + if ts.Values[0] == f(ts.Values[0], tsCached.Values[0]) { + tsStart := mStart[string(bb.B)] + if tsStart == nil || math.IsNaN(tsStart.Values[0]) || tsStart.Values[0] != f(ts.Values[0], tsStart.Values[0]) { + return nil, false + } + } + } + } + rvs := make([]*timeseries, 0, len(m)) for _, ts := range m { rvs = append(rvs, ts) } - return rvs + return rvs, true } // getSumInstantValues calculates tssCached + tssStart - tssEnd