From 1fec47a289d79c247871045f33b47ee383387cd4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 15 Sep 2020 21:06:04 +0300 Subject: [PATCH] app/vmselect/netstorage: reduce memory usage when the time range from query touches big number of samples per each time series --- app/vmselect/netstorage/netstorage.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 63fa24ae1c..8d4e447c75 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -202,10 +202,10 @@ func (upw *unpackWork) reset() { } } -func (upw *unpackWork) unpack() { +func (upw *unpackWork) unpack(tmpBlock *storage.Block) { for _, w := range upw.ws { sb := getSortBlock() - if err := sb.unpackFrom(w.br, w.tr, upw.fetchData); err != nil { + if err := sb.unpackFrom(tmpBlock, w.br, w.tr, upw.fetchData); err != nil { putSortBlock(sb) upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) return @@ -239,8 +239,9 @@ func init() { } func unpackWorker() { + var tmpBlock storage.Block for upw := range unpackWorkCh { - upw.unpack() + upw.unpack(&tmpBlock) } } @@ -372,30 +373,26 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) { var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`) type sortBlock struct { - // b is used as a temporary storage for unpacked rows before they - // go to Timestamps and Values. - b storage.Block - Timestamps []int64 Values []float64 NextIdx int } func (sb *sortBlock) reset() { - sb.b.Reset() sb.Timestamps = sb.Timestamps[:0] sb.Values = sb.Values[:0] sb.NextIdx = 0 } -func (sb *sortBlock) unpackFrom(br storage.BlockRef, tr storage.TimeRange, fetchData bool) error { - br.MustReadBlock(&sb.b, fetchData) +func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr storage.TimeRange, fetchData bool) error { + tmpBlock.Reset() + br.MustReadBlock(tmpBlock, fetchData) if fetchData { - if err := sb.b.UnmarshalData(); err != nil { + if err := tmpBlock.UnmarshalData(); err != nil { return fmt.Errorf("cannot unmarshal block: %w", err) } } - timestamps := sb.b.Timestamps() + timestamps := tmpBlock.Timestamps() // Skip timestamps smaller than tr.MinTimestamp. i := 0 @@ -408,16 +405,16 @@ func (sb *sortBlock) unpackFrom(br storage.BlockRef, tr storage.TimeRange, fetch for j > i && timestamps[j-1] > tr.MaxTimestamp { j-- } - skippedRows := sb.b.RowsCount() - (j - i) + skippedRows := tmpBlock.RowsCount() - (j - i) metricRowsSkipped.Add(skippedRows) // Copy the remaining values. if i == j { return nil } - values := sb.b.Values() + values := tmpBlock.Values() sb.Timestamps = append(sb.Timestamps, timestamps[i:j]...) - sb.Values = decimal.AppendDecimalToFloat(sb.Values, values[i:j], sb.b.Scale()) + sb.Values = decimal.AppendDecimalToFloat(sb.Values, values[i:j], tmpBlock.Scale()) return nil }