diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index f97075208..6be6a6ba7 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -426,7 +426,12 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. // Wait until work is complete samples := 0 - sbs := make([]*sortBlock, 0, brsLen) + sbh := getSortBlocksHeap() + sbs := sbh.sbs + if n := brsLen - cap(sbs); n > 0 { + sbs = append(sbs[:cap(sbs)], make([]*sortBlock, n)...) + } + sbs = sbs[:0] var firstErr error for _, upw := range upws { if err := <-upw.doneCh; err != nil && firstErr == nil { @@ -451,6 +456,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. } putUnpackWork(upw) } + sbh.sbs = sbs // Shut down local workers for _, workCh := range workChs { @@ -462,7 +468,8 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. return firstErr } dedupInterval := storage.GetDedupInterval() - mergeSortBlocks(dst, sbs, dedupInterval) + mergeSortBlocks(dst, sbh, dedupInterval) + putSortBlocksHeap(sbh) return nil } @@ -483,24 +490,25 @@ var sbPool sync.Pool var metricRowsSkipped = metrics.NewCounter(`vm_metric_rows_skipped_total{name="vmselect"}`) -func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) { +func mergeSortBlocks(dst *Result, sbh *sortBlocksHeap, dedupInterval int64) { // Skip empty sort blocks, since they cannot be passed to heap.Init. - src := sbh - sbh = sbh[:0] - for _, sb := range src { + sbs := sbh.sbs[:0] + for _, sb := range sbh.sbs { if len(sb.Timestamps) == 0 { putSortBlock(sb) continue } - sbh = append(sbh, sb) + sbs = append(sbs, sb) } - if len(sbh) == 0 { + sbh.sbs = sbs + if sbh.Len() == 0 { return } - heap.Init(&sbh) + heap.Init(sbh) for { - top := sbh[0] - if len(sbh) == 1 { + sbs := sbh.sbs + top := sbs[0] + if len(sbs) == 1 { dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:]...) dst.Values = append(dst.Values, top.Values[top.NextIdx:]...) putSortBlock(top) @@ -519,9 +527,9 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) { dst.Values = append(dst.Values, top.Values[topNextIdx:top.NextIdx]...) } if top.NextIdx < len(top.Timestamps) { - heap.Fix(&sbh, 0) + heap.Fix(sbh, 0) } else { - heap.Pop(&sbh) + heap.Pop(sbh) putSortBlock(top) } } @@ -604,48 +612,73 @@ func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, br return nil } -type sortBlocksHeap []*sortBlock +type sortBlocksHeap struct { + sbs []*sortBlock +} -func (sbh sortBlocksHeap) getNextBlock() *sortBlock { - if len(sbh) < 2 { +func (sbh *sortBlocksHeap) getNextBlock() *sortBlock { + sbs := sbh.sbs + if len(sbs) < 2 { return nil } - if len(sbh) < 3 { - return sbh[1] + if len(sbs) < 3 { + return sbs[1] } - a := sbh[1] - b := sbh[2] + a := sbs[1] + b := sbs[2] if a.Timestamps[a.NextIdx] <= b.Timestamps[b.NextIdx] { return a } return b } -func (sbh sortBlocksHeap) Len() int { - return len(sbh) +func (sbh *sortBlocksHeap) Len() int { + return len(sbh.sbs) } -func (sbh sortBlocksHeap) Less(i, j int) bool { - a := sbh[i] - b := sbh[j] +func (sbh *sortBlocksHeap) Less(i, j int) bool { + sbs := sbh.sbs + a := sbs[i] + b := sbs[j] return a.Timestamps[a.NextIdx] < b.Timestamps[b.NextIdx] } -func (sbh sortBlocksHeap) Swap(i, j int) { - sbh[i], sbh[j] = sbh[j], sbh[i] +func (sbh *sortBlocksHeap) Swap(i, j int) { + sbs := sbh.sbs + sbs[i], sbs[j] = sbs[j], sbs[i] } func (sbh *sortBlocksHeap) Push(x interface{}) { - *sbh = append(*sbh, x.(*sortBlock)) + sbh.sbs = append(sbh.sbs, x.(*sortBlock)) } func (sbh *sortBlocksHeap) Pop() interface{} { - a := *sbh - v := a[len(a)-1] - *sbh = a[:len(a)-1] + sbs := sbh.sbs + v := sbs[len(sbs)-1] + sbs[len(sbs)-1] = nil + sbh.sbs = sbs[:len(sbs)-1] return v } +func getSortBlocksHeap() *sortBlocksHeap { + v := sbhPool.Get() + if v == nil { + return &sortBlocksHeap{} + } + return v.(*sortBlocksHeap) +} + +func putSortBlocksHeap(sbh *sortBlocksHeap) { + sbs := sbh.sbs + for i := range sbs { + sbs[i] = nil + } + sbh.sbs = sbs[:0] + sbhPool.Put(sbh) +} + +var sbhPool sync.Pool + // DeleteSeries deletes time series matching the given tagFilterss. func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) { qt = qt.NewChild("delete series: %s", sq) diff --git a/app/vmselect/netstorage/netstorage_test.go b/app/vmselect/netstorage/netstorage_test.go index a6ea47892..1f1d5f22b 100644 --- a/app/vmselect/netstorage/netstorage_test.go +++ b/app/vmselect/netstorage/netstorage_test.go @@ -9,7 +9,10 @@ func TestMergeSortBlocks(t *testing.T) { f := func(blocks []*sortBlock, dedupInterval int64, expectedResult *Result) { t.Helper() var result Result - mergeSortBlocks(&result, blocks, dedupInterval) + sbh := getSortBlocksHeap() + sbh.sbs = append(sbh.sbs[:0], blocks...) + mergeSortBlocks(&result, sbh, dedupInterval) + putSortBlocksHeap(sbh) if !reflect.DeepEqual(result.Values, expectedResult.Values) { t.Fatalf("unexpected values;\ngot\n%v\nwant\n%v", result.Values, expectedResult.Values) } diff --git a/app/vmselect/netstorage/netstorage_timing_test.go b/app/vmselect/netstorage/netstorage_timing_test.go index 8345e5bf4..2410b9666 100644 --- a/app/vmselect/netstorage/netstorage_timing_test.go +++ b/app/vmselect/netstorage/netstorage_timing_test.go @@ -90,16 +90,18 @@ func benchmarkMergeSortBlocks(b *testing.B, blocks []*sortBlock) { b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { var result Result - sbs := make(sortBlocksHeap, len(blocks)) + sbh := getSortBlocksHeap() for pb.Next() { result.reset() - for i, b := range blocks { + sbs := sbh.sbs[:0] + for _, b := range blocks { sb := getSortBlock() sb.Timestamps = b.Timestamps sb.Values = b.Values - sbs[i] = sb + sbs = append(sbs, sb) } - mergeSortBlocks(&result, sbs, dedupInterval) + sbh.sbs = sbs + mergeSortBlocks(&result, sbh, dedupInterval) } }) }