mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
lib/storage: remove duplicate MetricIDs in tag->metricIDs
items before writing them into inverted index
This commit is contained in:
parent
aebc45ad26
commit
ea4c828bae
@ -2320,7 +2320,9 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt
|
||||
}
|
||||
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations.
|
||||
sort.Sort(&tmm.pendingMetricIDs)
|
||||
tmm.pendingMetricIDs = removeDuplicateMetricIDs(tmm.pendingMetricIDs)
|
||||
|
||||
// Marshal pendingMetricIDs
|
||||
dstDataLen := len(dstData)
|
||||
dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs)
|
||||
dstData = mp.Tag.Marshal(dstData)
|
||||
@ -2332,6 +2334,33 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt
|
||||
return dstData, dstItems
|
||||
}
|
||||
|
||||
func removeDuplicateMetricIDs(sortedMetricIDs []uint64) []uint64 {
|
||||
if len(sortedMetricIDs) < 2 {
|
||||
return sortedMetricIDs
|
||||
}
|
||||
prevMetricID := sortedMetricIDs[0]
|
||||
hasDuplicates := false
|
||||
for _, metricID := range sortedMetricIDs[1:] {
|
||||
if prevMetricID == metricID {
|
||||
hasDuplicates = true
|
||||
}
|
||||
prevMetricID = metricID
|
||||
}
|
||||
if !hasDuplicates {
|
||||
return sortedMetricIDs
|
||||
}
|
||||
dstMetricIDs := sortedMetricIDs[:1]
|
||||
prevMetricID = sortedMetricIDs[0]
|
||||
for _, metricID := range sortedMetricIDs[1:] {
|
||||
if prevMetricID == metricID {
|
||||
continue
|
||||
}
|
||||
dstMetricIDs = append(dstMetricIDs, metricID)
|
||||
prevMetricID = metricID
|
||||
}
|
||||
return dstMetricIDs
|
||||
}
|
||||
|
||||
func getTagToMetricIDsRowsMerger() *tagToMetricIDsRowsMerger {
|
||||
v := tmmPool.Get()
|
||||
if v == nil {
|
||||
|
@ -15,6 +15,23 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||
)
|
||||
|
||||
func TestRemoveDuplicateMetricIDs(t *testing.T) {
|
||||
f := func(metricIDs, expectedMetricIDs []uint64) {
|
||||
a := removeDuplicateMetricIDs(metricIDs)
|
||||
if !reflect.DeepEqual(a, expectedMetricIDs) {
|
||||
t.Fatalf("unexpected result from removeDuplicateMetricIDs:\ngot\n%d\nwant\n%d", a, expectedMetricIDs)
|
||||
}
|
||||
}
|
||||
f(nil, nil)
|
||||
f([]uint64{123}, []uint64{123})
|
||||
f([]uint64{123, 123}, []uint64{123})
|
||||
f([]uint64{123, 123, 123}, []uint64{123})
|
||||
f([]uint64{0, 1, 1, 2}, []uint64{0, 1, 2})
|
||||
f([]uint64{0, 0, 0, 1, 1, 2}, []uint64{0, 1, 2})
|
||||
f([]uint64{0, 1, 1, 2, 2}, []uint64{0, 1, 2})
|
||||
f([]uint64{0, 1, 2, 2}, []uint64{0, 1, 2})
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalTSIDs(t *testing.T) {
|
||||
f := func(tsids []TSID) {
|
||||
t.Helper()
|
||||
|
Loading…
Reference in New Issue
Block a user