diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index acd7ce35b3..f793316443 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -906,13 +906,10 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName return nil, err } - seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers()) - for i := 0; i < netstorage.MaxWorkers(); i++ { - seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded()) - } - var samplesScannedTotal uint64 keepMetricNames := getKeepMetricNames(expr) + tsw := getTimeseriesByWorkerID() + seriesByWorkerID := tsw.byWorkerID doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) { values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) preFunc(values, timestamps) @@ -933,8 +930,8 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) for i := range seriesByWorkerID { tss = append(tss, seriesByWorkerID[i].tss...) - putTimeseriesPadded(seriesByWorkerID[i]) } + putTimeseriesByWorkerID(tsw) rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal) @@ -1206,40 +1203,15 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, return tss, nil } -var tspPool sync.Pool - -func getTimeseriesPadded() *timeseriesWithPadding { - v := tspPool.Get() - if v == nil { - return ×eriesWithPadding{} - } - return v.(*timeseriesWithPadding) -} - -func putTimeseriesPadded(tsp *timeseriesWithPadding) { - tsp.tss = tsp.tss[:0] - tspPool.Put(tsp) -} - -type timeseriesWithPadding struct { - tss []*timeseries - - // The padding prevents false sharing on widespread platforms with - // 128 mod (cache line size) = 0 . - _ [128 - unsafe.Sizeof(timeseries{})%128]byte -} - func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs) defer qt.Done() - seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers()) - for i := 0; i < netstorage.MaxWorkers(); i++ { - seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded()) - } - var samplesScannedTotal uint64 + tsw := getTimeseriesByWorkerID() + seriesByWorkerID := tsw.byWorkerID + seriesLen := rss.Len() err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) @@ -1260,11 +1232,11 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k if err != nil { return nil, err } - tss := make([]*timeseries, 0, rss.Len()*len(rcs)) + tss := make([]*timeseries, 0, seriesLen*len(rcs)) for i := range seriesByWorkerID { tss = append(tss, seriesByWorkerID[i].tss...) - putTimeseriesPadded(seriesByWorkerID[i]) } + putTimeseriesByWorkerID(tsw) rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("samplesScanned=%d", samplesScannedTotal) @@ -1287,6 +1259,42 @@ func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConf return samplesScanned } +type timeseriesWithPadding struct { + tss []*timeseries + + // The padding prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof([]*timeseries{})%128]byte +} + +type timeseriesByWorkerID struct { + byWorkerID []timeseriesWithPadding +} + +func (tsw *timeseriesByWorkerID) reset() { + byWorkerID := tsw.byWorkerID + for i := range byWorkerID { + tsw.byWorkerID[i].tss = nil + } +} + +func getTimeseriesByWorkerID() *timeseriesByWorkerID { + v := timeseriesByWorkerIDPool.Get() + if v == nil { + return ×eriesByWorkerID{ + byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()), + } + } + return v.(*timeseriesByWorkerID) +} + +func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) { + tsw.reset() + timeseriesByWorkerIDPool.Put(tsw) +} + +var timeseriesByWorkerIDPool sync.Pool + var bbPool bytesutil.ByteBufferPool func evalNumber(ec *EvalConfig, n float64) []*timeseries { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1ac5007759..cb2dfba6cb 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -37,6 +37,8 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix displaying errors for each query. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3987). * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055). * BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999). +* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966). + ## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1)