diff --git a/CHANGELOG.md b/CHANGELOG.md index e3671e5dc1..eff96ea6f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * FEATURE: improve time series search for queries with multiple label filters. I.e. `foo{label1="value", label2=~"regexp"}`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/781 * FEATURE: vmalert: add `-dryRun` command-line option for validating the provided config files without the need to start `vmalert` service. +* FEATURE: accept optional third argument of string type at `topk_*` and `bottomk_*` functions. This is label name for additional time series to return with the sum of time series outside top/bottom K. See [MetricsQL docs](https://victoriametrics.github.io/MetricsQL.html) for more details. * BUGFIX: vmagent: properly handle OpenStack endpoint ending with `v3.0` such as `https://ostack.example.com:5000/v3.0` in the same way as Prometheus does. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/728#issuecomment-709914803 diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index b87b34a64a..050a8cbd48 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -69,7 +69,9 @@ func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc { if err != nil { return nil, err } - return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false) + return aggrFuncExt(func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries { + return afe(tss) + }, tss, &afa.ae.Modifier, afa.ae.Limit, false) } } @@ -98,7 +100,8 @@ func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.Modifie } } -func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeseries, modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) { +func aggrFuncExt(afe func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries, argOrig []*timeseries, + modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) { arg := copyTimeseriesMetricNames(argOrig, keepOriginal) // Perform grouping. @@ -124,7 +127,7 @@ func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeserie dstTssCount := 0 rvs := make([]*timeseries, 0, len(m)) for _, tss := range m { - rv := afe(tss) + rv := afe(tss, modifier) rvs = append(rvs, rv...) srcTssCount += len(tss) dstTssCount += len(rv) @@ -141,7 +144,7 @@ func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) { if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { return tss[:1] } limit := afa.ae.Limit @@ -178,10 +181,11 @@ func aggrFuncSum(tss []*timeseries) []*timeseries { sum := float64(0) count := 0 for _, ts := range tss { - if math.IsNaN(ts.Values[i]) { + v := ts.Values[i] + if math.IsNaN(v) { continue } - sum += ts.Values[i] + sum += v count++ } if count == 0 { @@ -449,7 +453,7 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) { if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { for i := range tss[0].Values { // Calculate avg and stddev for tss points at position i. // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation @@ -550,7 +554,7 @@ func aggrFuncCountValues(afa *aggrFuncArg) ([]*timeseries, error) { // Do nothing } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries { m := make(map[float64]bool) for _, ts := range tss { for _, v := range ts.Values { @@ -602,7 +606,7 @@ func newAggrFuncTopK(isReverse bool) aggrFunc { if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries { for n := range tss[0].Values { sort.Slice(tss, func(i, j int) bool { a := tss[i].Values[n] @@ -623,21 +627,32 @@ func newAggrFuncTopK(isReverse bool) aggrFunc { func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc { return func(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args - if err := expectTransformArgsNum(args, 2); err != nil { - return nil, err + if len(args) < 2 { + return nil, fmt.Errorf(`unexpected number of args; got %d; want at least %d`, len(args), 2) + } + if len(args) > 3 { + return nil, fmt.Errorf(`unexpected number of args; got %d; want no more than %d`, len(args), 3) } ks, err := getScalar(args[0], 0) if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { - return getRangeTopKTimeseries(tss, ks, f, isReverse) + remainingSumTagName := "" + if len(args) == 3 { + remainingSumTagName, err = getString(args[2], 2) + if err != nil { + return nil, err + } + } + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { + return getRangeTopKTimeseries(tss, modifier, ks, remainingSumTagName, f, isReverse) } return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) } } -func getRangeTopKTimeseries(tss []*timeseries, ks []float64, f func(values []float64) float64, isReverse bool) []*timeseries { +func getRangeTopKTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr, ks []float64, remainingSumTagName string, + f func(values []float64) float64, isReverse bool) []*timeseries { type tsWithValue struct { ts *timeseries value float64 @@ -661,28 +676,66 @@ func getRangeTopKTimeseries(tss []*timeseries, ks []float64, f func(values []flo for i := range maxs { tss[i] = maxs[i].ts } + remainingSumTS := getRemainingSumTimeseries(tss, modifier, ks, remainingSumTagName) for i, k := range ks { fillNaNsAtIdx(i, k, tss) } + if remainingSumTS != nil { + tss = append(tss, remainingSumTS) + } return removeNaNs(tss) } +func getRemainingSumTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr, ks []float64, remainingSumTagName string) *timeseries { + if len(remainingSumTagName) == 0 || len(tss) == 0 { + return nil + } + var dst timeseries + dst.CopyFromShallowTimestamps(tss[0]) + removeGroupTags(&dst.MetricName, modifier) + dst.MetricName.RemoveTag(remainingSumTagName) + dst.MetricName.AddTag(remainingSumTagName, remainingSumTagName) + for i, k := range ks { + kn := getIntK(k, len(tss)) + var sum float64 + count := 0 + for _, ts := range tss[:len(tss)-kn] { + v := ts.Values[i] + if math.IsNaN(v) { + continue + } + sum += v + count++ + } + if count == 0 { + sum = nan + } + dst.Values[i] = sum + } + return &dst +} + func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) { - if math.IsNaN(k) { - k = 0 - } - kn := int(k) - if kn < 0 { - kn = 0 - } - if kn > len(tss) { - kn = len(tss) - } + kn := getIntK(k, len(tss)) for _, ts := range tss[:len(tss)-kn] { ts.Values[idx] = nan } } +func getIntK(k float64, kMax int) int { + if math.IsNaN(k) { + return 0 + } + kn := int(k) + if kn < 0 { + return 0 + } + if kn > kMax { + return kMax + } + return kn +} + func minValue(values []float64) float64 { if len(values) == 0 { return nan @@ -746,7 +799,7 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) { if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { // Calculate medians for each point across tss. medians := make([]float64, len(ks)) h := histogram.GetFast() @@ -771,7 +824,7 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) { } return sum2 } - return getRangeTopKTimeseries(tss, ks, f, false) + return getRangeTopKTimeseries(tss, &afa.ae.Modifier, ks, "", f, false) } return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) } @@ -792,7 +845,7 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { maxK = k } } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { if len(tss) > maxK { tss = tss[:maxK] } @@ -833,8 +886,8 @@ func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) { return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false) } -func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries { - return func(tss []*timeseries) []*timeseries { +func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { + return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { dst := tss[0] h := histogram.GetFast() defer histogram.PutFast(h) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 4b05851756..e1e51653d5 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -4203,7 +4203,7 @@ func TestExecSuccess(t *testing.T) { }) t.Run(`topk_max(1)`, func(t *testing.T) { t.Parallel() - q := `sort(topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + q := `topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))` r1 := netstorage.Result{ MetricName: metricNameExpected, Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, @@ -4216,6 +4216,84 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r1} f(q, resultExpected) }) + t.Run(`topk_max(1, remaining_sum)`, func(t *testing.T) { + t.Parallel() + q := `sort_desc(topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"), "remaining_sum"))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, 10, 10, 10}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("remaining_sum"), + Value: []byte("remaining_sum"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) + t.Run(`topk_max(2, remaining_sum)`, func(t *testing.T) { + t.Parallel() + q := `sort_desc(topk_max(2, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"), "remaining_sum"))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, 10, 10, 10}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) + t.Run(`topk_max(3, remaining_sum)`, func(t *testing.T) { + t.Parallel() + q := `sort_desc(topk_max(3, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"), "remaining_sum"))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, 10, 10, 10}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) t.Run(`bottomk_max(1)`, func(t *testing.T) { t.Parallel() q := `sort(bottomk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index c9c085019a..c1f3a9872c 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -113,6 +113,7 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `bottomk_max(k, q)` - returns bottom K time series with the min maximums on the given time range - `bottomk_avg(k, q)` - returns bottom K time series with the min averages on the given time range - `bottomk_median(k, q)` - returns bottom K time series with the min medians on the given time range + All the `topk_*` and `bottomk_*` functions accept optional third argument - label name for the sum of the remaining time series outside top K or bottom K time series. For example, `topk_max(3, process_resident_memory_bytes, "remaining_sum")` would return up to 3 time series with the maximum value for `process_resident_memory_bytes` plus fourth time series with the sum of the remaining time series if any. The fourth time series will contain `remaining_sum="remaining_sum"` additional label. - `share_le_over_time(m[d], le)` - returns share (in the range 0..1) of values in `m` over `d`, which are smaller or equal to `le`. Useful for calculating SLI and SLO. Example: `share_le_over_time(memory_usage_bytes[24h], 100*1024*1024)` returns the share of time series values for the last 24 hours when memory usage was below or equal to 100MB. - `share_gt_over_time(m[d], gt)` - returns share (in the range 0..1) of values in `m` over `d`, which are bigger than `gt`. Useful for calculating SLI and SLO.