From 973df0968647e84774847e63855cc403715c19e9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 24 Sep 2020 20:16:19 +0300 Subject: [PATCH] app/vmselect/netstorage: do not spend CPU time on unpacking empty blocks during `/api/v1/series` calls --- app/vmselect/netstorage/netstorage.go | 49 +++++++++++++++++---------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index fc9c7f7bb2..c54cfc057f 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -178,12 +178,11 @@ type unpackWorkItem struct { } type unpackWork struct { - ws []unpackWorkItem - tbf *tmpBlocksFile - fetchData bool - at *auth.Token - sbs []*sortBlock - doneCh chan error + ws []unpackWorkItem + tbf *tmpBlocksFile + at *auth.Token + sbs []*sortBlock + doneCh chan error } func (upw *unpackWork) reset() { @@ -195,7 +194,6 @@ func (upw *unpackWork) reset() { } upw.ws = upw.ws[:0] upw.tbf = nil - upw.fetchData = false upw.at = nil sbs := upw.sbs for i := range sbs { @@ -210,7 +208,7 @@ func (upw *unpackWork) reset() { func (upw *unpackWork) unpack(tmpBlock *storage.Block) { for _, w := range upw.ws { sb := getSortBlock() - if err := sb.unpackFrom(tmpBlock, upw.tbf, w.addr, w.tr, upw.fetchData, upw.at); err != nil { + if err := sb.unpackFrom(tmpBlock, upw.tbf, w.addr, w.tr, upw.at); err != nil { putSortBlock(sb) upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) return @@ -258,17 +256,19 @@ var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1) // Unpack unpacks pts to dst. func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token) error { dst.reset() - if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil { return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err) } + if !fetchData { + // Do not spend resources on data reading and unpacking. + return nil + } // Feed workers with work addrsLen := len(pts.addrs) upws := make([]*unpackWork, 0, 1+addrsLen/unpackBatchSize) upw := getUnpackWork() upw.tbf = tbf - upw.fetchData = fetchData upw.at = at for _, addr := range pts.addrs { if len(upw.ws) >= unpackBatchSize { @@ -276,7 +276,6 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage. upws = append(upws, upw) upw = getUnpackWork() upw.tbf = tbf - upw.fetchData = fetchData upw.at = at } upw.ws = append(upw.ws, unpackWorkItem{ @@ -394,13 +393,11 @@ func (sb *sortBlock) reset() { sb.NextIdx = 0 } -func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool, at *auth.Token) error { +func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, at *auth.Token) error { tmpBlock.Reset() tbf.MustReadBlockAt(tmpBlock, addr) - if fetchData { - if err := tmpBlock.UnmarshalData(); err != nil { - return fmt.Errorf("cannot unmarshal block: %w", err) - } + if err := tmpBlock.UnmarshalData(); err != nil { + return fmt.Errorf("cannot unmarshal block: %w", err) } timestamps := tmpBlock.Timestamps() @@ -982,7 +979,19 @@ type tmpBlocksFileWrapper struct { orderedMetricNames []string } -func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error { +func (tbfw *tmpBlocksFileWrapper) RegisterEmptyBlock(mb *storage.MetricBlock) { + metricName := mb.MetricName + tbfw.mu.Lock() + if addrs := tbfw.m[string(metricName)]; addrs == nil { + // An optimization for big number of time series with long names: store only a single copy of metricNameStr + // in both tbfw.orderedMetricNames and tbfw.m. + tbfw.orderedMetricNames = append(tbfw.orderedMetricNames, string(metricName)) + tbfw.m[tbfw.orderedMetricNames[len(tbfw.orderedMetricNames)-1]] = []tmpBlockAddr{{}} + } + tbfw.mu.Unlock() +} + +func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock) error { bb := tmpBufPool.Get() bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block) tbfw.mu.Lock() @@ -1754,7 +1763,11 @@ func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc * blocksRead++ sn.metricBlocksRead.Inc() sn.metricRowsRead.Add(mb.Block.RowsCount()) - if err := tbfw.WriteBlock(&mb); err != nil { + if !fetchData { + tbfw.RegisterEmptyBlock(&mb) + continue + } + if err := tbfw.RegisterAndWriteBlock(&mb); err != nil { return blocksRead, fmt.Errorf("cannot write MetricBlock #%d to temporary blocks file: %w", blocksRead, err) } }