app/vmselect/promql: add sum2 and sum2_over_time, geomean and geomean_over_time funcs.

These functions may be useful for statistic calculations.
This commit is contained in:
Aliaksandr Valialkin 2019-06-24 16:17:28 +03:00
parent ccd8b7a003
commit 90e72c2a42
4 changed files with 159 additions and 0 deletions

View File

@ -26,6 +26,8 @@ var aggrFuncs = map[string]aggrFunc{
"median": aggrFuncMedian,
"limitk": aggrFuncLimitK,
"distinct": newAggrFunc(aggrFuncDistinct),
"sum2": newAggrFunc(aggrFuncSum2),
"geomean": newAggrFunc(aggrFuncGeomean),
}
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
@ -140,6 +142,52 @@ func aggrFuncSum(tss []*timeseries) []*timeseries {
return tss[:1]
}
func aggrFuncSum2(tss []*timeseries) []*timeseries {
dst := tss[0]
for i := range dst.Values {
sum2 := float64(0)
count := 0
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
sum2 += v * v
count++
}
if count == 0 {
sum2 = nan
}
dst.Values[i] = sum2
}
return tss[:1]
}
func aggrFuncGeomean(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - nothing to geomean.
return tss
}
dst := tss[0]
for i := range dst.Values {
p := 1.0
count := 0
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
p *= v
count++
}
if count == 0 {
p = nan
}
dst.Values[i] = math.Pow(p, 1/float64(count))
}
return tss[:1]
}
func aggrFuncMin(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - nothing to min.

View File

@ -2195,6 +2195,51 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`geomean(time)`, func(t *testing.T) {
t.Parallel()
q := `geomean(time()/100)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 12, 14, 16, 18, 20},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`geomean_over_time(time)`, func(t *testing.T) {
t.Parallel()
q := `round(geomean_over_time(alias(time()/100, "foobar")[3i]), 0.1)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{6.8, 8.8, 10.9, 12.9, 14.9, 16.9},
Timestamps: timestampsExpected,
}
r.MetricName.MetricGroup = []byte("foobar")
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum2(time)`, func(t *testing.T) {
t.Parallel()
q := `sum2(time()/100)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{100, 144, 196, 256, 324, 400},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum2_over_time(time)`, func(t *testing.T) {
t.Parallel()
q := `sum2_over_time(alias(time()/100, "foobar")[3i])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{155, 251, 371, 515, 683, 875},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum(multi-vector)`, func(t *testing.T) {
t.Parallel()
q := `sum(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss"))`
@ -2206,6 +2251,39 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`geomean(multi-vector)`, func(t *testing.T) {
t.Parallel()
q := `round(geomean(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss")), 0.1)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 11, 11.8, 12.6, 13.4, 14.1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum2(multi-vector)`, func(t *testing.T) {
t.Parallel()
q := `sum2(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss"))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{200, 244, 296, 356, 424, 500},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sqrt(sum2(multi-vector))`, func(t *testing.T) {
t.Parallel()
q := `round(sqrt(sum2(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss"))))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{14, 16, 17, 19, 21, 22},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`avg(multi-vector)`, func(t *testing.T) {
t.Parallel()
q := `avg(label_set(10, "foo", "bar") or label_set(time()/100, "baz", "sss"))`

View File

@ -38,6 +38,8 @@ var rollupFuncs = map[string]newRollupFunc{
"stdvar_over_time": newRollupFuncOneArg(rollupStdvar),
// Additional rollup funcs.
"sum2_over_time": newRollupFuncOneArg(rollupSum2),
"geomean_over_time": newRollupFuncOneArg(rollupGeomean),
"first_over_time": newRollupFuncOneArg(rollupFirst),
"last_over_time": newRollupFuncOneArg(rollupLast),
"distinct_over_time": newRollupFuncOneArg(rollupDistinct),
@ -72,6 +74,7 @@ var rollupFuncsKeepMetricGroup = map[string]bool{
"max_over_time": true,
"quantile_over_time": true,
"rollup": true,
"geomean_over_time": true,
}
func getRollupArgIdx(funcName string) int {
@ -497,6 +500,34 @@ func rollupSum(rfa *rollupFuncArg) float64 {
return sum
}
func rollupSum2(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue * rfa.prevValue
}
var sum2 float64
for _, v := range values {
sum2 += v * v
}
return sum2
}
func rollupGeomean(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
p := 1.0
for _, v := range values {
p *= v
}
return math.Pow(p, 1/float64(len(values)))
}
func rollupCount(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.

View File

@ -203,6 +203,8 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
f("min_over_time", 12)
f("max_over_time", 123)
f("sum_over_time", 565)
f("sum2_over_time", 37951)
f("geomean_over_time", 39.33466603189148)
f("count_over_time", 12)
f("stddev_over_time", 30.752935722554287)
f("stdvar_over_time", 945.7430555555555)