app/vmselect/netstorage: prevent from data races in ProcessSearchQuery and in Export funcs when -replicationFactor > 1

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
This commit is contained in:
Aliaksandr Valialkin 2020-11-23 10:25:28 +02:00
parent f4fd917e4f
commit 25a57ced6c

View File

@ -25,6 +25,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/metrics"
)
@ -1083,7 +1084,6 @@ var metricNamePool = &sync.Pool{
// ExportBlocks searches for time series matching sq and calls f for each found block.
//
// f is called in parallel from multiple goroutines.
// Data processing is immediately stopped if f returns non-nil error.
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr.
func ExportBlocks(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.Deadline, f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error {
@ -1094,7 +1094,14 @@ func ExportBlocks(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
var wg syncwg.WaitGroup
var stopped uint32
processBlock := func(mb *storage.MetricBlock) error {
wg.Add(1)
defer wg.Done()
if atomic.LoadUint32(&stopped) != 0 {
return nil
}
mn := metricNamePool.Get().(*storage.MetricName)
if err := mn.Unmarshal(mb.MetricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName: %w", err)
@ -1107,6 +1114,11 @@ func ExportBlocks(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
return nil
}
_, err := processSearchQuery(at, true, sq, true, processBlock, deadline)
// Make sure processBlock isn't called anymore in order to prevent from data races.
atomic.StoreUint32(&stopped, 1)
wg.Wait()
if err != nil {
return fmt.Errorf("error occured during export: %w", err)
}
@ -1185,7 +1197,14 @@ func ProcessSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se
tbf: getTmpBlocksFile(),
m: make(map[string][]tmpBlockAddr),
}
var wg syncwg.WaitGroup
var stopped uint32
processBlock := func(mb *storage.MetricBlock) error {
wg.Add(1)
defer wg.Done()
if atomic.LoadUint32(&stopped) != 0 {
return nil
}
if !fetchData {
tbfw.RegisterEmptyBlock(mb)
return nil
@ -1196,6 +1215,11 @@ func ProcessSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se
return nil
}
isPartial, err := processSearchQuery(at, denyPartialResponse, sq, fetchData, processBlock, deadline)
// Make sure processBlock isn't called anymore in order to protect from data races.
atomic.StoreUint32(&stopped, 1)
wg.Wait()
if err != nil {
putTmpBlocksFile(tbfw.tbf)
return nil, false, fmt.Errorf("error occured during search: %w", err)