lib/storage: remove duplicate tag keys on MetricName.Marshal call

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/172
This commit is contained in:
Aliaksandr Valialkin 2019-09-04 18:12:07 +03:00
parent cd2c9e39da
commit 16dd145586
2 changed files with 40 additions and 12 deletions

View File

@ -367,17 +367,8 @@ func (mn *MetricName) Unmarshal(src []byte) error {
}
}
// Verify no identical tag keys.
if len(mn.Tags) > 0 {
prevKey := mn.Tags[0].Key
for i := range mn.Tags[1:] {
t := &mn.Tags[1+i]
if bytes.Equal(t.Key, prevKey) {
return fmt.Errorf("found duplicate key %q", prevKey)
}
prevKey = t.Key
}
}
// There is no need in verifying for identical tag keys,
// since they must be handled in MetricName.Marshal inside marshalTags.
return nil
}
@ -584,8 +575,15 @@ func (ts *canonicalTagsSort) Swap(i, j int) {
}
func marshalTags(dst []byte, tags []Tag) []byte {
var prevKey []byte
for i := range tags {
dst = tags[i].Marshal(dst)
t := &tags[i]
if string(prevKey) == string(t.Key) {
// Skip duplicate keys, since they aren't allowed in Prometheus data model.
continue
}
prevKey = t.Key
dst = t.Marshal(dst)
}
return dst
}

View File

@ -34,6 +34,36 @@ func testMetricNameSortTags(t *testing.T, tags, expectedTags []string) {
}
}
func TestMetricNameMarshalDuplicateKeys(t *testing.T) {
var mn MetricName
mn.AccountID = 123
mn.ProjectID = 324
mn.MetricGroup = []byte("xxx")
mn.AddTag("foo", "bar")
mn.AddTag("duplicate", "tag")
mn.AddTag("duplicate", "tag")
mn.AddTag("tt", "xx")
mn.AddTag("duplicate", "tag2")
var mnExpected MetricName
mnExpected.AccountID = 123
mnExpected.ProjectID = 324
mnExpected.MetricGroup = []byte("xxx")
mnExpected.AddTag("duplicate", "tag")
mnExpected.AddTag("foo", "bar")
mnExpected.AddTag("tt", "xx")
mn.sortTags()
data := mn.Marshal(nil)
var mn1 MetricName
if err := mn1.Unmarshal(data); err != nil {
t.Fatalf("cannot unmarshal mn %s: %s", &mn, err)
}
if !reflect.DeepEqual(&mnExpected, &mn1) {
t.Fatalf("unexpected mn unmarshaled;\ngot\n%+v\nwant\n%+v", &mn1, &mnExpected)
}
}
func TestMetricNameMarshalUnmarshal(t *testing.T) {
for i := 0; i < 10; i++ {
for tagsCount := 0; tagsCount < 10; tagsCount++ {