diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index c629af8d9..079e3b5c8 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -36,7 +35,6 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { sum: s.value, count: 1, } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The entry has been successfully stored diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index 807fbdfba..ac0431deb 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -34,7 +33,6 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { v = &countSamplesStateValue{ n: 1, } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index e78ce9273..5bfbd4c34 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -42,7 +41,6 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { h: {}, }, } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The entry has been added to the map. diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index e4570f065..35e520ced 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "unsafe" @@ -25,7 +24,7 @@ type dedupAggrShard struct { type dedupAggrShardNopad struct { mu sync.Mutex - m map[string]*dedupAggrSample + m map[string]dedupAggrSample } type dedupAggrSample struct { @@ -59,7 +58,7 @@ func (das *dedupAggrShard) sizeBytes() uint64 { das.mu.Lock() n := uint64(unsafe.Sizeof(*das)) for k, s := range das.m { - n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)) + n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s)) } das.mu.Unlock() return n @@ -169,18 +168,12 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { m := das.m if m == nil { - m = make(map[string]*dedupAggrSample, len(samples)) + m = make(map[string]dedupAggrSample, len(samples)) das.m = m } for _, sample := range samples { - s, ok := m[sample.key] - if ok { - s.value = sample.value - } else { - key := strings.Clone(sample.key) - m[key] = &dedupAggrSample{ - value: sample.value, - } + m[sample.key] = dedupAggrSample{ + value: sample.value, } } } @@ -189,7 +182,9 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample das.mu.Lock() m := das.m - das.m = nil + if len(m) != 0 { + das.m = make(map[string]dedupAggrSample, len(m)) + } das.mu.Unlock() diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index c3892a5f8..663bf6563 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -3,7 +3,6 @@ package streamaggr import ( "fmt" "reflect" - "strings" "sync" "sync/atomic" "testing" @@ -25,8 +24,8 @@ func TestDedupAggrSerial(t *testing.T) { da.pushSamples(samples) } - if n := da.sizeBytes(); n > 4_200_000 { - t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 4_200_000 bytes", n) + if n := da.sizeBytes(); n > 3_400_000 { + t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 3_400_000 bytes", n) } if n := da.itemsCount(); n != seriesCount { t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount) @@ -37,7 +36,6 @@ func TestDedupAggrSerial(t *testing.T) { flushSamples := func(samples []pushSample) { mu.Lock() for _, sample := range samples { - sample.key = strings.Clone(sample.key) flushedSamplesMap[sample.key] = sample } mu.Unlock() diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 1430b2e58..0d073abf3 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -2,7 +2,6 @@ package streamaggr import ( "math" - "strings" "sync" "time" @@ -43,7 +42,6 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &histogramBucketStateValue{} - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index 667251076..6ba46b349 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -34,7 +33,6 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { v = &lastStateValue{ last: s.value, } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 4450e210a..76320a090 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -34,7 +33,6 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { v = &maxStateValue{ max: s.value, } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index 42d8a2314..cde503312 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -34,7 +33,6 @@ func (as *minAggrState) pushSamples(samples []pushSample) { v = &minStateValue{ min: s.value, } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index 4e605cc91..f9ec321d1 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -2,7 +2,6 @@ package streamaggr import ( "strconv" - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -42,7 +41,6 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { v = &quantilesStateValue{ h: h, } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 777573ea3..2a5b3e3d4 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -2,7 +2,6 @@ package streamaggr import ( "math" - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -35,7 +34,6 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &stddevStateValue{} - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index c6f5cfed9..450b2feef 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -34,7 +33,6 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &stdvarStateValue{} - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index c941073fb..8d98a7071 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -757,15 +757,15 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { outputLabels.Labels = append(outputLabels.Labels, labels.Labels...) } - bufLen := len(buf) - buf = a.compressLabels(buf, inputLabels.Labels, outputLabels.Labels) + buf = a.compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels) + key := bytesutil.InternBytes(buf) for _, sample := range ts.Samples { if math.IsNaN(sample.Value) { // Skip NaN values continue } samples = append(samples, pushSample{ - key: bytesutil.ToUnsafeString(buf[bufLen:]), + key: key, value: sample.Value, }) } diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 080df9347..4d88dc4f6 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -34,7 +33,6 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { v = &sumSamplesStateValue{ sum: s.value, } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created. diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index 516694ea6..78f8bd36f 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -2,7 +2,6 @@ package streamaggr import ( "math" - "strings" "sync" "time" @@ -77,7 +76,6 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { v = &totalStateValue{ lastValues: make(map[string]*lastValueState), } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if loaded { // Use the entry created by a concurrent goroutine. @@ -91,7 +89,6 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { lv, ok := sv.lastValues[inputKey] if !ok { lv = &lastValueState{} - inputKey = strings.Clone(inputKey) sv.lastValues[inputKey] = lv } if ok || keepFirstSample { diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 60cd3317d..2c5e97828 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -1,7 +1,6 @@ package streamaggr import ( - "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -36,7 +35,6 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { s.value: {}, }, } - outputKey = strings.Clone(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { // The new entry has been successfully created.