diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index ea8bf0aef8..fadd1694ea 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -25,6 +25,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" "github.com/VictoriaMetrics/metrics" ) @@ -1083,7 +1084,6 @@ var metricNamePool = &sync.Pool{ // ExportBlocks searches for time series matching sq and calls f for each found block. // // f is called in parallel from multiple goroutines. -// Data processing is immediately stopped if f returns non-nil error. // It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block. // It is the responsibility of f to filter blocks according to the given tr. func ExportBlocks(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.Deadline, f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error { @@ -1094,7 +1094,14 @@ func ExportBlocks(at *auth.Token, sq *storage.SearchQuery, deadline searchutils. MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, } + var wg syncwg.WaitGroup + var stopped uint32 processBlock := func(mb *storage.MetricBlock) error { + wg.Add(1) + defer wg.Done() + if atomic.LoadUint32(&stopped) != 0 { + return nil + } mn := metricNamePool.Get().(*storage.MetricName) if err := mn.Unmarshal(mb.MetricName); err != nil { return fmt.Errorf("cannot unmarshal metricName: %w", err) @@ -1107,6 +1114,11 @@ func ExportBlocks(at *auth.Token, sq *storage.SearchQuery, deadline searchutils. return nil } _, err := processSearchQuery(at, true, sq, true, processBlock, deadline) + + // Make sure processBlock isn't called anymore in order to prevent from data races. + atomic.StoreUint32(&stopped, 1) + wg.Wait() + if err != nil { return fmt.Errorf("error occured during export: %w", err) } @@ -1185,7 +1197,14 @@ func ProcessSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se tbf: getTmpBlocksFile(), m: make(map[string][]tmpBlockAddr), } + var wg syncwg.WaitGroup + var stopped uint32 processBlock := func(mb *storage.MetricBlock) error { + wg.Add(1) + defer wg.Done() + if atomic.LoadUint32(&stopped) != 0 { + return nil + } if !fetchData { tbfw.RegisterEmptyBlock(mb) return nil @@ -1196,6 +1215,11 @@ func ProcessSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se return nil } isPartial, err := processSearchQuery(at, denyPartialResponse, sq, fetchData, processBlock, deadline) + + // Make sure processBlock isn't called anymore in order to protect from data races. + atomic.StoreUint32(&stopped, 1) + wg.Wait() + if err != nil { putTmpBlocksFile(tbfw.tbf) return nil, false, fmt.Errorf("error occured during search: %w", err)