diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 07c7f7a59..86ab59a4a 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -79,8 +79,6 @@ func (rss *Results) mustClose() { rss.tbf = nil } -var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16) - type timeseriesWork struct { mustStop *uint32 rss *Results @@ -119,16 +117,38 @@ func putTimeseriesWork(tsw *timeseriesWork) { var tswPool sync.Pool +var timeseriesWorkChs []chan *timeseriesWork +var timeseriesWorkIdx uint32 + func init() { - for i := 0; i < gomaxprocs; i++ { - go timeseriesWorker(uint(i)) + timeseriesWorkChs = make([]chan *timeseriesWork, gomaxprocs) + for i := range timeseriesWorkChs { + timeseriesWorkChs[i] = make(chan *timeseriesWork, 16) + go timeseriesWorker(timeseriesWorkChs[i], uint(i)) } } -func timeseriesWorker(workerID uint) { +func scheduleTimeseriesWork(tsw *timeseriesWork) { + attempts := 0 + for { + idx := atomic.AddUint32(×eriesWorkIdx, 1) % uint32(len(timeseriesWorkChs)) + select { + case timeseriesWorkChs[idx] <- tsw: + return + default: + attempts++ + if attempts >= len(timeseriesWorkChs) { + timeseriesWorkChs[idx] <- tsw + return + } + } + } +} + +func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) { var rs Result var rsLastResetTime uint64 - for tsw := range timeseriesWorkCh { + for tsw := range ch { if atomic.LoadUint32(tsw.mustStop) != 0 { tsw.doneCh <- nil continue @@ -181,7 +201,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { tsw.pts = &rss.packedTimeseries[i] tsw.f = f tsw.mustStop = &mustStop - timeseriesWorkCh <- tsw + scheduleTimeseriesWork(tsw) tsws[i] = tsw } seriesProcessedTotal := len(rss.packedTimeseries) @@ -214,8 +234,6 @@ type packedTimeseries struct { brs []blockRef } -var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128) - type unpackWorkItem struct { br blockRef tr storage.TimeRange @@ -277,15 +295,37 @@ func putUnpackWork(upw *unpackWork) { var unpackWorkPool sync.Pool +var unpackWorkChs []chan *unpackWork +var unpackWorkIdx uint32 + func init() { - for i := 0; i < gomaxprocs; i++ { - go unpackWorker() + unpackWorkChs = make([]chan *unpackWork, gomaxprocs) + for i := range unpackWorkChs { + unpackWorkChs[i] = make(chan *unpackWork, 128) + go unpackWorker(unpackWorkChs[i]) } } -func unpackWorker() { +func scheduleUnpackWork(uw *unpackWork) { + attempts := 0 + for { + idx := atomic.AddUint32(&unpackWorkIdx, 1) % uint32(len(unpackWorkChs)) + select { + case unpackWorkChs[idx] <- uw: + return + default: + attempts++ + if attempts >= len(unpackWorkChs) { + unpackWorkChs[idx] <- uw + return + } + } + } +} + +func unpackWorker(ch <-chan *unpackWork) { var tmpBlock storage.Block - for upw := range unpackWorkCh { + for upw := range ch { upw.unpack(&tmpBlock) } } @@ -313,7 +353,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. upw.tbf = tbf for _, br := range pts.brs { if len(upw.ws) >= unpackBatchSize { - unpackWorkCh <- upw + scheduleUnpackWork(upw) upws = append(upws, upw) upw = getUnpackWork() upw.tbf = tbf @@ -323,7 +363,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. tr: tr, }) } - unpackWorkCh <- upw + scheduleUnpackWork(upw) upws = append(upws, upw) pts.brs = pts.brs[:0]