mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
app/vmselect/promql: properly handle instant query optimization conrner cases for min_over_time() and max_over_time()
- If min_over_time(m[offset] @ timestamp) <= min_over_time(m[offset] @ (timestamp-window)), then the optimization can be applied. - If max_over_time(m[offset] @ timestamp) >= max_over_time(m[offset] @ (timestamp-window)), then the optimization can be applied.
This commit is contained in:
parent
0fe02e8d9d
commit
9ff1ee333f
@ -1271,23 +1271,6 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
|
||||
if offset == 0 {
|
||||
return tssCached, nil
|
||||
}
|
||||
// Calculate max_over_time(m[offset] @ (timestamp - window))
|
||||
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if hasDuplicateSeries(tssEnd) {
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
|
||||
return evalAt(qtChild, timestamp, window)
|
||||
}
|
||||
// Verify whether tssCached values are bigger than tssEnd values.
|
||||
// If this isn't the case, then the optimization cannot be applied.
|
||||
if !isLowerInstantValues(tssEnd, tssCached) {
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached")
|
||||
deleteCachedSeries(qtChild)
|
||||
return evalAt(qt, timestamp, window)
|
||||
}
|
||||
|
||||
// Calculate max_over_time(m[offset] @ timestamp)
|
||||
tssStart, err := evalAt(qtChild, timestamp, offset)
|
||||
if err != nil {
|
||||
@ -1297,8 +1280,22 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
|
||||
return evalAt(qtChild, timestamp, window)
|
||||
}
|
||||
// Calculate max_over_time(m[offset] @ (timestamp - window))
|
||||
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if hasDuplicateSeries(tssEnd) {
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
|
||||
return evalAt(qtChild, timestamp, window)
|
||||
}
|
||||
// Calculate the result
|
||||
tss := getMaxInstantValues(qtChild, tssCached, tssStart)
|
||||
tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd)
|
||||
if !ok {
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached")
|
||||
deleteCachedSeries(qtChild)
|
||||
return evalAt(qt, timestamp, window)
|
||||
}
|
||||
return tss, nil
|
||||
case "min_over_time":
|
||||
if iafc != nil {
|
||||
@ -1336,23 +1333,6 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
|
||||
if offset == 0 {
|
||||
return tssCached, nil
|
||||
}
|
||||
// Calculate min_over_time(m[offset] @ (timestamp - window))
|
||||
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if hasDuplicateSeries(tssEnd) {
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
|
||||
return evalAt(qtChild, timestamp, window)
|
||||
}
|
||||
// Verify whether tssCached values are smaller than tssEnd values.
|
||||
// If this isn't the case, then the optimization cannot be applied.
|
||||
if !isLowerInstantValues(tssCached, tssEnd) {
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached")
|
||||
deleteCachedSeries(qtChild)
|
||||
return evalAt(qt, timestamp, window)
|
||||
}
|
||||
|
||||
// Calculate min_over_time(m[offset] @ timestamp)
|
||||
tssStart, err := evalAt(qtChild, timestamp, offset)
|
||||
if err != nil {
|
||||
@ -1362,8 +1342,22 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
|
||||
return evalAt(qtChild, timestamp, window)
|
||||
}
|
||||
// Calculate min_over_time(m[offset] @ (timestamp - window))
|
||||
tssEnd, err := evalAt(qtChild, timestamp-window, offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if hasDuplicateSeries(tssEnd) {
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
|
||||
return evalAt(qtChild, timestamp, window)
|
||||
}
|
||||
// Calculate the result
|
||||
tss := getMinInstantValues(qtChild, tssCached, tssStart)
|
||||
tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd)
|
||||
if !ok {
|
||||
qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached")
|
||||
deleteCachedSeries(qtChild)
|
||||
return evalAt(qt, timestamp, window)
|
||||
}
|
||||
return tss, nil
|
||||
case
|
||||
"count_eq_over_time",
|
||||
@ -1450,37 +1444,8 @@ func hasDuplicateSeries(tss []*timeseries) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// isLowerInstantValues verifies that tssA contains lower values than tssB
|
||||
func isLowerInstantValues(tssA, tssB []*timeseries) bool {
|
||||
assertInstantValues(tssA)
|
||||
assertInstantValues(tssB)
|
||||
|
||||
m := make(map[string]*timeseries, len(tssA))
|
||||
bb := bbPool.Get()
|
||||
defer bbPool.Put(bb)
|
||||
|
||||
for _, ts := range tssA {
|
||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||
if _, ok := m[string(bb.B)]; ok {
|
||||
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
|
||||
}
|
||||
m[string(bb.B)] = ts
|
||||
}
|
||||
|
||||
for _, tsB := range tssB {
|
||||
bb.B = marshalMetricNameSorted(bb.B[:0], &tsB.MetricName)
|
||||
tsA := m[string(bb.B)]
|
||||
if tsA != nil && !math.IsNaN(tsA.Values[0]) && !math.IsNaN(tsB.Values[0]) {
|
||||
if tsA.Values[0] >= tsB.Values[0] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries {
|
||||
qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart))
|
||||
func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) {
|
||||
qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
|
||||
defer qt.Done()
|
||||
|
||||
getMin := func(a, b float64) float64 {
|
||||
@ -1489,13 +1454,13 @@ func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseri
|
||||
}
|
||||
return b
|
||||
}
|
||||
tss := getMinMaxInstantValues(tssCached, tssStart, getMin)
|
||||
qt.Printf("resulting series=%d", len(tss))
|
||||
return tss
|
||||
tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMin)
|
||||
qt.Printf("resulting series=%d; ok=%v", len(tss), ok)
|
||||
return tss, ok
|
||||
}
|
||||
|
||||
func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseries) []*timeseries {
|
||||
qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d", len(tssCached), len(tssStart))
|
||||
func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries) ([]*timeseries, bool) {
|
||||
qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d", len(tssCached), len(tssStart), len(tssEnd))
|
||||
defer qt.Done()
|
||||
|
||||
getMax := func(a, b float64) float64 {
|
||||
@ -1504,19 +1469,20 @@ func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart []*timeseri
|
||||
}
|
||||
return b
|
||||
}
|
||||
tss := getMinMaxInstantValues(tssCached, tssStart, getMax)
|
||||
tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, getMax)
|
||||
qt.Printf("resulting series=%d", len(tss))
|
||||
return tss
|
||||
return tss, ok
|
||||
}
|
||||
|
||||
func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float64) float64) []*timeseries {
|
||||
func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, f func(a, b float64) float64) ([]*timeseries, bool) {
|
||||
assertInstantValues(tssCached)
|
||||
assertInstantValues(tssStart)
|
||||
assertInstantValues(tssEnd)
|
||||
|
||||
m := make(map[string]*timeseries, len(tssCached))
|
||||
bb := bbPool.Get()
|
||||
defer bbPool.Put(bb)
|
||||
|
||||
m := make(map[string]*timeseries, len(tssCached))
|
||||
for _, ts := range tssCached {
|
||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||
if _, ok := m[string(bb.B)]; ok {
|
||||
@ -1525,8 +1491,13 @@ func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float
|
||||
m[string(bb.B)] = ts
|
||||
}
|
||||
|
||||
mStart := make(map[string]*timeseries, len(tssStart))
|
||||
for _, ts := range tssStart {
|
||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||
if _, ok := m[string(bb.B)]; ok {
|
||||
logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
|
||||
}
|
||||
m[string(bb.B)] = ts
|
||||
tsCached := m[string(bb.B)]
|
||||
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
|
||||
if !math.IsNaN(ts.Values[0]) {
|
||||
@ -1537,11 +1508,24 @@ func getMinMaxInstantValues(tssCached, tssStart []*timeseries, f func(a, b float
|
||||
}
|
||||
}
|
||||
|
||||
for _, ts := range tssEnd {
|
||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||
tsCached := m[string(bb.B)]
|
||||
if tsCached != nil && !math.IsNaN(tsCached.Values[0]) && !math.IsNaN(ts.Values[0]) {
|
||||
if ts.Values[0] == f(ts.Values[0], tsCached.Values[0]) {
|
||||
tsStart := mStart[string(bb.B)]
|
||||
if tsStart == nil || math.IsNaN(tsStart.Values[0]) || tsStart.Values[0] != f(ts.Values[0], tsStart.Values[0]) {
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rvs := make([]*timeseries, 0, len(m))
|
||||
for _, ts := range m {
|
||||
rvs = append(rvs, ts)
|
||||
}
|
||||
return rvs
|
||||
return rvs, true
|
||||
}
|
||||
|
||||
// getSumInstantValues calculates tssCached + tssStart - tssEnd
|
||||
|
Loading…
Reference in New Issue
Block a user