diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index bce21a095..2890ef349 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -26,6 +26,8 @@ var aggrFuncs = map[string]aggrFunc{ "median": aggrFuncMedian, "limitk": aggrFuncLimitK, "distinct": newAggrFunc(aggrFuncDistinct), + "sum2": newAggrFunc(aggrFuncSum2), + "geomean": newAggrFunc(aggrFuncGeomean), } type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error) @@ -140,6 +142,52 @@ func aggrFuncSum(tss []*timeseries) []*timeseries { return tss[:1] } +func aggrFuncSum2(tss []*timeseries) []*timeseries { + dst := tss[0] + for i := range dst.Values { + sum2 := float64(0) + count := 0 + for _, ts := range tss { + v := ts.Values[i] + if math.IsNaN(v) { + continue + } + sum2 += v * v + count++ + } + if count == 0 { + sum2 = nan + } + dst.Values[i] = sum2 + } + return tss[:1] +} + +func aggrFuncGeomean(tss []*timeseries) []*timeseries { + if len(tss) == 1 { + // Fast path - nothing to geomean. + return tss + } + dst := tss[0] + for i := range dst.Values { + p := 1.0 + count := 0 + for _, ts := range tss { + v := ts.Values[i] + if math.IsNaN(v) { + continue + } + p *= v + count++ + } + if count == 0 { + p = nan + } + dst.Values[i] = math.Pow(p, 1/float64(count)) + } + return tss[:1] +} + func aggrFuncMin(tss []*timeseries) []*timeseries { if len(tss) == 1 { // Fast path - nothing to min. diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index f12ca25d0..50e4a7b5f 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -2195,6 +2195,51 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`geomean(time)`, func(t *testing.T) { + t.Parallel() + q := `geomean(time()/100)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 12, 14, 16, 18, 20}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`geomean_over_time(time)`, func(t *testing.T) { + t.Parallel() + q := `round(geomean_over_time(alias(time()/100, "foobar")[3i]), 0.1)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{6.8, 8.8, 10.9, 12.9, 14.9, 16.9}, + Timestamps: timestampsExpected, + } + r.MetricName.MetricGroup = []byte("foobar") + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`sum2(time)`, func(t *testing.T) { + t.Parallel() + q := `sum2(time()/100)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{100, 144, 196, 256, 324, 400}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`sum2_over_time(time)`, func(t *testing.T) { + t.Parallel() + q := `sum2_over_time(alias(time()/100, "foobar")[3i])` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{155, 251, 371, 515, 683, 875}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`sum(multi-vector)`, func(t *testing.T) { t.Parallel() q := `sum(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss"))` @@ -2206,6 +2251,39 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`geomean(multi-vector)`, func(t *testing.T) { + t.Parallel() + q := `round(geomean(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss")), 0.1)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 11, 11.8, 12.6, 13.4, 14.1}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`sum2(multi-vector)`, func(t *testing.T) { + t.Parallel() + q := `sum2(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss"))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{200, 244, 296, 356, 424, 500}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`sqrt(sum2(multi-vector))`, func(t *testing.T) { + t.Parallel() + q := `round(sqrt(sum2(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss"))))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{14, 16, 17, 19, 21, 22}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`avg(multi-vector)`, func(t *testing.T) { t.Parallel() q := `avg(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss"))` diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 667c7c724..2a170d57d 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -38,6 +38,8 @@ var rollupFuncs = map[string]newRollupFunc{ "stdvar_over_time": newRollupFuncOneArg(rollupStdvar), // Additional rollup funcs. + "sum2_over_time": newRollupFuncOneArg(rollupSum2), + "geomean_over_time": newRollupFuncOneArg(rollupGeomean), "first_over_time": newRollupFuncOneArg(rollupFirst), "last_over_time": newRollupFuncOneArg(rollupLast), "distinct_over_time": newRollupFuncOneArg(rollupDistinct), @@ -72,6 +74,7 @@ var rollupFuncsKeepMetricGroup = map[string]bool{ "max_over_time": true, "quantile_over_time": true, "rollup": true, + "geomean_over_time": true, } func getRollupArgIdx(funcName string) int { @@ -497,6 +500,34 @@ func rollupSum(rfa *rollupFuncArg) float64 { return sum } +func rollupSum2(rfa *rollupFuncArg) float64 { + // There is no need in handling NaNs here, since they must be cleaned up + // before calling rollup funcs. + values := rfa.values + if len(values) == 0 { + return rfa.prevValue * rfa.prevValue + } + var sum2 float64 + for _, v := range values { + sum2 += v * v + } + return sum2 +} + +func rollupGeomean(rfa *rollupFuncArg) float64 { + // There is no need in handling NaNs here, since they must be cleaned up + // before calling rollup funcs. + values := rfa.values + if len(values) == 0 { + return rfa.prevValue + } + p := 1.0 + for _, v := range values { + p *= v + } + return math.Pow(p, 1/float64(len(values))) +} + func rollupCount(rfa *rollupFuncArg) float64 { // There is no need in handling NaNs here, since they must be cleaned up // before calling rollup funcs. diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index 202aa1879..f1b9d55c0 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -203,6 +203,8 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) { f("min_over_time", 12) f("max_over_time", 123) f("sum_over_time", 565) + f("sum2_over_time", 37951) + f("geomean_over_time", 39.33466603189148) f("count_over_time", 12) f("stddev_over_time", 30.752935722554287) f("stdvar_over_time", 945.7430555555555)