app/vmselect/promql: allow passing optional third argument to topk_* and bottomk_* functions in order to obtain sum of time series outside top/bottom K

This commit is contained in:
Aliaksandr Valialkin 2020-10-20 19:41:48 +03:00
parent 7599e5c835
commit c4464594b7
4 changed files with 163 additions and 30 deletions

View File

@ -10,6 +10,7 @@
* FEATURE: improve time series search for queries with multiple label filters. I.e. `foo{label1="value", label2=~"regexp"}`.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/781
* FEATURE: vmalert: add `-dryRun` command-line option for validating the provided config files without the need to start `vmalert` service.
* FEATURE: accept optional third argument of string type at `topk_*` and `bottomk_*` functions. This is label name for additional time series to return with the sum of time series outside top/bottom K. See [MetricsQL docs](https://victoriametrics.github.io/MetricsQL.html) for more details.
* BUGFIX: vmagent: properly handle OpenStack endpoint ending with `v3.0` such as `https://ostack.example.com:5000/v3.0`
in the same way as Prometheus does. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/728#issuecomment-709914803

View File

@ -69,7 +69,9 @@ func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc {
if err != nil {
return nil, err
}
return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false)
return aggrFuncExt(func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries {
return afe(tss)
}, tss, &afa.ae.Modifier, afa.ae.Limit, false)
}
}
@ -98,7 +100,8 @@ func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.Modifie
}
}
func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeseries, modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) {
func aggrFuncExt(afe func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries, argOrig []*timeseries,
modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) {
arg := copyTimeseriesMetricNames(argOrig, keepOriginal)
// Perform grouping.
@ -124,7 +127,7 @@ func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeserie
dstTssCount := 0
rvs := make([]*timeseries, 0, len(m))
for _, tss := range m {
rv := afe(tss)
rv := afe(tss, modifier)
rvs = append(rvs, rv...)
srcTssCount += len(tss)
dstTssCount += len(rv)
@ -141,7 +144,7 @@ func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) {
if err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
return tss[:1]
}
limit := afa.ae.Limit
@ -178,10 +181,11 @@ func aggrFuncSum(tss []*timeseries) []*timeseries {
sum := float64(0)
count := 0
for _, ts := range tss {
if math.IsNaN(ts.Values[i]) {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
sum += ts.Values[i]
sum += v
count++
}
if count == 0 {
@ -449,7 +453,7 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
if err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
for i := range tss[0].Values {
// Calculate avg and stddev for tss points at position i.
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
@ -550,7 +554,7 @@ func aggrFuncCountValues(afa *aggrFuncArg) ([]*timeseries, error) {
// Do nothing
}
afe := func(tss []*timeseries) []*timeseries {
afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries {
m := make(map[float64]bool)
for _, ts := range tss {
for _, v := range ts.Values {
@ -602,7 +606,7 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
if err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries {
for n := range tss[0].Values {
sort.Slice(tss, func(i, j int) bool {
a := tss[i].Values[n]
@ -623,21 +627,32 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc {
return func(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err
if len(args) < 2 {
return nil, fmt.Errorf(`unexpected number of args; got %d; want at least %d`, len(args), 2)
}
if len(args) > 3 {
return nil, fmt.Errorf(`unexpected number of args; got %d; want no more than %d`, len(args), 3)
}
ks, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
return getRangeTopKTimeseries(tss, ks, f, isReverse)
remainingSumTagName := ""
if len(args) == 3 {
remainingSumTagName, err = getString(args[2], 2)
if err != nil {
return nil, err
}
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
return getRangeTopKTimeseries(tss, modifier, ks, remainingSumTagName, f, isReverse)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
}
func getRangeTopKTimeseries(tss []*timeseries, ks []float64, f func(values []float64) float64, isReverse bool) []*timeseries {
func getRangeTopKTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr, ks []float64, remainingSumTagName string,
f func(values []float64) float64, isReverse bool) []*timeseries {
type tsWithValue struct {
ts *timeseries
value float64
@ -661,28 +676,66 @@ func getRangeTopKTimeseries(tss []*timeseries, ks []float64, f func(values []flo
for i := range maxs {
tss[i] = maxs[i].ts
}
remainingSumTS := getRemainingSumTimeseries(tss, modifier, ks, remainingSumTagName)
for i, k := range ks {
fillNaNsAtIdx(i, k, tss)
}
if remainingSumTS != nil {
tss = append(tss, remainingSumTS)
}
return removeNaNs(tss)
}
func getRemainingSumTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr, ks []float64, remainingSumTagName string) *timeseries {
if len(remainingSumTagName) == 0 || len(tss) == 0 {
return nil
}
var dst timeseries
dst.CopyFromShallowTimestamps(tss[0])
removeGroupTags(&dst.MetricName, modifier)
dst.MetricName.RemoveTag(remainingSumTagName)
dst.MetricName.AddTag(remainingSumTagName, remainingSumTagName)
for i, k := range ks {
kn := getIntK(k, len(tss))
var sum float64
count := 0
for _, ts := range tss[:len(tss)-kn] {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
sum += v
count++
}
if count == 0 {
sum = nan
}
dst.Values[i] = sum
}
return &dst
}
func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) {
if math.IsNaN(k) {
k = 0
}
kn := int(k)
if kn < 0 {
kn = 0
}
if kn > len(tss) {
kn = len(tss)
}
kn := getIntK(k, len(tss))
for _, ts := range tss[:len(tss)-kn] {
ts.Values[idx] = nan
}
}
func getIntK(k float64, kMax int) int {
if math.IsNaN(k) {
return 0
}
kn := int(k)
if kn < 0 {
return 0
}
if kn > kMax {
return kMax
}
return kn
}
func minValue(values []float64) float64 {
if len(values) == 0 {
return nan
@ -746,7 +799,7 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
if err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
// Calculate medians for each point across tss.
medians := make([]float64, len(ks))
h := histogram.GetFast()
@ -771,7 +824,7 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
}
return sum2
}
return getRangeTopKTimeseries(tss, ks, f, false)
return getRangeTopKTimeseries(tss, &afa.ae.Modifier, ks, "", f, false)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
@ -792,7 +845,7 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
maxK = k
}
}
afe := func(tss []*timeseries) []*timeseries {
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
if len(tss) > maxK {
tss = tss[:maxK]
}
@ -833,8 +886,8 @@ func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) {
return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false)
}
func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries {
return func(tss []*timeseries) []*timeseries {
func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
dst := tss[0]
h := histogram.GetFast()
defer histogram.PutFast(h)

View File

@ -4203,7 +4203,7 @@ func TestExecSuccess(t *testing.T) {
})
t.Run(`topk_max(1)`, func(t *testing.T) {
t.Parallel()
q := `sort(topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
q := `topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334},
@ -4216,6 +4216,84 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1}
f(q, resultExpected)
})
t.Run(`topk_max(1, remaining_sum)`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"), "remaining_sum"))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("baz"),
Value: []byte("sss"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 10, 10, 10, 10, 10},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("remaining_sum"),
Value: []byte("remaining_sum"),
},
}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`topk_max(2, remaining_sum)`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(topk_max(2, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"), "remaining_sum"))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("baz"),
Value: []byte("sss"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 10, 10, 10, 10, 10},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`topk_max(3, remaining_sum)`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(topk_max(3, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"), "remaining_sum"))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("baz"),
Value: []byte("sss"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 10, 10, 10, 10, 10},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`bottomk_max(1)`, func(t *testing.T) {
t.Parallel()
q := `sort(bottomk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`

View File

@ -113,6 +113,7 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `bottomk_max(k, q)` - returns bottom K time series with the min maximums on the given time range
- `bottomk_avg(k, q)` - returns bottom K time series with the min averages on the given time range
- `bottomk_median(k, q)` - returns bottom K time series with the min medians on the given time range
All the `topk_*` and `bottomk_*` functions accept optional third argument - label name for the sum of the remaining time series outside top K or bottom K time series. For example, `topk_max(3, process_resident_memory_bytes, "remaining_sum")` would return up to 3 time series with the maximum value for `process_resident_memory_bytes` plus fourth time series with the sum of the remaining time series if any. The fourth time series will contain `remaining_sum="remaining_sum"` additional label.
- `share_le_over_time(m[d], le)` - returns share (in the range 0..1) of values in `m` over `d`, which are smaller or equal to `le`. Useful for calculating SLI and SLO.
Example: `share_le_over_time(memory_usage_bytes[24h], 100*1024*1024)` returns the share of time series values for the last 24 hours when memory usage was below or equal to 100MB.
- `share_gt_over_time(m[d], gt)` - returns share (in the range 0..1) of values in `m` over `d`, which are bigger than `gt`. Useful for calculating SLI and SLO.