diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 755826974e..ddd80d8fca 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -16,6 +16,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -28,7 +29,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" "github.com/VictoriaMetrics/metrics" "github.com/cespare/xxhash/v2" "github.com/valyala/fastrand" @@ -655,9 +655,9 @@ func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadli } // Push mrs to storage nodes in parallel. - snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { sn.registerMetricNamesRequests.Inc() - err := sn.registerMetricNames(qt, mrsPerNode[idx], deadline) + err := sn.registerMetricNames(qt, mrsPerNode[workerIdx], deadline) if err != nil { sn.registerMetricNamesErrors.Inc() } @@ -686,7 +686,7 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear deletedCount int err error } - snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { sn.deleteSeriesRequests.Inc() deletedCount, err := sn.deleteSeries(qt, requestData, deadline) if err != nil { @@ -727,7 +727,7 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se labelNames []string err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { sn.labelNamesRequests.Inc() labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline) if err != nil { @@ -829,7 +829,7 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str labelValues []string err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { sn.labelValuesRequests.Inc() labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline) if err != nil { @@ -911,7 +911,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP suffixes []string err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { sn.tagValueSuffixesRequests.Inc() suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline) if err != nil { @@ -975,7 +975,7 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se status *storage.TSDBStatus err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { sn.tsdbStatusRequests.Inc() status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline) if err != nil { @@ -1080,7 +1080,7 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia n uint64 err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { sn.seriesCountRequests.Inc() n, err := sn.getSeriesCount(qt, accountID, projectID, deadline) if err != nil { @@ -1213,7 +1213,7 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto metricNames []string err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { sn.searchMetricNamesRequests.Inc() metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline) if err != nil { @@ -1315,9 +1315,18 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage requestData := sq.Marshal(nil) // Make sure that processBlock is no longer called after the exit from ProcessBlocks() function. + // Use per-worker WaitGroup instead of a shared WaitGroup in order to avoid inter-CPU contention, + // which may siginificantly slow down the rate of processBlock calls on multi-CPU systems. + type wgWithPadding struct { + wg sync.WaitGroup + // Prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + pad [128 - unsafe.Sizeof(sync.WaitGroup{})%128]byte + } + wgs := make([]wgWithPadding, len(storageNodes)) var stopped uint32 - var wg syncwg.WaitGroup - f := func(mb *storage.MetricBlock) error { + f := func(mb *storage.MetricBlock, workerIdx int) error { + wg := &wgs[workerIdx].wg wg.Add(1) defer wg.Done() if atomic.LoadUint32(&stopped) != 0 { @@ -1327,9 +1336,9 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage } // Send the query to all the storage nodes in parallel. - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { sn.searchRequests.Inc() - err := sn.processSearchQuery(qt, requestData, f, deadline) + err := sn.processSearchQuery(qt, requestData, f, workerIdx, deadline) if err != nil { sn.searchErrors.Inc() err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) @@ -1344,7 +1353,9 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage }) // Make sure that processBlock is no longer called after the exit from ProcessBlocks() function. atomic.StoreUint32(&stopped, 1) - wg.Wait() + for i := range wgs { + wgs[i].wg.Wait() + } if err != nil { return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err) } @@ -1356,12 +1367,12 @@ type storageNodesRequest struct { resultsCh chan interface{} } -func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool, f func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{}) *storageNodesRequest { +func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool, f func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{}) *storageNodesRequest { resultsCh := make(chan interface{}, len(storageNodes)) for idx, sn := range storageNodes { qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr()) - go func(idx int, sn *storageNode) { - result := f(qtChild, idx, sn) + go func(workerIdx int, sn *storageNode) { + result := f(qtChild, workerIdx, sn) resultsCh <- result qtChild.Done() }(idx, sn) @@ -1631,9 +1642,10 @@ func (sn *storageNode) processSearchMetricNames(qt *querytracer.Tracer, requestD return metricNames, nil } -func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error { +func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock, workerIdx int) error, + workerIdx int, deadline searchutils.Deadline) error { f := func(bc *handshake.BufferedConn) error { - if err := sn.processSearchQueryOnConn(bc, requestData, processBlock); err != nil { + if err := sn.processSearchQueryOnConn(bc, requestData, processBlock, workerIdx); err != nil { return err } return nil @@ -2133,7 +2145,8 @@ func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn const maxMetricNameSize = 64 * 1024 -func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, processBlock func(mb *storage.MetricBlock) error) error { +func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, + processBlock func(mb *storage.MetricBlock, workerIdx int) error, workerIdx int) error { // Send the request to sn. if err := writeBytes(bc, requestData); err != nil { return fmt.Errorf("cannot write requestData: %w", err) @@ -2173,7 +2186,7 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ blocksRead++ sn.metricBlocksRead.Inc() sn.metricRowsRead.Add(mb.Block.RowsCount()) - if err := processBlock(&mb); err != nil { + if err := processBlock(&mb, workerIdx); err != nil { return fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err) } } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 86c1108a4f..a8a12a0c95 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve performance for heavy queries on systems with many CPU cores. + * BUGFIX: prevent from excess CPU usage when the storage enters [read-only mode](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#readonly-mode). ## [v1.80.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.80.0)