From 08da383eacfbd07b07d36f2b7d355cd37d251eb1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 20 Mar 2023 20:23:30 -0700 Subject: [PATCH] app/vmselect/netstorage: reduce the number of calls to runtime.Gosched() at timeseriesWorker() and unpackWorker() Call runtime.Gosched() only when there is a work to steal from other workers. Simplify the timeseriesWorker() and unpackWroker() code a bit by inlining stealTimeseriesWork() and stealUnpackWork(). This should reduce CPU usage when processing queries on systems with big number of CPU cores. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966 --- app/vmselect/netstorage/netstorage.go | 80 ++++++++++----------------- 1 file changed, 28 insertions(+), 52 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 180c3bfe90..e1612ad9f1 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -166,40 +166,28 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo // Then help others with the remaining work. rowsProcessed = 0 seriesProcessed = 0 - idx := int(workerID) - for { - tsw, idxNext := stealTimeseriesWork(workChs, idx) - if tsw == nil { - // There is no more work - break + for i := uint(1); i < uint(len(workChs)); i++ { + idx := (i + workerID) % uint(len(workChs)) + ch := workChs[idx] + for len(ch) > 0 { + // Give a chance other goroutines to perform their work. + runtime.Gosched() + // It is expected that every channel in the workChs is already closed, + // so the next line should return immediately. + tsw, ok := <-ch + if !ok { + break + } + tsw.err = tsw.do(&tmpResult.rs, workerID) + rowsProcessed += tsw.rowsProcessed + seriesProcessed++ } - tsw.err = tsw.do(&tmpResult.rs, workerID) - rowsProcessed += tsw.rowsProcessed - seriesProcessed++ - idx = idxNext } qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed) putTmpResult(tmpResult) } -func stealTimeseriesWork(workChs []chan *timeseriesWork, startIdx int) (*timeseriesWork, int) { - for i := startIdx; i < startIdx+len(workChs); i++ { - // Give a chance other goroutines to perform their work - runtime.Gosched() - - idx := i % len(workChs) - ch := workChs[idx] - // It is expected that every channel in the workChs is already closed, - // so the next line should return immediately. - tsw, ok := <-ch - if ok { - return tsw, idx - } - } - return nil, startIdx -} - func getTmpResult() *result { v := resultPool.Get() if v == nil { @@ -415,37 +403,25 @@ func unpackWorker(workChs []chan *unpackWork, workerID uint) { } // Then help others with their work. - idx := int(workerID) - for { - upw, idxNext := stealUnpackWork(workChs, idx) - if upw == nil { - // There is no more work - break + for i := uint(1); i < uint(len(workChs)); i++ { + idx := (i + workerID) % uint(len(workChs)) + ch := workChs[idx] + for len(ch) > 0 { + // Give a chance other goroutines to perform their work + runtime.Gosched() + // It is expected that every channel in the workChs is already closed, + // so the next line should return immediately. + upw, ok := <-ch + if !ok { + break + } + upw.unpack(tmpBlock) } - upw.unpack(tmpBlock) - idx = idxNext } putTmpStorageBlock(tmpBlock) } -func stealUnpackWork(workChs []chan *unpackWork, startIdx int) (*unpackWork, int) { - for i := startIdx; i < startIdx+len(workChs); i++ { - // Give a chance other goroutines to perform their work - runtime.Gosched() - - idx := i % len(workChs) - ch := workChs[idx] - // It is expected that every channel in the workChs is already closed, - // so the next line should return immediately. - upw, ok := <-ch - if ok { - return upw, idx - } - } - return nil, startIdx -} - func getTmpStorageBlock() *storage.Block { v := tmpStorageBlockPool.Get() if v == nil {