mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
app/vmselect/netstorage: reduce CPU contention when upacking time series blocks by unpacking batches of such blocks instead of a single block
This should improve query performance on systems with big number of CPU cores (16 and more)
This commit is contained in:
parent
46c98cd97a
commit
14ddb8a34e
@ -171,28 +171,54 @@ type packedTimeseries struct {
|
||||
|
||||
var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128)
|
||||
|
||||
type unpackWorkItem struct {
|
||||
addr tmpBlockAddr
|
||||
tr storage.TimeRange
|
||||
}
|
||||
|
||||
type unpackWork struct {
|
||||
ws []unpackWorkItem
|
||||
tbf *tmpBlocksFile
|
||||
addr tmpBlockAddr
|
||||
tr storage.TimeRange
|
||||
fetchData bool
|
||||
at *auth.Token
|
||||
sb *sortBlock
|
||||
sbs []*sortBlock
|
||||
doneCh chan error
|
||||
}
|
||||
|
||||
func (upw *unpackWork) reset() {
|
||||
ws := upw.ws
|
||||
for i := range ws {
|
||||
w := &ws[i]
|
||||
w.addr = tmpBlockAddr{}
|
||||
w.tr = storage.TimeRange{}
|
||||
}
|
||||
upw.ws = upw.ws[:0]
|
||||
upw.tbf = nil
|
||||
upw.addr = tmpBlockAddr{}
|
||||
upw.tr = storage.TimeRange{}
|
||||
upw.fetchData = false
|
||||
upw.at = nil
|
||||
upw.sb = nil
|
||||
sbs := upw.sbs
|
||||
for i := range sbs {
|
||||
sbs[i] = nil
|
||||
}
|
||||
upw.sbs = upw.sbs[:0]
|
||||
if n := len(upw.doneCh); n > 0 {
|
||||
logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n)
|
||||
}
|
||||
}
|
||||
|
||||
func (upw *unpackWork) unpack() {
|
||||
for _, w := range upw.ws {
|
||||
sb := getSortBlock()
|
||||
if err := sb.unpackFrom(upw.tbf, w.addr, w.tr, upw.fetchData, upw.at); err != nil {
|
||||
putSortBlock(sb)
|
||||
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
|
||||
return
|
||||
}
|
||||
upw.sbs = append(upw.sbs, sb)
|
||||
}
|
||||
upw.doneCh <- nil
|
||||
}
|
||||
|
||||
func getUnpackWork() *unpackWork {
|
||||
v := unpackWorkPool.Get()
|
||||
if v != nil {
|
||||
@ -218,17 +244,15 @@ func init() {
|
||||
|
||||
func unpackWorker() {
|
||||
for upw := range unpackWorkCh {
|
||||
sb := getSortBlock()
|
||||
if err := sb.unpackFrom(upw.tbf, upw.addr, upw.tr, upw.fetchData, upw.at); err != nil {
|
||||
putSortBlock(sb)
|
||||
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
|
||||
continue
|
||||
}
|
||||
upw.sb = sb
|
||||
upw.doneCh <- nil
|
||||
upw.unpack()
|
||||
}
|
||||
}
|
||||
|
||||
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
|
||||
//
|
||||
// This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system.
|
||||
const unpackBatchSize = 16
|
||||
|
||||
// Unpack unpacks pts to dst.
|
||||
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token) error {
|
||||
dst.reset()
|
||||
@ -238,17 +262,27 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.
|
||||
}
|
||||
|
||||
// Feed workers with work
|
||||
upws := make([]*unpackWork, len(pts.addrs))
|
||||
for i, addr := range pts.addrs {
|
||||
upw := getUnpackWork()
|
||||
upw.tbf = tbf
|
||||
upw.addr = addr
|
||||
upw.tr = tr
|
||||
upw.fetchData = fetchData
|
||||
upw.at = at
|
||||
unpackWorkCh <- upw
|
||||
upws[i] = upw
|
||||
upws := make([]*unpackWork, 0, 1+len(pts.addrs)/unpackBatchSize)
|
||||
upw := getUnpackWork()
|
||||
upw.tbf = tbf
|
||||
upw.fetchData = fetchData
|
||||
upw.at = at
|
||||
for _, addr := range pts.addrs {
|
||||
if len(upw.ws) >= unpackBatchSize {
|
||||
unpackWorkCh <- upw
|
||||
upws = append(upws, upw)
|
||||
upw = getUnpackWork()
|
||||
upw.tbf = tbf
|
||||
upw.fetchData = fetchData
|
||||
upw.at = at
|
||||
}
|
||||
upw.ws = append(upw.ws, unpackWorkItem{
|
||||
addr: addr,
|
||||
tr: tr,
|
||||
})
|
||||
}
|
||||
unpackWorkCh <- upw
|
||||
upws = append(upws, upw)
|
||||
pts.addrs = pts.addrs[:0]
|
||||
|
||||
// Wait until work is complete
|
||||
@ -260,9 +294,11 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.
|
||||
firstErr = err
|
||||
}
|
||||
if firstErr == nil {
|
||||
sbs = append(sbs, upw.sb)
|
||||
} else if upw.sb != nil {
|
||||
putSortBlock(upw.sb)
|
||||
sbs = append(sbs, upw.sbs...)
|
||||
} else {
|
||||
for _, sb := range upw.sbs {
|
||||
putSortBlock(sb)
|
||||
}
|
||||
}
|
||||
putUnpackWork(upw)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user