mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
add filter to getMetricIDs (#783)
* add getMetricIDs filter * check nil filter before use
This commit is contained in:
parent
ed473c94ff
commit
ad41e39350
@ -1962,7 +1962,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
metricIDs, err := is.getMetricIDsForTagFilter(tf, maxMetrics)
|
metricIDs, err := is.getMetricIDsForTagFilter(tf, maxMetrics, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errFallbackToMetricNameMatch {
|
if err == errFallbackToMetricNameMatch {
|
||||||
// Skip tag filters requiring to scan for too many metrics.
|
// Skip tag filters requiring to scan for too many metrics.
|
||||||
@ -2225,7 +2225,7 @@ const (
|
|||||||
|
|
||||||
var uselessTagFilterCacheValue = []byte("1")
|
var uselessTagFilterCacheValue = []byte("1")
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (*uint64set.Set, error) {
|
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, filter *uint64set.Set) (*uint64set.Set, error) {
|
||||||
if tf.isNegative {
|
if tf.isNegative {
|
||||||
logger.Panicf("BUG: isNegative must be false")
|
logger.Panicf("BUG: isNegative must be false")
|
||||||
}
|
}
|
||||||
@ -2243,7 +2243,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
|
|||||||
|
|
||||||
// Slow path - scan for all the rows with the given prefix.
|
// Slow path - scan for all the rows with the given prefix.
|
||||||
maxLoops := maxMetrics * maxIndexScanSlowLoopsPerMetric
|
maxLoops := maxMetrics * maxIndexScanSlowLoopsPerMetric
|
||||||
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
|
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, filter, func(metricID uint64) bool {
|
||||||
metricIDs.Add(metricID)
|
metricIDs.Add(metricID)
|
||||||
return metricIDs.Len() < maxMetrics
|
return metricIDs.Len() < maxMetrics
|
||||||
})
|
})
|
||||||
@ -2256,7 +2256,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
|
|||||||
return metricIDs, nil
|
return metricIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, f func(metricID uint64) bool) error {
|
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, filter *uint64set.Set, f func(metricID uint64) bool) error {
|
||||||
if len(tf.orSuffixes) > 0 {
|
if len(tf.orSuffixes) > 0 {
|
||||||
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
|
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
|
||||||
}
|
}
|
||||||
@ -2293,6 +2293,24 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
|
|||||||
if err := mp.InitOnlyTail(item, tail); err != nil {
|
if err := mp.InitOnlyTail(item, tail); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mp.ParseMetricIDs()
|
||||||
|
|
||||||
|
// Fast path: use the filter to skip items that don't need to be matched.
|
||||||
|
if filter != nil {
|
||||||
|
hasCommonMetric := false
|
||||||
|
for _, metricID := range mp.MetricIDs {
|
||||||
|
if filter.Has(metricID) {
|
||||||
|
hasCommonMetric = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// For both complement and intersection calculations, the result and filter need to have elements in common.
|
||||||
|
if !hasCommonMetric {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
|
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
|
||||||
// Fast path: the same tag value found.
|
// Fast path: the same tag value found.
|
||||||
// There is no need in checking it again with potentially
|
// There is no need in checking it again with potentially
|
||||||
@ -2301,7 +2319,6 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
|
|||||||
if loops > maxLoops {
|
if loops > maxLoops {
|
||||||
return errFallbackToMetricNameMatch
|
return errFallbackToMetricNameMatch
|
||||||
}
|
}
|
||||||
mp.ParseMetricIDs()
|
|
||||||
for _, metricID := range mp.MetricIDs {
|
for _, metricID := range mp.MetricIDs {
|
||||||
if !f(metricID) {
|
if !f(metricID) {
|
||||||
return nil
|
return nil
|
||||||
@ -2340,7 +2357,6 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
|
|||||||
if loops > maxLoops {
|
if loops > maxLoops {
|
||||||
return errFallbackToMetricNameMatch
|
return errFallbackToMetricNameMatch
|
||||||
}
|
}
|
||||||
mp.ParseMetricIDs()
|
|
||||||
for _, metricID := range mp.MetricIDs {
|
for _, metricID := range mp.MetricIDs {
|
||||||
if !f(metricID) {
|
if !f(metricID) {
|
||||||
return nil
|
return nil
|
||||||
@ -2684,7 +2700,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
tfsRemainingWithCount = append(tfsRemainingWithCount, tfsWithCount[i])
|
tfsRemainingWithCount = append(tfsRemainingWithCount, tfsWithCount[i])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
|
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -2735,7 +2751,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
tf := tfWithCount.tf
|
tf := tfWithCount.tf
|
||||||
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
|
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics, metricIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -2921,7 +2937,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) {
|
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int, filter *uint64set.Set) (*uint64set.Set, error) {
|
||||||
// Augument tag filter prefix for per-date search instead of global search.
|
// Augument tag filter prefix for per-date search instead of global search.
|
||||||
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
||||||
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
||||||
@ -2935,17 +2951,21 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
|
|||||||
tfNew := *tf
|
tfNew := *tf
|
||||||
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
|
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
|
||||||
tfNew.prefix = kb.B
|
tfNew.prefix = kb.B
|
||||||
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
|
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics, filter)
|
||||||
|
|
||||||
// Store the number of matching metricIDs in the cache in order to sort tag filters
|
// Store the number of matching metricIDs in the cache in order to sort tag filters
|
||||||
// in ascending number of matching metricIDs on the next search.
|
// in ascending number of matching metricIDs on the next search.
|
||||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
||||||
metricIDsLen := uint64(metricIDs.Len())
|
|
||||||
if err != nil {
|
var metricIDsLen uint64
|
||||||
|
if err == nil {
|
||||||
|
metricIDsLen = uint64(metricIDs.Len())
|
||||||
|
} else {
|
||||||
// Set metricIDsLen to maxMetrics, so the given entry will be moved to the end
|
// Set metricIDsLen to maxMetrics, so the given entry will be moved to the end
|
||||||
// of tag filters on the next search.
|
// of tag filters on the next search.
|
||||||
metricIDsLen = uint64(maxMetrics)
|
metricIDsLen = uint64(maxMetrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
kb.B = encoding.MarshalUint64(kb.B[:0], metricIDsLen)
|
kb.B = encoding.MarshalUint64(kb.B[:0], metricIDsLen)
|
||||||
is.db.metricIDsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
is.db.metricIDsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
||||||
return metricIDs, err
|
return metricIDs, err
|
||||||
@ -3078,7 +3098,7 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil
|
|||||||
|
|
||||||
// Slow path - scan for all the rows with the given prefix.
|
// Slow path - scan for all the rows with the given prefix.
|
||||||
maxLoops := filter.Len() * maxIndexScanSlowLoopsPerMetric
|
maxLoops := filter.Len() * maxIndexScanSlowLoopsPerMetric
|
||||||
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
|
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, filter, func(metricID uint64) bool {
|
||||||
if tf.isNegative {
|
if tf.isNegative {
|
||||||
// filter must be equal to metricIDs
|
// filter must be equal to metricIDs
|
||||||
metricIDs.Del(metricID)
|
metricIDs.Del(metricID)
|
||||||
|
Loading…
Reference in New Issue
Block a user