lib/storage: optimize convert multiple values regexp filter to composite tag filter (#1610)

* lib/storage: optimize convert multiple values regexp filter to composite tag filter

* Apply suggestions from code review

Co-authored-by: Aliaksandr Valialkin <valyala@gmail.com>
This commit is contained in:
faceair 2021-09-14 17:47:07 +08:00 committed by Aliaksandr Valialkin
parent 5e5ce27df7
commit 61a51f7c15
2 changed files with 357 additions and 166 deletions

View File

@ -19,34 +19,58 @@ import (
//
// This converts `foo{bar="baz",x=~"a.+"}` to `{foo=bar="baz",foo=x=~"a.+"} filter.
func convertToCompositeTagFilterss(tfss []*TagFilters) []*TagFilters {
tfssNew := make([]*TagFilters, len(tfss))
for i, tfs := range tfss {
tfssNew[i] = convertToCompositeTagFilters(tfs)
tfssNew := make([]*TagFilters, 0, len(tfss))
for _, tfs := range tfss {
tfssNew = append(tfss, convertToCompositeTagFilters(tfs)...)
}
return tfssNew
}
func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters {
// Search for metric name filter, which must be used for creating composite filters.
var name []byte
func convertToCompositeTagFilters(tfs *TagFilters) []*TagFilters {
tfssCompiled := make([]*TagFilters, 0)
// Search for filters on metric name, which will be used for creating composite filters.
var names [][]byte
hasPositiveFilter := false
for _, tf := range tfs.tfs {
if len(tf.key) == 0 && !tf.isNegative && !tf.isRegexp {
name = tf.value
names = [][]byte{tf.value}
} else if len(tf.key) == 0 && !tf.isNegative && tf.isRegexp && len(tf.orSuffixes) > 0 {
// Split the filter {__name__=~"name1|...|nameN", other_filters}
// into `name1{other_filters}`, ..., `nameN{other_filters}`
// and generate composite filters for each of them
names = names[:0]
for _, orSuffix := range tf.orSuffixes {
names = append(names, []byte(orSuffix))
}
} else if !tf.isNegative && !tf.isEmptyMatch {
hasPositiveFilter = true
}
}
if len(name) == 0 {
if len(names) == 0 {
tfssCompiled = append(tfssCompiled, tfs)
atomic.AddUint64(&compositeFilterMissingConversions, 1)
return tfs
return tfssCompiled
}
tfsNew := make([]tagFilter, 0, len(tfs.tfs))
var compositeKey []byte
compositeFilters := 0
for _, name := range names {
tfsNew := make([]tagFilter, 0, len(tfs.tfs))
for _, tf := range tfs.tfs {
if len(tf.key) == 0 {
if !hasPositiveFilter || tf.isNegative || tf.isRegexp || string(tf.value) != string(name) {
sameOrSuffixes := true
if len(names) != len(tf.orSuffixes) {
sameOrSuffixes = false
} else {
for i, orSuffix := range tf.orSuffixes {
if string(names[i]) != orSuffix {
sameOrSuffixes = false
break
}
}
}
if !hasPositiveFilter || tf.isNegative || tf.isRegexp && !sameOrSuffixes || !tf.isRegexp && string(tf.value) != string(name) {
tfsNew = append(tfsNew, tf)
}
continue
@ -55,6 +79,7 @@ func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters {
tfsNew = append(tfsNew, tf)
continue
}
compositeKey = marshalCompositeTagKey(compositeKey[:0], name, tf.key)
var tfNew tagFilter
if err := tfNew.Init(tfs.commonPrefix, compositeKey, tf.value, tf.isNegative, tf.isRegexp); err != nil {
@ -63,14 +88,17 @@ func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters {
tfsNew = append(tfsNew, tfNew)
compositeFilters++
}
if compositeFilters == 0 {
atomic.AddUint64(&compositeFilterMissingConversions, 1)
return tfs
}
tfsCompiled := NewTagFilters(tfs.accountID, tfs.projectID)
tfsCompiled.tfs = tfsNew
tfssCompiled = append(tfssCompiled, tfsCompiled)
}
if compositeFilters == 0 {
tfssCompiled = append(tfssCompiled[:0], tfs)
atomic.AddUint64(&compositeFilterMissingConversions, 1)
return tfssCompiled
}
atomic.AddUint64(&compositeFilterSuccessConversions, 1)
return tfsCompiled
return tfssCompiled
}
var (

View File

@ -7,7 +7,7 @@ import (
)
func TestConvertToCompositeTagFilters(t *testing.T) {
f := func(tfs, resultExpected []TagFilter) {
f := func(tfs []TagFilter, resultExpected [][]TagFilter) {
t.Helper()
accountID := uint32(123)
projectID := uint32(456)
@ -17,29 +17,33 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
t.Fatalf("cannot add tf=%s: %s", tf.String(), err)
}
}
resultCompiled := convertToCompositeTagFilters(tfsCompiled)
resultCompileds := convertToCompositeTagFilters(tfsCompiled)
result := make([][]TagFilter, len(resultCompileds))
for i, resultCompiled := range resultCompileds {
if resultCompiled.accountID != accountID {
t.Fatalf("unexpected accountID; got %d; want %d", resultCompiled.accountID, accountID)
}
if resultCompiled.projectID != projectID {
t.Fatalf("unexpected projectID; got %d; want %d", resultCompiled.projectID, projectID)
}
result := make([]TagFilter, len(resultCompiled.tfs))
tfs := make([]TagFilter, len(resultCompiled.tfs))
for i, tf := range resultCompiled.tfs {
result[i] = TagFilter{
tfs[i] = TagFilter{
Key: tf.key,
Value: tf.value,
IsNegative: tf.isNegative,
IsRegexp: tf.isRegexp,
}
}
result[i] = tfs
}
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result;\ngot\n%+v\nwant\n%+v", result, resultExpected)
}
}
// Empty filters
f(nil, []TagFilter{})
f(nil, [][]TagFilter{{}})
// A single non-name filter
f([]TagFilter{
@ -49,13 +53,15 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: []byte("foo"),
Value: []byte("bar"),
IsNegative: false,
IsRegexp: false,
},
},
})
// Multiple non-name filters
@ -72,7 +78,8 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: true,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: []byte("foo"),
Value: []byte("bar"),
@ -85,6 +92,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: true,
IsRegexp: false,
},
},
})
// A single name filter
@ -95,13 +103,15 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: nil,
Value: []byte("bar"),
IsNegative: false,
IsRegexp: false,
},
},
})
// Two name filters
@ -118,7 +128,8 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: nil,
Value: []byte("bar"),
@ -131,6 +142,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
},
})
// A name filter with non-name filter.
@ -147,13 +159,15 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: []byte("\xfe\x03barfoo"),
Value: []byte("abc"),
IsNegative: false,
IsRegexp: false,
},
},
})
// A name filter with a single negative filter
@ -170,7 +184,8 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: true,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: nil,
Value: []byte("bar"),
@ -183,6 +198,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: true,
IsRegexp: false,
},
},
})
// A name filter with a negative and a positive filter
@ -205,7 +221,8 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: true,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: []byte("\xfe\x03barfoo"),
Value: []byte("abc"),
@ -218,6 +235,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: true,
},
},
})
// Two name filters with non-name filter.
@ -240,7 +258,8 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: nil,
Value: []byte("bar"),
@ -253,6 +272,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
},
})
// A name filter with regexp non-name filter, which can be converted to non-regexp.
@ -269,13 +289,15 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: true,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: []byte("\xfe\x03barfoo"),
Value: []byte("abc"),
IsNegative: false,
IsRegexp: false,
},
},
})
// A name filter with regexp non-name filter.
@ -292,13 +314,15 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: true,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: []byte("\xfe\x03barfoo"),
Value: []byte("abc.+"),
IsNegative: false,
IsRegexp: true,
},
},
})
// A name filter with graphite filter.
@ -315,7 +339,8 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: nil,
Value: []byte("bar"),
@ -328,6 +353,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
},
})
// A name filter with non-name filter and a graphite filter.
@ -350,7 +376,8 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: []byte("\xfe\x03barfoo"),
Value: []byte("abc"),
@ -363,6 +390,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
},
})
// Regexp name filter, which can be converted to non-regexp, with non-name filter.
@ -379,13 +407,144 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: []byte("\xfe\x03barfoo"),
Value: []byte("abc"),
IsNegative: false,
IsRegexp: false,
},
},
})
// Multiple values regexp filter, which can be converted to non-regexp, with non-name filter.
f([]TagFilter{
{
Key: nil,
Value: []byte("bar|foo"),
IsNegative: false,
IsRegexp: true,
},
{
Key: []byte("foo"),
Value: []byte("abc"),
IsNegative: false,
IsRegexp: false,
},
}, [][]TagFilter{
{
{
Key: []byte("\xfe\x03barfoo"),
Value: []byte("abc"),
IsNegative: false,
IsRegexp: false,
},
},
{
{
Key: []byte("\xfe\x03foofoo"),
Value: []byte("abc"),
IsNegative: false,
IsRegexp: false,
},
},
})
// Two multiple values regexp filter, which can be converted to non-regexp, with non-name filter.
f([]TagFilter{
{
Key: nil,
Value: []byte("bar|foo"),
IsNegative: false,
IsRegexp: true,
},
{
Key: nil,
Value: []byte("abc|def"),
IsNegative: false,
IsRegexp: true,
},
{
Key: []byte("face"),
Value: []byte("air"),
IsNegative: false,
IsRegexp: false,
},
}, [][]TagFilter{
{
{
Key: nil,
Value: []byte("bar|foo"),
IsNegative: false,
IsRegexp: true,
},
{
Key: []byte("\xfe\x03abcface"),
Value: []byte("air"),
IsNegative: false,
IsRegexp: false,
},
},
{
{
Key: nil,
Value: []byte("bar|foo"),
IsNegative: false,
IsRegexp: true,
},
{
Key: []byte("\xfe\x03defface"),
Value: []byte("air"),
IsNegative: false,
IsRegexp: false,
},
},
})
// Multiple values regexp filter with a single negative filter
f([]TagFilter{
{
Key: nil,
Value: []byte("bar|foo"),
IsNegative: false,
IsRegexp: true,
},
{
Key: []byte("foo"),
Value: []byte("abc"),
IsNegative: true,
IsRegexp: false,
},
}, [][]TagFilter{
{
{
Key: nil,
Value: []byte("bar|foo"),
IsNegative: false,
IsRegexp: true,
},
{
Key: []byte("\xfe\x03barfoo"),
Value: []byte("abc"),
IsNegative: true,
IsRegexp: false,
},
},
{
{
Key: nil,
Value: []byte("bar|foo"),
IsNegative: false,
IsRegexp: true,
},
{
Key: []byte("\xfe\x03foofoo"),
Value: []byte("abc"),
IsNegative: true,
IsRegexp: false,
},
},
})
// Regexp name filter with non-name filter.
@ -402,7 +561,8 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: true,
IsRegexp: false,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: nil,
Value: []byte("bar.+"),
@ -415,6 +575,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: true,
IsRegexp: false,
},
},
})
// Regexp non-name filter, which matches anything.
@ -431,13 +592,15 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
IsNegative: false,
IsRegexp: true,
},
}, []TagFilter{
}, [][]TagFilter{
{
{
Key: nil,
Value: []byte("bar"),
IsNegative: false,
IsRegexp: false,
},
},
})
}