app/vmselect/netstorage: remove tswPool, since it isnt efficient

This commit is contained in:
Aliaksandr Valialkin 2024-01-23 01:36:57 +02:00
parent 72a838a2a1
commit e0399ec29a
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB

View File

@ -90,30 +90,6 @@ type timeseriesWork struct {
rowsProcessed int
}
func (tsw *timeseriesWork) reset() {
tsw.mustStop = nil
tsw.rss = nil
tsw.pts = nil
tsw.f = nil
tsw.err = nil
tsw.rowsProcessed = 0
}
func getTimeseriesWork() *timeseriesWork {
v := tswPool.Get()
if v == nil {
v = &timeseriesWork{}
}
return v.(*timeseriesWork)
}
func putTimeseriesWork(tsw *timeseriesWork) {
tsw.reset()
tswPool.Put(tsw)
}
var tswPool sync.Pool
func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
if atomic.LoadUint32(tsw.mustStop) != 0 {
return nil
@ -272,22 +248,20 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
maxWorkers := MaxWorkers()
if maxWorkers == 1 || tswsLen == 1 {
// It is faster to process time series in the current goroutine.
tsw := getTimeseriesWork()
var tsw timeseriesWork
tmpResult := getTmpResult()
rowsProcessedTotal := 0
var err error
for i := range rss.packedTimeseries {
initTimeseriesWork(tsw, &rss.packedTimeseries[i])
initTimeseriesWork(&tsw, &rss.packedTimeseries[i])
err = tsw.do(&tmpResult.rs, 0)
rowsReadPerSeries.Update(float64(tsw.rowsProcessed))
rowsProcessedTotal += tsw.rowsProcessed
if err != nil {
break
}
tsw.reset()
}
putTmpResult(tmpResult)
putTimeseriesWork(tsw)
return rowsProcessedTotal, err
}
@ -297,11 +271,9 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
// which reduces the scalability on systems with many CPU cores.
// Prepare the work for workers.
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
tsws := make([]timeseriesWork, len(rss.packedTimeseries))
for i := range rss.packedTimeseries {
tsw := getTimeseriesWork()
initTimeseriesWork(tsw, &rss.packedTimeseries[i])
tsws[i] = tsw
initTimeseriesWork(&tsws[i], &rss.packedTimeseries[i])
}
// Prepare worker channels.
@ -316,9 +288,9 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
}
// Spread work among workers.
for i, tsw := range tsws {
for i := range tsws {
idx := i % len(workChs)
workChs[idx] <- tsw
workChs[idx] <- &tsws[i]
}
// Mark worker channels as closed.
for _, workCh := range workChs {
@ -341,14 +313,14 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
// Collect results.
var firstErr error
rowsProcessedTotal := 0
for _, tsw := range tsws {
for i := range tsws {
tsw := &tsws[i]
if tsw.err != nil && firstErr == nil {
// Return just the first error, since other errors are likely duplicate the first error.
firstErr = tsw.err
}
rowsReadPerSeries.Update(float64(tsw.rowsProcessed))
rowsProcessedTotal += tsw.rowsProcessed
putTimeseriesWork(tsw)
}
return rowsProcessedTotal, firstErr
}