app/vmselect: adjust label_map() handling for corner cases

The following corner cases now supported:
* label_map(q, "label", "", "foo") - adds `label="foo"` to series with missing `label`
* label_map(q, "label", "foo", "") - removes `label="foo"` from series

All the unmatched labels are kept unchanged.
This commit is contained in:
Aliaksandr Valialkin 2020-03-13 18:41:49 +02:00
parent 5c5a30734e
commit 58cb7fc476
2 changed files with 29 additions and 9 deletions

View File

@ -991,7 +991,8 @@ func TestExecSuccess(t *testing.T) {
label_set(time()+100, "label", "v2"),
label_set(time()+200, "label", "v3"),
label_set(time()+300, "x", "y"),
), "label", "v1", "foo", "v2", "bar"))`
label_set(time()+400, "label", "v4"),
), "label", "v1", "foo", "v2", "bar", "", "qwe", "v4", ""))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
@ -1015,16 +1016,31 @@ func TestExecSuccess(t *testing.T) {
Values: []float64{1200, 1400, 1600, 1800, 2000, 2200},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{{
Key: []byte("label"),
Value: []byte("v3"),
}}
r4 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1300, 1500, 1700, 1900, 2100, 2300},
Timestamps: timestampsExpected,
}
r4.MetricName.Tags = []storage.Tag{{
r4.MetricName.Tags = []storage.Tag{
{
Key: []byte("label"),
Value: []byte("qwe"),
},
{
Key: []byte("x"),
Value: []byte("y"),
}}
resultExpected := []netstorage.Result{r1, r2, r3, r4}
},
}
r5 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1400, 1600, 1800, 2000, 2200, 2400},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r1, r2, r3, r4, r5}
f(q, resultExpected)
})
t.Run(`label_copy(new_tag)`, func(t *testing.T) {
@ -5428,6 +5444,8 @@ func TestExecError(t *testing.T) {
f(`label_transform(1)`)
f(`label_set()`)
f(`label_set(1, "foo")`)
f(`label_map()`)
f(`label_map(1)`)
f(`label_del()`)
f(`label_keep()`)
f(`label_match()`)

View File

@ -1048,9 +1048,11 @@ func transformLabelMap(tfa *transformFuncArg) ([]*timeseries, error) {
for _, ts := range rvs {
mn := &ts.MetricName
dstValue := getDstValue(mn, label)
value := m[string(*dstValue)]
value, ok := m[string(*dstValue)]
if ok {
*dstValue = append((*dstValue)[:0], value...)
if len(value) == 0 {
}
if len(*dstValue) == 0 {
mn.RemoveTag(label)
}
}