changes metricsFind api (#1137)

it should be able mitigate crash if label value contains *,[ or { symbols
This commit is contained in:
Nikolay 2021-03-18 17:12:02 +03:00 committed by Aliaksandr Valialkin
parent ff6fe0698b
commit 8807410a00

View File

@ -78,7 +78,7 @@ func MetricsFindHandler(startTime time.Time, at *auth.Token, w http.ResponseWrit
MaxTimestamp: until,
}
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
paths, isPartial, err := metricsFind(at, denyPartialResponse, tr, label, query, delimiter[0], false, deadline)
paths, isPartial, err := metricsFind(at, denyPartialResponse, tr, label, "", query, delimiter[0], false, deadline)
if err != nil {
return err
}
@ -159,7 +159,7 @@ func MetricsExpandHandler(startTime time.Time, at *auth.Token, w http.ResponseWr
isPartialResponse := false
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
for _, query := range queries {
paths, isPartial, err := metricsFind(at, denyPartialResponse, tr, label, query, delimiter[0], true, deadline)
paths, isPartial, err := metricsFind(at, denyPartialResponse, tr, label, "", query, delimiter[0], true, deadline)
if err != nil {
return err
}
@ -230,45 +230,51 @@ func MetricsIndexHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
return nil
}
// metricsFind searches for label values that match the given query.
func metricsFind(at *auth.Token, denyPartialResponse bool, tr storage.TimeRange, label, query string, delimiter byte,
// metricsFind searches for label values that match the given head and tail.
func metricsFind(at *auth.Token, denyPartialResponse bool, tr storage.TimeRange, label, head, tail string, delimiter byte,
isExpand bool, deadline searchutils.Deadline) ([]string, bool, error) {
n := strings.IndexAny(query, "*{[")
if n < 0 || n == len(query)-1 && strings.HasSuffix(query, "*") {
expandTail := n >= 0
if expandTail {
query = query[:len(query)-1]
n := strings.IndexAny(tail, "*{[")
// fast path.
if n < 0 {
res, isPartial, err := netstorage.GetTagValueSuffixes(at, denyPartialResponse, tr, label, head+tail, delimiter, deadline)
if err != nil {
return nil, false, err
}
suffixes, isPartial, err := netstorage.GetTagValueSuffixes(at, denyPartialResponse, tr, label, query, delimiter, deadline)
if len(res) == 0 {
return nil, false, nil
}
return []string{head + tail}, isPartial, nil
}
if strings.HasSuffix(head, "*") {
head = head[:len(head)-1]
suffixes, isPartial, err := netstorage.GetTagValueSuffixes(at, denyPartialResponse, tr, label, head, delimiter, deadline)
if err != nil {
return nil, false, err
}
if len(suffixes) == 0 {
return nil, false, nil
}
if !expandTail && len(query) > 0 && query[len(query)-1] == delimiter {
return []string{query}, false, nil
}
results := make([]string, 0, len(suffixes))
for _, suffix := range suffixes {
if expandTail || len(suffix) == 0 || len(suffix) == 1 && suffix[0] == delimiter {
results = append(results, query+suffix)
}
results = append(results, head+suffix)
}
return results, isPartial, nil
}
subquery := query[:n] + "*"
paths, isPartial, err := metricsFind(at, denyPartialResponse, tr, label, subquery, delimiter, isExpand, deadline)
head += tail[:n]
subquery := head + "*"
// execute subquery with the given head.
paths, isPartial, err := metricsFind(at, denyPartialResponse, tr, label, subquery, "*", delimiter, isExpand, deadline)
if err != nil {
return nil, false, err
}
tail := ""
suffix := query[n:]
tailNew := ""
suffix := tail[n:]
if m := strings.IndexByte(suffix, delimiter); m >= 0 {
tail = suffix[m+1:]
tailNew = suffix[m+1:]
suffix = suffix[:m+1]
}
qPrefix := query[:n] + suffix
qPrefix := head + suffix
rePrefix, err := getRegexpForQuery(qPrefix, delimiter)
if err != nil {
return nil, false, fmt.Errorf("cannot convert query %q to regexp: %w", qPrefix, err)
@ -278,12 +284,11 @@ func metricsFind(at *auth.Token, denyPartialResponse bool, tr storage.TimeRange,
if !rePrefix.MatchString(path) {
continue
}
if tail == "" {
if tailNew == "" {
results = append(results, path)
continue
}
subquery := path + tail
fullPaths, isPartialLocal, err := metricsFind(at, denyPartialResponse, tr, label, subquery, delimiter, isExpand, deadline)
fullPaths, isPartialLocal, err := metricsFind(at, denyPartialResponse, tr, label, path, tailNew, delimiter, isExpand, deadline)
if err != nil {
return nil, false, err
}