lib/storage: properly handle {label=~"foo|"} filters as Prometheus does

Such filters must match all the time series with `label="foo"` plus all the time series without `label`

Previously only time series with `label="foo"` were matched.
This commit is contained in:
Aliaksandr Valialkin 2020-03-30 18:34:51 +03:00
parent b47444e69d
commit d450249955
2 changed files with 40 additions and 1 deletions

View File

@ -782,6 +782,7 @@ func (ctx *vmselectRequestCtx) setupTfss() error {
return fmt.Errorf("cannot parse tag filter %s: %s", tf, err) return fmt.Errorf("cannot parse tag filter %s: %s", tf, err)
} }
} }
tfss = append(tfss, tfs.Finalize()...)
} }
ctx.tfss = tfss ctx.tfss = tfss
return nil return nil

View File

@ -39,6 +39,8 @@ func NewTagFilters(accountID, projectID uint32) *TagFilters {
// Add adds the given tag filter to tfs. // Add adds the given tag filter to tfs.
// //
// MetricGroup must be encoded with nil key. // MetricGroup must be encoded with nil key.
//
// Finalize must be called after tfs is constructed.
func (tfs *TagFilters) Add(key, value []byte, isNegative, isRegexp bool) error { func (tfs *TagFilters) Add(key, value []byte, isNegative, isRegexp bool) error {
// Verify whether tag filter is empty. // Verify whether tag filter is empty.
if len(value) == 0 { if len(value) == 0 {
@ -72,6 +74,34 @@ func (tfs *TagFilters) Add(key, value []byte, isNegative, isRegexp bool) error {
return nil return nil
} }
// Finalize finalizes tfs and may return complementary TagFilters,
// which must be added to the resulting set of tag filters.
func (tfs *TagFilters) Finalize() []*TagFilters {
var tfssNew []*TagFilters
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf.matchesEmptyValue {
// tf matches empty value, so it must be accompanied with `key!~".+"` tag filter
// in order to match time series without the given label.
tfssNew = append(tfssNew, tfs.cloneWithNegativeFilter(tf))
}
}
return tfssNew
}
func (tfs *TagFilters) cloneWithNegativeFilter(tfNegative *tagFilter) *TagFilters {
tfsNew := NewTagFilters(tfs.accountID, tfs.projectID)
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf == tfNegative {
tfsNew.Add(tf.key, []byte(".+"), true, true)
} else {
tfsNew.Add(tf.key, tf.value, tf.isNegative, tf.isRegexp)
}
}
return tfsNew
}
// String returns human-readable value for tfs. // String returns human-readable value for tfs.
func (tfs *TagFilters) String() string { func (tfs *TagFilters) String() string {
var bb bytes.Buffer var bb bytes.Buffer
@ -114,7 +144,7 @@ type tagFilter struct {
// Prefix always contains {nsPrefixTagToMetricIDs, AccountID, ProjectID, key}. // Prefix always contains {nsPrefixTagToMetricIDs, AccountID, ProjectID, key}.
// Additionally it contains: // Additionally it contains:
// - value ending with tagSeparatorChar if !isRegexp. // - value if !isRegexp.
// - non-regexp prefix if isRegexp. // - non-regexp prefix if isRegexp.
prefix []byte prefix []byte
@ -123,6 +153,11 @@ type tagFilter struct {
// Matches regexp suffix. // Matches regexp suffix.
reSuffixMatch func(b []byte) bool reSuffixMatch func(b []byte) bool
// Set to true for filter that matches empty value, i.e. "", "|foo" or ".*"
//
// Such a filter must be applied directly to metricNames.
matchesEmptyValue bool
} }
// String returns human-readable tf value. // String returns human-readable tf value.
@ -214,6 +249,9 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
} }
tf.orSuffixes = append(tf.orSuffixes[:0], rcv.orValues...) tf.orSuffixes = append(tf.orSuffixes[:0], rcv.orValues...)
tf.reSuffixMatch = rcv.reMatch tf.reSuffixMatch = rcv.reMatch
if len(prefix) == 0 && !tf.isNegative && tf.reSuffixMatch(nil) {
tf.matchesEmptyValue = true
}
return nil return nil
} }