app/vmselect/netstorage: move common code for requests execution on all the storage nodes to startStorageNodesRequest func

This commit is contained in:
Aliaksandr Valialkin 2020-11-23 10:51:40 +02:00
parent 25a57ced6c
commit 7987129baa

View File

@ -460,24 +460,21 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
deletedCount int deletedCount int
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(true, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.deleteSeriesRequests.Inc()
go func(sn *storageNode) { deletedCount, err := sn.deleteMetrics(requestData, deadline)
sn.deleteSeriesRequests.Inc() if err != nil {
deletedCount, err := sn.deleteMetrics(requestData, deadline) sn.deleteSeriesRequestErrors.Inc()
if err != nil { }
sn.deleteSeriesRequestErrors.Inc() return &nodeResult{
} deletedCount: deletedCount,
resultsCh <- &nodeResult{ err: err,
deletedCount: deletedCount, }
err: err, })
}
}(sn)
}
// Collect results // Collect results
deletedTotal := 0 deletedTotal := 0
_, err := collectResults(true, resultsCh, partialDeleteSeriesResults, func(result interface{}) error { err := snr.collectAllResults(partialDeleteSeriesResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -501,25 +498,22 @@ func GetLabelsOnTimeRange(at *auth.Token, denyPartialResponse bool, tr storage.T
labels []string labels []string
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.labelsOnTimeRangeRequests.Inc()
go func(sn *storageNode) { labels, err := sn.getLabelsOnTimeRange(at.AccountID, at.ProjectID, tr, deadline)
sn.labelsOnTimeRangeRequests.Inc() if err != nil {
labels, err := sn.getLabelsOnTimeRange(at.AccountID, at.ProjectID, tr, deadline) sn.labelsOnTimeRangeRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot get labels on time range from vmstorage %s: %w", sn.connPool.Addr(), err)
sn.labelsOnTimeRangeRequestErrors.Inc() }
err = fmt.Errorf("cannot get labels on time range from vmstorage %s: %w", sn.connPool.Addr(), err) return &nodeResult{
} labels: labels,
resultsCh <- &nodeResult{ err: err,
labels: labels, }
err: err, })
}
}(sn)
}
// Collect results // Collect results
var labels []string var labels []string
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelsOnTimeRangeResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialLabelsOnTimeRangeResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -583,25 +577,22 @@ func GetLabels(at *auth.Token, denyPartialResponse bool, deadline searchutils.De
labels []string labels []string
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.labelsRequests.Inc()
go func(sn *storageNode) { labels, err := sn.getLabels(at.AccountID, at.ProjectID, deadline)
sn.labelsRequests.Inc() if err != nil {
labels, err := sn.getLabels(at.AccountID, at.ProjectID, deadline) sn.labelsRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err)
sn.labelsRequestErrors.Inc() }
err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err) return &nodeResult{
} labels: labels,
resultsCh <- &nodeResult{ err: err,
labels: labels, }
err: err, })
}
}(sn)
}
// Collect results // Collect results
var labels []string var labels []string
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelsResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialLabelsResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -641,25 +632,22 @@ func GetLabelValuesOnTimeRange(at *auth.Token, denyPartialResponse bool, labelNa
labelValues []string labelValues []string
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.labelValuesOnTimeRangeRequests.Inc()
go func(sn *storageNode) { labelValues, err := sn.getLabelValuesOnTimeRange(at.AccountID, at.ProjectID, labelName, tr, deadline)
sn.labelValuesOnTimeRangeRequests.Inc() if err != nil {
labelValues, err := sn.getLabelValuesOnTimeRange(at.AccountID, at.ProjectID, labelName, tr, deadline) sn.labelValuesOnTimeRangeRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot get label values on time range from vmstorage %s: %w", sn.connPool.Addr(), err)
sn.labelValuesOnTimeRangeRequestErrors.Inc() }
err = fmt.Errorf("cannot get label values on time range from vmstorage %s: %w", sn.connPool.Addr(), err) return &nodeResult{
} labelValues: labelValues,
resultsCh <- &nodeResult{ err: err,
labelValues: labelValues, }
err: err, })
}
}(sn)
}
// Collect results // Collect results
var labelValues []string var labelValues []string
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelValuesOnTimeRangeResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialLabelValuesOnTimeRangeResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -717,25 +705,22 @@ func GetLabelValues(at *auth.Token, denyPartialResponse bool, labelName string,
labelValues []string labelValues []string
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.labelValuesRequests.Inc()
go func(sn *storageNode) { labelValues, err := sn.getLabelValues(at.AccountID, at.ProjectID, labelName, deadline)
sn.labelValuesRequests.Inc() if err != nil {
labelValues, err := sn.getLabelValues(at.AccountID, at.ProjectID, labelName, deadline) sn.labelValuesRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err)
sn.labelValuesRequestErrors.Inc() }
err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err) return &nodeResult{
} labelValues: labelValues,
resultsCh <- &nodeResult{ err: err,
labelValues: labelValues, }
err: err, })
}
}(sn)
}
// Collect results // Collect results
var labelValues []string var labelValues []string
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelValuesResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialLabelValuesResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -767,26 +752,23 @@ func GetTagValueSuffixes(at *auth.Token, denyPartialResponse bool, tr storage.Ti
suffixes []string suffixes []string
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.tagValueSuffixesRequests.Inc()
go func(sn *storageNode) { suffixes, err := sn.getTagValueSuffixes(at.AccountID, at.ProjectID, tr, tagKey, tagValuePrefix, delimiter, deadline)
sn.tagValueSuffixesRequests.Inc() if err != nil {
suffixes, err := sn.getTagValueSuffixes(at.AccountID, at.ProjectID, tr, tagKey, tagValuePrefix, delimiter, deadline) sn.tagValueSuffixesRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
sn.tagValueSuffixesRequestErrors.Inc() tr.String(), tagKey, tagValuePrefix, delimiter, sn.connPool.Addr(), err)
err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w", }
tr.String(), tagKey, tagValuePrefix, delimiter, sn.connPool.Addr(), err) return &nodeResult{
} suffixes: suffixes,
resultsCh <- &nodeResult{ err: err,
suffixes: suffixes, }
err: err, })
}
}(sn)
}
// Collect results // Collect results
m := make(map[string]struct{}) m := make(map[string]struct{})
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialTagValueSuffixesResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialTagValueSuffixesResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -817,25 +799,22 @@ func GetLabelEntries(at *auth.Token, denyPartialResponse bool, deadline searchut
labelEntries []storage.TagEntry labelEntries []storage.TagEntry
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.labelEntriesRequests.Inc()
go func(sn *storageNode) { labelEntries, err := sn.getLabelEntries(at.AccountID, at.ProjectID, deadline)
sn.labelEntriesRequests.Inc() if err != nil {
labelEntries, err := sn.getLabelEntries(at.AccountID, at.ProjectID, deadline) sn.labelEntriesRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot get label entries from vmstorage %s: %w", sn.connPool.Addr(), err)
sn.labelEntriesRequestErrors.Inc() }
err = fmt.Errorf("cannot get label entries from vmstorage %s: %w", sn.connPool.Addr(), err) return &nodeResult{
} labelEntries: labelEntries,
resultsCh <- &nodeResult{ err: err,
labelEntries: labelEntries, }
err: err, })
}
}(sn)
}
// Collect results // Collect results
var labelEntries []storage.TagEntry var labelEntries []storage.TagEntry
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelEntriesResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialLabelEntriesResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -910,25 +889,22 @@ func GetTSDBStatusForDate(at *auth.Token, denyPartialResponse bool, deadline sea
status *storage.TSDBStatus status *storage.TSDBStatus
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.tsdbStatusRequests.Inc()
go func(sn *storageNode) { status, err := sn.getTSDBStatusForDate(at.AccountID, at.ProjectID, date, topN, deadline)
sn.tsdbStatusRequests.Inc() if err != nil {
status, err := sn.getTSDBStatusForDate(at.AccountID, at.ProjectID, date, topN, deadline) sn.tsdbStatusRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err)
sn.tsdbStatusRequestErrors.Inc() }
err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err) return &nodeResult{
} status: status,
resultsCh <- &nodeResult{ err: err,
status: status, }
err: err, })
}
}(sn)
}
// Collect results. // Collect results.
var statuses []*storage.TSDBStatus var statuses []*storage.TSDBStatus
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialTSDBStatusResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialTSDBStatusResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -1000,25 +976,22 @@ func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchuti
n uint64 n uint64
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.seriesCountRequests.Inc()
go func(sn *storageNode) { n, err := sn.getSeriesCount(at.AccountID, at.ProjectID, deadline)
sn.seriesCountRequests.Inc() if err != nil {
n, err := sn.getSeriesCount(at.AccountID, at.ProjectID, deadline) sn.seriesCountRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot get series count from vmstorage %s: %w", sn.connPool.Addr(), err)
sn.seriesCountRequestErrors.Inc() }
err = fmt.Errorf("cannot get series count from vmstorage %s: %w", sn.connPool.Addr(), err) return &nodeResult{
} n: n,
resultsCh <- &nodeResult{ err: err,
n: n, }
err: err, })
}
}(sn)
}
// Collect results // Collect results
var n uint64 var n uint64
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialSeriesCountResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialSeriesCountResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -1137,25 +1110,22 @@ func SearchMetricNames(at *auth.Token, denyPartialResponse bool, sq *storage.Sea
metricNames [][]byte metricNames [][]byte
err error err error
} }
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.searchMetricNamesRequests.Inc()
go func(sn *storageNode) { metricNames, err := sn.processSearchMetricNames(requestData, deadline)
sn.searchMetricNamesRequests.Inc() if err != nil {
metricNames, err := sn.processSearchMetricNames(requestData, deadline) sn.searchMetricNamesRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err)
sn.searchMetricNamesRequestErrors.Inc() }
err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err) return &nodeResult{
} metricNames: metricNames,
resultsCh <- &nodeResult{ err: err,
metricNames: metricNames, }
err: err, })
}
}(sn)
}
// Collect results. // Collect results.
metricNames := make(map[string]struct{}) metricNames := make(map[string]struct{})
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialSearchMetricNamesResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialSearchMetricNamesResults, func(result interface{}) error {
nr := result.(*nodeResult) nr := result.(*nodeResult)
if nr.err != nil { if nr.err != nil {
return nr.err return nr.err
@ -1251,21 +1221,18 @@ func processSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se
requestData := sq.Marshal(nil) requestData := sq.Marshal(nil)
// Send the query to all the storage nodes in parallel. // Send the query to all the storage nodes in parallel.
resultsCh := make(chan interface{}, len(storageNodes)) snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} {
for _, sn := range storageNodes { sn.searchRequests.Inc()
go func(sn *storageNode) { err := sn.processSearchQuery(requestData, fetchData, processBlock, deadline)
sn.searchRequests.Inc() if err != nil {
err := sn.processSearchQuery(requestData, fetchData, processBlock, deadline) sn.searchRequestErrors.Inc()
if err != nil { err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
sn.searchRequestErrors.Inc() }
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) return &err
} })
resultsCh <- &err
}(sn)
}
// Collect results. // Collect results.
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialSearchResults, func(result interface{}) error { isPartial, err := snr.collectResults(partialSearchResults, func(result interface{}) error {
errP := result.(*error) errP := result.(*error)
return *errP return *errP
}) })
@ -1275,15 +1242,48 @@ func processSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se
return isPartial, nil return isPartial, nil
} }
func collectResults(denyPartialResponse bool, resultsCh <-chan interface{}, partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) { type storageNodesRequest struct {
denyPartialResponse bool
resultsCh chan interface{}
}
func startStorageNodesRequest(denyPartialResponse bool, f func(sn *storageNode) interface{}) *storageNodesRequest {
resultsCh := make(chan interface{}, len(storageNodes))
for _, sn := range storageNodes {
go func(sn *storageNode) {
result := f(sn)
resultsCh <- result
}(sn)
}
return &storageNodesRequest{
denyPartialResponse: denyPartialResponse,
resultsCh: resultsCh,
}
}
func (snr *storageNodesRequest) collectAllResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) error {
var errors []error
for i := 0; i < len(storageNodes); i++ {
result := <-snr.resultsCh
if err := f(result); err != nil {
errors = append(errors, err)
continue
}
}
if len(errors) > 0 {
return errors[0]
}
return nil
}
func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) {
var errors []error var errors []error
resultsCollected := 0 resultsCollected := 0
for i := 0; i < len(storageNodes); i++ { for i := 0; i < len(storageNodes); i++ {
// There is no need in timer here, since all the goroutines executing // There is no need in timer here, since all the goroutines executing
// the sn.process* function must be finished until the deadline. // the sn.process* function must be finished until the deadline.
result := <-resultsCh result := <-snr.resultsCh
err := f(result) if err := f(result); err != nil {
if err != nil {
errors = append(errors, err) errors = append(errors, err)
continue continue
} }
@ -1294,7 +1294,7 @@ func collectResults(denyPartialResponse bool, resultsCh <-chan interface{}, part
// This should speed up responses when a part of vmstorage nodes are slow and/or temporarily unavailable. // This should speed up responses when a part of vmstorage nodes are slow and/or temporarily unavailable.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
// //
// It is expected that cap(resultsCh) == len(storageNodes), otherwise goroutine leak is possible. // It is expected that cap(snr.resultsCh) == len(storageNodes), otherwise goroutine leak is possible.
return false, nil return false, nil
} }
} }
@ -1312,7 +1312,7 @@ func collectResults(denyPartialResponse bool, resultsCh <-chan interface{}, part
// Do not return the error, since it may spam logs on busy vmselect // Do not return the error, since it may spam logs on busy vmselect
// serving high amounts of requests. // serving high amounts of requests.
partialResultsCounter.Inc() partialResultsCounter.Inc()
if denyPartialResponse { if snr.denyPartialResponse {
return true, errors[0] return true, errors[0]
} }
isPartial = true isPartial = true