From ea4c828bae138ab2f46008791139d92f19a8c6f1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 25 Sep 2019 17:55:13 +0300 Subject: [PATCH] lib/storage: remove duplicate MetricIDs in `tag->metricIDs` items before writing them into inverted index --- lib/storage/index_db.go | 29 +++++++++++++++++++++++++++++ lib/storage/index_db_test.go | 17 +++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 37d571b6f3..5b977f2924 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -2320,7 +2320,9 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt } // Use sort.Sort instead of sort.Slice in order to reduce memory allocations. sort.Sort(&tmm.pendingMetricIDs) + tmm.pendingMetricIDs = removeDuplicateMetricIDs(tmm.pendingMetricIDs) + // Marshal pendingMetricIDs dstDataLen := len(dstData) dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs) dstData = mp.Tag.Marshal(dstData) @@ -2332,6 +2334,33 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt return dstData, dstItems } +func removeDuplicateMetricIDs(sortedMetricIDs []uint64) []uint64 { + if len(sortedMetricIDs) < 2 { + return sortedMetricIDs + } + prevMetricID := sortedMetricIDs[0] + hasDuplicates := false + for _, metricID := range sortedMetricIDs[1:] { + if prevMetricID == metricID { + hasDuplicates = true + } + prevMetricID = metricID + } + if !hasDuplicates { + return sortedMetricIDs + } + dstMetricIDs := sortedMetricIDs[:1] + prevMetricID = sortedMetricIDs[0] + for _, metricID := range sortedMetricIDs[1:] { + if prevMetricID == metricID { + continue + } + dstMetricIDs = append(dstMetricIDs, metricID) + prevMetricID = metricID + } + return dstMetricIDs +} + func getTagToMetricIDsRowsMerger() *tagToMetricIDsRowsMerger { v := tmmPool.Get() if v == nil { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 67979dbe83..9f4b3c2a16 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -15,6 +15,23 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) +func TestRemoveDuplicateMetricIDs(t *testing.T) { + f := func(metricIDs, expectedMetricIDs []uint64) { + a := removeDuplicateMetricIDs(metricIDs) + if !reflect.DeepEqual(a, expectedMetricIDs) { + t.Fatalf("unexpected result from removeDuplicateMetricIDs:\ngot\n%d\nwant\n%d", a, expectedMetricIDs) + } + } + f(nil, nil) + f([]uint64{123}, []uint64{123}) + f([]uint64{123, 123}, []uint64{123}) + f([]uint64{123, 123, 123}, []uint64{123}) + f([]uint64{0, 1, 1, 2}, []uint64{0, 1, 2}) + f([]uint64{0, 0, 0, 1, 1, 2}, []uint64{0, 1, 2}) + f([]uint64{0, 1, 1, 2, 2}, []uint64{0, 1, 2}) + f([]uint64{0, 1, 2, 2}, []uint64{0, 1, 2}) +} + func TestMarshalUnmarshalTSIDs(t *testing.T) { f := func(tsids []TSID) { t.Helper()