app/vmselect/promql: pass workerID to the callback inside doParallel()

This opens the possibility to remove tssLock from evalRollupFuncWithSubquery()
in the follow-up commit from @zekker6 in order to speed up the code
for systems with many CPU cores.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966
This commit is contained in:
Aliaksandr Valialkin 2023-03-20 20:54:54 -07:00
parent e749a015a9
commit 79d8f0e7c6
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1

View File

@ -925,7 +925,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
var tssLock sync.Mutex var tssLock sync.Mutex
var samplesScannedTotal uint64 var samplesScannedTotal uint64
keepMetricNames := getKeepMetricNames(expr) keepMetricNames := getKeepMetricNames(expr)
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) { doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
preFunc(values, timestamps) preFunc(values, timestamps)
for _, rc := range rcs { for _, rc := range rcs {
@ -969,28 +969,36 @@ func getKeepMetricNames(expr metricsql.Expr) bool {
return false return false
} }
func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) { func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64)) {
concurrency := cgroup.AvailableCPUs() workers := netstorage.MaxWorkers()
if concurrency > len(tss) { if workers > len(tss) {
concurrency = len(tss) workers = len(tss)
} }
workCh := make(chan *timeseries, concurrency) seriesPerWorker := (len(tss) + workers - 1) / workers
workChs := make([]chan *timeseries, workers)
for i := range workChs {
workChs[i] = make(chan *timeseries, seriesPerWorker)
}
for i, ts := range tss {
idx := i % len(workChs)
workChs[idx] <- ts
}
for _, workCh := range workChs {
close(workCh)
}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(concurrency) wg.Add(workers)
for i := 0; i < concurrency; i++ { for i := 0; i < workers; i++ {
go func() { go func(workerID uint) {
defer wg.Done() defer wg.Done()
var tmpValues []float64 var tmpValues []float64
var tmpTimestamps []int64 var tmpTimestamps []int64
for ts := range workCh { for ts := range workChs[workerID] {
tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps) tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, workerID)
} }
}() }(uint(i))
} }
for _, ts := range tss {
workCh <- ts
}
close(workCh)
wg.Wait() wg.Wait()
} }