lib/storage: optimize time series search by regexp filter

This should improve search speed on label filters like `{foo=~"bar.+baz"}`
This commit is contained in:
Aliaksandr Valialkin 2019-06-27 16:15:25 +03:00
parent 0c8d463307
commit c1be1e4342

View File

@ -1507,17 +1507,35 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
return metricIDs, nil return metricIDs, nil
} }
// Slow path - scan all the rows with tf.prefix // Slow path - scan for all the rows with the given prefix.
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
metricIDs[metricID] = struct{}{}
return len(metricIDs) < maxMetrics
})
if err != nil {
return nil, err
}
return metricIDs, nil
}
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, f func(metricID uint64) bool) error {
if len(tf.orSuffixes) > 0 {
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
}
// Scan all the rows with tf.prefix and call f on every tf match.
loops := 0 loops := 0
ts := &is.ts ts := &is.ts
kb := &is.kb
var prevMatchingK []byte
var prevMatch bool
ts.Seek(tf.prefix) ts.Seek(tf.prefix)
for len(metricIDs) < maxMetrics && ts.NextItem() { for ts.NextItem() {
loops++ loops++
if loops > maxLoops { if loops > maxLoops {
return nil, errFallbackToMetricNameMatch return errFallbackToMetricNameMatch
} }
k := ts.Item k := ts.Item
if !bytes.HasPrefix(k, tf.prefix) { if !bytes.HasPrefix(k, tf.prefix) {
break break
@ -1526,25 +1544,49 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
// Get MetricID from k (the last 8 bytes). // Get MetricID from k (the last 8 bytes).
k = k[len(tf.prefix):] k = k[len(tf.prefix):]
if len(k) < 8 { if len(k) < 8 {
return nil, fmt.Errorf("invald key suffix size; want at least %d bytes; got %d bytes", 8, len(k)) return fmt.Errorf("invald key suffix size; want at least %d bytes; got %d bytes", 8, len(k))
} }
v := k[len(k)-8:] v := k[len(k)-8:]
k = k[:len(k)-8] k = k[:len(k)-8]
metricID := encoding.UnmarshalUint64(v)
if prevMatch && string(k) == string(prevMatchingK) {
// Fast path: the same tag value found.
// There is no need in checking it again with potentially
// slow tf.matchSuffix, which may call regexp.
if !f(metricID) {
break
}
continue
}
ok, err := tf.matchSuffix(k) ok, err := tf.matchSuffix(k)
if err != nil { if err != nil {
return nil, fmt.Errorf("error when matching %s: %s", tf, err) return fmt.Errorf("error when matching %s: %s", tf, err)
} }
if !ok { if !ok {
prevMatch = false
// Optimization: skip all the metricIDs for the given tag value
kb.B = append(kb.B[:0], ts.Item[:len(ts.Item)-8]...)
// The last char in kb.B must be tagSeparatorChar. Just increment it
// in order to jump to the next tag value.
if len(kb.B) == 0 || kb.B[len(kb.B)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff {
return fmt.Errorf("data corruption: the last char in k=%X must be %X", kb.B, tagSeparatorChar)
}
kb.B[len(kb.B)-1]++
ts.Seek(kb.B)
continue continue
} }
metricID := encoding.UnmarshalUint64(v) prevMatch = true
metricIDs[metricID] = struct{}{} prevMatchingK = append(prevMatchingK[:0], k...)
if !f(metricID) {
break
}
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return nil, fmt.Errorf("error when searching for tag filter prefix %q: %s", tf.prefix, err) return fmt.Errorf("error when searching for tag filter prefix %q: %s", tf.prefix, err)
} }
return metricIDs, nil return nil
} }
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs map[uint64]struct{}) error { func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs map[uint64]struct{}) error {
@ -1874,47 +1916,19 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map
// Slow path - scan for all the rows with the given prefix. // Slow path - scan for all the rows with the given prefix.
maxLoops := len(filter) * maxIndexScanLoopsPerMetric maxLoops := len(filter) * maxIndexScanLoopsPerMetric
loops := 0 err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
ts := &is.ts
ts.Seek(tf.prefix)
for ts.NextItem() {
loops++
if loops > maxLoops {
return nil, errFallbackToMetricNameMatch
}
k := ts.Item
if !bytes.HasPrefix(k, tf.prefix) {
break
}
// Extract MetricID from k (the last 8 bytes).
k = k[len(tf.prefix):]
if len(k) < 8 {
return nil, fmt.Errorf("cannot extract metricID from k; want at least %d bytes; got %d bytes", 8, len(k))
}
v := k[len(k)-8:]
k = k[:len(k)-8]
ok, err := tf.matchSuffix(k)
if err != nil {
return nil, fmt.Errorf("error when matching %s: %s", tf, err)
}
if !ok {
continue
}
metricID := encoding.UnmarshalUint64(v)
if tf.isNegative { if tf.isNegative {
// filter must be equal to metricIDs // filter must be equal to metricIDs
delete(metricIDs, metricID) delete(metricIDs, metricID)
continue return true
} }
if _, ok := filter[metricID]; ok { if _, ok := filter[metricID]; ok {
metricIDs[metricID] = struct{}{} metricIDs[metricID] = struct{}{}
} }
} return true
if err := ts.Error(); err != nil { })
return nil, fmt.Errorf("error searching %q: %s", tf.prefix, err) if err != nil {
return nil, err
} }
return metricIDs, nil return metricIDs, nil
} }