diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 336a0ce0aa..5ba2f7a875 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -7,7 +7,6 @@ import ( "runtime" "sort" "sync" - "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" @@ -67,6 +66,47 @@ func (rss *Results) Cancel() { rss.tbf = nil } +var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs) + +type timeseriesWork struct { + rss *Results + pts *packedTimeseries + f func(rs *Result, workerID uint) + doneCh chan error + + rowsProcessed int +} + +func init() { + for i := 0; i < gomaxprocs; i++ { + go timeseriesWorker(uint(i)) + } +} + +func timeseriesWorker(workerID uint) { + var rs Result + for tsw := range timeseriesWorkCh { + rss := tsw.rss + if time.Until(rss.deadline.Deadline) < 0 { + tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) + continue + } + if err := tsw.pts.Unpack(rss.tbf, &rs, rss.tr, rss.fetchData, rss.at); err != nil { + tsw.doneCh <- fmt.Errorf("error during time series unpacking: %s", err) + continue + } + if len(rs.Timestamps) > 0 || !rss.fetchData { + tsw.f(&rs, workerID) + } + tsw.rowsProcessed = len(rs.Values) + tsw.doneCh <- nil + if cap(rs.Values) > 1024*1024 && 4*len(rs.Values) < cap(rs.Values) { + // Reset rs in order to preseve memory usage after processing big time series with millions of rows. + rs = Result{} + } + } +} + // RunParallel runs in parallel f for all the results from rss. // // f shouldn't hold references to rs after returning. @@ -79,72 +119,36 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { rss.tbf = nil }() - workersCount := 1 + len(rss.packedTimeseries)/32 - if workersCount > gomaxprocs { - workersCount = gomaxprocs - } - if workersCount == 0 { - logger.Panicf("BUG: workersCount cannot be zero") - } - workCh := make(chan *packedTimeseries, workersCount) - doneCh := make(chan error) - - // Start workers. - rowsProcessedTotal := uint64(0) - for i := 0; i < workersCount; i++ { - go func(workerID uint) { - rs := getResult() - defer putResult(rs) - maxWorkersCount := gomaxprocs / workersCount - - var err error - rowsProcessed := 0 - for pts := range workCh { - if time.Until(rss.deadline.Deadline) < 0 { - err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) - break - } - if err = pts.Unpack(rss.tbf, rs, rss.tr, rss.fetchData, rss.at, maxWorkersCount); err != nil { - break - } - if len(rs.Timestamps) == 0 && rss.fetchData { - // Skip empty blocks. - continue - } - rowsProcessed += len(rs.Values) - f(rs, workerID) - } - atomic.AddUint64(&rowsProcessedTotal, uint64(rowsProcessed)) - // Drain the remaining work - for range workCh { - } - doneCh <- err - }(uint(i)) - } - // Feed workers with work. + tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) for i := range rss.packedTimeseries { - workCh <- &rss.packedTimeseries[i] + tsw := ×eriesWork{ + rss: rss, + pts: &rss.packedTimeseries[i], + f: f, + doneCh: make(chan error, 1), + } + timeseriesWorkCh <- tsw + tsws[i] = tsw } seriesProcessedTotal := len(rss.packedTimeseries) rss.packedTimeseries = rss.packedTimeseries[:0] - close(workCh) - // Wait until workers finish. - var errors []error - for i := 0; i < workersCount; i++ { - if err := <-doneCh; err != nil { - errors = append(errors, err) + // Wait until work is complete. + var firstErr error + rowsProcessedTotal := 0 + for _, tsw := range tsws { + if err := <-tsw.doneCh; err != nil && firstErr == nil { + // Return just the first error, since other errors + // are likely duplicate the first error. + firstErr = err } + rowsProcessedTotal += tsw.rowsProcessed } + perQueryRowsProcessed.Update(float64(rowsProcessedTotal)) perQuerySeriesProcessed.Update(float64(seriesProcessedTotal)) - if len(errors) > 0 { - // Return just the first error, since other errors - // is likely duplicate the first error. - return errors[0] - } - return nil + return firstErr } var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`) @@ -157,70 +161,78 @@ type packedTimeseries struct { addrs []tmpBlockAddr } +var unpackWorkCh = make(chan *unpackWork, gomaxprocs) + +type unpackWork struct { + tbf *tmpBlocksFile + addr tmpBlockAddr + tr storage.TimeRange + fetchData bool + at *auth.Token + doneCh chan error + sb *sortBlock +} + +func init() { + for i := 0; i < gomaxprocs; i++ { + go unpackWorker() + } +} + +func unpackWorker() { + for upw := range unpackWorkCh { + sb := getSortBlock() + if err := sb.unpackFrom(upw.tbf, upw.addr, upw.tr, upw.fetchData, upw.at); err != nil { + putSortBlock(sb) + upw.doneCh <- fmt.Errorf("cannot unpack block: %s", err) + continue + } + upw.sb = sb + upw.doneCh <- nil + } +} + // Unpack unpacks pts to dst. -func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token, maxWorkersCount int) error { +func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token) error { dst.reset() if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil { return fmt.Errorf("cannot unmarshal metricName %q: %s", pts.metricName, err) } - workersCount := 1 + len(pts.addrs)/32 - if workersCount > maxWorkersCount { - workersCount = maxWorkersCount - } - if workersCount == 0 { - logger.Panicf("BUG: workersCount cannot be zero") - } - - sbs := make([]*sortBlock, 0, len(pts.addrs)) - var sbsLock sync.Mutex - - workCh := make(chan tmpBlockAddr, workersCount) - doneCh := make(chan error) - - // Start workers - for i := 0; i < workersCount; i++ { - go func() { - var err error - for addr := range workCh { - sb := getSortBlock() - if err = sb.unpackFrom(tbf, addr, tr, fetchData, at); err != nil { - break - } - - sbsLock.Lock() - sbs = append(sbs, sb) - sbsLock.Unlock() - } - - // Drain the remaining work - for range workCh { - } - doneCh <- err - }() - } - // Feed workers with work - for _, addr := range pts.addrs { - workCh <- addr + upws := make([]*unpackWork, len(pts.addrs)) + for i, addr := range pts.addrs { + upw := &unpackWork{ + tbf: tbf, + addr: addr, + tr: tr, + fetchData: fetchData, + at: at, + doneCh: make(chan error, 1), + } + unpackWorkCh <- upw + upws[i] = upw } pts.addrs = pts.addrs[:0] - close(workCh) - // Wait until workers finish - var errors []error - for i := 0; i < workersCount; i++ { - if err := <-doneCh; err != nil { - errors = append(errors, err) + // Wait until work is complete + sbs := make([]*sortBlock, 0, len(pts.addrs)) + var firstErr error + for _, upw := range upws { + if err := <-upw.doneCh; err != nil && firstErr == nil { + // Return the first error only, since other errors are likely the same. + firstErr = err + } + if firstErr == nil { + sbs = append(sbs, upw.sb) + } else { + putSortBlock(upw.sb) } } - if len(errors) > 0 { - // Return the first error only, since other errors are likely the same. - return errors[0] + if firstErr != nil { + return firstErr } - - // Merge blocks mergeSortBlocks(dst, sbs) return nil } @@ -1590,25 +1602,6 @@ var ( // The maximum number of concurrent queries per storageNode. const maxConcurrentQueriesPerStorageNode = 100 -func getResult() *Result { - v := rsPool.Get() - if v == nil { - return &Result{} - } - return v.(*Result) -} - -func putResult(rs *Result) { - if len(rs.Values) > 8192 { - // Do not pool big results, since they may occupy too much memory. - return - } - rs.reset() - rsPool.Put(rs) -} - -var rsPool sync.Pool - // Deadline contains deadline with the corresponding timeout for pretty error messages. type Deadline struct { Deadline time.Time