app/vmselect/promql: move common code from aggrFuncOutliersK and newAggrFuncRangeTopK into getRangeTopKTimeseries

This commit is contained in:
Aliaksandr Valialkin 2020-05-19 16:10:52 +03:00
parent 37068064dd
commit 7d46dd452a

View File

@ -484,11 +484,6 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
} }
} }
type tsWithValue struct {
ts *timeseries
value float64
}
func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc { func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc {
return func(afa *aggrFuncArg) ([]*timeseries, error) { return func(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args args := afa.args
@ -500,6 +495,17 @@ func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggr
return nil, err return nil, err
} }
afe := func(tss []*timeseries) []*timeseries { afe := func(tss []*timeseries) []*timeseries {
return getRangeTopKTimeseries(tss, ks, f, isReverse)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
}
func getRangeTopKTimeseries(tss []*timeseries, ks []float64, f func(values []float64) float64, isReverse bool) []*timeseries {
type tsWithValue struct {
ts *timeseries
value float64
}
maxs := make([]tsWithValue, len(tss)) maxs := make([]tsWithValue, len(tss))
for i, ts := range tss { for i, ts := range tss {
value := f(ts.Values) value := f(ts.Values)
@ -524,9 +530,6 @@ func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggr
} }
return removeNaNs(tss) return removeNaNs(tss)
} }
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
}
func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) { func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) {
if math.IsNaN(k) { if math.IsNaN(k) {
@ -623,38 +626,16 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
} }
histogram.PutFast(h) histogram.PutFast(h)
// Calculate variation-like value for each tss. // Return topK time series with the highest variance from median.
type variation struct { f := func(values []float64) float64 {
sum2 float64
ts *timeseries
}
variations := make([]variation, len(tss))
for i, ts := range tss {
sum2 := float64(0) sum2 := float64(0)
for n, v := range ts.Values { for n, v := range values {
d := v - medians[n] d := v - medians[n]
sum2 += d * d sum2 += d * d
} }
variations[i] = variation{ return sum2
sum2: sum2,
ts: ts,
} }
} return getRangeTopKTimeseries(tss, ks, f, false)
// Sort variations by sum2.
sort.Slice(variations, func(i, j int) bool {
a, b := variations[i], variations[j]
return lessWithNaNs(a.sum2, b.sum2)
})
// Return only up to k time series with the highest variation.
for i := range variations {
tss[i] = variations[i].ts
}
for i, k := range ks {
fillNaNsAtIdx(i, k, tss)
}
return removeNaNs(tss)
} }
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
} }