app/vmselect/netstorage: improve scalability of series unpacking on multi-CPU systems

This commit is contained in:
Aliaksandr Valialkin 2021-07-15 15:40:41 +03:00
parent 171d44acd8
commit e6ef97a5ee

View File

@ -79,8 +79,6 @@ func (rss *Results) mustClose() {
rss.tbf = nil
}
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16)
type timeseriesWork struct {
mustStop *uint32
rss *Results
@ -119,16 +117,38 @@ func putTimeseriesWork(tsw *timeseriesWork) {
var tswPool sync.Pool
var timeseriesWorkChs []chan *timeseriesWork
var timeseriesWorkIdx uint32
func init() {
for i := 0; i < gomaxprocs; i++ {
go timeseriesWorker(uint(i))
timeseriesWorkChs = make([]chan *timeseriesWork, gomaxprocs)
for i := range timeseriesWorkChs {
timeseriesWorkChs[i] = make(chan *timeseriesWork, 16)
go timeseriesWorker(timeseriesWorkChs[i], uint(i))
}
}
func timeseriesWorker(workerID uint) {
func scheduleTimeseriesWork(tsw *timeseriesWork) {
attempts := 0
for {
idx := atomic.AddUint32(&timeseriesWorkIdx, 1) % uint32(len(timeseriesWorkChs))
select {
case timeseriesWorkChs[idx] <- tsw:
return
default:
attempts++
if attempts >= len(timeseriesWorkChs) {
timeseriesWorkChs[idx] <- tsw
return
}
}
}
}
func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
var rs Result
var rsLastResetTime uint64
for tsw := range timeseriesWorkCh {
for tsw := range ch {
if atomic.LoadUint32(tsw.mustStop) != 0 {
tsw.doneCh <- nil
continue
@ -181,7 +201,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
tsw.pts = &rss.packedTimeseries[i]
tsw.f = f
tsw.mustStop = &mustStop
timeseriesWorkCh <- tsw
scheduleTimeseriesWork(tsw)
tsws[i] = tsw
}
seriesProcessedTotal := len(rss.packedTimeseries)
@ -214,8 +234,6 @@ type packedTimeseries struct {
brs []blockRef
}
var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128)
type unpackWorkItem struct {
br blockRef
tr storage.TimeRange
@ -277,15 +295,37 @@ func putUnpackWork(upw *unpackWork) {
var unpackWorkPool sync.Pool
var unpackWorkChs []chan *unpackWork
var unpackWorkIdx uint32
func init() {
for i := 0; i < gomaxprocs; i++ {
go unpackWorker()
unpackWorkChs = make([]chan *unpackWork, gomaxprocs)
for i := range unpackWorkChs {
unpackWorkChs[i] = make(chan *unpackWork, 128)
go unpackWorker(unpackWorkChs[i])
}
}
func unpackWorker() {
func scheduleUnpackWork(uw *unpackWork) {
attempts := 0
for {
idx := atomic.AddUint32(&unpackWorkIdx, 1) % uint32(len(unpackWorkChs))
select {
case unpackWorkChs[idx] <- uw:
return
default:
attempts++
if attempts >= len(unpackWorkChs) {
unpackWorkChs[idx] <- uw
return
}
}
}
}
func unpackWorker(ch <-chan *unpackWork) {
var tmpBlock storage.Block
for upw := range unpackWorkCh {
for upw := range ch {
upw.unpack(&tmpBlock)
}
}
@ -313,7 +353,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
upw.tbf = tbf
for _, br := range pts.brs {
if len(upw.ws) >= unpackBatchSize {
unpackWorkCh <- upw
scheduleUnpackWork(upw)
upws = append(upws, upw)
upw = getUnpackWork()
upw.tbf = tbf
@ -323,7 +363,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
tr: tr,
})
}
unpackWorkCh <- upw
scheduleUnpackWork(upw)
upws = append(upws, upw)
pts.brs = pts.brs[:0]