lib/streamaggr: reduce memory allocations when registering new series in deduplication and aggregation structs

This commit is contained in:
Aliaksandr Valialkin 2024-03-04 16:44:31 +02:00
parent 925f60841f
commit 946814afee
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
16 changed files with 13 additions and 47 deletions

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -36,7 +35,6 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
sum: s.value, sum: s.value,
count: 1, count: 1,
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The entry has been successfully stored // The entry has been successfully stored

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -34,7 +33,6 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
v = &countSamplesStateValue{ v = &countSamplesStateValue{
n: 1, n: 1,
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -42,7 +41,6 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
h: {}, h: {},
}, },
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The entry has been added to the map. // The entry has been added to the map.

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"unsafe" "unsafe"
@ -25,7 +24,7 @@ type dedupAggrShard struct {
type dedupAggrShardNopad struct { type dedupAggrShardNopad struct {
mu sync.Mutex mu sync.Mutex
m map[string]*dedupAggrSample m map[string]dedupAggrSample
} }
type dedupAggrSample struct { type dedupAggrSample struct {
@ -59,7 +58,7 @@ func (das *dedupAggrShard) sizeBytes() uint64 {
das.mu.Lock() das.mu.Lock()
n := uint64(unsafe.Sizeof(*das)) n := uint64(unsafe.Sizeof(*das))
for k, s := range das.m { for k, s := range das.m {
n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)) n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s))
} }
das.mu.Unlock() das.mu.Unlock()
return n return n
@ -169,18 +168,12 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
m := das.m m := das.m
if m == nil { if m == nil {
m = make(map[string]*dedupAggrSample, len(samples)) m = make(map[string]dedupAggrSample, len(samples))
das.m = m das.m = m
} }
for _, sample := range samples { for _, sample := range samples {
s, ok := m[sample.key] m[sample.key] = dedupAggrSample{
if ok { value: sample.value,
s.value = sample.value
} else {
key := strings.Clone(sample.key)
m[key] = &dedupAggrSample{
value: sample.value,
}
} }
} }
} }
@ -189,7 +182,9 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
das.mu.Lock() das.mu.Lock()
m := das.m m := das.m
das.m = nil if len(m) != 0 {
das.m = make(map[string]dedupAggrSample, len(m))
}
das.mu.Unlock() das.mu.Unlock()

View File

@ -3,7 +3,6 @@ package streamaggr
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -25,8 +24,8 @@ func TestDedupAggrSerial(t *testing.T) {
da.pushSamples(samples) da.pushSamples(samples)
} }
if n := da.sizeBytes(); n > 4_200_000 { if n := da.sizeBytes(); n > 3_400_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 4_200_000 bytes", n) t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 3_400_000 bytes", n)
} }
if n := da.itemsCount(); n != seriesCount { if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount) t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
@ -37,7 +36,6 @@ func TestDedupAggrSerial(t *testing.T) {
flushSamples := func(samples []pushSample) { flushSamples := func(samples []pushSample) {
mu.Lock() mu.Lock()
for _, sample := range samples { for _, sample := range samples {
sample.key = strings.Clone(sample.key)
flushedSamplesMap[sample.key] = sample flushedSamplesMap[sample.key] = sample
} }
mu.Unlock() mu.Unlock()

View File

@ -2,7 +2,6 @@ package streamaggr
import ( import (
"math" "math"
"strings"
"sync" "sync"
"time" "time"
@ -43,7 +42,6 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &histogramBucketStateValue{} v = &histogramBucketStateValue{}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -34,7 +33,6 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
v = &lastStateValue{ v = &lastStateValue{
last: s.value, last: s.value,
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -34,7 +33,6 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
v = &maxStateValue{ v = &maxStateValue{
max: s.value, max: s.value,
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -34,7 +33,6 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
v = &minStateValue{ v = &minStateValue{
min: s.value, min: s.value,
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.

View File

@ -2,7 +2,6 @@ package streamaggr
import ( import (
"strconv" "strconv"
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -42,7 +41,6 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
v = &quantilesStateValue{ v = &quantilesStateValue{
h: h, h: h,
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.

View File

@ -2,7 +2,6 @@ package streamaggr
import ( import (
"math" "math"
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -35,7 +34,6 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &stddevStateValue{} v = &stddevStateValue{}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -34,7 +33,6 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
if !ok { if !ok {
// The entry is missing in the map. Try creating it. // The entry is missing in the map. Try creating it.
v = &stdvarStateValue{} v = &stdvarStateValue{}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.

View File

@ -757,15 +757,15 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...) outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
} }
bufLen := len(buf) buf = a.compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels)
buf = a.compressLabels(buf, inputLabels.Labels, outputLabels.Labels) key := bytesutil.InternBytes(buf)
for _, sample := range ts.Samples { for _, sample := range ts.Samples {
if math.IsNaN(sample.Value) { if math.IsNaN(sample.Value) {
// Skip NaN values // Skip NaN values
continue continue
} }
samples = append(samples, pushSample{ samples = append(samples, pushSample{
key: bytesutil.ToUnsafeString(buf[bufLen:]), key: key,
value: sample.Value, value: sample.Value,
}) })
} }

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -34,7 +33,6 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
v = &sumSamplesStateValue{ v = &sumSamplesStateValue{
sum: s.value, sum: s.value,
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.

View File

@ -2,7 +2,6 @@ package streamaggr
import ( import (
"math" "math"
"strings"
"sync" "sync"
"time" "time"
@ -77,7 +76,6 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
v = &totalStateValue{ v = &totalStateValue{
lastValues: make(map[string]*lastValueState), lastValues: make(map[string]*lastValueState),
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded { if loaded {
// Use the entry created by a concurrent goroutine. // Use the entry created by a concurrent goroutine.
@ -91,7 +89,6 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
lv, ok := sv.lastValues[inputKey] lv, ok := sv.lastValues[inputKey]
if !ok { if !ok {
lv = &lastValueState{} lv = &lastValueState{}
inputKey = strings.Clone(inputKey)
sv.lastValues[inputKey] = lv sv.lastValues[inputKey] = lv
} }
if ok || keepFirstSample { if ok || keepFirstSample {

View File

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -36,7 +35,6 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
s.value: {}, s.value: {},
}, },
} }
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v) vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded { if !loaded {
// The new entry has been successfully created. // The new entry has been successfully created.