diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index bc211a9922..70ce87ccd3 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -171,28 +171,54 @@ type packedTimeseries struct { var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128) +type unpackWorkItem struct { + addr tmpBlockAddr + tr storage.TimeRange +} + type unpackWork struct { + ws []unpackWorkItem tbf *tmpBlocksFile - addr tmpBlockAddr - tr storage.TimeRange fetchData bool at *auth.Token - sb *sortBlock + sbs []*sortBlock doneCh chan error } func (upw *unpackWork) reset() { + ws := upw.ws + for i := range ws { + w := &ws[i] + w.addr = tmpBlockAddr{} + w.tr = storage.TimeRange{} + } + upw.ws = upw.ws[:0] upw.tbf = nil - upw.addr = tmpBlockAddr{} - upw.tr = storage.TimeRange{} upw.fetchData = false upw.at = nil - upw.sb = nil + sbs := upw.sbs + for i := range sbs { + sbs[i] = nil + } + upw.sbs = upw.sbs[:0] if n := len(upw.doneCh); n > 0 { logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n) } } +func (upw *unpackWork) unpack() { + for _, w := range upw.ws { + sb := getSortBlock() + if err := sb.unpackFrom(upw.tbf, w.addr, w.tr, upw.fetchData, upw.at); err != nil { + putSortBlock(sb) + upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) + return + } + upw.sbs = append(upw.sbs, sb) + } + upw.doneCh <- nil +} + func getUnpackWork() *unpackWork { v := unpackWorkPool.Get() if v != nil { @@ -218,17 +244,15 @@ func init() { func unpackWorker() { for upw := range unpackWorkCh { - sb := getSortBlock() - if err := sb.unpackFrom(upw.tbf, upw.addr, upw.tr, upw.fetchData, upw.at); err != nil { - putSortBlock(sb) - upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) - continue - } - upw.sb = sb - upw.doneCh <- nil + upw.unpack() } } +// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine. +// +// This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system. +const unpackBatchSize = 16 + // Unpack unpacks pts to dst. func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token) error { dst.reset() @@ -238,17 +262,27 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage. } // Feed workers with work - upws := make([]*unpackWork, len(pts.addrs)) - for i, addr := range pts.addrs { - upw := getUnpackWork() - upw.tbf = tbf - upw.addr = addr - upw.tr = tr - upw.fetchData = fetchData - upw.at = at - unpackWorkCh <- upw - upws[i] = upw + upws := make([]*unpackWork, 0, 1+len(pts.addrs)/unpackBatchSize) + upw := getUnpackWork() + upw.tbf = tbf + upw.fetchData = fetchData + upw.at = at + for _, addr := range pts.addrs { + if len(upw.ws) >= unpackBatchSize { + unpackWorkCh <- upw + upws = append(upws, upw) + upw = getUnpackWork() + upw.tbf = tbf + upw.fetchData = fetchData + upw.at = at + } + upw.ws = append(upw.ws, unpackWorkItem{ + addr: addr, + tr: tr, + }) } + unpackWorkCh <- upw + upws = append(upws, upw) pts.addrs = pts.addrs[:0] // Wait until work is complete @@ -260,9 +294,11 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage. firstErr = err } if firstErr == nil { - sbs = append(sbs, upw.sb) - } else if upw.sb != nil { - putSortBlock(upw.sb) + sbs = append(sbs, upw.sbs...) + } else { + for _, sb := range upw.sbs { + putSortBlock(sb) + } } putUnpackWork(upw) }