mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
app/vmselect/netstorage: emit more useful information in query traces when some of vmstorage nodes return errors or if there is no need to wait for their responses
This commit is contained in:
parent
60aaf4b2c2
commit
eb784ff399
@ -1435,37 +1435,66 @@ func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialRespon
|
||||
|
||||
type storageNodesRequest struct {
|
||||
denyPartialResponse bool
|
||||
resultsCh chan interface{}
|
||||
resultsCh chan rpcResult
|
||||
qts map[*querytracer.Tracer]struct{}
|
||||
sns []*storageNode
|
||||
}
|
||||
|
||||
type rpcResult struct {
|
||||
data interface{}
|
||||
qt *querytracer.Tracer
|
||||
}
|
||||
|
||||
func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool,
|
||||
f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{}) *storageNodesRequest {
|
||||
resultsCh := make(chan interface{}, len(sns))
|
||||
resultsCh := make(chan rpcResult, len(sns))
|
||||
qts := make(map[*querytracer.Tracer]struct{}, len(sns))
|
||||
for idx, sn := range sns {
|
||||
qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr())
|
||||
qts[qtChild] = struct{}{}
|
||||
go func(workerID uint, sn *storageNode) {
|
||||
result := f(qtChild, workerID, sn)
|
||||
resultsCh <- result
|
||||
qtChild.Done()
|
||||
data := f(qtChild, workerID, sn)
|
||||
resultsCh <- rpcResult{
|
||||
data: data,
|
||||
qt: qtChild,
|
||||
}
|
||||
}(uint(idx), sn)
|
||||
}
|
||||
return &storageNodesRequest{
|
||||
denyPartialResponse: denyPartialResponse,
|
||||
resultsCh: resultsCh,
|
||||
qts: qts,
|
||||
sns: sns,
|
||||
}
|
||||
}
|
||||
|
||||
func (snr *storageNodesRequest) finishQueryTracers(msg string) {
|
||||
for qt := range snr.qts {
|
||||
snr.finishQueryTracer(qt, msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (snr *storageNodesRequest) finishQueryTracer(qt *querytracer.Tracer, msg string) {
|
||||
if msg == "" {
|
||||
qt.Done()
|
||||
} else {
|
||||
qt.Donef("%s", msg)
|
||||
}
|
||||
delete(snr.qts, qt)
|
||||
}
|
||||
|
||||
func (snr *storageNodesRequest) collectAllResults(f func(result interface{}) error) error {
|
||||
sns := snr.sns
|
||||
for i := 0; i < len(sns); i++ {
|
||||
result := <-snr.resultsCh
|
||||
if err := f(result); err != nil {
|
||||
if err := f(result.data); err != nil {
|
||||
snr.finishQueryTracer(result.qt, fmt.Sprintf("error: %s", err))
|
||||
// Immediately return the error to the caller without waiting for responses from other vmstorage nodes -
|
||||
// they will be processed in brackground.
|
||||
snr.finishQueryTracers("cancel request because of error in other vmstorage nodes")
|
||||
return err
|
||||
}
|
||||
snr.finishQueryTracer(result.qt, "")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -1478,12 +1507,14 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
|
||||
// There is no need in timer here, since all the goroutines executing the f function
|
||||
// passed to startStorageNodesRequest must be finished until the deadline.
|
||||
result := <-snr.resultsCh
|
||||
if err := f(result); err != nil {
|
||||
if err := f(result.data); err != nil {
|
||||
snr.finishQueryTracer(result.qt, fmt.Sprintf("error: %s", err))
|
||||
var er *errRemote
|
||||
if errors.As(err, &er) {
|
||||
// Immediately return the error reported by vmstorage to the caller,
|
||||
// since such errors usually mean misconfiguration at vmstorage.
|
||||
// The misconfiguration must be known by the caller, so it is fixed ASAP.
|
||||
snr.finishQueryTracers("cancel request because of error in other vmstorage nodes")
|
||||
return false, err
|
||||
}
|
||||
errsPartial = append(errsPartial, err)
|
||||
@ -1491,10 +1522,12 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
|
||||
// Return the error to the caller if partial responses are denied
|
||||
// and the number of partial responses reach -replicationFactor,
|
||||
// since this means that the response is partial.
|
||||
snr.finishQueryTracers("cancel request because partial responses are denied and some vmstorage nodes failed to return response")
|
||||
return false, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
snr.finishQueryTracer(result.qt, "")
|
||||
resultsCollected++
|
||||
if resultsCollected > len(sns)-*replicationFactor {
|
||||
// There is no need in waiting for the remaining results,
|
||||
@ -1503,6 +1536,8 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
|
||||
//
|
||||
// It is expected that cap(snr.resultsCh) == len(sns), otherwise goroutine leak is possible.
|
||||
snr.finishQueryTracers(fmt.Sprintf("cancel request because %d out of %d nodes already returned response according to -replicationFactor=%d",
|
||||
resultsCollected, len(sns), *replicationFactor))
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user