From e0399ec29af5908ef68beb0a0d17721f8e7fd0c3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 23 Jan 2024 01:36:57 +0200 Subject: [PATCH] app/vmselect/netstorage: remove tswPool, since it isnt efficient --- app/vmselect/netstorage/netstorage.go | 44 +++++---------------------- 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 3def82fe7e..5b1d6531fd 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -90,30 +90,6 @@ type timeseriesWork struct { rowsProcessed int } -func (tsw *timeseriesWork) reset() { - tsw.mustStop = nil - tsw.rss = nil - tsw.pts = nil - tsw.f = nil - tsw.err = nil - tsw.rowsProcessed = 0 -} - -func getTimeseriesWork() *timeseriesWork { - v := tswPool.Get() - if v == nil { - v = ×eriesWork{} - } - return v.(*timeseriesWork) -} - -func putTimeseriesWork(tsw *timeseriesWork) { - tsw.reset() - tswPool.Put(tsw) -} - -var tswPool sync.Pool - func (tsw *timeseriesWork) do(r *Result, workerID uint) error { if atomic.LoadUint32(tsw.mustStop) != 0 { return nil @@ -272,22 +248,20 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke maxWorkers := MaxWorkers() if maxWorkers == 1 || tswsLen == 1 { // It is faster to process time series in the current goroutine. - tsw := getTimeseriesWork() + var tsw timeseriesWork tmpResult := getTmpResult() rowsProcessedTotal := 0 var err error for i := range rss.packedTimeseries { - initTimeseriesWork(tsw, &rss.packedTimeseries[i]) + initTimeseriesWork(&tsw, &rss.packedTimeseries[i]) err = tsw.do(&tmpResult.rs, 0) rowsReadPerSeries.Update(float64(tsw.rowsProcessed)) rowsProcessedTotal += tsw.rowsProcessed if err != nil { break } - tsw.reset() } putTmpResult(tmpResult) - putTimeseriesWork(tsw) return rowsProcessedTotal, err } @@ -297,11 +271,9 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke // which reduces the scalability on systems with many CPU cores. // Prepare the work for workers. - tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) + tsws := make([]timeseriesWork, len(rss.packedTimeseries)) for i := range rss.packedTimeseries { - tsw := getTimeseriesWork() - initTimeseriesWork(tsw, &rss.packedTimeseries[i]) - tsws[i] = tsw + initTimeseriesWork(&tsws[i], &rss.packedTimeseries[i]) } // Prepare worker channels. @@ -316,9 +288,9 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke } // Spread work among workers. - for i, tsw := range tsws { + for i := range tsws { idx := i % len(workChs) - workChs[idx] <- tsw + workChs[idx] <- &tsws[i] } // Mark worker channels as closed. for _, workCh := range workChs { @@ -341,14 +313,14 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke // Collect results. var firstErr error rowsProcessedTotal := 0 - for _, tsw := range tsws { + for i := range tsws { + tsw := &tsws[i] if tsw.err != nil && firstErr == nil { // Return just the first error, since other errors are likely duplicate the first error. firstErr = tsw.err } rowsReadPerSeries.Update(float64(tsw.rowsProcessed)) rowsProcessedTotal += tsw.rowsProcessed - putTimeseriesWork(tsw) } return rowsProcessedTotal, firstErr }