diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index 8a45844264..11f824cbb4 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -19,58 +19,86 @@ import ( // // This converts `foo{bar="baz",x=~"a.+"}` to `{foo=bar="baz",foo=x=~"a.+"} filter. func convertToCompositeTagFilterss(tfss []*TagFilters) []*TagFilters { - tfssNew := make([]*TagFilters, len(tfss)) - for i, tfs := range tfss { - tfssNew[i] = convertToCompositeTagFilters(tfs) + tfssNew := make([]*TagFilters, 0, len(tfss)) + for _, tfs := range tfss { + tfssNew = append(tfss, convertToCompositeTagFilters(tfs)...) } return tfssNew } -func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters { - // Search for metric name filter, which must be used for creating composite filters. - var name []byte +func convertToCompositeTagFilters(tfs *TagFilters) []*TagFilters { + tfssCompiled := make([]*TagFilters, 0) + + // Search for filters on metric name, which will be used for creating composite filters. + var names [][]byte hasPositiveFilter := false for _, tf := range tfs.tfs { if len(tf.key) == 0 && !tf.isNegative && !tf.isRegexp { - name = tf.value + names = [][]byte{tf.value} + } else if len(tf.key) == 0 && !tf.isNegative && tf.isRegexp && len(tf.orSuffixes) > 0 { + // Split the filter {__name__=~"name1|...|nameN", other_filters} + // into `name1{other_filters}`, ..., `nameN{other_filters}` + // and generate composite filters for each of them + names = names[:0] + for _, orSuffix := range tf.orSuffixes { + names = append(names, []byte(orSuffix)) + } } else if !tf.isNegative && !tf.isEmptyMatch { hasPositiveFilter = true } } - if len(name) == 0 { + if len(names) == 0 { + tfssCompiled = append(tfssCompiled, tfs) atomic.AddUint64(&compositeFilterMissingConversions, 1) - return tfs + return tfssCompiled } - tfsNew := make([]tagFilter, 0, len(tfs.tfs)) + var compositeKey []byte compositeFilters := 0 - for _, tf := range tfs.tfs { - if len(tf.key) == 0 { - if !hasPositiveFilter || tf.isNegative || tf.isRegexp || string(tf.value) != string(name) { - tfsNew = append(tfsNew, tf) + for _, name := range names { + tfsNew := make([]tagFilter, 0, len(tfs.tfs)) + for _, tf := range tfs.tfs { + if len(tf.key) == 0 { + sameOrSuffixes := true + if len(names) != len(tf.orSuffixes) { + sameOrSuffixes = false + } else { + for i, orSuffix := range tf.orSuffixes { + if string(names[i]) != orSuffix { + sameOrSuffixes = false + break + } + } + } + if !hasPositiveFilter || tf.isNegative || tf.isRegexp && !sameOrSuffixes || !tf.isRegexp && string(tf.value) != string(name) { + tfsNew = append(tfsNew, tf) + } + continue } - continue + if string(tf.key) == "__graphite__" || bytes.Equal(tf.key, graphiteReverseTagKey) { + tfsNew = append(tfsNew, tf) + continue + } + + compositeKey = marshalCompositeTagKey(compositeKey[:0], name, tf.key) + var tfNew tagFilter + if err := tfNew.Init(tfs.commonPrefix, compositeKey, tf.value, tf.isNegative, tf.isRegexp); err != nil { + logger.Panicf("BUG: unexpected error when creating composite tag filter for name=%q and key=%q: %s", name, tf.key, err) + } + tfsNew = append(tfsNew, tfNew) + compositeFilters++ } - if string(tf.key) == "__graphite__" || bytes.Equal(tf.key, graphiteReverseTagKey) { - tfsNew = append(tfsNew, tf) - continue - } - compositeKey = marshalCompositeTagKey(compositeKey[:0], name, tf.key) - var tfNew tagFilter - if err := tfNew.Init(tfs.commonPrefix, compositeKey, tf.value, tf.isNegative, tf.isRegexp); err != nil { - logger.Panicf("BUG: unexpected error when creating composite tag filter for name=%q and key=%q: %s", name, tf.key, err) - } - tfsNew = append(tfsNew, tfNew) - compositeFilters++ + tfsCompiled := NewTagFilters(tfs.accountID, tfs.projectID) + tfsCompiled.tfs = tfsNew + tfssCompiled = append(tfssCompiled, tfsCompiled) } if compositeFilters == 0 { + tfssCompiled = append(tfssCompiled[:0], tfs) atomic.AddUint64(&compositeFilterMissingConversions, 1) - return tfs + return tfssCompiled } - tfsCompiled := NewTagFilters(tfs.accountID, tfs.projectID) - tfsCompiled.tfs = tfsNew atomic.AddUint64(&compositeFilterSuccessConversions, 1) - return tfsCompiled + return tfssCompiled } var ( diff --git a/lib/storage/tag_filters_test.go b/lib/storage/tag_filters_test.go index 7f3f0ba9be..01bd8d1f88 100644 --- a/lib/storage/tag_filters_test.go +++ b/lib/storage/tag_filters_test.go @@ -7,7 +7,7 @@ import ( ) func TestConvertToCompositeTagFilters(t *testing.T) { - f := func(tfs, resultExpected []TagFilter) { + f := func(tfs []TagFilter, resultExpected [][]TagFilter) { t.Helper() accountID := uint32(123) projectID := uint32(456) @@ -17,21 +17,25 @@ func TestConvertToCompositeTagFilters(t *testing.T) { t.Fatalf("cannot add tf=%s: %s", tf.String(), err) } } - resultCompiled := convertToCompositeTagFilters(tfsCompiled) - if resultCompiled.accountID != accountID { - t.Fatalf("unexpected accountID; got %d; want %d", resultCompiled.accountID, accountID) - } - if resultCompiled.projectID != projectID { - t.Fatalf("unexpected projectID; got %d; want %d", resultCompiled.projectID, projectID) - } - result := make([]TagFilter, len(resultCompiled.tfs)) - for i, tf := range resultCompiled.tfs { - result[i] = TagFilter{ - Key: tf.key, - Value: tf.value, - IsNegative: tf.isNegative, - IsRegexp: tf.isRegexp, + resultCompileds := convertToCompositeTagFilters(tfsCompiled) + result := make([][]TagFilter, len(resultCompileds)) + for i, resultCompiled := range resultCompileds { + if resultCompiled.accountID != accountID { + t.Fatalf("unexpected accountID; got %d; want %d", resultCompiled.accountID, accountID) } + if resultCompiled.projectID != projectID { + t.Fatalf("unexpected projectID; got %d; want %d", resultCompiled.projectID, projectID) + } + tfs := make([]TagFilter, len(resultCompiled.tfs)) + for i, tf := range resultCompiled.tfs { + tfs[i] = TagFilter{ + Key: tf.key, + Value: tf.value, + IsNegative: tf.isNegative, + IsRegexp: tf.isRegexp, + } + } + result[i] = tfs } if !reflect.DeepEqual(result, resultExpected) { t.Fatalf("unexpected result;\ngot\n%+v\nwant\n%+v", result, resultExpected) @@ -39,7 +43,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) { } // Empty filters - f(nil, []TagFilter{}) + f(nil, [][]TagFilter{{}}) // A single non-name filter f([]TagFilter{ @@ -49,12 +53,14 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: []byte("foo"), - Value: []byte("bar"), - IsNegative: false, - IsRegexp: false, + { + Key: []byte("foo"), + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, }, }) @@ -72,18 +78,20 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: true, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: []byte("foo"), - Value: []byte("bar"), - IsNegative: false, - IsRegexp: false, - }, - { - Key: []byte("x"), - Value: []byte("yy"), - IsNegative: true, - IsRegexp: false, + { + Key: []byte("foo"), + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, + { + Key: []byte("x"), + Value: []byte("yy"), + IsNegative: true, + IsRegexp: false, + }, }, }) @@ -95,12 +103,14 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: nil, - Value: []byte("bar"), - IsNegative: false, - IsRegexp: false, + { + Key: nil, + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, }, }) @@ -118,18 +128,20 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: nil, - Value: []byte("bar"), - IsNegative: false, - IsRegexp: false, - }, - { - Key: nil, - Value: []byte("baz"), - IsNegative: false, - IsRegexp: false, + { + Key: nil, + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, + { + Key: nil, + Value: []byte("baz"), + IsNegative: false, + IsRegexp: false, + }, }, }) @@ -147,12 +159,14 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: []byte("\xfe\x03barfoo"), - Value: []byte("abc"), - IsNegative: false, - IsRegexp: false, + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc"), + IsNegative: false, + IsRegexp: false, + }, }, }) @@ -170,18 +184,20 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: true, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: nil, - Value: []byte("bar"), - IsNegative: false, - IsRegexp: false, - }, - { - Key: []byte("\xfe\x03barfoo"), - Value: []byte("abc"), - IsNegative: true, - IsRegexp: false, + { + Key: nil, + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc"), + IsNegative: true, + IsRegexp: false, + }, }, }) @@ -205,18 +221,20 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: true, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: []byte("\xfe\x03barfoo"), - Value: []byte("abc"), - IsNegative: true, - IsRegexp: false, - }, - { - Key: []byte("\xfe\x03bara"), - Value: []byte("b.+"), - IsNegative: false, - IsRegexp: true, + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc"), + IsNegative: true, + IsRegexp: false, + }, + { + Key: []byte("\xfe\x03bara"), + Value: []byte("b.+"), + IsNegative: false, + IsRegexp: true, + }, }, }) @@ -240,18 +258,20 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: nil, - Value: []byte("bar"), - IsNegative: false, - IsRegexp: false, - }, - { - Key: []byte("\xfe\x03bazfoo"), - Value: []byte("abc"), - IsNegative: false, - IsRegexp: false, + { + Key: nil, + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, + { + Key: []byte("\xfe\x03bazfoo"), + Value: []byte("abc"), + IsNegative: false, + IsRegexp: false, + }, }, }) @@ -269,12 +289,14 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: true, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: []byte("\xfe\x03barfoo"), - Value: []byte("abc"), - IsNegative: false, - IsRegexp: false, + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc"), + IsNegative: false, + IsRegexp: false, + }, }, }) @@ -292,12 +314,14 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: true, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: []byte("\xfe\x03barfoo"), - Value: []byte("abc.+"), - IsNegative: false, - IsRegexp: true, + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc.+"), + IsNegative: false, + IsRegexp: true, + }, }, }) @@ -315,18 +339,20 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: nil, - Value: []byte("bar"), - IsNegative: false, - IsRegexp: false, - }, - { - Key: []byte("__graphite__"), - Value: []byte("foo.*.bar"), - IsNegative: false, - IsRegexp: false, + { + Key: nil, + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, + { + Key: []byte("__graphite__"), + Value: []byte("foo.*.bar"), + IsNegative: false, + IsRegexp: false, + }, }, }) @@ -350,18 +376,20 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: []byte("\xfe\x03barfoo"), - Value: []byte("abc"), - IsNegative: false, - IsRegexp: false, - }, - { - Key: []byte("__graphite__"), - Value: []byte("foo.*.bar"), - IsNegative: false, - IsRegexp: false, + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc"), + IsNegative: false, + IsRegexp: false, + }, + { + Key: []byte("__graphite__"), + Value: []byte("foo.*.bar"), + IsNegative: false, + IsRegexp: false, + }, }, }) @@ -379,13 +407,144 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: []byte("\xfe\x03barfoo"), + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc"), + IsNegative: false, + IsRegexp: false, + }, + }, + }) + + // Multiple values regexp filter, which can be converted to non-regexp, with non-name filter. + f([]TagFilter{ + { + Key: nil, + Value: []byte("bar|foo"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: []byte("foo"), Value: []byte("abc"), IsNegative: false, IsRegexp: false, }, + }, [][]TagFilter{ + { + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc"), + IsNegative: false, + IsRegexp: false, + }, + }, + { + { + Key: []byte("\xfe\x03foofoo"), + Value: []byte("abc"), + IsNegative: false, + IsRegexp: false, + }, + }, + }) + + // Two multiple values regexp filter, which can be converted to non-regexp, with non-name filter. + f([]TagFilter{ + { + Key: nil, + Value: []byte("bar|foo"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: nil, + Value: []byte("abc|def"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: []byte("face"), + Value: []byte("air"), + IsNegative: false, + IsRegexp: false, + }, + }, [][]TagFilter{ + { + { + Key: nil, + Value: []byte("bar|foo"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: []byte("\xfe\x03abcface"), + Value: []byte("air"), + IsNegative: false, + IsRegexp: false, + }, + }, + { + { + Key: nil, + Value: []byte("bar|foo"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: []byte("\xfe\x03defface"), + Value: []byte("air"), + IsNegative: false, + IsRegexp: false, + }, + }, + }) + + // Multiple values regexp filter with a single negative filter + f([]TagFilter{ + { + Key: nil, + Value: []byte("bar|foo"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: []byte("foo"), + Value: []byte("abc"), + IsNegative: true, + IsRegexp: false, + }, + }, [][]TagFilter{ + { + { + Key: nil, + Value: []byte("bar|foo"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc"), + IsNegative: true, + IsRegexp: false, + }, + }, + { + { + Key: nil, + Value: []byte("bar|foo"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: []byte("\xfe\x03foofoo"), + Value: []byte("abc"), + IsNegative: true, + IsRegexp: false, + }, + }, }) // Regexp name filter with non-name filter. @@ -402,18 +561,20 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: true, IsRegexp: false, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: nil, - Value: []byte("bar.+"), - IsNegative: false, - IsRegexp: true, - }, - { - Key: []byte("foo"), - Value: []byte("abc"), - IsNegative: true, - IsRegexp: false, + { + Key: nil, + Value: []byte("bar.+"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: []byte("foo"), + Value: []byte("abc"), + IsNegative: true, + IsRegexp: false, + }, }, }) @@ -431,12 +592,14 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: false, IsRegexp: true, }, - }, []TagFilter{ + }, [][]TagFilter{ { - Key: nil, - Value: []byte("bar"), - IsNegative: false, - IsRegexp: false, + { + Key: nil, + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, }, }) }