app/vmselect/promql: properly override label values from group_left and group_right lists like Prometheus does

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/577
This commit is contained in:
Aliaksandr Valialkin 2020-06-21 16:32:00 +03:00
parent 50aa34bcbe
commit 70bf8218bb
3 changed files with 46 additions and 28 deletions

View File

@ -206,7 +206,7 @@ func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft,
resetMetricGroupIfRequired(be, tsLeft)
if len(tssRight) == 1 {
// Easy case - right part contains only a single matching time series.
tsLeft.MetricName.AddMissingTags(joinTags, &tssRight[0].MetricName)
tsLeft.MetricName.SetTags(joinTags, &tssRight[0].MetricName)
rvsLeft = append(rvsLeft, tsLeft)
rvsRight = append(rvsRight, tssRight[0])
continue
@ -225,7 +225,7 @@ func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft,
for _, tsRight := range tssRight {
var tsCopy timeseries
tsCopy.CopyFromShallowTimestamps(tsLeft)
tsCopy.MetricName.AddMissingTags(joinTags, &tsRight.MetricName)
tsCopy.MetricName.SetTags(joinTags, &tsRight.MetricName)
bb.B = marshalMetricTagsSorted(bb.B[:0], &tsCopy.MetricName)
if tsExisting := m[string(bb.B)]; tsExisting != nil {
// Try merging tsExisting with tsRight if they don't overlap.

View File

@ -2009,25 +2009,37 @@ func TestExecSuccess(t *testing.T) {
})
t.Run(`scalar * ignoring(foo) group_right vector`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(2 * ignoring(foo) group_right(a,foo) (label_set(time(), "foo", "bar") or label_set(10, "foo", "qwert")))`
q := `sort_desc(label_set(2, "a", "2") * ignoring(foo,a) group_right(a) (label_set(time(), "foo", "bar", "a", "1"), label_set(10, "foo", "qwert")))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{2000, 2400, 2800, 3200, 3600, 4000},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("a"),
Value: []byte("2"),
},
{
Key: []byte("foo"),
Value: []byte("bar"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{20, 20, 20, 20, 20, 20},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("qwert"),
}}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("a"),
Value: []byte("2"),
},
{
Key: []byte("foo"),
Value: []byte("qwert"),
},
}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
@ -2342,9 +2354,9 @@ func TestExecSuccess(t *testing.T) {
t.Run(`vector + vector on group_left matching`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(
(label_set(time(), "t1", "v123", "t2", "v3") or label_set(10, "t2", "v3", "xxx", "yy"))
(label_set(time(), "t1", "v123", "t2", "v3"), label_set(10, "t2", "v3", "xxx", "yy"))
+ on (foo, t2) group_left (t1, noxxx)
(label_set(100, "t1", "v1") or label_set(time(), "t2", "v3", "noxxx", "aa"))
(label_set(100, "t1", "v1"), label_set(time(), "t2", "v3", "noxxx", "aa"))
)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
@ -2356,10 +2368,6 @@ func TestExecSuccess(t *testing.T) {
Key: []byte("noxxx"),
Value: []byte("aa"),
},
{
Key: []byte("t1"),
Value: []byte("v123"),
},
{
Key: []byte("t2"),
Value: []byte("v3"),

View File

@ -298,20 +298,30 @@ func (mn *MetricName) GetTagValue(tagKey string) []byte {
return nil
}
// AddMissingTags adds tags from src with keys matching addTags.
func (mn *MetricName) AddMissingTags(addTags []string, src *MetricName) {
if hasTag(addTags, metricGroupTagKey) {
mn.MetricGroup = append(mn.MetricGroup[:0], src.MetricGroup...)
}
for i := range src.Tags {
srcTag := &src.Tags[i]
if !hasTag(addTags, srcTag.Key) {
// SetTags sets tags from src with keys matching addTags.
func (mn *MetricName) SetTags(addTags []string, src *MetricName) {
for _, tagName := range addTags {
if tagName == string(metricGroupTagKey) {
mn.MetricGroup = append(mn.MetricGroup[:0], src.MetricGroup...)
continue
}
var srcTag *Tag
for i := range src.Tags {
t := &src.Tags[i]
if string(t.Key) == tagName {
srcTag = t
break
}
}
if srcTag == nil {
mn.RemoveTag(tagName)
continue
}
found := false
for j := range mn.Tags {
tag := &mn.Tags[j]
if string(tag.Key) == string(srcTag.Key) {
for i := range mn.Tags {
t := &mn.Tags[i]
if string(t.Key) == tagName {
t.Value = append(t.Value[:0], srcTag.Value...)
found = true
break
}