mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/streamaggr: remove flushState arg at dedupAggr.flush(), since it is always set to true in production
This commit is contained in:
parent
8d95522529
commit
f45d02a243
@ -4,8 +4,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
"github.com/cespare/xxhash/v2"
|
"github.com/cespare/xxhash/v2"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const dedupAggrShardsCount = 128
|
const dedupAggrShardsCount = 128
|
||||||
@ -113,7 +114,7 @@ func (ctx *dedupFlushCtx) reset() {
|
|||||||
ctx.samples = ctx.samples[:0]
|
ctx.samples = ctx.samples[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) {
|
func (da *dedupAggr) flush(f func(samples []pushSample)) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := range da.shards {
|
for i := range da.shards {
|
||||||
flushConcurrencyCh <- struct{}{}
|
flushConcurrencyCh <- struct{}{}
|
||||||
@ -125,7 +126,7 @@ func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
ctx := getDedupFlushCtx()
|
ctx := getDedupFlushCtx()
|
||||||
shard.flush(ctx, f, resetState)
|
shard.flush(ctx, f)
|
||||||
putDedupFlushCtx(ctx)
|
putDedupFlushCtx(ctx)
|
||||||
}(&da.shards[i])
|
}(&da.shards[i])
|
||||||
}
|
}
|
||||||
@ -193,11 +194,11 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample), resetState bool) {
|
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
|
||||||
das.mu.Lock()
|
das.mu.Lock()
|
||||||
|
|
||||||
m := das.m
|
m := das.m
|
||||||
if resetState && len(m) > 0 {
|
if len(m) > 0 {
|
||||||
das.m = make(map[string]dedupAggrSample, len(m))
|
das.m = make(map[string]dedupAggrSample, len(m))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ func TestDedupAggrSerial(t *testing.T) {
|
|||||||
}
|
}
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
da.flush(flushSamples, true)
|
da.flush(flushSamples)
|
||||||
|
|
||||||
if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
|
if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
|
||||||
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)
|
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
)
|
)
|
||||||
@ -17,20 +16,6 @@ func BenchmarkDedupAggr(b *testing.B) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkDedupAggrFlushSerial(b *testing.B) {
|
|
||||||
as := newTotalAggrState(time.Hour, true, true)
|
|
||||||
benchSamples := newBenchSamples(100_000)
|
|
||||||
da := newDedupAggr()
|
|
||||||
da.pushSamples(benchSamples)
|
|
||||||
|
|
||||||
b.ResetTimer()
|
|
||||||
b.ReportAllocs()
|
|
||||||
b.SetBytes(int64(len(benchSamples)))
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
da.flush(as.pushSamples, false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
|
func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
|
||||||
const loops = 100
|
const loops = 100
|
||||||
benchSamples := newBenchSamples(samplesPerPush)
|
benchSamples := newBenchSamples(samplesPerPush)
|
||||||
|
@ -166,7 +166,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
|
|||||||
ctx.labels = labels
|
ctx.labels = labels
|
||||||
ctx.samples = samples
|
ctx.samples = samples
|
||||||
putDeduplicatorFlushCtx(ctx)
|
putDeduplicatorFlushCtx(ctx)
|
||||||
}, true)
|
})
|
||||||
|
|
||||||
duration := time.Since(startTime)
|
duration := time.Since(startTime)
|
||||||
d.dedupFlushDuration.Update(duration.Seconds())
|
d.dedupFlushDuration.Update(duration.Seconds())
|
||||||
|
@ -747,7 +747,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
|
|||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
a.da.flush(a.pushSamples, true)
|
a.da.flush(a.pushSamples)
|
||||||
|
|
||||||
d := time.Since(startTime)
|
d := time.Since(startTime)
|
||||||
a.dedupFlushDuration.Update(d.Seconds())
|
a.dedupFlushDuration.Update(d.Seconds())
|
||||||
|
Loading…
Reference in New Issue
Block a user