diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index fadd1694ea..8ab2503b63 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -460,24 +460,21 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils. deletedCount int err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.deleteSeriesRequests.Inc() - deletedCount, err := sn.deleteMetrics(requestData, deadline) - if err != nil { - sn.deleteSeriesRequestErrors.Inc() - } - resultsCh <- &nodeResult{ - deletedCount: deletedCount, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(true, func(sn *storageNode) interface{} { + sn.deleteSeriesRequests.Inc() + deletedCount, err := sn.deleteMetrics(requestData, deadline) + if err != nil { + sn.deleteSeriesRequestErrors.Inc() + } + return &nodeResult{ + deletedCount: deletedCount, + err: err, + } + }) // Collect results deletedTotal := 0 - _, err := collectResults(true, resultsCh, partialDeleteSeriesResults, func(result interface{}) error { + err := snr.collectAllResults(partialDeleteSeriesResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -501,25 +498,22 @@ func GetLabelsOnTimeRange(at *auth.Token, denyPartialResponse bool, tr storage.T labels []string err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.labelsOnTimeRangeRequests.Inc() - labels, err := sn.getLabelsOnTimeRange(at.AccountID, at.ProjectID, tr, deadline) - if err != nil { - sn.labelsOnTimeRangeRequestErrors.Inc() - err = fmt.Errorf("cannot get labels on time range from vmstorage %s: %w", sn.connPool.Addr(), err) - } - resultsCh <- &nodeResult{ - labels: labels, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.labelsOnTimeRangeRequests.Inc() + labels, err := sn.getLabelsOnTimeRange(at.AccountID, at.ProjectID, tr, deadline) + if err != nil { + sn.labelsOnTimeRangeRequestErrors.Inc() + err = fmt.Errorf("cannot get labels on time range from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + labels: labels, + err: err, + } + }) // Collect results var labels []string - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelsOnTimeRangeResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialLabelsOnTimeRangeResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -583,25 +577,22 @@ func GetLabels(at *auth.Token, denyPartialResponse bool, deadline searchutils.De labels []string err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.labelsRequests.Inc() - labels, err := sn.getLabels(at.AccountID, at.ProjectID, deadline) - if err != nil { - sn.labelsRequestErrors.Inc() - err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err) - } - resultsCh <- &nodeResult{ - labels: labels, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.labelsRequests.Inc() + labels, err := sn.getLabels(at.AccountID, at.ProjectID, deadline) + if err != nil { + sn.labelsRequestErrors.Inc() + err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + labels: labels, + err: err, + } + }) // Collect results var labels []string - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelsResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialLabelsResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -641,25 +632,22 @@ func GetLabelValuesOnTimeRange(at *auth.Token, denyPartialResponse bool, labelNa labelValues []string err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.labelValuesOnTimeRangeRequests.Inc() - labelValues, err := sn.getLabelValuesOnTimeRange(at.AccountID, at.ProjectID, labelName, tr, deadline) - if err != nil { - sn.labelValuesOnTimeRangeRequestErrors.Inc() - err = fmt.Errorf("cannot get label values on time range from vmstorage %s: %w", sn.connPool.Addr(), err) - } - resultsCh <- &nodeResult{ - labelValues: labelValues, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.labelValuesOnTimeRangeRequests.Inc() + labelValues, err := sn.getLabelValuesOnTimeRange(at.AccountID, at.ProjectID, labelName, tr, deadline) + if err != nil { + sn.labelValuesOnTimeRangeRequestErrors.Inc() + err = fmt.Errorf("cannot get label values on time range from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + labelValues: labelValues, + err: err, + } + }) // Collect results var labelValues []string - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelValuesOnTimeRangeResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialLabelValuesOnTimeRangeResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -717,25 +705,22 @@ func GetLabelValues(at *auth.Token, denyPartialResponse bool, labelName string, labelValues []string err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.labelValuesRequests.Inc() - labelValues, err := sn.getLabelValues(at.AccountID, at.ProjectID, labelName, deadline) - if err != nil { - sn.labelValuesRequestErrors.Inc() - err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err) - } - resultsCh <- &nodeResult{ - labelValues: labelValues, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.labelValuesRequests.Inc() + labelValues, err := sn.getLabelValues(at.AccountID, at.ProjectID, labelName, deadline) + if err != nil { + sn.labelValuesRequestErrors.Inc() + err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + labelValues: labelValues, + err: err, + } + }) // Collect results var labelValues []string - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelValuesResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialLabelValuesResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -767,26 +752,23 @@ func GetTagValueSuffixes(at *auth.Token, denyPartialResponse bool, tr storage.Ti suffixes []string err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.tagValueSuffixesRequests.Inc() - suffixes, err := sn.getTagValueSuffixes(at.AccountID, at.ProjectID, tr, tagKey, tagValuePrefix, delimiter, deadline) - if err != nil { - sn.tagValueSuffixesRequestErrors.Inc() - err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w", - tr.String(), tagKey, tagValuePrefix, delimiter, sn.connPool.Addr(), err) - } - resultsCh <- &nodeResult{ - suffixes: suffixes, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.tagValueSuffixesRequests.Inc() + suffixes, err := sn.getTagValueSuffixes(at.AccountID, at.ProjectID, tr, tagKey, tagValuePrefix, delimiter, deadline) + if err != nil { + sn.tagValueSuffixesRequestErrors.Inc() + err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w", + tr.String(), tagKey, tagValuePrefix, delimiter, sn.connPool.Addr(), err) + } + return &nodeResult{ + suffixes: suffixes, + err: err, + } + }) // Collect results m := make(map[string]struct{}) - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialTagValueSuffixesResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialTagValueSuffixesResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -817,25 +799,22 @@ func GetLabelEntries(at *auth.Token, denyPartialResponse bool, deadline searchut labelEntries []storage.TagEntry err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.labelEntriesRequests.Inc() - labelEntries, err := sn.getLabelEntries(at.AccountID, at.ProjectID, deadline) - if err != nil { - sn.labelEntriesRequestErrors.Inc() - err = fmt.Errorf("cannot get label entries from vmstorage %s: %w", sn.connPool.Addr(), err) - } - resultsCh <- &nodeResult{ - labelEntries: labelEntries, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.labelEntriesRequests.Inc() + labelEntries, err := sn.getLabelEntries(at.AccountID, at.ProjectID, deadline) + if err != nil { + sn.labelEntriesRequestErrors.Inc() + err = fmt.Errorf("cannot get label entries from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + labelEntries: labelEntries, + err: err, + } + }) // Collect results var labelEntries []storage.TagEntry - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelEntriesResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialLabelEntriesResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -910,25 +889,22 @@ func GetTSDBStatusForDate(at *auth.Token, denyPartialResponse bool, deadline sea status *storage.TSDBStatus err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.tsdbStatusRequests.Inc() - status, err := sn.getTSDBStatusForDate(at.AccountID, at.ProjectID, date, topN, deadline) - if err != nil { - sn.tsdbStatusRequestErrors.Inc() - err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err) - } - resultsCh <- &nodeResult{ - status: status, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.tsdbStatusRequests.Inc() + status, err := sn.getTSDBStatusForDate(at.AccountID, at.ProjectID, date, topN, deadline) + if err != nil { + sn.tsdbStatusRequestErrors.Inc() + err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + status: status, + err: err, + } + }) // Collect results. var statuses []*storage.TSDBStatus - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialTSDBStatusResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialTSDBStatusResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -1000,25 +976,22 @@ func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchuti n uint64 err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.seriesCountRequests.Inc() - n, err := sn.getSeriesCount(at.AccountID, at.ProjectID, deadline) - if err != nil { - sn.seriesCountRequestErrors.Inc() - err = fmt.Errorf("cannot get series count from vmstorage %s: %w", sn.connPool.Addr(), err) - } - resultsCh <- &nodeResult{ - n: n, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.seriesCountRequests.Inc() + n, err := sn.getSeriesCount(at.AccountID, at.ProjectID, deadline) + if err != nil { + sn.seriesCountRequestErrors.Inc() + err = fmt.Errorf("cannot get series count from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + n: n, + err: err, + } + }) // Collect results var n uint64 - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialSeriesCountResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialSeriesCountResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -1137,25 +1110,22 @@ func SearchMetricNames(at *auth.Token, denyPartialResponse bool, sq *storage.Sea metricNames [][]byte err error } - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.searchMetricNamesRequests.Inc() - metricNames, err := sn.processSearchMetricNames(requestData, deadline) - if err != nil { - sn.searchMetricNamesRequestErrors.Inc() - err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err) - } - resultsCh <- &nodeResult{ - metricNames: metricNames, - err: err, - } - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.searchMetricNamesRequests.Inc() + metricNames, err := sn.processSearchMetricNames(requestData, deadline) + if err != nil { + sn.searchMetricNamesRequestErrors.Inc() + err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + metricNames: metricNames, + err: err, + } + }) // Collect results. metricNames := make(map[string]struct{}) - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialSearchMetricNamesResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialSearchMetricNamesResults, func(result interface{}) error { nr := result.(*nodeResult) if nr.err != nil { return nr.err @@ -1251,21 +1221,18 @@ func processSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. - resultsCh := make(chan interface{}, len(storageNodes)) - for _, sn := range storageNodes { - go func(sn *storageNode) { - sn.searchRequests.Inc() - err := sn.processSearchQuery(requestData, fetchData, processBlock, deadline) - if err != nil { - sn.searchRequestErrors.Inc() - err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) - } - resultsCh <- &err - }(sn) - } + snr := startStorageNodesRequest(denyPartialResponse, func(sn *storageNode) interface{} { + sn.searchRequests.Inc() + err := sn.processSearchQuery(requestData, fetchData, processBlock, deadline) + if err != nil { + sn.searchRequestErrors.Inc() + err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &err + }) // Collect results. - isPartial, err := collectResults(denyPartialResponse, resultsCh, partialSearchResults, func(result interface{}) error { + isPartial, err := snr.collectResults(partialSearchResults, func(result interface{}) error { errP := result.(*error) return *errP }) @@ -1275,15 +1242,48 @@ func processSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se return isPartial, nil } -func collectResults(denyPartialResponse bool, resultsCh <-chan interface{}, partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) { +type storageNodesRequest struct { + denyPartialResponse bool + resultsCh chan interface{} +} + +func startStorageNodesRequest(denyPartialResponse bool, f func(sn *storageNode) interface{}) *storageNodesRequest { + resultsCh := make(chan interface{}, len(storageNodes)) + for _, sn := range storageNodes { + go func(sn *storageNode) { + result := f(sn) + resultsCh <- result + }(sn) + } + return &storageNodesRequest{ + denyPartialResponse: denyPartialResponse, + resultsCh: resultsCh, + } +} + +func (snr *storageNodesRequest) collectAllResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) error { + var errors []error + for i := 0; i < len(storageNodes); i++ { + result := <-snr.resultsCh + if err := f(result); err != nil { + errors = append(errors, err) + continue + } + } + if len(errors) > 0 { + return errors[0] + } + return nil +} + +func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) { var errors []error resultsCollected := 0 for i := 0; i < len(storageNodes); i++ { // There is no need in timer here, since all the goroutines executing // the sn.process* function must be finished until the deadline. - result := <-resultsCh - err := f(result) - if err != nil { + result := <-snr.resultsCh + if err := f(result); err != nil { errors = append(errors, err) continue } @@ -1294,7 +1294,7 @@ func collectResults(denyPartialResponse bool, resultsCh <-chan interface{}, part // This should speed up responses when a part of vmstorage nodes are slow and/or temporarily unavailable. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711 // - // It is expected that cap(resultsCh) == len(storageNodes), otherwise goroutine leak is possible. + // It is expected that cap(snr.resultsCh) == len(storageNodes), otherwise goroutine leak is possible. return false, nil } } @@ -1312,7 +1312,7 @@ func collectResults(denyPartialResponse bool, resultsCh <-chan interface{}, part // Do not return the error, since it may spam logs on busy vmselect // serving high amounts of requests. partialResultsCounter.Inc() - if denyPartialResponse { + if snr.denyPartialResponse { return true, errors[0] } isPartial = true