app/vmselect/promql: move common code from aggrFuncOutliersK and newAggrFuncRangeTopK into getRangeTopKTimeseries

This commit is contained in:
Aliaksandr Valialkin 2020-05-19 16:10:52 +03:00
parent f52769f6ee
commit 538fdfe133

View File

@ -484,11 +484,6 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
}
}
type tsWithValue struct {
ts *timeseries
value float64
}
func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc {
return func(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
@ -500,34 +495,42 @@ func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggr
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
maxs := make([]tsWithValue, len(tss))
for i, ts := range tss {
value := f(ts.Values)
maxs[i] = tsWithValue{
ts: ts,
value: value,
}
}
sort.Slice(maxs, func(i, j int) bool {
a := maxs[i].value
b := maxs[j].value
if isReverse {
a, b = b, a
}
return lessWithNaNs(a, b)
})
for i := range maxs {
tss[i] = maxs[i].ts
}
for i, k := range ks {
fillNaNsAtIdx(i, k, tss)
}
return removeNaNs(tss)
return getRangeTopKTimeseries(tss, ks, f, isReverse)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
}
func getRangeTopKTimeseries(tss []*timeseries, ks []float64, f func(values []float64) float64, isReverse bool) []*timeseries {
type tsWithValue struct {
ts *timeseries
value float64
}
maxs := make([]tsWithValue, len(tss))
for i, ts := range tss {
value := f(ts.Values)
maxs[i] = tsWithValue{
ts: ts,
value: value,
}
}
sort.Slice(maxs, func(i, j int) bool {
a := maxs[i].value
b := maxs[j].value
if isReverse {
a, b = b, a
}
return lessWithNaNs(a, b)
})
for i := range maxs {
tss[i] = maxs[i].ts
}
for i, k := range ks {
fillNaNsAtIdx(i, k, tss)
}
return removeNaNs(tss)
}
func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) {
if math.IsNaN(k) {
k = 0
@ -623,38 +626,16 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
}
histogram.PutFast(h)
// Calculate variation-like value for each tss.
type variation struct {
sum2 float64
ts *timeseries
}
variations := make([]variation, len(tss))
for i, ts := range tss {
// Return topK time series with the highest variance from median.
f := func(values []float64) float64 {
sum2 := float64(0)
for n, v := range ts.Values {
for n, v := range values {
d := v - medians[n]
sum2 += d * d
}
variations[i] = variation{
sum2: sum2,
ts: ts,
}
return sum2
}
// Sort variations by sum2.
sort.Slice(variations, func(i, j int) bool {
a, b := variations[i], variations[j]
return lessWithNaNs(a.sum2, b.sum2)
})
// Return only up to k time series with the highest variation.
for i := range variations {
tss[i] = variations[i].ts
}
for i, k := range ks {
fillNaNsAtIdx(i, k, tss)
}
return removeNaNs(tss)
return getRangeTopKTimeseries(tss, ks, f, false)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}