app/vmselect/promql: reset timeseries name on group_left and group_right as Prometheus does

This commit is contained in:
Aliaksandr Valialkin 2019-09-03 20:42:45 +03:00
parent 458d412bb6
commit b08f085082
2 changed files with 80 additions and 6 deletions

View File

@ -322,6 +322,7 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
} }
src := tssRight[0] src := tssRight[0]
for _, ts := range tssLeft { for _, ts := range tssLeft {
resetMetricGroupIfRequired(be, ts)
ts.MetricName.AddMissingTags(joinTags, &src.MetricName) ts.MetricName.AddMissingTags(joinTags, &src.MetricName)
rvsLeft = append(rvsLeft, ts) rvsLeft = append(rvsLeft, ts)
rvsRight = append(rvsRight, src) rvsRight = append(rvsRight, src)
@ -332,6 +333,7 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
} }
src := tssLeft[0] src := tssLeft[0]
for _, ts := range tssRight { for _, ts := range tssRight {
resetMetricGroupIfRequired(be, ts)
ts.MetricName.AddMissingTags(joinTags, &src.MetricName) ts.MetricName.AddMissingTags(joinTags, &src.MetricName)
rvsLeft = append(rvsLeft, src) rvsLeft = append(rvsLeft, src)
rvsRight = append(rvsRight, ts) rvsRight = append(rvsRight, ts)

View File

@ -1869,9 +1869,9 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`vector * on(foo) group_left() duplicate_timeseries`, func(t *testing.T) { t.Run(`vector * on(foo) group_left() duplicate_nonoverlapping_timeseries`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `label_set(time()/10, "foo", "bar") + on(foo) group_left() ( q := `label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_left() (
label_set(time() < 1400, "foo", "bar", "op", "le"), label_set(time() < 1400, "foo", "bar", "op", "le"),
label_set(time() >= 1400, "foo", "bar", "op", "ge"), label_set(time() >= 1400, "foo", "bar", "op", "ge"),
)` )`
@ -1880,13 +1880,85 @@ func TestExecSuccess(t *testing.T) {
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200}, Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
Timestamps: timestampsExpected, Timestamps: timestampsExpected,
} }
r1.MetricName.Tags = []storage.Tag{{ r1.MetricName.Tags = []storage.Tag{
Key: []byte("foo"), {
Value: []byte("bar"), Key: []byte("foo"),
}} Value: []byte("bar"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r1} resultExpected := []netstorage.Result{r1}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`vector * on(foo) group_left(__name__)`, func(t *testing.T) {
t.Parallel()
q := `label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_left(__name__)
label_set(time(), "foo", "bar", "__name__", "aaa")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
Timestamps: timestampsExpected,
}
r1.MetricName.MetricGroup = []byte("aaa")
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r1}
f(q, resultExpected)
})
t.Run(`vector * on(foo) group_right()`, func(t *testing.T) {
t.Parallel()
q := `sort(label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_right(xx) (
label_set(time(), "foo", "bar", "__name__", "aaa"),
label_set(time()+3, "foo", "bar", "__name__", "yyy","ppp", "123"),
))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1103, 1323, 1543, 1763, 1983, 2203},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("ppp"),
Value: []byte("123"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`vector * on() group_left scalar`, func(t *testing.T) { t.Run(`vector * on() group_left scalar`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `sort_desc((label_set(time(), "foo", "bar") or label_set(10, "foo", "qwert")) * on() group_left 2)` q := `sort_desc((label_set(time(), "foo", "bar") or label_set(10, "foo", "qwert")) * on() group_left 2)`