diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index b1ed9b337..f0d2133f3 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -605,6 +605,14 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, rollupResultCacheMiss.Inc() } + // Obtain rollup configs before fetching data from db, + // so type errors can be caught earlier. + sharedTimestamps := getTimestamps(start, ec.End, ec.Step) + preFunc, rcs, err := getRollupConfigs(name, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) + if err != nil { + return nil, err + } + // Fetch the remaining part of the result. tfs := toTagFilters(me.LabelFilters) sq := &storage.SearchQuery{ @@ -629,12 +637,6 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, tss = mergeTimeseries(tssCached, tss, start, ec) return tss, nil } - sharedTimestamps := getTimestamps(start, ec.End, ec.Step) - preFunc, rcs, err := getRollupConfigs(name, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) - if err != nil { - rss.Cancel() - return nil, err - } // Verify timeseries fit available memory after the rollup. // Take into account points from tssCached. diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 646cbf47e..b4d37416a 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -4480,6 +4480,29 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`hoeffding_bound_lower()`, func(t *testing.T) { + t.Parallel() + q := `hoeffding_bound_lower(0.9, rand(0)[:10s])` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.2516770508510652, 0.2830570387745462, 0.27716232108436645, 0.3679356319931767, 0.3168460474120903, 0.23156726248243734}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`hoeffding_bound_upper()`, func(t *testing.T) { + t.Parallel() + q := `hoeffding_bound_upper(0.9, alias(rand(0), "foobar")[:10s])` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.6510581320042821, 0.7261021731890429, 0.7245290097397009, 0.8113950442584258, 0.7736122275568004, 0.6658564048254882}, + Timestamps: timestampsExpected, + } + r.MetricName.MetricGroup = []byte("foobar") + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`aggr_over_time(single-func)`, func(t *testing.T) { t.Parallel() q := `aggr_over_time("increase", rand(0)[:10s])` @@ -5267,6 +5290,12 @@ func TestExecError(t *testing.T) { f(`aggr_over_time()`) f(`aggr_over_time(foo)`) f(`aggr_over_time("foo", bar, 1)`) + f(`hoeffding_bound_lower()`) + f(`hoeffding_bound_lower(1)`) + f(`hoeffding_bound_lower(0.99, foo, 1)`) + f(`hoeffding_bound_upper()`) + f(`hoeffding_bound_upper(1)`) + f(`hoeffding_bound_upper(0.99, foo, 1)`) // Invalid argument type f(`median_over_time({}, 2)`) diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 481d90160..9bc1bf6c0 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -41,30 +41,32 @@ var rollupFuncs = map[string]newRollupFunc{ "absent_over_time": newRollupFuncOneArg(rollupAbsent), // Additional rollup funcs. - "sum2_over_time": newRollupFuncOneArg(rollupSum2), - "geomean_over_time": newRollupFuncOneArg(rollupGeomean), - "first_over_time": newRollupFuncOneArg(rollupFirst), - "last_over_time": newRollupFuncOneArg(rollupLast), - "distinct_over_time": newRollupFuncOneArg(rollupDistinct), - "increases_over_time": newRollupFuncOneArg(rollupIncreases), - "decreases_over_time": newRollupFuncOneArg(rollupDecreases), - "integrate": newRollupFuncOneArg(rollupIntegrate), - "ideriv": newRollupFuncOneArg(rollupIderiv), - "lifetime": newRollupFuncOneArg(rollupLifetime), - "lag": newRollupFuncOneArg(rollupLag), - "scrape_interval": newRollupFuncOneArg(rollupScrapeInterval), - "tmin_over_time": newRollupFuncOneArg(rollupTmin), - "tmax_over_time": newRollupFuncOneArg(rollupTmax), - "share_le_over_time": newRollupShareLE, - "share_gt_over_time": newRollupShareGT, - "histogram_over_time": newRollupFuncOneArg(rollupHistogram), - "rollup": newRollupFuncOneArg(rollupFake), - "rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets - "rollup_deriv": newRollupFuncOneArg(rollupFake), - "rollup_delta": newRollupFuncOneArg(rollupFake), - "rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets - "rollup_candlestick": newRollupFuncOneArg(rollupFake), - "aggr_over_time": newRollupFuncTwoArgs(rollupFake), + "sum2_over_time": newRollupFuncOneArg(rollupSum2), + "geomean_over_time": newRollupFuncOneArg(rollupGeomean), + "first_over_time": newRollupFuncOneArg(rollupFirst), + "last_over_time": newRollupFuncOneArg(rollupLast), + "distinct_over_time": newRollupFuncOneArg(rollupDistinct), + "increases_over_time": newRollupFuncOneArg(rollupIncreases), + "decreases_over_time": newRollupFuncOneArg(rollupDecreases), + "integrate": newRollupFuncOneArg(rollupIntegrate), + "ideriv": newRollupFuncOneArg(rollupIderiv), + "lifetime": newRollupFuncOneArg(rollupLifetime), + "lag": newRollupFuncOneArg(rollupLag), + "scrape_interval": newRollupFuncOneArg(rollupScrapeInterval), + "tmin_over_time": newRollupFuncOneArg(rollupTmin), + "tmax_over_time": newRollupFuncOneArg(rollupTmax), + "share_le_over_time": newRollupShareLE, + "share_gt_over_time": newRollupShareGT, + "histogram_over_time": newRollupFuncOneArg(rollupHistogram), + "rollup": newRollupFuncOneArg(rollupFake), + "rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets + "rollup_deriv": newRollupFuncOneArg(rollupFake), + "rollup_delta": newRollupFuncOneArg(rollupFake), + "rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets + "rollup_candlestick": newRollupFuncOneArg(rollupFake), + "aggr_over_time": newRollupFuncTwoArgs(rollupFake), + "hoeffding_bound_upper": newRollupHoeffdingBoundUpper, + "hoeffding_bound_lower": newRollupHoeffdingBoundLower, } // rollupAggrFuncs are functions that can be passed to `aggr_over_time()` @@ -136,13 +138,15 @@ var rollupFuncsRemoveCounterResets = map[string]bool{ } var rollupFuncsKeepMetricGroup = map[string]bool{ - "default_rollup": true, - "avg_over_time": true, - "min_over_time": true, - "max_over_time": true, - "quantile_over_time": true, - "rollup": true, - "geomean_over_time": true, + "default_rollup": true, + "avg_over_time": true, + "min_over_time": true, + "max_over_time": true, + "quantile_over_time": true, + "rollup": true, + "geomean_over_time": true, + "hoeffding_bound_lower": true, + "hoeffding_bound_upper": true, } func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) { @@ -200,7 +204,8 @@ func getRollupArgIdx(funcName string) int { logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", funcName) } switch funcName { - case "quantile_over_time", "aggr_over_time": + case "quantile_over_time", "aggr_over_time", + "hoeffding_bound_lower", "hoeffding_bound_upper": return 1 default: return 0 @@ -828,6 +833,66 @@ func newRollupShareFilter(args []interface{}, countFilter func(values []float64, return rf, nil } +func newRollupHoeffdingBoundLower(args []interface{}) (rollupFunc, error) { + if err := expectRollupArgsNum(args, 2); err != nil { + return nil, err + } + phis, err := getScalar(args[0], 0) + if err != nil { + return nil, err + } + rf := func(rfa *rollupFuncArg) float64 { + bound, avg := rollupHoeffdingBoundInternal(rfa, phis) + return avg - bound + } + return rf, nil +} + +func newRollupHoeffdingBoundUpper(args []interface{}) (rollupFunc, error) { + if err := expectRollupArgsNum(args, 2); err != nil { + return nil, err + } + phis, err := getScalar(args[0], 0) + if err != nil { + return nil, err + } + rf := func(rfa *rollupFuncArg) float64 { + bound, avg := rollupHoeffdingBoundInternal(rfa, phis) + return avg + bound + } + return rf, nil +} + +func rollupHoeffdingBoundInternal(rfa *rollupFuncArg, phis []float64) (float64, float64) { + // There is no need in handling NaNs here, since they must be cleaned up + // before calling rollup funcs. + values := rfa.values + if len(values) == 0 { + return nan, nan + } + if len(values) == 1 { + return 0, values[0] + } + vMax := rollupMax(rfa) + vMin := rollupMin(rfa) + vAvg := rollupAvg(rfa) + vRange := vMax - vMin + if vRange <= 0 { + return 0, vAvg + } + phi := phis[rfa.idx] + if phi >= 1 { + return inf, vAvg + } + if phi <= 0 { + return 0, vAvg + } + // See https://en.wikipedia.org/wiki/Hoeffding%27s_inequality + // and https://www.youtube.com/watch?v=6UwcqiNsZ8U&feature=youtu.be&t=1237 + bound := vRange * math.Sqrt(math.Log(1/(1-phi))/(2*float64(len(values)))) + return bound, vAvg +} + func newRollupQuantile(args []interface{}) (rollupFunc, error) { if err := expectRollupArgsNum(args, 2); err != nil { return nil, err diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index 41e013aa8..f93c917ab 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -310,6 +310,48 @@ func TestRollupHoltWinters(t *testing.T) { f(0.9, 0.9, 33.99637566941818) } +func TestRollupHoeffdingBoundLower(t *testing.T) { + f := func(phi, vExpected float64) { + t.Helper() + phis := []*timeseries{{ + Values: []float64{phi}, + Timestamps: []int64{123}, + }} + var me metricsql.MetricExpr + args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}} + testRollupFunc(t, "hoeffding_bound_lower", args, &me, vExpected) + } + + f(0.5, 28.21949401521037) + f(-1, 47.083333333333336) + f(0, 47.083333333333336) + f(1, -inf) + f(2, -inf) + f(0.1, 39.72878000047643) + f(0.9, 12.701803086472331) +} + +func TestRollupHoeffdingBoundUpper(t *testing.T) { + f := func(phi, vExpected float64) { + t.Helper() + phis := []*timeseries{{ + Values: []float64{phi}, + Timestamps: []int64{123}, + }} + var me metricsql.MetricExpr + args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}} + testRollupFunc(t, "hoeffding_bound_upper", args, &me, vExpected) + } + + f(0.5, 65.9471726514563) + f(-1, 47.083333333333336) + f(0, 47.083333333333336) + f(1, inf) + f(2, inf) + f(0.1, 54.43788666619024) + f(0.9, 81.46486358019433) +} + func TestRollupNewRollupFuncSuccess(t *testing.T) { f := func(funcName string, vExpected float64) { t.Helper() diff --git a/docs/ExtendedPromQL.md b/docs/ExtendedPromQL.md index a8ebf74c9..7e661cb27 100644 --- a/docs/ExtendedPromQL.md +++ b/docs/ExtendedPromQL.md @@ -103,3 +103,5 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `aggr_over_time(("aggr_func1", "aggr_func2", ...), m[d])` - simultaneously calculates all the listed `aggr_func*` for `m` over `d` time range. `aggr_func*` can contain any functions that accept range vector. For instance, `aggr_over_time(("min_over_time", "max_over_time", "rate"), m[d])` would calculate `min_over_time`, `max_over_time` and `rate` for `m[d]`. +- `hoeffding_bound_upper(phi, m[d])` and `hoeffding_bound_lower(phi, m[d])` - return upper and lower [Hoeffding bounds](https://en.wikipedia.org/wiki/Hoeffding%27s_inequality) + for the given `phi` in the range `[0..1]`. diff --git a/lib/metricsql/rollup.go b/lib/metricsql/rollup.go index 7156f92d7..0100d9a02 100644 --- a/lib/metricsql/rollup.go +++ b/lib/metricsql/rollup.go @@ -29,31 +29,33 @@ var rollupFuncs = map[string]bool{ "absent_over_time": true, // Additional rollup funcs. - "default_rollup": true, - "sum2_over_time": true, - "geomean_over_time": true, - "first_over_time": true, - "last_over_time": true, - "distinct_over_time": true, - "increases_over_time": true, - "decreases_over_time": true, - "integrate": true, - "ideriv": true, - "lifetime": true, - "lag": true, - "scrape_interval": true, - "tmin_over_time": true, - "tmax_over_time": true, - "share_le_over_time": true, - "share_gt_over_time": true, - "histogram_over_time": true, - "rollup": true, - "rollup_rate": true, - "rollup_deriv": true, - "rollup_delta": true, - "rollup_increase": true, - "rollup_candlestick": true, - "aggr_over_time": true, + "default_rollup": true, + "sum2_over_time": true, + "geomean_over_time": true, + "first_over_time": true, + "last_over_time": true, + "distinct_over_time": true, + "increases_over_time": true, + "decreases_over_time": true, + "integrate": true, + "ideriv": true, + "lifetime": true, + "lag": true, + "scrape_interval": true, + "tmin_over_time": true, + "tmax_over_time": true, + "share_le_over_time": true, + "share_gt_over_time": true, + "histogram_over_time": true, + "rollup": true, + "rollup_rate": true, + "rollup_deriv": true, + "rollup_delta": true, + "rollup_increase": true, + "rollup_candlestick": true, + "aggr_over_time": true, + "hoeffding_bound_upper": true, + "hoeffding_bound_lower": true, } // IsRollupFunc returns whether funcName is known rollup function.