app/vmselect/netstorage: reduce the number of calls to runtime.Gosched() at timeseriesWorker() and unpackWorker()

Call runtime.Gosched() only when there is a work to steal from other workers.
Simplify the timeseriesWorker() and unpackWroker() code a bit by inlining stealTimeseriesWork() and stealUnpackWork().

This should reduce CPU usage when processing queries on systems with big number of CPU cores.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966
This commit is contained in:
Aliaksandr Valialkin 2023-03-20 20:23:30 -07:00
parent 18af01c387
commit 08da383eac
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1

View File

@ -166,40 +166,28 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo
// Then help others with the remaining work. // Then help others with the remaining work.
rowsProcessed = 0 rowsProcessed = 0
seriesProcessed = 0 seriesProcessed = 0
idx := int(workerID) for i := uint(1); i < uint(len(workChs)); i++ {
for { idx := (i + workerID) % uint(len(workChs))
tsw, idxNext := stealTimeseriesWork(workChs, idx) ch := workChs[idx]
if tsw == nil { for len(ch) > 0 {
// There is no more work // Give a chance other goroutines to perform their work.
break runtime.Gosched()
// It is expected that every channel in the workChs is already closed,
// so the next line should return immediately.
tsw, ok := <-ch
if !ok {
break
}
tsw.err = tsw.do(&tmpResult.rs, workerID)
rowsProcessed += tsw.rowsProcessed
seriesProcessed++
} }
tsw.err = tsw.do(&tmpResult.rs, workerID)
rowsProcessed += tsw.rowsProcessed
seriesProcessed++
idx = idxNext
} }
qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed) qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
putTmpResult(tmpResult) putTmpResult(tmpResult)
} }
func stealTimeseriesWork(workChs []chan *timeseriesWork, startIdx int) (*timeseriesWork, int) {
for i := startIdx; i < startIdx+len(workChs); i++ {
// Give a chance other goroutines to perform their work
runtime.Gosched()
idx := i % len(workChs)
ch := workChs[idx]
// It is expected that every channel in the workChs is already closed,
// so the next line should return immediately.
tsw, ok := <-ch
if ok {
return tsw, idx
}
}
return nil, startIdx
}
func getTmpResult() *result { func getTmpResult() *result {
v := resultPool.Get() v := resultPool.Get()
if v == nil { if v == nil {
@ -415,37 +403,25 @@ func unpackWorker(workChs []chan *unpackWork, workerID uint) {
} }
// Then help others with their work. // Then help others with their work.
idx := int(workerID) for i := uint(1); i < uint(len(workChs)); i++ {
for { idx := (i + workerID) % uint(len(workChs))
upw, idxNext := stealUnpackWork(workChs, idx) ch := workChs[idx]
if upw == nil { for len(ch) > 0 {
// There is no more work // Give a chance other goroutines to perform their work
break runtime.Gosched()
// It is expected that every channel in the workChs is already closed,
// so the next line should return immediately.
upw, ok := <-ch
if !ok {
break
}
upw.unpack(tmpBlock)
} }
upw.unpack(tmpBlock)
idx = idxNext
} }
putTmpStorageBlock(tmpBlock) putTmpStorageBlock(tmpBlock)
} }
func stealUnpackWork(workChs []chan *unpackWork, startIdx int) (*unpackWork, int) {
for i := startIdx; i < startIdx+len(workChs); i++ {
// Give a chance other goroutines to perform their work
runtime.Gosched()
idx := i % len(workChs)
ch := workChs[idx]
// It is expected that every channel in the workChs is already closed,
// so the next line should return immediately.
upw, ok := <-ch
if ok {
return upw, idx
}
}
return nil, startIdx
}
func getTmpStorageBlock() *storage.Block { func getTmpStorageBlock() *storage.Block {
v := tmpStorageBlockPool.Get() v := tmpStorageBlockPool.Get()
if v == nil { if v == nil {