app/vmselect/netstorage: increase concurrency when processing small number of time series with big number of data points per each time series

Previously VictoriaMetrics was processing up to 32 time series in a single goroutine.
This could be slow if each time series contains big number of data points (10M+ or more), since only a single CPU core could be loaded with work,
while other CPU cores were idle. Fix this by launching GOMAXPROCS workers for time series processing.

This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/572
This commit is contained in:
Aliaksandr Valialkin 2020-06-23 20:29:19 +03:00
parent 3a444bb7bb
commit 0fdbe5de25

View File

@ -7,7 +7,6 @@ import (
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
@ -67,6 +66,47 @@ func (rss *Results) Cancel() {
rss.tbf = nil
}
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs)
type timeseriesWork struct {
rss *Results
pts *packedTimeseries
f func(rs *Result, workerID uint)
doneCh chan error
rowsProcessed int
}
func init() {
for i := 0; i < gomaxprocs; i++ {
go timeseriesWorker(uint(i))
}
}
func timeseriesWorker(workerID uint) {
var rs Result
for tsw := range timeseriesWorkCh {
rss := tsw.rss
if time.Until(rss.deadline.Deadline) < 0 {
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
continue
}
if err := tsw.pts.Unpack(rss.tbf, &rs, rss.tr, rss.fetchData, rss.at); err != nil {
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %s", err)
continue
}
if len(rs.Timestamps) > 0 || !rss.fetchData {
tsw.f(&rs, workerID)
}
tsw.rowsProcessed = len(rs.Values)
tsw.doneCh <- nil
if cap(rs.Values) > 1024*1024 && 4*len(rs.Values) < cap(rs.Values) {
// Reset rs in order to preseve memory usage after processing big time series with millions of rows.
rs = Result{}
}
}
}
// RunParallel runs in parallel f for all the results from rss.
//
// f shouldn't hold references to rs after returning.
@ -79,72 +119,36 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
rss.tbf = nil
}()
workersCount := 1 + len(rss.packedTimeseries)/32
if workersCount > gomaxprocs {
workersCount = gomaxprocs
}
if workersCount == 0 {
logger.Panicf("BUG: workersCount cannot be zero")
}
workCh := make(chan *packedTimeseries, workersCount)
doneCh := make(chan error)
// Start workers.
rowsProcessedTotal := uint64(0)
for i := 0; i < workersCount; i++ {
go func(workerID uint) {
rs := getResult()
defer putResult(rs)
maxWorkersCount := gomaxprocs / workersCount
var err error
rowsProcessed := 0
for pts := range workCh {
if time.Until(rss.deadline.Deadline) < 0 {
err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
break
}
if err = pts.Unpack(rss.tbf, rs, rss.tr, rss.fetchData, rss.at, maxWorkersCount); err != nil {
break
}
if len(rs.Timestamps) == 0 && rss.fetchData {
// Skip empty blocks.
continue
}
rowsProcessed += len(rs.Values)
f(rs, workerID)
}
atomic.AddUint64(&rowsProcessedTotal, uint64(rowsProcessed))
// Drain the remaining work
for range workCh {
}
doneCh <- err
}(uint(i))
}
// Feed workers with work.
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
for i := range rss.packedTimeseries {
workCh <- &rss.packedTimeseries[i]
tsw := &timeseriesWork{
rss: rss,
pts: &rss.packedTimeseries[i],
f: f,
doneCh: make(chan error, 1),
}
timeseriesWorkCh <- tsw
tsws[i] = tsw
}
seriesProcessedTotal := len(rss.packedTimeseries)
rss.packedTimeseries = rss.packedTimeseries[:0]
close(workCh)
// Wait until workers finish.
var errors []error
for i := 0; i < workersCount; i++ {
if err := <-doneCh; err != nil {
errors = append(errors, err)
// Wait until work is complete.
var firstErr error
rowsProcessedTotal := 0
for _, tsw := range tsws {
if err := <-tsw.doneCh; err != nil && firstErr == nil {
// Return just the first error, since other errors
// are likely duplicate the first error.
firstErr = err
}
rowsProcessedTotal += tsw.rowsProcessed
}
perQueryRowsProcessed.Update(float64(rowsProcessedTotal))
perQuerySeriesProcessed.Update(float64(seriesProcessedTotal))
if len(errors) > 0 {
// Return just the first error, since other errors
// is likely duplicate the first error.
return errors[0]
}
return nil
return firstErr
}
var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`)
@ -157,70 +161,78 @@ type packedTimeseries struct {
addrs []tmpBlockAddr
}
var unpackWorkCh = make(chan *unpackWork, gomaxprocs)
type unpackWork struct {
tbf *tmpBlocksFile
addr tmpBlockAddr
tr storage.TimeRange
fetchData bool
at *auth.Token
doneCh chan error
sb *sortBlock
}
func init() {
for i := 0; i < gomaxprocs; i++ {
go unpackWorker()
}
}
func unpackWorker() {
for upw := range unpackWorkCh {
sb := getSortBlock()
if err := sb.unpackFrom(upw.tbf, upw.addr, upw.tr, upw.fetchData, upw.at); err != nil {
putSortBlock(sb)
upw.doneCh <- fmt.Errorf("cannot unpack block: %s", err)
continue
}
upw.sb = sb
upw.doneCh <- nil
}
}
// Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token, maxWorkersCount int) error {
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token) error {
dst.reset()
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %s", pts.metricName, err)
}
workersCount := 1 + len(pts.addrs)/32
if workersCount > maxWorkersCount {
workersCount = maxWorkersCount
}
if workersCount == 0 {
logger.Panicf("BUG: workersCount cannot be zero")
}
sbs := make([]*sortBlock, 0, len(pts.addrs))
var sbsLock sync.Mutex
workCh := make(chan tmpBlockAddr, workersCount)
doneCh := make(chan error)
// Start workers
for i := 0; i < workersCount; i++ {
go func() {
var err error
for addr := range workCh {
sb := getSortBlock()
if err = sb.unpackFrom(tbf, addr, tr, fetchData, at); err != nil {
break
}
sbsLock.Lock()
sbs = append(sbs, sb)
sbsLock.Unlock()
}
// Drain the remaining work
for range workCh {
}
doneCh <- err
}()
}
// Feed workers with work
for _, addr := range pts.addrs {
workCh <- addr
upws := make([]*unpackWork, len(pts.addrs))
for i, addr := range pts.addrs {
upw := &unpackWork{
tbf: tbf,
addr: addr,
tr: tr,
fetchData: fetchData,
at: at,
doneCh: make(chan error, 1),
}
unpackWorkCh <- upw
upws[i] = upw
}
pts.addrs = pts.addrs[:0]
close(workCh)
// Wait until workers finish
var errors []error
for i := 0; i < workersCount; i++ {
if err := <-doneCh; err != nil {
errors = append(errors, err)
// Wait until work is complete
sbs := make([]*sortBlock, 0, len(pts.addrs))
var firstErr error
for _, upw := range upws {
if err := <-upw.doneCh; err != nil && firstErr == nil {
// Return the first error only, since other errors are likely the same.
firstErr = err
}
if firstErr == nil {
sbs = append(sbs, upw.sb)
} else {
putSortBlock(upw.sb)
}
}
if len(errors) > 0 {
// Return the first error only, since other errors are likely the same.
return errors[0]
if firstErr != nil {
return firstErr
}
// Merge blocks
mergeSortBlocks(dst, sbs)
return nil
}
@ -1590,25 +1602,6 @@ var (
// The maximum number of concurrent queries per storageNode.
const maxConcurrentQueriesPerStorageNode = 100
func getResult() *Result {
v := rsPool.Get()
if v == nil {
return &Result{}
}
return v.(*Result)
}
func putResult(rs *Result) {
if len(rs.Values) > 8192 {
// Do not pool big results, since they may occupy too much memory.
return
}
rs.reset()
rsPool.Put(rs)
}
var rsPool sync.Pool
// Deadline contains deadline with the corresponding timeout for pretty error messages.
type Deadline struct {
Deadline time.Time