add filter to getMetricIDs (#783)

* add getMetricIDs filter

* check nil filter before use
This commit is contained in:
faceair 2020-09-21 16:33:43 -05:00 committed by GitHub
parent 94f7d00537
commit be5e1222f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1927,7 +1927,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
continue
}
metricIDs, err := is.getMetricIDsForTagFilter(tf, maxMetrics)
metricIDs, err := is.getMetricIDsForTagFilter(tf, maxMetrics, nil)
if err != nil {
if err == errFallbackToMetricNameMatch {
// Skip tag filters requiring to scan for too many metrics.
@ -2190,7 +2190,7 @@ const (
var uselessTagFilterCacheValue = []byte("1")
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (*uint64set.Set, error) {
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, filter *uint64set.Set) (*uint64set.Set, error) {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
}
@ -2208,7 +2208,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
// Slow path - scan for all the rows with the given prefix.
maxLoops := maxMetrics * maxIndexScanSlowLoopsPerMetric
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, filter, func(metricID uint64) bool {
metricIDs.Add(metricID)
return metricIDs.Len() < maxMetrics
})
@ -2221,7 +2221,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
return metricIDs, nil
}
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, f func(metricID uint64) bool) error {
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, filter *uint64set.Set, f func(metricID uint64) bool) error {
if len(tf.orSuffixes) > 0 {
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
}
@ -2258,6 +2258,24 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
if err := mp.InitOnlyTail(item, tail); err != nil {
return err
}
mp.ParseMetricIDs()
// Fast path: use the filter to skip items that don't need to be matched.
if filter != nil {
hasCommonMetric := false
for _, metricID := range mp.MetricIDs {
if filter.Has(metricID) {
hasCommonMetric = true
break
}
}
// For both complement and intersection calculations, the result and filter need to have elements in common.
if !hasCommonMetric {
continue
}
}
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
// Fast path: the same tag value found.
// There is no need in checking it again with potentially
@ -2266,7 +2284,6 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
if !f(metricID) {
return nil
@ -2305,7 +2322,6 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
if !f(metricID) {
return nil
@ -2649,7 +2665,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
tfsRemainingWithCount = append(tfsRemainingWithCount, tfsWithCount[i])
continue
}
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics, nil)
if err != nil {
return nil, err
}
@ -2700,7 +2716,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
break
}
tf := tfWithCount.tf
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics, metricIDs)
if err != nil {
return nil, err
}
@ -2875,7 +2891,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
return true, nil
}
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) {
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int, filter *uint64set.Set) (*uint64set.Set, error) {
// Augument tag filter prefix for per-date search instead of global search.
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
@ -2889,17 +2905,21 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
tfNew := *tf
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
tfNew.prefix = kb.B
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics, filter)
// Store the number of matching metricIDs in the cache in order to sort tag filters
// in ascending number of matching metricIDs on the next search.
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
metricIDsLen := uint64(metricIDs.Len())
if err != nil {
var metricIDsLen uint64
if err == nil {
metricIDsLen = uint64(metricIDs.Len())
} else {
// Set metricIDsLen to maxMetrics, so the given entry will be moved to the end
// of tag filters on the next search.
metricIDsLen = uint64(maxMetrics)
}
kb.B = encoding.MarshalUint64(kb.B[:0], metricIDsLen)
is.db.metricIDsPerDateTagFilterCache.Set(is.kb.B, kb.B)
return metricIDs, err
@ -3032,7 +3052,7 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil
// Slow path - scan for all the rows with the given prefix.
maxLoops := filter.Len() * maxIndexScanSlowLoopsPerMetric
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, filter, func(metricID uint64) bool {
if tf.isNegative {
// filter must be equal to metricIDs
metricIDs.Del(metricID)