app/vmselect/netstorage: prevent from calling processBlocks callback after the exit from ProcessBlocks function

This should prevent from panic at multi-level vmselect
when the top-level vmselect is configured with -replicationFactor > 1
This commit is contained in:
Aliaksandr Valialkin 2022-08-08 12:54:55 +03:00
parent 3c583c16a1
commit 1996e36cf0
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 18 additions and 26 deletions

View File

@ -1173,16 +1173,9 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
MinTimestamp: sq.MinTimestamp, MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp, MaxTimestamp: sq.MaxTimestamp,
} }
var wg syncwg.WaitGroup
var stopped uint32
var blocksRead uint64 var blocksRead uint64
var samples uint64 var samples uint64
processBlock := func(mb *storage.MetricBlock) error { processBlock := func(mb *storage.MetricBlock) error {
wg.Add(1)
defer wg.Done()
if atomic.LoadUint32(&stopped) != 0 {
return nil
}
mn := metricNamePool.Get().(*storage.MetricName) mn := metricNamePool.Get().(*storage.MetricName)
if err := mn.Unmarshal(mb.MetricName); err != nil { if err := mn.Unmarshal(mb.MetricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName: %w", err) return fmt.Errorf("cannot unmarshal metricName: %w", err)
@ -1197,12 +1190,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
return nil return nil
} }
_, err := ProcessBlocks(qt, true, sq, processBlock, deadline) _, err := ProcessBlocks(qt, true, sq, processBlock, deadline)
qt.Printf("export blocks=%d, samples=%d, err=%v", blocksRead, samples, err)
// Make sure processBlock isn't called anymore in order to prevent from data races.
atomic.StoreUint32(&stopped, 1)
wg.Wait()
qt.Printf("export blocks=%d, samples=%d", blocksRead, samples)
if err != nil { if err != nil {
return fmt.Errorf("error occured during export: %w", err) return fmt.Errorf("error occured during export: %w", err)
} }
@ -1282,16 +1270,9 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
tbf: getTmpBlocksFile(), tbf: getTmpBlocksFile(),
m: make(map[string][]tmpBlockAddr), m: make(map[string][]tmpBlockAddr),
} }
var wg syncwg.WaitGroup
var stopped uint32
var blocksRead uint64 var blocksRead uint64
var samples uint64 var samples uint64
processBlock := func(mb *storage.MetricBlock) error { processBlock := func(mb *storage.MetricBlock) error {
wg.Add(1)
defer wg.Done()
if atomic.LoadUint32(&stopped) != 0 {
return nil
}
atomic.AddUint64(&blocksRead, 1) atomic.AddUint64(&blocksRead, 1)
n := atomic.AddUint64(&samples, uint64(mb.Block.RowsCount())) n := atomic.AddUint64(&samples, uint64(mb.Block.RowsCount()))
if *maxSamplesPerQuery > 0 && n > uint64(*maxSamplesPerQuery) { if *maxSamplesPerQuery > 0 && n > uint64(*maxSamplesPerQuery) {
@ -1303,11 +1284,6 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
return nil return nil
} }
isPartial, err := ProcessBlocks(qt, denyPartialResponse, sq, processBlock, deadline) isPartial, err := ProcessBlocks(qt, denyPartialResponse, sq, processBlock, deadline)
// Make sure processBlock isn't called anymore in order to protect from data races.
atomic.StoreUint32(&stopped, 1)
wg.Wait()
if err != nil { if err != nil {
putTmpBlocksFile(tbfw.tbf) putTmpBlocksFile(tbfw.tbf)
return nil, false, fmt.Errorf("error occured during search: %w", err) return nil, false, fmt.Errorf("error occured during search: %w", err)
@ -1338,10 +1314,22 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) { processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) {
requestData := sq.Marshal(nil) requestData := sq.Marshal(nil)
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
var stopped uint32
var wg syncwg.WaitGroup
f := func(mb *storage.MetricBlock) error {
wg.Add(1)
defer wg.Done()
if atomic.LoadUint32(&stopped) != 0 {
return nil
}
return processBlock(mb)
}
// Send the query to all the storage nodes in parallel. // Send the query to all the storage nodes in parallel.
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
sn.searchRequests.Inc() sn.searchRequests.Inc()
err := sn.processSearchQuery(qt, requestData, processBlock, deadline) err := sn.processSearchQuery(qt, requestData, f, deadline)
if err != nil { if err != nil {
sn.searchErrors.Inc() sn.searchErrors.Inc()
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
@ -1354,6 +1342,9 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
errP := result.(*error) errP := result.(*error)
return *errP return *errP
}) })
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
atomic.StoreUint32(&stopped, 1)
wg.Wait()
if err != nil { if err != nil {
return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err) return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err)
} }

View File

@ -24,6 +24,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery for [Yandex Cloud](https://cloud.yandex.com/en/). See [these docs](https://docs.victoriametrics.com/sd_configs.html#yandexcloud_sd_configs) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1386). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery for [Yandex Cloud](https://cloud.yandex.com/en/). See [these docs](https://docs.victoriametrics.com/sd_configs.html#yandexcloud_sd_configs) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1386).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui). Zoom in the graph by selecting the needed time range in the same way Grafana does. Hold `ctrl` (or `cmd` on MacOS) in order to move the graph to the left/right. Hold `ctrl` (or `cmd` on MacOS) and scroll up/down in order to zoom in/out the area under the cursor. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2812). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui). Zoom in the graph by selecting the needed time range in the same way Grafana does. Hold `ctrl` (or `cmd` on MacOS) in order to move the graph to the left/right. Hold `ctrl` (or `cmd` on MacOS) and scroll up/down in order to zoom in/out the area under the cursor. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2812).
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): fix potential panic in [multi-level cluster setup](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup) when top-level `vmselect` is configured with `-replicationFactor` bigger than 1.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly handle custom `endpoint` value in [ec2_sd_configs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config). It was ignored since [v1.77.0](https://docs.victoriametrics.com/CHANGELOG.html#v1770) because of a bug in the implementation of [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1287). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2917). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly handle custom `endpoint` value in [ec2_sd_configs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config). It was ignored since [v1.77.0](https://docs.victoriametrics.com/CHANGELOG.html#v1770) because of a bug in the implementation of [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1287). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2917).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): add missing `__meta_kubernetes_ingress_class_name` meta-label for `role: ingress` service discovery in Kubernetes. See [this commit from Prometheus](https://github.com/prometheus/prometheus/commit/7e65ad3e432bd2837c17e3e63e85dcbcc30f4a8a). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): add missing `__meta_kubernetes_ingress_class_name` meta-label for `role: ingress` service discovery in Kubernetes. See [this commit from Prometheus](https://github.com/prometheus/prometheus/commit/7e65ad3e432bd2837c17e3e63e85dcbcc30f4a8a).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow stale responses from Consul service discovery (aka [consul_sd_configs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)) by default in the same way as Prometheus does. This should reduce load on Consul when discovering big number of targets. Stale responses can be disabled by specifying `allow_stale: false` option in `consul_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2940). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow stale responses from Consul service discovery (aka [consul_sd_configs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)) by default in the same way as Prometheus does. This should reduce load on Consul when discovering big number of targets. Stale responses can be disabled by specifying `allow_stale: false` option in `consul_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2940).