lib/streamaggr: use strings.Clone() instead of bytesutil.InternString() for creating series key in dedupAggr

Our internal testing shows that this reduces GC overhead when deduplicating tens of millions of active series.
This commit is contained in:
Aliaksandr Valialkin 2024-06-10 16:05:49 +02:00
parent e8bb4359bb
commit 0b7c47a40c
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB

View File

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"sync/atomic"
"unsafe"
@ -26,7 +27,7 @@ type dedupAggrShard struct {
type dedupAggrShardNopad struct {
mu sync.Mutex
m map[string]dedupAggrSample
m map[string]*dedupAggrSample
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
@ -157,28 +158,25 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
m := das.m
if m == nil {
m = make(map[string]dedupAggrSample, len(samples))
m = make(map[string]*dedupAggrSample, len(samples))
das.m = m
}
for _, sample := range samples {
s, ok := m[sample.key]
if !ok {
key := bytesutil.InternString(sample.key)
m[key] = dedupAggrSample{
key := strings.Clone(sample.key)
m[key] = &dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
das.itemsCount.Add(1)
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof("")+unsafe.Sizeof(dedupAggrSample{})))
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)))
continue
}
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) {
key := bytesutil.InternString(sample.key)
m[key] = dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
s.value = sample.value
s.timestamp = sample.timestamp
}
}
}
@ -188,7 +186,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
m := das.m
if len(m) > 0 {
das.m = make(map[string]dedupAggrSample, len(m))
das.m = make(map[string]*dedupAggrSample, len(m))
das.sizeBytes.Store(0)
das.itemsCount.Store(0)
}
@ -208,7 +206,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
})
// Limit the number of samples per each flush in order to limit memory usage.
if len(dstSamples) >= 100_000 {
if len(dstSamples) >= 10_000 {
f(dstSamples)
clear(dstSamples)
dstSamples = dstSamples[:0]