From 553016ea99f3741fb5fb5020b1af88bb63d68be6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 10 Feb 2021 14:37:14 +0200 Subject: [PATCH] lib/storage: disable composite index usage when querying old data --- app/vmselect/netstorage/netstorage.go | 5 -- docs/CHANGELOG.md | 2 +- lib/fs/fs.go | 43 ++++++++++++ lib/storage/index_db.go | 65 ++++++++++++++---- lib/storage/index_db_test.go | 10 +-- lib/storage/index_db_timing_test.go | 6 +- lib/storage/storage.go | 78 +++++++++++++++++---- lib/storage/tag_filters.go | 45 ++++++++----- lib/storage/tag_filters_test.go | 97 +++++++++++++++++++++++++-- 9 files changed, 289 insertions(+), 62 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 409b8938a..46874de35 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -25,8 +25,6 @@ var ( maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned from /api/v1/label//values") maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find") maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series each search can scan") - disableCompositeTagFilters = flag.Bool("search.disableCompositeTagFilters", false, "Whether to disable composite tag filters. This option is useful "+ - "for querying old data, which is created before v1.54.0 release. Note that disabled composite tag filters may reduce query performance") ) // Result is a single timeseries result. @@ -931,9 +929,6 @@ type blockRef struct { func setupTfss(tr storage.TimeRange, tagFilterss [][]storage.TagFilter, deadline searchutils.Deadline) ([]*storage.TagFilters, error) { tfss := make([]*storage.TagFilters, 0, len(tagFilterss)) for _, tagFilters := range tagFilterss { - if !*disableCompositeTagFilters { - tagFilters = storage.ConvertToCompositeTagFilters(tagFilters) - } tfs := storage.NewTagFilters() for i := range tagFilters { tf := &tagFilters[i] diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e370bfeff..4accc62d9 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,7 +2,7 @@ # tip -* FEATURE: optimize searching for time series by label filters where individual filters match big number of time series (more than a million). For example, the query `up{job="foobar"}` should work faster if `{job="foobar"}` matches a million of time series, while `up{job="foobar"}` matches much lower number of time series. The optimization can be disabled by passing `-search.disableCompositeTagFilters` command-line flag to VictoriaMetrics. +* FEATURE: optimize searching for time series by label filters where individual filters match big number of time series (more than a million). For example, the query `up{job="foobar"}` should work faster if `{job="foobar"}` matches a million of time series, while `up{job="foobar"}` matches much lower number of time series. * FEATURE: single-node VictoriaMetrics now accepts requests to handlers with `/prometheus` and `/graphite` prefixes such as `/prometheus/api/v1/query`. This improves compatibility with [handlers from VictoriaMetrics cluster](https://victoriametrics.github.io/Cluster-VictoriaMetrics.html#url-format). * FEATURE: expose `process_open_fds` and `process_max_fds` metrics. These metrics can be used for alerting when `process_open_fds` reaches `process_max_fds`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/402 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1037 * FEATURE: vmalert: add `-datasource.appendTypePrefix` command-line option for querying both Prometheus and Graphite datasource in cluster version of VictoriaMetrics. See [these docs](https://victoriametrics.github.io/vmalert.html#graphite) for details. diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 9e0cefb13..229e0266b 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -3,6 +3,7 @@ package fs import ( "fmt" "io" + "io/ioutil" "os" "path/filepath" "regexp" @@ -257,6 +258,48 @@ func SymlinkRelative(srcPath, dstPath string) error { return os.Symlink(srcPathRel, dstPath) } +// CopyDirectory copies all the files in srcPath to dstPath. +func CopyDirectory(srcPath, dstPath string) error { + fis, err := ioutil.ReadDir(srcPath) + if err != nil { + return err + } + if err := MkdirAllIfNotExist(dstPath); err != nil { + return err + } + for _, fi := range fis { + if !fi.Mode().IsRegular() { + // Skip non-files + continue + } + src := filepath.Join(srcPath, fi.Name()) + dst := filepath.Join(dstPath, fi.Name()) + if err := copyFile(src, dst); err != nil { + return err + } + } + MustSyncPath(dstPath) + return nil +} + +func copyFile(srcPath, dstPath string) error { + src, err := os.Open(srcPath) + if err != nil { + return err + } + defer MustClose(src) + dst, err := os.Create(dstPath) + if err != nil { + return err + } + defer MustClose(dst) + if _, err := io.Copy(dst, src); err != nil { + return err + } + MustSyncPath(dstPath) + return nil +} + // ReadFullData reads len(data) bytes from r. func ReadFullData(r io.Reader, data []byte) error { n, err := io.ReadFull(r, data) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 4143563b3..4ac2196cc 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -117,10 +117,13 @@ type indexDB struct { // metricIDs, since it usually requires 1 bit per deleted metricID. deletedMetricIDs atomic.Value deletedMetricIDsUpdateLock sync.Mutex + + // The minimum timestamp when queries with composite index can be used. + minTimestampForCompositeIndex int64 } // openIndexDB opens index db from the given path with the given caches. -func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (*indexDB, error) { +func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (*indexDB, error) { if metricIDCache == nil { logger.Panicf("BUG: metricIDCache must be non-nil") } @@ -152,6 +155,8 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working tsidCache: tsidCache, uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour), + + minTimestampForCompositeIndex: minTimestampForCompositeIndex, } is := db.getIndexSearch(noDeadline) @@ -1628,6 +1633,9 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, if len(tfss) == 0 { return nil, nil } + if tr.MinTimestamp >= db.minTimestampForCompositeIndex { + tfss = convertToCompositeTagFilterss(tfss) + } tfKeyBuf := tagFiltersKeyBufPool.Get() defer tagFiltersKeyBufPool.Put(tfKeyBuf) @@ -1886,8 +1894,8 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs sortedMetricIDs := srcMetricIDs.AppendTo(nil) kb := &is.kb - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) - tfs = fromCompositeTagFilters(tfs, kb.B) + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) + tfs = removeCompositeTagFilters(tfs, kb.B) metricName := kbPool.Get() defer kbPool.Put(metricName) @@ -2090,17 +2098,21 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet return nil, metricIDs, nil } -func fromCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter { - tfsNew := make([]*tagFilter, 0, len(tfs)) +func removeCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter { + if !hasCompositeTagFilters(tfs, prefix) { + return tfs + } + var tagKey []byte + var name []byte + tfsNew := make([]*tagFilter, 0, len(tfs)+1) for _, tf := range tfs { if !bytes.HasPrefix(tf.prefix, prefix) { tfsNew = append(tfsNew, tf) continue } suffix := tf.prefix[len(prefix):] - var tagKey, tail []byte var err error - tail, tagKey, err = unmarshalTagValue(tagKey[:0], suffix) + _, tagKey, err = unmarshalTagValue(tagKey[:0], suffix) if err != nil { logger.Panicf("BUG: cannot unmarshal tag key from suffix=%q: %s", suffix, err) } @@ -2114,20 +2126,49 @@ func fromCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter { if err != nil { logger.Panicf("BUG: cannot unmarshal nameLen from tagKey %q: %s", tagKey, err) } + if nameLen == 0 { + logger.Panicf("BUG: nameLen must be greater than 0") + } if uint64(len(tagKey)) < nameLen { logger.Panicf("BUG: expecting at %d bytes for name in tagKey=%q; got %d bytes", nameLen, tagKey, len(tagKey)) } + name = append(name[:0], tagKey[:nameLen]...) tagKey = tagKey[nameLen:] - tfNew := *tf - tfNew.key = append(tfNew.key[:0], tagKey...) - tfNew.prefix = append(tfNew.prefix[:0], prefix...) - tfNew.prefix = marshalTagValue(tfNew.prefix, tagKey) - tfNew.prefix = append(tfNew.prefix, tail...) + var tfNew tagFilter + if err := tfNew.Init(prefix, tagKey, tf.value, tf.isNegative, tf.isRegexp); err != nil { + logger.Panicf("BUG: cannot initialize {%s=%q} filter: %s", tagKey, tf.value, err) + } + tfsNew = append(tfsNew, &tfNew) + } + if len(name) > 0 { + var tfNew tagFilter + if err := tfNew.Init(prefix, nil, name, false, false); err != nil { + logger.Panicf("BUG: unexpected error when initializing {__name__=%q} filter: %s", name, err) + } tfsNew = append(tfsNew, &tfNew) } return tfsNew } +func hasCompositeTagFilters(tfs []*tagFilter, prefix []byte) bool { + var tagKey []byte + for _, tf := range tfs { + if !bytes.HasPrefix(tf.prefix, prefix) { + continue + } + suffix := tf.prefix[len(prefix):] + var err error + _, tagKey, err = unmarshalTagValue(tagKey[:0], suffix) + if err != nil { + logger.Panicf("BUG: cannot unmarshal tag key from suffix=%q: %s", suffix, err) + } + if len(tagKey) > 0 && tagKey[0] == compositeTagKeyPrefix { + return true + } + } + return false +} + func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) { kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) for i, tf := range tfs { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index eecf1b64f..c77d99e02 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -456,7 +456,7 @@ func TestIndexDBOpenClose(t *testing.T) { defer tsidCache.Stop() for i := 0; i < 5; i++ { - db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache) + db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, 0) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -479,7 +479,7 @@ func TestIndexDB(t *testing.T) { defer tsidCache.Stop() dbName := "test-index-db-serial" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -509,7 +509,7 @@ func TestIndexDB(t *testing.T) { // Re-open the db and verify it works as expected. db.MustClose() - db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) + db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -533,7 +533,7 @@ func TestIndexDB(t *testing.T) { defer tsidCache.Stop() dbName := "test-index-db-concurrent" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -1465,7 +1465,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { defer tsidCache.Stop() dbName := "test-index-db-ts-range" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 2b4bcf3b9..08aaf2463 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -50,7 +50,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { defer tsidCache.Stop() const dbName = "bench-index-db-add-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -115,7 +115,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { defer tsidCache.Stop() const dbName = "bench-head-posting-for-matchers" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -294,7 +294,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { defer tsidCache.Stop() const dbName = "bench-index-db-get-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index b52bd12c0..3aeccbd5b 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -108,6 +108,9 @@ type Storage struct { // which may be in the process of flushing to disk by concurrently running // snapshot process. snapshotLock sync.Mutex + + // The minimum timestamp when composite index search can be used. + minTimestampForCompositeIndex int64 } // OpenStorage opens storage on the given path with the given retentionMsecs. @@ -126,14 +129,9 @@ func OpenStorage(path string, retentionMsecs int64) (*Storage, error) { stop: make(chan struct{}), } - if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, fmt.Errorf("cannot create a directory for the storage at %q: %w", path, err) } - snapshotsPath := path + "/snapshots" - if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err) - } // Protect from concurrent opens. flockF, err := fs.CreateFlockFile(path) @@ -142,6 +140,12 @@ func OpenStorage(path string, retentionMsecs int64) (*Storage, error) { } s.flockF = flockF + // Pre-create snapshots directory if it is missing. + snapshotsPath := path + "/snapshots" + if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil { + return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err) + } + // Load caches. mem := memory.Allowed() s.tsidCache = s.mustLoadCache("MetricName->TSID", "metricName_tsid", mem/3) @@ -166,10 +170,11 @@ func OpenStorage(path string, retentionMsecs int64) (*Storage, error) { // Load indexdb idbPath := path + "/indexdb" idbSnapshotsPath := idbPath + "/snapshots" + isEmptyDB := !fs.IsPathExist(idbPath) if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil { return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err) } - idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache) + idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex) if err != nil { return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err) } @@ -185,6 +190,13 @@ func OpenStorage(path string, retentionMsecs int64) (*Storage, error) { } s.tb = tb + // Load metadata + metadataDir := path + "/metadata" + if err := fs.MkdirAllIfNotExist(metadataDir); err != nil { + return nil, fmt.Errorf("cannot create %q: %w", metadataDir, err) + } + s.minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex(metadataDir, isEmptyDB) + s.startCurrHourMetricIDsUpdater() s.startNextDayMetricIDsUpdater() s.startRetentionWatcher() @@ -235,7 +247,7 @@ func (s *Storage) CreateSnapshot() (string, error) { } fs.MustSyncPath(dstDataDir) - idbSnapshot := fmt.Sprintf("%s/indexdb/snapshots/%s", s.path, snapshotName) + idbSnapshot := fmt.Sprintf("%s/indexdb/snapshots/%s", srcDir, snapshotName) idb := s.idb() currSnapshot := idbSnapshot + "/" + idb.name if err := idb.tb.CreateSnapshotAt(currSnapshot); err != nil { @@ -253,8 +265,13 @@ func (s *Storage) CreateSnapshot() (string, error) { return "", fmt.Errorf("cannot create symlink from %q to %q: %w", idbSnapshot, dstIdbDir, err) } + srcMetadataDir := srcDir + "/metadata" + dstMetadataDir := dstDir + "/metadata" + if err := fs.CopyDirectory(srcMetadataDir, dstMetadataDir); err != nil { + return "", fmt.Errorf("cannot copy metadata: %s", err) + } + fs.MustSyncPath(dstDir) - fs.MustSyncPath(srcDir + "/snapshots") logger.Infof("created Storage snapshot for %q at %q in %.3f seconds", srcDir, dstDir, time.Since(startTime).Seconds()) return snapshotName, nil @@ -537,7 +554,7 @@ func (s *Storage) mustRotateIndexDB() { // Create new indexdb table. newTableName := nextIndexDBTableName() idbNewPath := s.path + "/indexdb/" + newTableName - idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache) + idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex) if err != nil { logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err) } @@ -758,6 +775,43 @@ func marshalUint64Set(dst []byte, m *uint64set.Set) []byte { return dst } +func mustGetMinTimestampForCompositeIndex(metadataDir string, isEmptyDB bool) int64 { + path := metadataDir + "/minTimestampForCompositeIndex" + minTimestamp, err := loadMinTimestampForCompositeIndex(path) + if err == nil { + return minTimestamp + } + logger.Errorf("cannot read minTimestampForCompositeIndex, so trying to re-create it; error: %s", err) + date := time.Now().UnixNano() / 1e6 / msecPerDay + if !isEmptyDB { + // The current and the next day can already contain non-composite indexes, + // so they cannot be queried with composite indexes. + date += 2 + } else { + date = 0 + } + minTimestamp = date * msecPerDay + dateBuf := encoding.MarshalInt64(nil, minTimestamp) + if err := os.RemoveAll(path); err != nil { + logger.Fatalf("cannot remove a file with minTimestampForCompositeIndex: %s", err) + } + if err := fs.WriteFileAtomically(path, dateBuf); err != nil { + logger.Fatalf("cannot store minTimestampForCompositeIndex: %s", err) + } + return minTimestamp +} + +func loadMinTimestampForCompositeIndex(path string) (int64, error) { + data, err := ioutil.ReadFile(path) + if err != nil { + return 0, err + } + if len(data) != 8 { + return 0, fmt.Errorf("unexpected length of %q; got %d bytes; want 8 bytes", path, len(data)) + } + return encoding.UnmarshalInt64(data), nil +} + func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache { path := s.cachePath + "/" + name logger.Infof("loading %s cache from %q...", info, path) @@ -1948,7 +2002,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) { s.tsidCache.Set(metricName, buf) } -func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (curr, prev *indexDB, err error) { +func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (curr, prev *indexDB, err error) { if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err) } @@ -2007,12 +2061,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w // Open the last two tables. currPath := path + "/" + tableNames[len(tableNames)-1] - curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache) + curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex) if err != nil { return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err) } prevPath := path + "/" + tableNames[len(tableNames)-2] - prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache) + prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex) if err != nil { curr.MustClose() return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err) diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index aa748a397..9a368f45c 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -14,15 +14,23 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) -// ConvertToCompositeTagFilters converts tfs to composite filters. +// convertToCompositeTagFilterss converts tfss to composite filters. // // This converts `foo{bar="baz",x=~"a.+"}` to `{foo=bar="baz",foo=x=~"a.+"} filter. -func ConvertToCompositeTagFilters(tfs []TagFilter) []TagFilter { +func convertToCompositeTagFilterss(tfss []*TagFilters) []*TagFilters { + tfssNew := make([]*TagFilters, len(tfss)) + for i, tfs := range tfss { + tfssNew[i] = convertToCompositeTagFilters(tfs) + } + return tfssNew +} + +func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters { // Search for metric name filter, which must be used for creating composite filters. var name []byte - for _, tf := range tfs { - if len(tf.Key) == 0 && !tf.IsNegative && !tf.IsRegexp { - name = tf.Value + for _, tf := range tfs.tfs { + if len(tf.key) == 0 && !tf.isNegative && !tf.isRegexp { + name = tf.value break } } @@ -30,33 +38,34 @@ func ConvertToCompositeTagFilters(tfs []TagFilter) []TagFilter { // There is no metric name filter, so composite filters cannot be created. return tfs } - tfsNew := make([]TagFilter, 0, len(tfs)) + tfsNew := make([]tagFilter, 0, len(tfs.tfs)) var compositeKey []byte compositeFilters := 0 - for _, tf := range tfs { - if len(tf.Key) == 0 { - if tf.IsNegative || tf.IsRegexp || string(tf.Value) != string(name) { + for _, tf := range tfs.tfs { + if len(tf.key) == 0 { + if tf.isNegative || tf.isRegexp || string(tf.value) != string(name) { tfsNew = append(tfsNew, tf) } continue } - if string(tf.Key) == "__graphite__" { + if string(tf.key) == "__graphite__" { tfsNew = append(tfsNew, tf) continue } - compositeKey = marshalCompositeTagKey(compositeKey[:0], name, tf.Key) - tfsNew = append(tfsNew, TagFilter{ - Key: append([]byte{}, compositeKey...), - Value: append([]byte{}, tf.Value...), - IsNegative: tf.IsNegative, - IsRegexp: tf.IsRegexp, - }) + compositeKey = marshalCompositeTagKey(compositeKey[:0], name, tf.key) + var tfNew tagFilter + if err := tfNew.Init(tfs.commonPrefix, compositeKey, tf.value, tf.isNegative, tf.isRegexp); err != nil { + logger.Panicf("BUG: unexpected error when creating composite tag filter for name=%q and key=%q: %s", name, tf.key, err) + } + tfsNew = append(tfsNew, tfNew) compositeFilters++ } if compositeFilters == 0 { return tfs } - return tfsNew + tfsCompiled := NewTagFilters() + tfsCompiled.tfs = tfsNew + return tfsCompiled } // TagFilters represents filters used for filtering tags. diff --git a/lib/storage/tag_filters_test.go b/lib/storage/tag_filters_test.go index 0b60f9970..b434baf04 100644 --- a/lib/storage/tag_filters_test.go +++ b/lib/storage/tag_filters_test.go @@ -9,13 +9,29 @@ import ( func TestConvertToCompositeTagFilters(t *testing.T) { f := func(tfs, resultExpected []TagFilter) { t.Helper() - result := ConvertToCompositeTagFilters(tfs) + tfsCompiled := NewTagFilters() + for _, tf := range tfs { + if err := tfsCompiled.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil { + t.Fatalf("cannot add tf=%s: %s", tf.String(), err) + } + } + resultCompiled := convertToCompositeTagFilters(tfsCompiled) + result := make([]TagFilter, len(resultCompiled.tfs)) + for i, tf := range resultCompiled.tfs { + result[i] = TagFilter{ + Key: tf.key, + Value: tf.value, + IsNegative: tf.isNegative, + IsRegexp: tf.isRegexp, + } + } if !reflect.DeepEqual(result, resultExpected) { t.Fatalf("unexpected result;\ngot\n%+v\nwant\n%+v", result, resultExpected) } } + // Empty filters - f(nil, nil) + f(nil, []TagFilter{}) // A single non-name filter f([]TagFilter{ @@ -167,7 +183,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) { }, }) - // A name filter with negative regexp non-name filter. + // A name filter with negative regexp non-name filter, which can be converted to non-regexp. f([]TagFilter{ { Key: nil, @@ -186,6 +202,29 @@ func TestConvertToCompositeTagFilters(t *testing.T) { Key: []byte("\xfe\x03barfoo"), Value: []byte("abc"), IsNegative: true, + IsRegexp: false, + }, + }) + + // A name filter with negative regexp non-name filter. + f([]TagFilter{ + { + Key: nil, + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, + { + Key: []byte("foo"), + Value: []byte("abc.+"), + IsNegative: true, + IsRegexp: true, + }, + }, []TagFilter{ + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc.+"), + IsNegative: true, IsRegexp: true, }, }) @@ -244,7 +283,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) { Key: []byte("\xfe\x03barfoo"), Value: []byte("abc"), IsNegative: true, - IsRegexp: true, + IsRegexp: false, }, { Key: []byte("__graphite__"), @@ -254,7 +293,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) { }, }) - // Regexp name filter with non-name filter. + // Regexp name filter, which can be converted to non-regexp, with non-name filter. f([]TagFilter{ { Key: nil, @@ -269,9 +308,19 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsRegexp: false, }, }, []TagFilter{ + { + Key: []byte("\xfe\x03barfoo"), + Value: []byte("abc"), + IsNegative: true, + IsRegexp: false, + }, + }) + + // Regexp name filter with non-name filter. + f([]TagFilter{ { Key: nil, - Value: []byte("bar"), + Value: []byte("bar.+"), IsNegative: false, IsRegexp: true, }, @@ -281,6 +330,42 @@ func TestConvertToCompositeTagFilters(t *testing.T) { IsNegative: true, IsRegexp: false, }, + }, []TagFilter{ + { + Key: nil, + Value: []byte("bar.+"), + IsNegative: false, + IsRegexp: true, + }, + { + Key: []byte("foo"), + Value: []byte("abc"), + IsNegative: true, + IsRegexp: false, + }, + }) + + // Regexp non-name filter, which matches anything. + f([]TagFilter{ + { + Key: nil, + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, + { + Key: []byte("foo"), + Value: []byte(".*"), + IsNegative: false, + IsRegexp: true, + }, + }, []TagFilter{ + { + Key: nil, + Value: []byte("bar"), + IsNegative: false, + IsRegexp: false, + }, }) }