mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-06 08:02:17 +01:00
app/vmselect/netstorage: move common code for collecting query results from vmstorage nodes to collectResults function
This commit is contained in:
parent
d892d63204
commit
990eb29a9b
@ -455,7 +455,7 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
|
|||||||
deletedCount int
|
deletedCount int
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.deleteSeriesRequests.Inc()
|
sn.deleteSeriesRequests.Inc()
|
||||||
@ -463,7 +463,7 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
sn.deleteSeriesRequestErrors.Inc()
|
sn.deleteSeriesRequestErrors.Inc()
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
deletedCount: deletedCount,
|
deletedCount: deletedCount,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -472,20 +472,16 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
|
|||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
deletedTotal := 0
|
deletedTotal := 0
|
||||||
var errors []error
|
_, err := collectResults(true, resultsCh, partialDeleteSeriesResults, func(result interface{}) error {
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
nr := result.(*nodeResult)
|
||||||
// There is no need in timer here, since all the goroutines executing
|
|
||||||
// sn.deleteMetrics must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
deletedTotal += nr.deletedCount
|
deletedTotal += nr.deletedCount
|
||||||
}
|
return nil
|
||||||
if len(errors) > 0 {
|
})
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
if err != nil {
|
||||||
return deletedTotal, fmt.Errorf("error occured during deleting time series: %w", errors[0])
|
return deletedTotal, fmt.Errorf("cannot delete time series on all the vmstorage nodes: %w", err)
|
||||||
}
|
}
|
||||||
return deletedTotal, nil
|
return deletedTotal, nil
|
||||||
}
|
}
|
||||||
@ -500,7 +496,7 @@ func GetLabelsOnTimeRange(at *auth.Token, denyPartialResponse bool, tr storage.T
|
|||||||
labels []string
|
labels []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.labelsOnTimeRangeRequests.Inc()
|
sn.labelsOnTimeRangeRequests.Inc()
|
||||||
@ -509,7 +505,7 @@ func GetLabelsOnTimeRange(at *auth.Token, denyPartialResponse bool, tr storage.T
|
|||||||
sn.labelsOnTimeRangeRequestErrors.Inc()
|
sn.labelsOnTimeRangeRequestErrors.Inc()
|
||||||
err = fmt.Errorf("cannot get labels on time range from vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot get labels on time range from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
labels: labels,
|
labels: labels,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -518,33 +514,18 @@ func GetLabelsOnTimeRange(at *auth.Token, denyPartialResponse bool, tr storage.T
|
|||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
var labels []string
|
var labels []string
|
||||||
var errors []error
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelsOnTimeRangeResults, func(result interface{}) error {
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
nr := result.(*nodeResult)
|
||||||
// There is no need in timer here, since all the goroutines executing
|
|
||||||
// sn.getLabelsOnTimeRange must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
labels = append(labels, nr.labels...)
|
labels = append(labels, nr.labels...)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, isPartial, fmt.Errorf("cannot fetch labels on time range from vmstorage nodes: %w", err)
|
||||||
}
|
}
|
||||||
isPartial := false
|
|
||||||
if len(errors) > 0 {
|
|
||||||
if len(errors) == len(storageNodes) {
|
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
|
||||||
return nil, false, fmt.Errorf("error occured during fetching labels on time range: %w", errors[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Just log errors and return partial results.
|
|
||||||
// This allows gracefully degrade vmselect in the case
|
|
||||||
// if certain storageNodes are temporarily unavailable.
|
|
||||||
partialLabelsOnTimeRangeResults.Inc()
|
|
||||||
if denyPartialResponse {
|
|
||||||
return nil, true, errors[0]
|
|
||||||
}
|
|
||||||
isPartial = true
|
|
||||||
}
|
|
||||||
// Deduplicate labels
|
// Deduplicate labels
|
||||||
labels = deduplicateStrings(labels)
|
labels = deduplicateStrings(labels)
|
||||||
// Substitute "" with "__name__"
|
// Substitute "" with "__name__"
|
||||||
@ -597,7 +578,7 @@ func GetLabels(at *auth.Token, denyPartialResponse bool, deadline searchutils.De
|
|||||||
labels []string
|
labels []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.labelsRequests.Inc()
|
sn.labelsRequests.Inc()
|
||||||
@ -606,7 +587,7 @@ func GetLabels(at *auth.Token, denyPartialResponse bool, deadline searchutils.De
|
|||||||
sn.labelsRequestErrors.Inc()
|
sn.labelsRequestErrors.Inc()
|
||||||
err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
labels: labels,
|
labels: labels,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -615,33 +596,18 @@ func GetLabels(at *auth.Token, denyPartialResponse bool, deadline searchutils.De
|
|||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
var labels []string
|
var labels []string
|
||||||
var errors []error
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelsResults, func(result interface{}) error {
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
nr := result.(*nodeResult)
|
||||||
// There is no need in timer here, since all the goroutines executing
|
|
||||||
// sn.getLabels must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
labels = append(labels, nr.labels...)
|
labels = append(labels, nr.labels...)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, isPartial, fmt.Errorf("cannot fetch labels from vmstorage nodes: %w", err)
|
||||||
}
|
}
|
||||||
isPartial := false
|
|
||||||
if len(errors) > 0 {
|
|
||||||
if len(errors) == len(storageNodes) {
|
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
|
||||||
return nil, false, fmt.Errorf("error occured during fetching labels: %w", errors[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Just log errors and return partial results.
|
|
||||||
// This allows gracefully degrade vmselect in the case
|
|
||||||
// if certain storageNodes are temporarily unavailable.
|
|
||||||
partialLabelsResults.Inc()
|
|
||||||
if denyPartialResponse {
|
|
||||||
return nil, true, errors[0]
|
|
||||||
}
|
|
||||||
isPartial = true
|
|
||||||
}
|
|
||||||
// Deduplicate labels
|
// Deduplicate labels
|
||||||
labels = deduplicateStrings(labels)
|
labels = deduplicateStrings(labels)
|
||||||
// Substitute "" with "__name__"
|
// Substitute "" with "__name__"
|
||||||
@ -670,7 +636,7 @@ func GetLabelValuesOnTimeRange(at *auth.Token, denyPartialResponse bool, labelNa
|
|||||||
labelValues []string
|
labelValues []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.labelValuesOnTimeRangeRequests.Inc()
|
sn.labelValuesOnTimeRangeRequests.Inc()
|
||||||
@ -679,7 +645,7 @@ func GetLabelValuesOnTimeRange(at *auth.Token, denyPartialResponse bool, labelNa
|
|||||||
sn.labelValuesOnTimeRangeRequestErrors.Inc()
|
sn.labelValuesOnTimeRangeRequestErrors.Inc()
|
||||||
err = fmt.Errorf("cannot get label values on time range from vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot get label values on time range from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
labelValues: labelValues,
|
labelValues: labelValues,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -688,32 +654,16 @@ func GetLabelValuesOnTimeRange(at *auth.Token, denyPartialResponse bool, labelNa
|
|||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
var labelValues []string
|
var labelValues []string
|
||||||
var errors []error
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelValuesOnTimeRangeResults, func(result interface{}) error {
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
nr := result.(*nodeResult)
|
||||||
// There is no need in timer here, since all the goroutines executing
|
|
||||||
// sn.getLabelValuesOnTimeRange must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
labelValues = append(labelValues, nr.labelValues...)
|
labelValues = append(labelValues, nr.labelValues...)
|
||||||
}
|
return nil
|
||||||
isPartial := false
|
})
|
||||||
if len(errors) > 0 {
|
if err != nil {
|
||||||
if len(errors) == len(storageNodes) {
|
return nil, isPartial, fmt.Errorf("cannot fetch label values on time range from vmstorage nodes: %w", err)
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
|
||||||
return nil, false, fmt.Errorf("error occured during fetching label values on time range: %w", errors[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Just log errors and return partial results.
|
|
||||||
// This allows gracefully degrade vmselect in the case
|
|
||||||
// if certain storageNodes are temporarily unavailable.
|
|
||||||
partialLabelValuesOnTimeRangeResults.Inc()
|
|
||||||
if denyPartialResponse {
|
|
||||||
return nil, true, errors[0]
|
|
||||||
}
|
|
||||||
isPartial = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deduplicate label values
|
// Deduplicate label values
|
||||||
@ -762,7 +712,7 @@ func GetLabelValues(at *auth.Token, denyPartialResponse bool, labelName string,
|
|||||||
labelValues []string
|
labelValues []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.labelValuesRequests.Inc()
|
sn.labelValuesRequests.Inc()
|
||||||
@ -771,7 +721,7 @@ func GetLabelValues(at *auth.Token, denyPartialResponse bool, labelName string,
|
|||||||
sn.labelValuesRequestErrors.Inc()
|
sn.labelValuesRequestErrors.Inc()
|
||||||
err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
labelValues: labelValues,
|
labelValues: labelValues,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -780,32 +730,16 @@ func GetLabelValues(at *auth.Token, denyPartialResponse bool, labelName string,
|
|||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
var labelValues []string
|
var labelValues []string
|
||||||
var errors []error
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelValuesResults, func(result interface{}) error {
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
nr := result.(*nodeResult)
|
||||||
// There is no need in timer here, since all the goroutines executing
|
|
||||||
// sn.getLabelValues must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
labelValues = append(labelValues, nr.labelValues...)
|
labelValues = append(labelValues, nr.labelValues...)
|
||||||
}
|
return nil
|
||||||
isPartial := false
|
})
|
||||||
if len(errors) > 0 {
|
if err != nil {
|
||||||
if len(errors) == len(storageNodes) {
|
return nil, isPartial, fmt.Errorf("cannot fetch label values from vmstorage nodes: %w", err)
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
|
||||||
return nil, false, fmt.Errorf("error occured during fetching label values: %w", errors[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Just log errors and return partial results.
|
|
||||||
// This allows gracefully degrade vmselect in the case
|
|
||||||
// if certain storageNodes are temporarily unavailable.
|
|
||||||
partialLabelValuesResults.Inc()
|
|
||||||
if denyPartialResponse {
|
|
||||||
return nil, true, errors[0]
|
|
||||||
}
|
|
||||||
isPartial = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deduplicate label values
|
// Deduplicate label values
|
||||||
@ -828,7 +762,7 @@ func GetTagValueSuffixes(at *auth.Token, denyPartialResponse bool, tr storage.Ti
|
|||||||
suffixes []string
|
suffixes []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.tagValueSuffixesRequests.Inc()
|
sn.tagValueSuffixesRequests.Inc()
|
||||||
@ -838,7 +772,7 @@ func GetTagValueSuffixes(at *auth.Token, denyPartialResponse bool, tr storage.Ti
|
|||||||
err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
|
err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
|
||||||
tr.String(), tagKey, tagValuePrefix, delimiter, sn.connPool.Addr(), err)
|
tr.String(), tagKey, tagValuePrefix, delimiter, sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
suffixes: suffixes,
|
suffixes: suffixes,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -847,36 +781,20 @@ func GetTagValueSuffixes(at *auth.Token, denyPartialResponse bool, tr storage.Ti
|
|||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
m := make(map[string]struct{})
|
m := make(map[string]struct{})
|
||||||
var errors []error
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialTagValueSuffixesResults, func(result interface{}) error {
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
nr := result.(*nodeResult)
|
||||||
// There is no need in timer here, since all the goroutines executing
|
|
||||||
// sn.getTagValueSuffixes must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
for _, suffix := range nr.suffixes {
|
for _, suffix := range nr.suffixes {
|
||||||
m[suffix] = struct{}{}
|
m[suffix] = struct{}{}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, isPartial, fmt.Errorf("cannot fetch tag value suffixes from vmstorage nodes: %w", err)
|
||||||
}
|
}
|
||||||
isPartial := false
|
|
||||||
if len(errors) > 0 {
|
|
||||||
if len(errors) == len(storageNodes) {
|
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
|
||||||
return nil, false, fmt.Errorf("error occured during fetching tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c: %w",
|
|
||||||
tr.String(), tagKey, tagValuePrefix, delimiter, errors[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Just log errors and return partial results.
|
|
||||||
// This allows gracefully degrade vmselect in the case
|
|
||||||
// if certain storageNodes are temporarily unavailable.
|
|
||||||
partialLabelEntriesResults.Inc()
|
|
||||||
if denyPartialResponse {
|
|
||||||
return nil, true, errors[0]
|
|
||||||
}
|
|
||||||
isPartial = true
|
|
||||||
}
|
|
||||||
suffixes := make([]string, 0, len(m))
|
suffixes := make([]string, 0, len(m))
|
||||||
for suffix := range m {
|
for suffix := range m {
|
||||||
suffixes = append(suffixes, suffix)
|
suffixes = append(suffixes, suffix)
|
||||||
@ -894,7 +812,7 @@ func GetLabelEntries(at *auth.Token, denyPartialResponse bool, deadline searchut
|
|||||||
labelEntries []storage.TagEntry
|
labelEntries []storage.TagEntry
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.labelEntriesRequests.Inc()
|
sn.labelEntriesRequests.Inc()
|
||||||
@ -903,7 +821,7 @@ func GetLabelEntries(at *auth.Token, denyPartialResponse bool, deadline searchut
|
|||||||
sn.labelEntriesRequestErrors.Inc()
|
sn.labelEntriesRequestErrors.Inc()
|
||||||
err = fmt.Errorf("cannot get label entries from vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot get label entries from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
labelEntries: labelEntries,
|
labelEntries: labelEntries,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -912,32 +830,16 @@ func GetLabelEntries(at *auth.Token, denyPartialResponse bool, deadline searchut
|
|||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
var labelEntries []storage.TagEntry
|
var labelEntries []storage.TagEntry
|
||||||
var errors []error
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialLabelEntriesResults, func(result interface{}) error {
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
nr := result.(*nodeResult)
|
||||||
// There is no need in timer here, since all the goroutines executing
|
|
||||||
// sn.getLabelEntries must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
labelEntries = append(labelEntries, nr.labelEntries...)
|
labelEntries = append(labelEntries, nr.labelEntries...)
|
||||||
}
|
return nil
|
||||||
isPartial := false
|
})
|
||||||
if len(errors) > 0 {
|
if err != nil {
|
||||||
if len(errors) == len(storageNodes) {
|
return nil, isPartial, fmt.Errorf("cannot featch label etnries from vmstorage nodes: %w", err)
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
|
||||||
return nil, false, fmt.Errorf("error occured during fetching label entries: %w", errors[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Just log errors and return partial results.
|
|
||||||
// This allows gracefully degrade vmselect in the case
|
|
||||||
// if certain storageNodes are temporarily unavailable.
|
|
||||||
partialLabelEntriesResults.Inc()
|
|
||||||
if denyPartialResponse {
|
|
||||||
return nil, true, errors[0]
|
|
||||||
}
|
|
||||||
isPartial = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Substitute "" with "__name__"
|
// Substitute "" with "__name__"
|
||||||
@ -1003,7 +905,7 @@ func GetTSDBStatusForDate(at *auth.Token, denyPartialResponse bool, deadline sea
|
|||||||
status *storage.TSDBStatus
|
status *storage.TSDBStatus
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.tsdbStatusRequests.Inc()
|
sn.tsdbStatusRequests.Inc()
|
||||||
@ -1012,7 +914,7 @@ func GetTSDBStatusForDate(at *auth.Token, denyPartialResponse bool, deadline sea
|
|||||||
sn.tsdbStatusRequestErrors.Inc()
|
sn.tsdbStatusRequestErrors.Inc()
|
||||||
err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
status: status,
|
status: status,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -1021,31 +923,16 @@ func GetTSDBStatusForDate(at *auth.Token, denyPartialResponse bool, deadline sea
|
|||||||
|
|
||||||
// Collect results.
|
// Collect results.
|
||||||
var statuses []*storage.TSDBStatus
|
var statuses []*storage.TSDBStatus
|
||||||
var errors []error
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialTSDBStatusResults, func(result interface{}) error {
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
nr := result.(*nodeResult)
|
||||||
// There is no need in timer here, since all the goroutines executing
|
|
||||||
// sn.getTSDBStatusForDate must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
statuses = append(statuses, nr.status)
|
statuses = append(statuses, nr.status)
|
||||||
}
|
return nil
|
||||||
isPartial := false
|
})
|
||||||
if len(errors) > 0 {
|
if err != nil {
|
||||||
if len(errors) == len(storageNodes) {
|
return nil, isPartial, fmt.Errorf("cannot fetch tsdb status from vmstorage nodes: %w", err)
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
|
||||||
return nil, false, fmt.Errorf("error occured during fetching tsdb stats: %w", errors[0])
|
|
||||||
}
|
|
||||||
// Just log errors and return partial results.
|
|
||||||
// This allows gracefully degrade vmselect in the case
|
|
||||||
// if certain storageNodes are temporarily unavailable.
|
|
||||||
partialTSDBStatusResults.Inc()
|
|
||||||
if denyPartialResponse {
|
|
||||||
return nil, true, errors[0]
|
|
||||||
}
|
|
||||||
isPartial = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
status := mergeTSDBStatuses(statuses, topN)
|
status := mergeTSDBStatuses(statuses, topN)
|
||||||
@ -1108,7 +995,7 @@ func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchuti
|
|||||||
n uint64
|
n uint64
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.seriesCountRequests.Inc()
|
sn.seriesCountRequests.Inc()
|
||||||
@ -1117,7 +1004,7 @@ func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchuti
|
|||||||
sn.seriesCountRequestErrors.Inc()
|
sn.seriesCountRequestErrors.Inc()
|
||||||
err = fmt.Errorf("cannot get series count from vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot get series count from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
n: n,
|
n: n,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -1126,31 +1013,16 @@ func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchuti
|
|||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
var n uint64
|
var n uint64
|
||||||
var errors []error
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialSeriesCountResults, func(result interface{}) error {
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
nr := result.(*nodeResult)
|
||||||
// There is no need in timer here, since all the goroutines executing
|
|
||||||
// sn.getSeriesCount must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
n += nr.n
|
n += nr.n
|
||||||
}
|
return nil
|
||||||
isPartial := false
|
})
|
||||||
if len(errors) > 0 {
|
if err != nil {
|
||||||
if len(errors) == len(storageNodes) {
|
return 0, isPartial, fmt.Errorf("cannot fetch series count from vmstorage nodes: %w", err)
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
|
||||||
return 0, false, fmt.Errorf("error occured during fetching series count: %w", errors[0])
|
|
||||||
}
|
|
||||||
// Just log errors and return partial results.
|
|
||||||
// This allows gracefully degrade vmselect in the case
|
|
||||||
// if certain storageNodes are temporarily unavailable.
|
|
||||||
partialSeriesCountResults.Inc()
|
|
||||||
if denyPartialResponse {
|
|
||||||
return 0, true, errors[0]
|
|
||||||
}
|
|
||||||
isPartial = true
|
|
||||||
}
|
}
|
||||||
return n, isPartial, nil
|
return n, isPartial, nil
|
||||||
}
|
}
|
||||||
@ -1249,7 +1121,7 @@ func SearchMetricNames(at *auth.Token, denyPartialResponse bool, sq *storage.Sea
|
|||||||
metricNames [][]byte
|
metricNames [][]byte
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
resultsCh := make(chan nodeResult, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.searchMetricNamesRequests.Inc()
|
sn.searchMetricNamesRequests.Inc()
|
||||||
@ -1258,7 +1130,7 @@ func SearchMetricNames(at *auth.Token, denyPartialResponse bool, sq *storage.Sea
|
|||||||
sn.searchMetricNamesRequestErrors.Inc()
|
sn.searchMetricNamesRequestErrors.Inc()
|
||||||
err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- nodeResult{
|
resultsCh <- &nodeResult{
|
||||||
metricNames: metricNames,
|
metricNames: metricNames,
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
@ -1266,37 +1138,19 @@ func SearchMetricNames(at *auth.Token, denyPartialResponse bool, sq *storage.Sea
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Collect results.
|
// Collect results.
|
||||||
var errors []error
|
|
||||||
metricNames := make(map[string]struct{})
|
metricNames := make(map[string]struct{})
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialSearchMetricNamesResults, func(result interface{}) error {
|
||||||
// There is no need in timer here, since all the goroutines executing
|
nr := result.(*nodeResult)
|
||||||
// sn.processSearchMetricNames must be finished until the deadline.
|
|
||||||
nr := <-resultsCh
|
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
errors = append(errors, nr.err)
|
return nr.err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
for _, metricName := range nr.metricNames {
|
for _, metricName := range nr.metricNames {
|
||||||
metricNames[string(metricName)] = struct{}{}
|
metricNames[string(metricName)] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
isPartial := false
|
})
|
||||||
if len(errors) > 0 {
|
if err != nil {
|
||||||
if len(errors) == len(storageNodes) {
|
return nil, isPartial, fmt.Errorf("cannot fetch metric names from vmstorage nodes: %w", err)
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
|
||||||
return nil, false, errors[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Just return partial results.
|
|
||||||
// This allows gracefully degrade vmselect in the case
|
|
||||||
// if certain storageNodes are temporarily unavailable.
|
|
||||||
// Do not return the error, since it may spam logs on busy vmselect
|
|
||||||
// serving high amounts of requests.
|
|
||||||
partialSearchResults.Inc()
|
|
||||||
if denyPartialResponse {
|
|
||||||
return nil, true, errors[0]
|
|
||||||
}
|
|
||||||
isPartial = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal metricNames
|
// Unmarshal metricNames
|
||||||
@ -1369,7 +1223,7 @@ func processSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se
|
|||||||
requestData := sq.Marshal(nil)
|
requestData := sq.Marshal(nil)
|
||||||
|
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
resultsCh := make(chan error, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for _, sn := range storageNodes {
|
for _, sn := range storageNodes {
|
||||||
go func(sn *storageNode) {
|
go func(sn *storageNode) {
|
||||||
sn.searchRequests.Inc()
|
sn.searchRequests.Inc()
|
||||||
@ -1378,16 +1232,28 @@ func processSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se
|
|||||||
sn.searchRequestErrors.Inc()
|
sn.searchRequestErrors.Inc()
|
||||||
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
}
|
}
|
||||||
resultsCh <- err
|
resultsCh <- &err
|
||||||
}(sn)
|
}(sn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect results.
|
// Collect results.
|
||||||
|
isPartial, err := collectResults(denyPartialResponse, resultsCh, partialSearchResults, func(result interface{}) error {
|
||||||
|
errP := result.(*error)
|
||||||
|
return *errP
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err)
|
||||||
|
}
|
||||||
|
return isPartial, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func collectResults(denyPartialResponse bool, resultsCh <-chan interface{}, partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) {
|
||||||
var errors []error
|
var errors []error
|
||||||
for i := 0; i < len(storageNodes); i++ {
|
for i := 0; i < len(storageNodes); i++ {
|
||||||
// There is no need in timer here, since all the goroutines executing
|
// There is no need in timer here, since all the goroutines executing
|
||||||
// sn.processSearchQuery must be finished until the deadline.
|
// the sn.process* function must be finished until the deadline.
|
||||||
err := <-resultsCh
|
result := <-resultsCh
|
||||||
|
err := f(result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errors = append(errors, err)
|
errors = append(errors, err)
|
||||||
continue
|
continue
|
||||||
@ -1396,16 +1262,17 @@ func processSearchQuery(at *auth.Token, denyPartialResponse bool, sq *storage.Se
|
|||||||
isPartial := false
|
isPartial := false
|
||||||
if len(errors) > 0 {
|
if len(errors) > 0 {
|
||||||
if len(errors) == len(storageNodes) {
|
if len(errors) == len(storageNodes) {
|
||||||
|
// All the vmstorage nodes returned error.
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
// Return only the first error, since it has no sense in returning all errors.
|
||||||
return false, errors[0]
|
return false, errors[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Just return partial results.
|
// Return partial results.
|
||||||
// This allows gracefully degrade vmselect in the case
|
// This allows gracefully degrade vmselect in the case
|
||||||
// if certain storageNodes are temporarily unavailable.
|
// if a part of storageNodes are temporarily unavailable.
|
||||||
// Do not return the error, since it may spam logs on busy vmselect
|
// Do not return the error, since it may spam logs on busy vmselect
|
||||||
// serving high amounts of requests.
|
// serving high amounts of requests.
|
||||||
partialSearchResults.Inc()
|
partialResultsCounter.Inc()
|
||||||
if denyPartialResponse {
|
if denyPartialResponse {
|
||||||
return true, errors[0]
|
return true, errors[0]
|
||||||
}
|
}
|
||||||
@ -2401,14 +2268,17 @@ func Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
partialLabelsOnTimeRangeResults = metrics.NewCounter(`vm_partial_labels_on_time_range_results_total{name="vmselect"}`)
|
partialDeleteSeriesResults = metrics.NewCounter(`vm_partial_results_total{type="delete_series", name="vmselect"}`)
|
||||||
partialLabelsResults = metrics.NewCounter(`vm_partial_labels_results_total{name="vmselect"}`)
|
partialLabelsOnTimeRangeResults = metrics.NewCounter(`vm_partial_results_total{type="labels_on_time_range", name="vmselect"}`)
|
||||||
partialLabelValuesOnTimeRangeResults = metrics.NewCounter(`vm_partial_label_values_on_time_range_results_total{name="vmselect"}`)
|
partialLabelsResults = metrics.NewCounter(`vm_partial_results_total{type="labels", name="vmselect"}`)
|
||||||
partialLabelValuesResults = metrics.NewCounter(`vm_partial_label_values_results_total{name="vmselect"}`)
|
partialLabelValuesOnTimeRangeResults = metrics.NewCounter(`vm_partial_results_total{type="label_values_on_time_range", name="vmselect"}`)
|
||||||
partialLabelEntriesResults = metrics.NewCounter(`vm_partial_label_entries_results_total{name="vmselect"}`)
|
partialLabelValuesResults = metrics.NewCounter(`vm_partial_results_total{type="label_values", name="vmselect"}`)
|
||||||
partialTSDBStatusResults = metrics.NewCounter(`vm_partial_tsdb_status_results_total{name="vmselect"}`)
|
partialTagValueSuffixesResults = metrics.NewCounter(`vm_partial_results_total{type="tag_value_suffixes", name="vmselect"}`)
|
||||||
partialSeriesCountResults = metrics.NewCounter(`vm_partial_series_count_results_total{name="vmselect"}`)
|
partialLabelEntriesResults = metrics.NewCounter(`vm_partial_results_total{type="label_entries", name="vmselect"}`)
|
||||||
partialSearchResults = metrics.NewCounter(`vm_partial_search_results_total{name="vmselect"}`)
|
partialTSDBStatusResults = metrics.NewCounter(`vm_partial_results_total{type="tsdb_status", name="vmselect"}`)
|
||||||
|
partialSeriesCountResults = metrics.NewCounter(`vm_partial_results_total{type="series_count" name="vmselect"}`)
|
||||||
|
partialSearchMetricNamesResults = metrics.NewCounter(`vm_partial_results_total{type="search_metric_names", name="vmselect"}`)
|
||||||
|
partialSearchResults = metrics.NewCounter(`vm_partial_results_total{type="search", name="vmselect"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
// The maximum number of concurrent queries per storageNode.
|
// The maximum number of concurrent queries per storageNode.
|
||||||
|
Loading…
Reference in New Issue
Block a user