app/vmselect: prevent from possible deadlock when f callback blocks inside RunParallel

This commit is contained in:
Aliaksandr Valialkin 2021-07-26 15:38:51 +03:00
parent c857e05604
commit 3921d8afae
2 changed files with 62 additions and 29 deletions

View File

@ -122,32 +122,22 @@ func putTimeseriesWork(tsw *timeseriesWork) {
var tswPool sync.Pool var tswPool sync.Pool
var timeseriesWorkChs []chan *timeseriesWork func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) {
if len(workChs) == 1 {
func init() {
timeseriesWorkChs = make([]chan *timeseriesWork, gomaxprocs)
for i := range timeseriesWorkChs {
timeseriesWorkChs[i] = make(chan *timeseriesWork, 16)
go timeseriesWorker(timeseriesWorkChs[i], uint(i))
}
}
func scheduleTimeseriesWork(tsw *timeseriesWork) {
if len(timeseriesWorkChs) == 1 {
// Fast path for a single CPU core // Fast path for a single CPU core
timeseriesWorkChs[0] <- tsw workChs[0] <- tsw
return return
} }
attempts := 0 attempts := 0
for { for {
idx := fastrand.Uint32n(uint32(len(timeseriesWorkChs))) idx := fastrand.Uint32n(uint32(len(workChs)))
select { select {
case timeseriesWorkChs[idx] <- tsw: case workChs[idx] <- tsw:
return return
default: default:
attempts++ attempts++
if attempts >= len(timeseriesWorkChs) { if attempts >= len(workChs) {
timeseriesWorkChs[idx] <- tsw workChs[idx] <- tsw
return return
} }
} }
@ -155,8 +145,11 @@ func scheduleTimeseriesWork(tsw *timeseriesWork) {
} }
func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) { func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
var rs Result v := resultPool.Get()
var rsLastResetTime uint64 if v == nil {
v = &result{}
}
r := v.(*result)
for tsw := range ch { for tsw := range ch {
if atomic.LoadUint32(tsw.mustStop) != 0 { if atomic.LoadUint32(tsw.mustStop) != 0 {
tsw.doneCh <- nil tsw.doneCh <- nil
@ -168,29 +161,38 @@ func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
continue continue
} }
if err := tsw.pts.Unpack(rss.tbf, &rs, rss.tr, rss.fetchData, rss.at); err != nil { if err := tsw.pts.Unpack(rss.tbf, &r.rs, rss.tr, rss.fetchData, rss.at); err != nil {
atomic.StoreUint32(tsw.mustStop, 1) atomic.StoreUint32(tsw.mustStop, 1)
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err) tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err)
continue continue
} }
if len(rs.Timestamps) > 0 || !rss.fetchData { if len(r.rs.Timestamps) > 0 || !rss.fetchData {
if err := tsw.f(&rs, workerID); err != nil { if err := tsw.f(&r.rs, workerID); err != nil {
atomic.StoreUint32(tsw.mustStop, 1) atomic.StoreUint32(tsw.mustStop, 1)
tsw.doneCh <- err tsw.doneCh <- err
continue continue
} }
} }
tsw.rowsProcessed = len(rs.Values) tsw.rowsProcessed = len(r.rs.Values)
tsw.doneCh <- nil tsw.doneCh <- nil
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
if cap(rs.Values) > 1024*1024 && 4*len(rs.Values) < cap(rs.Values) && currentTime-rsLastResetTime > 10 { if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
// Reset rs in order to preseve memory usage after processing big time series with millions of rows. // Reset r.rs in order to preseve memory usage after processing big time series with millions of rows.
rs = Result{} r.rs = Result{}
rsLastResetTime = currentTime r.lastResetTime = currentTime
} }
} }
r.rs.reset()
resultPool.Put(r)
} }
type result struct {
rs Result
lastResetTime uint64
}
var resultPool sync.Pool
// RunParallel runs f in parallel for all the results from rss. // RunParallel runs f in parallel for all the results from rss.
// //
// f shouldn't hold references to rs after returning. // f shouldn't hold references to rs after returning.
@ -204,6 +206,30 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
rss.tbf = nil rss.tbf = nil
}() }()
// Spin up local workers.
//
// Do not use a global workChs with a global pool of workers, since it may lead to a deadlock in the following case:
// - RunParallel is called with f, which blocks without forward progress.
// - All the workers in the global pool became blocked in f.
// - workChs is filled up, so it cannot accept new work items from other RunParallel calls.
workers := len(rss.packedTimeseries)
if workers > gomaxprocs {
workers = gomaxprocs
}
if workers < 1 {
workers = 1
}
workChs := make([]chan *timeseriesWork, workers)
var workChsWG sync.WaitGroup
for i := 0; i < workers; i++ {
workChs[i] = make(chan *timeseriesWork, 16)
workChsWG.Add(1)
go func(workerID int) {
defer workChsWG.Done()
timeseriesWorker(workChs[workerID], uint(workerID))
}(i)
}
// Feed workers with work. // Feed workers with work.
tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
var mustStop uint32 var mustStop uint32
@ -213,7 +239,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
tsw.pts = &rss.packedTimeseries[i] tsw.pts = &rss.packedTimeseries[i]
tsw.f = f tsw.f = f
tsw.mustStop = &mustStop tsw.mustStop = &mustStop
scheduleTimeseriesWork(tsw) scheduleTimeseriesWork(workChs, tsw)
tsws[i] = tsw tsws[i] = tsw
} }
seriesProcessedTotal := len(rss.packedTimeseries) seriesProcessedTotal := len(rss.packedTimeseries)
@ -233,6 +259,13 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
perQueryRowsProcessed.Update(float64(rowsProcessedTotal)) perQueryRowsProcessed.Update(float64(rowsProcessedTotal))
perQuerySeriesProcessed.Update(float64(seriesProcessedTotal)) perQuerySeriesProcessed.Update(float64(seriesProcessedTotal))
// Shut down local workers
for _, workCh := range workChs {
close(workCh)
}
workChsWG.Wait()
return firstErr return firstErr
} }
@ -310,7 +343,6 @@ func putUnpackWork(upw *unpackWork) {
var unpackWorkPool sync.Pool var unpackWorkPool sync.Pool
var unpackWorkChs []chan *unpackWork var unpackWorkChs []chan *unpackWork
var unpackWorkIdx uint32
func init() { func init() {
unpackWorkChs = make([]chan *unpackWork, gomaxprocs) unpackWorkChs = make([]chan *unpackWork, gomaxprocs)

View File

@ -8,6 +8,7 @@ sort: 15
* FEATURE: add `-search.maxSamplesPerSeries` command-line flag for limiting the number of raw samples a single query could process per each time series. This option can prevent from out of memory errors when a query processes tens of millions of raw samples per series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1067). * FEATURE: add `-search.maxSamplesPerSeries` command-line flag for limiting the number of raw samples a single query could process per each time series. This option can prevent from out of memory errors when a query processes tens of millions of raw samples per series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1067).
* BUGFIX: vmselect: prevent from possible deadlock when multiple `target` query args are passed to [Graphite Render API](https://docs.victoriametrics.com/#graphite-render-api-usage).
* BUGFIX: return series with `a op b` labels and `N` values for `(a op b) default N` if `(a op b)` returns series with all NaN values. Previously such series were removed. * BUGFIX: return series with `a op b` labels and `N` values for `(a op b) default N` if `(a op b)` returns series with all NaN values. Previously such series were removed.