mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
app/vmselect/promql: return lower
and upper
bounds for the estimated percentile from histogram_quantile
if third arg is passed
Updates https://github.com/prometheus/prometheus/issues/5706
This commit is contained in:
parent
557909aa81
commit
c25b97829f
@ -2334,6 +2334,35 @@ func TestExecSuccess(t *testing.T) {
|
|||||||
resultExpected := []netstorage.Result{r}
|
resultExpected := []netstorage.Result{r}
|
||||||
f(q, resultExpected)
|
f(q, resultExpected)
|
||||||
})
|
})
|
||||||
|
t.Run(`histogram_quantile(single-value-valid-le, boundsLabel)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(histogram_quantile(0.6, label_set(100, "le", "200"), "foobar"))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{0, 0, 0, 0, 0, 0},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("foobar"),
|
||||||
|
Value: []byte("lower"),
|
||||||
|
}}
|
||||||
|
r2 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{120, 120, 120, 120, 120, 120},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r3 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{200, 200, 200, 200, 200, 200},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r3.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("foobar"),
|
||||||
|
Value: []byte("upper"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
t.Run(`histogram_quantile(single-value-valid-le-max-phi)`, func(t *testing.T) {
|
t.Run(`histogram_quantile(single-value-valid-le-max-phi)`, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := `histogram_quantile(1, (
|
q := `histogram_quantile(1, (
|
||||||
@ -2471,6 +2500,56 @@ func TestExecSuccess(t *testing.T) {
|
|||||||
resultExpected := []netstorage.Result{r}
|
resultExpected := []netstorage.Result{r}
|
||||||
f(q, resultExpected)
|
f(q, resultExpected)
|
||||||
})
|
})
|
||||||
|
t.Run(`histogram_quantile(normal-bucket-count, boundsLabel)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(histogram_quantile(0.2,
|
||||||
|
label_set(0, "foo", "bar", "le", "10")
|
||||||
|
or label_set(100, "foo", "bar", "le", "30")
|
||||||
|
or label_set(300, "foo", "bar", "le", "+Inf"),
|
||||||
|
"xxx"
|
||||||
|
))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{10, 10, 10, 10, 10, 10},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("xxx"),
|
||||||
|
Value: []byte("lower"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r2 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{22, 22, 22, 22, 22, 22},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r2.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
}}
|
||||||
|
r3 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{30, 30, 30, 30, 30, 30},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r3.MetricName.Tags = []storage.Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("xxx"),
|
||||||
|
Value: []byte("upper"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
t.Run(`histogram_quantile(zero-bucket-count)`, func(t *testing.T) {
|
t.Run(`histogram_quantile(zero-bucket-count)`, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := `histogram_quantile(0.6,
|
q := `histogram_quantile(0.6,
|
||||||
|
@ -400,17 +400,27 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
|
|||||||
|
|
||||||
func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
args := tfa.args
|
args := tfa.args
|
||||||
if err := expectTransformArgsNum(args, 2); err != nil {
|
if len(args) < 2 || len(args) > 3 {
|
||||||
return nil, err
|
return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args))
|
||||||
}
|
}
|
||||||
phis, err := getScalar(args[0], 0)
|
phis, err := getScalar(args[0], 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("cannot parse phi: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert buckets with `vmrange` labels to buckets with `le` labels.
|
// Convert buckets with `vmrange` labels to buckets with `le` labels.
|
||||||
tss := vmrangeBucketsToLE(args[1])
|
tss := vmrangeBucketsToLE(args[1])
|
||||||
|
|
||||||
|
// Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details.
|
||||||
|
var boundsLabel string
|
||||||
|
if len(args) > 2 {
|
||||||
|
s, err := getString(args[2], 2)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %s", err)
|
||||||
|
}
|
||||||
|
boundsLabel = s
|
||||||
|
}
|
||||||
|
|
||||||
// Group metrics by all tags excluding "le"
|
// Group metrics by all tags excluding "le"
|
||||||
type x struct {
|
type x struct {
|
||||||
le float64
|
le float64
|
||||||
@ -453,10 +463,10 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||||||
}
|
}
|
||||||
return nan
|
return nan
|
||||||
}
|
}
|
||||||
quantile := func(i int, phis []float64, xss []x) float64 {
|
quantile := func(i int, phis []float64, xss []x) (q, lower, upper float64) {
|
||||||
phi := phis[i]
|
phi := phis[i]
|
||||||
if math.IsNaN(phi) {
|
if math.IsNaN(phi) {
|
||||||
return nan
|
return nan, nan, nan
|
||||||
}
|
}
|
||||||
// Fix broken buckets.
|
// Fix broken buckets.
|
||||||
// They are already sorted by le, so their values must be in ascending order,
|
// They are already sorted by le, so their values must be in ascending order,
|
||||||
@ -479,13 +489,13 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||||||
xss = xss[:len(xss)-1]
|
xss = xss[:len(xss)-1]
|
||||||
}
|
}
|
||||||
if vLast == 0 || math.IsNaN(vLast) {
|
if vLast == 0 || math.IsNaN(vLast) {
|
||||||
return nan
|
return nan, nan, nan
|
||||||
}
|
}
|
||||||
if phi < 0 {
|
if phi < 0 {
|
||||||
return -inf
|
return -inf, -inf, xss[0].ts.Values[i]
|
||||||
}
|
}
|
||||||
if phi > 1 {
|
if phi > 1 {
|
||||||
return inf
|
return inf, vLast, inf
|
||||||
}
|
}
|
||||||
vReq := vLast * phi
|
vReq := vLast * phi
|
||||||
vPrev = 0
|
vPrev = 0
|
||||||
@ -509,14 +519,17 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if math.IsInf(le, 0) {
|
if math.IsInf(le, 0) {
|
||||||
return lastNonInf(i, xss)
|
vv := lastNonInf(i, xss)
|
||||||
|
return vv, vv, inf
|
||||||
}
|
}
|
||||||
if v == vPrev {
|
if v == vPrev {
|
||||||
return lePrev
|
return lePrev, lePrev, v
|
||||||
}
|
}
|
||||||
return lePrev + (le-lePrev)*(vReq-vPrev)/(v-vPrev)
|
vv := lePrev + (le-lePrev)*(vReq-vPrev)/(v-vPrev)
|
||||||
|
return vv, lePrev, le
|
||||||
}
|
}
|
||||||
return lastNonInf(i, xss)
|
vv := lastNonInf(i, xss)
|
||||||
|
return vv, vv, inf
|
||||||
}
|
}
|
||||||
rvs := make([]*timeseries, 0, len(m))
|
rvs := make([]*timeseries, 0, len(m))
|
||||||
for _, xss := range m {
|
for _, xss := range m {
|
||||||
@ -524,10 +537,30 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||||||
return xss[i].le < xss[j].le
|
return xss[i].le < xss[j].le
|
||||||
})
|
})
|
||||||
dst := xss[0].ts
|
dst := xss[0].ts
|
||||||
|
var tsLower, tsUpper *timeseries
|
||||||
|
if len(boundsLabel) > 0 {
|
||||||
|
tsLower = ×eries{}
|
||||||
|
tsLower.CopyFromShallowTimestamps(dst)
|
||||||
|
tsLower.MetricName.RemoveTag(boundsLabel)
|
||||||
|
tsLower.MetricName.AddTag(boundsLabel, "lower")
|
||||||
|
tsUpper = ×eries{}
|
||||||
|
tsUpper.CopyFromShallowTimestamps(dst)
|
||||||
|
tsUpper.MetricName.RemoveTag(boundsLabel)
|
||||||
|
tsUpper.MetricName.AddTag(boundsLabel, "upper")
|
||||||
|
}
|
||||||
for i := range dst.Values {
|
for i := range dst.Values {
|
||||||
dst.Values[i] = quantile(i, phis, xss)
|
v, lower, upper := quantile(i, phis, xss)
|
||||||
|
dst.Values[i] = v
|
||||||
|
if len(boundsLabel) > 0 {
|
||||||
|
tsLower.Values[i] = lower
|
||||||
|
tsUpper.Values[i] = upper
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rvs = append(rvs, dst)
|
rvs = append(rvs, dst)
|
||||||
|
if len(boundsLabel) > 0 {
|
||||||
|
rvs = append(rvs, tsLower)
|
||||||
|
rvs = append(rvs, tsUpper)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return rvs, nil
|
return rvs, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user