app/vmselect/promql: attempt to repair invalid bucket counts passed to histogram_quantile

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/136
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/154
This commit is contained in:
Aliaksandr Valialkin 2019-08-22 14:38:21 +03:00
parent 5f33fc8e46
commit 1272e407b2
2 changed files with 95 additions and 19 deletions

View File

@ -2198,21 +2198,78 @@ func TestExecSuccess(t *testing.T) {
}) })
t.Run(`histogram_quantile(negative-bucket-count)`, func(t *testing.T) { t.Run(`histogram_quantile(negative-bucket-count)`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `sort(histogram_quantile(0.6, q := `histogram_quantile(0.6,
label_set(90, "foo", "bar", "le", "10") label_set(90, "foo", "bar", "le", "10")
or label_set(-100, "foo", "bar", "le", "30") or label_set(-100, "foo", "bar", "le", "30")
or label_set(300, "foo", "bar", "le", "+Inf") or label_set(300, "foo", "bar", "le", "+Inf")
))` )`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{30, 30, 30, 30, 30, 30},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_quantile(nan-bucket-count)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0.6,
label_set(90, "foo", "bar", "le", "10")
or label_set(NaN, "foo", "bar", "le", "30")
or label_set(300, "foo", "bar", "le", "+Inf")
)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{30, 30, 30, 30, 30, 30},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_quantile(nan-bucket-count)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0.2,
label_set(0, "foo", "bar", "le", "10")
or label_set(100, "foo", "bar", "le", "30")
or label_set(300, "foo", "bar", "le", "+Inf")
)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{22, 22, 22, 22, 22, 22},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_quantile(zero-bucket-count)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0.6,
label_set(0, "foo", "bar", "le", "10")
or label_set(0, "foo", "bar", "le", "30")
or label_set(0, "foo", "bar", "le", "+Inf")
)`
resultExpected := []netstorage.Result{} resultExpected := []netstorage.Result{}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`histogram_quantile(nan-bucket-count)`, func(t *testing.T) { t.Run(`histogram_quantile(nan-bucket-count)`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `sort(histogram_quantile(0.6, q := `histogram_quantile(0.6,
label_set(90, "foo", "bar", "le", "10") label_set(nan, "foo", "bar", "le", "10")
or label_set(NaN, "foo", "bar", "le", "30") or label_set(nan, "foo", "bar", "le", "30")
or label_set(300, "foo", "bar", "le", "+Inf") or label_set(nan, "foo", "bar", "le", "+Inf")
))` )`
resultExpected := []netstorage.Result{} resultExpected := []netstorage.Result{}
f(q, resultExpected) f(q, resultExpected)
}) })

View File

@ -309,8 +309,16 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
bbPool.Put(bb) bbPool.Put(bb)
// Calculate quantile for each group in m // Calculate quantile for each group in m
lastNonInf := func(xss []x) float64 {
for len(xss) > 0 && math.IsInf(xss[len(xss)-1].le, 0) { lastNonInf := func(i int, xss []x) float64 {
for len(xss) > 0 {
xsLast := xss[len(xss)-1]
if xsLast.ts.Values[i] == 0 {
return nan
}
if !math.IsInf(xsLast.le, 0) {
break
}
xss = xss[:len(xss)-1] xss = xss[:len(xss)-1]
} }
if len(xss) == 0 { if len(xss) == 0 {
@ -319,27 +327,38 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
return xss[len(xss)-1].le return xss[len(xss)-1].le
} }
quantile := func(i int, phis []float64, xss []x) float64 { quantile := func(i int, phis []float64, xss []x) float64 {
vPrev := float64(0)
lePrev := float64(0)
phi := phis[i] phi := phis[i]
if math.IsNaN(phi) { if math.IsNaN(phi) {
return nan return nan
} }
// Verify for broken buckets with NaN or negative values. // Fix broken buckets.
// They are already sorted by le, so their values must be in ascending order,
// since the next bucket value includes all the previous buckets.
vPrev := float64(0)
for _, xs := range xss { for _, xs := range xss {
v := xs.ts.Values[i] v := xs.ts.Values[i]
if math.IsNaN(v) || v < 0 { if math.IsNaN(v) || v < vPrev {
// Broken bucket. xs.ts.Values[i] = vPrev
return nan } else {
vPrev = v
} }
} }
if len(xss) == 0 {
return nan
}
if phi < 0 { if phi < 0 {
return -inf return -inf
} }
if phi > 1 { if phi > 1 {
return inf return inf
} }
vReq := xss[len(xss)-1].ts.Values[i] * phi vLast := xss[len(xss)-1].ts.Values[i]
if vLast == 0 {
return nan
}
vReq := vLast * phi
vPrev = 0
lePrev := float64(0)
for _, xs := range xss { for _, xs := range xss {
v := xs.ts.Values[i] v := xs.ts.Values[i]
le := xs.le le := xs.le
@ -349,16 +368,16 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
continue continue
} }
if math.IsInf(le, 0) { if math.IsInf(le, 0) {
return lastNonInf(xss) return lastNonInf(i, xss)
} }
if v == vPrev { if v == vPrev {
return lePrev return lePrev
} }
return lePrev + (le-lePrev)*(vReq-vPrev)/(v-vPrev) return lePrev + (le-lePrev)*(vReq-vPrev)/(v-vPrev)
} }
return lastNonInf(xss) return lastNonInf(i, xss)
} }
var rvs []*timeseries rvs := make([]*timeseries, 0, len(m))
for _, xss := range m { for _, xss := range m {
sort.Slice(xss, func(i, j int) bool { sort.Slice(xss, func(i, j int) bool {
return xss[i].le < xss[j].le return xss[i].le < xss[j].le