mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/storage: disable composite index usage when querying old data
This commit is contained in:
parent
fcb7655d1e
commit
553016ea99
@ -25,8 +25,6 @@ var (
|
|||||||
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned from /api/v1/label/<label_name>/values")
|
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned from /api/v1/label/<label_name>/values")
|
||||||
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
||||||
maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series each search can scan")
|
maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series each search can scan")
|
||||||
disableCompositeTagFilters = flag.Bool("search.disableCompositeTagFilters", false, "Whether to disable composite tag filters. This option is useful "+
|
|
||||||
"for querying old data, which is created before v1.54.0 release. Note that disabled composite tag filters may reduce query performance")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Result is a single timeseries result.
|
// Result is a single timeseries result.
|
||||||
@ -931,9 +929,6 @@ type blockRef struct {
|
|||||||
func setupTfss(tr storage.TimeRange, tagFilterss [][]storage.TagFilter, deadline searchutils.Deadline) ([]*storage.TagFilters, error) {
|
func setupTfss(tr storage.TimeRange, tagFilterss [][]storage.TagFilter, deadline searchutils.Deadline) ([]*storage.TagFilters, error) {
|
||||||
tfss := make([]*storage.TagFilters, 0, len(tagFilterss))
|
tfss := make([]*storage.TagFilters, 0, len(tagFilterss))
|
||||||
for _, tagFilters := range tagFilterss {
|
for _, tagFilters := range tagFilterss {
|
||||||
if !*disableCompositeTagFilters {
|
|
||||||
tagFilters = storage.ConvertToCompositeTagFilters(tagFilters)
|
|
||||||
}
|
|
||||||
tfs := storage.NewTagFilters()
|
tfs := storage.NewTagFilters()
|
||||||
for i := range tagFilters {
|
for i := range tagFilters {
|
||||||
tf := &tagFilters[i]
|
tf := &tagFilters[i]
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# tip
|
# tip
|
||||||
|
|
||||||
* FEATURE: optimize searching for time series by label filters where individual filters match big number of time series (more than a million). For example, the query `up{job="foobar"}` should work faster if `{job="foobar"}` matches a million of time series, while `up{job="foobar"}` matches much lower number of time series. The optimization can be disabled by passing `-search.disableCompositeTagFilters` command-line flag to VictoriaMetrics.
|
* FEATURE: optimize searching for time series by label filters where individual filters match big number of time series (more than a million). For example, the query `up{job="foobar"}` should work faster if `{job="foobar"}` matches a million of time series, while `up{job="foobar"}` matches much lower number of time series.
|
||||||
* FEATURE: single-node VictoriaMetrics now accepts requests to handlers with `/prometheus` and `/graphite` prefixes such as `/prometheus/api/v1/query`. This improves compatibility with [handlers from VictoriaMetrics cluster](https://victoriametrics.github.io/Cluster-VictoriaMetrics.html#url-format).
|
* FEATURE: single-node VictoriaMetrics now accepts requests to handlers with `/prometheus` and `/graphite` prefixes such as `/prometheus/api/v1/query`. This improves compatibility with [handlers from VictoriaMetrics cluster](https://victoriametrics.github.io/Cluster-VictoriaMetrics.html#url-format).
|
||||||
* FEATURE: expose `process_open_fds` and `process_max_fds` metrics. These metrics can be used for alerting when `process_open_fds` reaches `process_max_fds`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/402 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1037
|
* FEATURE: expose `process_open_fds` and `process_max_fds` metrics. These metrics can be used for alerting when `process_open_fds` reaches `process_max_fds`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/402 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1037
|
||||||
* FEATURE: vmalert: add `-datasource.appendTypePrefix` command-line option for querying both Prometheus and Graphite datasource in cluster version of VictoriaMetrics. See [these docs](https://victoriametrics.github.io/vmalert.html#graphite) for details.
|
* FEATURE: vmalert: add `-datasource.appendTypePrefix` command-line option for querying both Prometheus and Graphite datasource in cluster version of VictoriaMetrics. See [these docs](https://victoriametrics.github.io/vmalert.html#graphite) for details.
|
||||||
|
43
lib/fs/fs.go
43
lib/fs/fs.go
@ -3,6 +3,7 @@ package fs
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
@ -257,6 +258,48 @@ func SymlinkRelative(srcPath, dstPath string) error {
|
|||||||
return os.Symlink(srcPathRel, dstPath)
|
return os.Symlink(srcPathRel, dstPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CopyDirectory copies all the files in srcPath to dstPath.
|
||||||
|
func CopyDirectory(srcPath, dstPath string) error {
|
||||||
|
fis, err := ioutil.ReadDir(srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := MkdirAllIfNotExist(dstPath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, fi := range fis {
|
||||||
|
if !fi.Mode().IsRegular() {
|
||||||
|
// Skip non-files
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
src := filepath.Join(srcPath, fi.Name())
|
||||||
|
dst := filepath.Join(dstPath, fi.Name())
|
||||||
|
if err := copyFile(src, dst); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MustSyncPath(dstPath)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyFile(srcPath, dstPath string) error {
|
||||||
|
src, err := os.Open(srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer MustClose(src)
|
||||||
|
dst, err := os.Create(dstPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer MustClose(dst)
|
||||||
|
if _, err := io.Copy(dst, src); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
MustSyncPath(dstPath)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ReadFullData reads len(data) bytes from r.
|
// ReadFullData reads len(data) bytes from r.
|
||||||
func ReadFullData(r io.Reader, data []byte) error {
|
func ReadFullData(r io.Reader, data []byte) error {
|
||||||
n, err := io.ReadFull(r, data)
|
n, err := io.ReadFull(r, data)
|
||||||
|
@ -117,10 +117,13 @@ type indexDB struct {
|
|||||||
// metricIDs, since it usually requires 1 bit per deleted metricID.
|
// metricIDs, since it usually requires 1 bit per deleted metricID.
|
||||||
deletedMetricIDs atomic.Value
|
deletedMetricIDs atomic.Value
|
||||||
deletedMetricIDsUpdateLock sync.Mutex
|
deletedMetricIDsUpdateLock sync.Mutex
|
||||||
|
|
||||||
|
// The minimum timestamp when queries with composite index can be used.
|
||||||
|
minTimestampForCompositeIndex int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// openIndexDB opens index db from the given path with the given caches.
|
// openIndexDB opens index db from the given path with the given caches.
|
||||||
func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (*indexDB, error) {
|
func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (*indexDB, error) {
|
||||||
if metricIDCache == nil {
|
if metricIDCache == nil {
|
||||||
logger.Panicf("BUG: metricIDCache must be non-nil")
|
logger.Panicf("BUG: metricIDCache must be non-nil")
|
||||||
}
|
}
|
||||||
@ -152,6 +155,8 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
|
|||||||
tsidCache: tsidCache,
|
tsidCache: tsidCache,
|
||||||
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
|
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
|
||||||
metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
|
metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
|
||||||
|
|
||||||
|
minTimestampForCompositeIndex: minTimestampForCompositeIndex,
|
||||||
}
|
}
|
||||||
|
|
||||||
is := db.getIndexSearch(noDeadline)
|
is := db.getIndexSearch(noDeadline)
|
||||||
@ -1628,6 +1633,9 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
|
|||||||
if len(tfss) == 0 {
|
if len(tfss) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
if tr.MinTimestamp >= db.minTimestampForCompositeIndex {
|
||||||
|
tfss = convertToCompositeTagFilterss(tfss)
|
||||||
|
}
|
||||||
|
|
||||||
tfKeyBuf := tagFiltersKeyBufPool.Get()
|
tfKeyBuf := tagFiltersKeyBufPool.Get()
|
||||||
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
|
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
|
||||||
@ -1886,8 +1894,8 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
|
|||||||
sortedMetricIDs := srcMetricIDs.AppendTo(nil)
|
sortedMetricIDs := srcMetricIDs.AppendTo(nil)
|
||||||
|
|
||||||
kb := &is.kb
|
kb := &is.kb
|
||||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
||||||
tfs = fromCompositeTagFilters(tfs, kb.B)
|
tfs = removeCompositeTagFilters(tfs, kb.B)
|
||||||
|
|
||||||
metricName := kbPool.Get()
|
metricName := kbPool.Get()
|
||||||
defer kbPool.Put(metricName)
|
defer kbPool.Put(metricName)
|
||||||
@ -2090,17 +2098,21 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
|
|||||||
return nil, metricIDs, nil
|
return nil, metricIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func fromCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter {
|
func removeCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter {
|
||||||
tfsNew := make([]*tagFilter, 0, len(tfs))
|
if !hasCompositeTagFilters(tfs, prefix) {
|
||||||
|
return tfs
|
||||||
|
}
|
||||||
|
var tagKey []byte
|
||||||
|
var name []byte
|
||||||
|
tfsNew := make([]*tagFilter, 0, len(tfs)+1)
|
||||||
for _, tf := range tfs {
|
for _, tf := range tfs {
|
||||||
if !bytes.HasPrefix(tf.prefix, prefix) {
|
if !bytes.HasPrefix(tf.prefix, prefix) {
|
||||||
tfsNew = append(tfsNew, tf)
|
tfsNew = append(tfsNew, tf)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
suffix := tf.prefix[len(prefix):]
|
suffix := tf.prefix[len(prefix):]
|
||||||
var tagKey, tail []byte
|
|
||||||
var err error
|
var err error
|
||||||
tail, tagKey, err = unmarshalTagValue(tagKey[:0], suffix)
|
_, tagKey, err = unmarshalTagValue(tagKey[:0], suffix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("BUG: cannot unmarshal tag key from suffix=%q: %s", suffix, err)
|
logger.Panicf("BUG: cannot unmarshal tag key from suffix=%q: %s", suffix, err)
|
||||||
}
|
}
|
||||||
@ -2114,20 +2126,49 @@ func fromCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("BUG: cannot unmarshal nameLen from tagKey %q: %s", tagKey, err)
|
logger.Panicf("BUG: cannot unmarshal nameLen from tagKey %q: %s", tagKey, err)
|
||||||
}
|
}
|
||||||
|
if nameLen == 0 {
|
||||||
|
logger.Panicf("BUG: nameLen must be greater than 0")
|
||||||
|
}
|
||||||
if uint64(len(tagKey)) < nameLen {
|
if uint64(len(tagKey)) < nameLen {
|
||||||
logger.Panicf("BUG: expecting at %d bytes for name in tagKey=%q; got %d bytes", nameLen, tagKey, len(tagKey))
|
logger.Panicf("BUG: expecting at %d bytes for name in tagKey=%q; got %d bytes", nameLen, tagKey, len(tagKey))
|
||||||
}
|
}
|
||||||
|
name = append(name[:0], tagKey[:nameLen]...)
|
||||||
tagKey = tagKey[nameLen:]
|
tagKey = tagKey[nameLen:]
|
||||||
tfNew := *tf
|
var tfNew tagFilter
|
||||||
tfNew.key = append(tfNew.key[:0], tagKey...)
|
if err := tfNew.Init(prefix, tagKey, tf.value, tf.isNegative, tf.isRegexp); err != nil {
|
||||||
tfNew.prefix = append(tfNew.prefix[:0], prefix...)
|
logger.Panicf("BUG: cannot initialize {%s=%q} filter: %s", tagKey, tf.value, err)
|
||||||
tfNew.prefix = marshalTagValue(tfNew.prefix, tagKey)
|
}
|
||||||
tfNew.prefix = append(tfNew.prefix, tail...)
|
tfsNew = append(tfsNew, &tfNew)
|
||||||
|
}
|
||||||
|
if len(name) > 0 {
|
||||||
|
var tfNew tagFilter
|
||||||
|
if err := tfNew.Init(prefix, nil, name, false, false); err != nil {
|
||||||
|
logger.Panicf("BUG: unexpected error when initializing {__name__=%q} filter: %s", name, err)
|
||||||
|
}
|
||||||
tfsNew = append(tfsNew, &tfNew)
|
tfsNew = append(tfsNew, &tfNew)
|
||||||
}
|
}
|
||||||
return tfsNew
|
return tfsNew
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func hasCompositeTagFilters(tfs []*tagFilter, prefix []byte) bool {
|
||||||
|
var tagKey []byte
|
||||||
|
for _, tf := range tfs {
|
||||||
|
if !bytes.HasPrefix(tf.prefix, prefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
suffix := tf.prefix[len(prefix):]
|
||||||
|
var err error
|
||||||
|
_, tagKey, err = unmarshalTagValue(tagKey[:0], suffix)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal tag key from suffix=%q: %s", suffix, err)
|
||||||
|
}
|
||||||
|
if len(tagKey) > 0 && tagKey[0] == compositeTagKeyPrefix {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
|
func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
|
||||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
||||||
for i, tf := range tfs {
|
for i, tf := range tfs {
|
||||||
|
@ -456,7 +456,7 @@ func TestIndexDBOpenClose(t *testing.T) {
|
|||||||
defer tsidCache.Stop()
|
defer tsidCache.Stop()
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache)
|
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot open indexDB: %s", err)
|
t.Fatalf("cannot open indexDB: %s", err)
|
||||||
}
|
}
|
||||||
@ -479,7 +479,7 @@ func TestIndexDB(t *testing.T) {
|
|||||||
defer tsidCache.Stop()
|
defer tsidCache.Stop()
|
||||||
|
|
||||||
dbName := "test-index-db-serial"
|
dbName := "test-index-db-serial"
|
||||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot open indexDB: %s", err)
|
t.Fatalf("cannot open indexDB: %s", err)
|
||||||
}
|
}
|
||||||
@ -509,7 +509,7 @@ func TestIndexDB(t *testing.T) {
|
|||||||
|
|
||||||
// Re-open the db and verify it works as expected.
|
// Re-open the db and verify it works as expected.
|
||||||
db.MustClose()
|
db.MustClose()
|
||||||
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot open indexDB: %s", err)
|
t.Fatalf("cannot open indexDB: %s", err)
|
||||||
}
|
}
|
||||||
@ -533,7 +533,7 @@ func TestIndexDB(t *testing.T) {
|
|||||||
defer tsidCache.Stop()
|
defer tsidCache.Stop()
|
||||||
|
|
||||||
dbName := "test-index-db-concurrent"
|
dbName := "test-index-db-concurrent"
|
||||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot open indexDB: %s", err)
|
t.Fatalf("cannot open indexDB: %s", err)
|
||||||
}
|
}
|
||||||
@ -1465,7 +1465,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||||||
defer tsidCache.Stop()
|
defer tsidCache.Stop()
|
||||||
|
|
||||||
dbName := "test-index-db-ts-range"
|
dbName := "test-index-db-ts-range"
|
||||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot open indexDB: %s", err)
|
t.Fatalf("cannot open indexDB: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
|||||||
defer tsidCache.Stop()
|
defer tsidCache.Stop()
|
||||||
|
|
||||||
const dbName = "bench-index-db-add-tsids"
|
const dbName = "bench-index-db-add-tsids"
|
||||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("cannot open indexDB: %s", err)
|
b.Fatalf("cannot open indexDB: %s", err)
|
||||||
}
|
}
|
||||||
@ -115,7 +115,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
|||||||
defer tsidCache.Stop()
|
defer tsidCache.Stop()
|
||||||
|
|
||||||
const dbName = "bench-head-posting-for-matchers"
|
const dbName = "bench-head-posting-for-matchers"
|
||||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("cannot open indexDB: %s", err)
|
b.Fatalf("cannot open indexDB: %s", err)
|
||||||
}
|
}
|
||||||
@ -294,7 +294,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
|||||||
defer tsidCache.Stop()
|
defer tsidCache.Stop()
|
||||||
|
|
||||||
const dbName = "bench-index-db-get-tsids"
|
const dbName = "bench-index-db-get-tsids"
|
||||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("cannot open indexDB: %s", err)
|
b.Fatalf("cannot open indexDB: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -108,6 +108,9 @@ type Storage struct {
|
|||||||
// which may be in the process of flushing to disk by concurrently running
|
// which may be in the process of flushing to disk by concurrently running
|
||||||
// snapshot process.
|
// snapshot process.
|
||||||
snapshotLock sync.Mutex
|
snapshotLock sync.Mutex
|
||||||
|
|
||||||
|
// The minimum timestamp when composite index search can be used.
|
||||||
|
minTimestampForCompositeIndex int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenStorage opens storage on the given path with the given retentionMsecs.
|
// OpenStorage opens storage on the given path with the given retentionMsecs.
|
||||||
@ -126,14 +129,9 @@ func OpenStorage(path string, retentionMsecs int64) (*Storage, error) {
|
|||||||
|
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||||
return nil, fmt.Errorf("cannot create a directory for the storage at %q: %w", path, err)
|
return nil, fmt.Errorf("cannot create a directory for the storage at %q: %w", path, err)
|
||||||
}
|
}
|
||||||
snapshotsPath := path + "/snapshots"
|
|
||||||
if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Protect from concurrent opens.
|
// Protect from concurrent opens.
|
||||||
flockF, err := fs.CreateFlockFile(path)
|
flockF, err := fs.CreateFlockFile(path)
|
||||||
@ -142,6 +140,12 @@ func OpenStorage(path string, retentionMsecs int64) (*Storage, error) {
|
|||||||
}
|
}
|
||||||
s.flockF = flockF
|
s.flockF = flockF
|
||||||
|
|
||||||
|
// Pre-create snapshots directory if it is missing.
|
||||||
|
snapshotsPath := path + "/snapshots"
|
||||||
|
if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Load caches.
|
// Load caches.
|
||||||
mem := memory.Allowed()
|
mem := memory.Allowed()
|
||||||
s.tsidCache = s.mustLoadCache("MetricName->TSID", "metricName_tsid", mem/3)
|
s.tsidCache = s.mustLoadCache("MetricName->TSID", "metricName_tsid", mem/3)
|
||||||
@ -166,10 +170,11 @@ func OpenStorage(path string, retentionMsecs int64) (*Storage, error) {
|
|||||||
// Load indexdb
|
// Load indexdb
|
||||||
idbPath := path + "/indexdb"
|
idbPath := path + "/indexdb"
|
||||||
idbSnapshotsPath := idbPath + "/snapshots"
|
idbSnapshotsPath := idbPath + "/snapshots"
|
||||||
|
isEmptyDB := !fs.IsPathExist(idbPath)
|
||||||
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
|
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
|
||||||
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
|
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
|
||||||
}
|
}
|
||||||
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache)
|
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err)
|
return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err)
|
||||||
}
|
}
|
||||||
@ -185,6 +190,13 @@ func OpenStorage(path string, retentionMsecs int64) (*Storage, error) {
|
|||||||
}
|
}
|
||||||
s.tb = tb
|
s.tb = tb
|
||||||
|
|
||||||
|
// Load metadata
|
||||||
|
metadataDir := path + "/metadata"
|
||||||
|
if err := fs.MkdirAllIfNotExist(metadataDir); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot create %q: %w", metadataDir, err)
|
||||||
|
}
|
||||||
|
s.minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex(metadataDir, isEmptyDB)
|
||||||
|
|
||||||
s.startCurrHourMetricIDsUpdater()
|
s.startCurrHourMetricIDsUpdater()
|
||||||
s.startNextDayMetricIDsUpdater()
|
s.startNextDayMetricIDsUpdater()
|
||||||
s.startRetentionWatcher()
|
s.startRetentionWatcher()
|
||||||
@ -235,7 +247,7 @@ func (s *Storage) CreateSnapshot() (string, error) {
|
|||||||
}
|
}
|
||||||
fs.MustSyncPath(dstDataDir)
|
fs.MustSyncPath(dstDataDir)
|
||||||
|
|
||||||
idbSnapshot := fmt.Sprintf("%s/indexdb/snapshots/%s", s.path, snapshotName)
|
idbSnapshot := fmt.Sprintf("%s/indexdb/snapshots/%s", srcDir, snapshotName)
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
currSnapshot := idbSnapshot + "/" + idb.name
|
currSnapshot := idbSnapshot + "/" + idb.name
|
||||||
if err := idb.tb.CreateSnapshotAt(currSnapshot); err != nil {
|
if err := idb.tb.CreateSnapshotAt(currSnapshot); err != nil {
|
||||||
@ -253,8 +265,13 @@ func (s *Storage) CreateSnapshot() (string, error) {
|
|||||||
return "", fmt.Errorf("cannot create symlink from %q to %q: %w", idbSnapshot, dstIdbDir, err)
|
return "", fmt.Errorf("cannot create symlink from %q to %q: %w", idbSnapshot, dstIdbDir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
srcMetadataDir := srcDir + "/metadata"
|
||||||
|
dstMetadataDir := dstDir + "/metadata"
|
||||||
|
if err := fs.CopyDirectory(srcMetadataDir, dstMetadataDir); err != nil {
|
||||||
|
return "", fmt.Errorf("cannot copy metadata: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
fs.MustSyncPath(dstDir)
|
fs.MustSyncPath(dstDir)
|
||||||
fs.MustSyncPath(srcDir + "/snapshots")
|
|
||||||
|
|
||||||
logger.Infof("created Storage snapshot for %q at %q in %.3f seconds", srcDir, dstDir, time.Since(startTime).Seconds())
|
logger.Infof("created Storage snapshot for %q at %q in %.3f seconds", srcDir, dstDir, time.Since(startTime).Seconds())
|
||||||
return snapshotName, nil
|
return snapshotName, nil
|
||||||
@ -537,7 +554,7 @@ func (s *Storage) mustRotateIndexDB() {
|
|||||||
// Create new indexdb table.
|
// Create new indexdb table.
|
||||||
newTableName := nextIndexDBTableName()
|
newTableName := nextIndexDBTableName()
|
||||||
idbNewPath := s.path + "/indexdb/" + newTableName
|
idbNewPath := s.path + "/indexdb/" + newTableName
|
||||||
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache)
|
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
|
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
|
||||||
}
|
}
|
||||||
@ -758,6 +775,43 @@ func marshalUint64Set(dst []byte, m *uint64set.Set) []byte {
|
|||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustGetMinTimestampForCompositeIndex(metadataDir string, isEmptyDB bool) int64 {
|
||||||
|
path := metadataDir + "/minTimestampForCompositeIndex"
|
||||||
|
minTimestamp, err := loadMinTimestampForCompositeIndex(path)
|
||||||
|
if err == nil {
|
||||||
|
return minTimestamp
|
||||||
|
}
|
||||||
|
logger.Errorf("cannot read minTimestampForCompositeIndex, so trying to re-create it; error: %s", err)
|
||||||
|
date := time.Now().UnixNano() / 1e6 / msecPerDay
|
||||||
|
if !isEmptyDB {
|
||||||
|
// The current and the next day can already contain non-composite indexes,
|
||||||
|
// so they cannot be queried with composite indexes.
|
||||||
|
date += 2
|
||||||
|
} else {
|
||||||
|
date = 0
|
||||||
|
}
|
||||||
|
minTimestamp = date * msecPerDay
|
||||||
|
dateBuf := encoding.MarshalInt64(nil, minTimestamp)
|
||||||
|
if err := os.RemoveAll(path); err != nil {
|
||||||
|
logger.Fatalf("cannot remove a file with minTimestampForCompositeIndex: %s", err)
|
||||||
|
}
|
||||||
|
if err := fs.WriteFileAtomically(path, dateBuf); err != nil {
|
||||||
|
logger.Fatalf("cannot store minTimestampForCompositeIndex: %s", err)
|
||||||
|
}
|
||||||
|
return minTimestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadMinTimestampForCompositeIndex(path string) (int64, error) {
|
||||||
|
data, err := ioutil.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if len(data) != 8 {
|
||||||
|
return 0, fmt.Errorf("unexpected length of %q; got %d bytes; want 8 bytes", path, len(data))
|
||||||
|
}
|
||||||
|
return encoding.UnmarshalInt64(data), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache {
|
func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache {
|
||||||
path := s.cachePath + "/" + name
|
path := s.cachePath + "/" + name
|
||||||
logger.Infof("loading %s cache from %q...", info, path)
|
logger.Infof("loading %s cache from %q...", info, path)
|
||||||
@ -1948,7 +2002,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) {
|
|||||||
s.tsidCache.Set(metricName, buf)
|
s.tsidCache.Set(metricName, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (curr, prev *indexDB, err error) {
|
func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (curr, prev *indexDB, err error) {
|
||||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||||
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
|
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||||
}
|
}
|
||||||
@ -2007,12 +2061,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w
|
|||||||
// Open the last two tables.
|
// Open the last two tables.
|
||||||
currPath := path + "/" + tableNames[len(tableNames)-1]
|
currPath := path + "/" + tableNames[len(tableNames)-1]
|
||||||
|
|
||||||
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache)
|
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
|
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
|
||||||
}
|
}
|
||||||
prevPath := path + "/" + tableNames[len(tableNames)-2]
|
prevPath := path + "/" + tableNames[len(tableNames)-2]
|
||||||
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache)
|
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
curr.MustClose()
|
curr.MustClose()
|
||||||
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
|
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
|
||||||
|
@ -14,15 +14,23 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConvertToCompositeTagFilters converts tfs to composite filters.
|
// convertToCompositeTagFilterss converts tfss to composite filters.
|
||||||
//
|
//
|
||||||
// This converts `foo{bar="baz",x=~"a.+"}` to `{foo=bar="baz",foo=x=~"a.+"} filter.
|
// This converts `foo{bar="baz",x=~"a.+"}` to `{foo=bar="baz",foo=x=~"a.+"} filter.
|
||||||
func ConvertToCompositeTagFilters(tfs []TagFilter) []TagFilter {
|
func convertToCompositeTagFilterss(tfss []*TagFilters) []*TagFilters {
|
||||||
|
tfssNew := make([]*TagFilters, len(tfss))
|
||||||
|
for i, tfs := range tfss {
|
||||||
|
tfssNew[i] = convertToCompositeTagFilters(tfs)
|
||||||
|
}
|
||||||
|
return tfssNew
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters {
|
||||||
// Search for metric name filter, which must be used for creating composite filters.
|
// Search for metric name filter, which must be used for creating composite filters.
|
||||||
var name []byte
|
var name []byte
|
||||||
for _, tf := range tfs {
|
for _, tf := range tfs.tfs {
|
||||||
if len(tf.Key) == 0 && !tf.IsNegative && !tf.IsRegexp {
|
if len(tf.key) == 0 && !tf.isNegative && !tf.isRegexp {
|
||||||
name = tf.Value
|
name = tf.value
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -30,33 +38,34 @@ func ConvertToCompositeTagFilters(tfs []TagFilter) []TagFilter {
|
|||||||
// There is no metric name filter, so composite filters cannot be created.
|
// There is no metric name filter, so composite filters cannot be created.
|
||||||
return tfs
|
return tfs
|
||||||
}
|
}
|
||||||
tfsNew := make([]TagFilter, 0, len(tfs))
|
tfsNew := make([]tagFilter, 0, len(tfs.tfs))
|
||||||
var compositeKey []byte
|
var compositeKey []byte
|
||||||
compositeFilters := 0
|
compositeFilters := 0
|
||||||
for _, tf := range tfs {
|
for _, tf := range tfs.tfs {
|
||||||
if len(tf.Key) == 0 {
|
if len(tf.key) == 0 {
|
||||||
if tf.IsNegative || tf.IsRegexp || string(tf.Value) != string(name) {
|
if tf.isNegative || tf.isRegexp || string(tf.value) != string(name) {
|
||||||
tfsNew = append(tfsNew, tf)
|
tfsNew = append(tfsNew, tf)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if string(tf.Key) == "__graphite__" {
|
if string(tf.key) == "__graphite__" {
|
||||||
tfsNew = append(tfsNew, tf)
|
tfsNew = append(tfsNew, tf)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
compositeKey = marshalCompositeTagKey(compositeKey[:0], name, tf.Key)
|
compositeKey = marshalCompositeTagKey(compositeKey[:0], name, tf.key)
|
||||||
tfsNew = append(tfsNew, TagFilter{
|
var tfNew tagFilter
|
||||||
Key: append([]byte{}, compositeKey...),
|
if err := tfNew.Init(tfs.commonPrefix, compositeKey, tf.value, tf.isNegative, tf.isRegexp); err != nil {
|
||||||
Value: append([]byte{}, tf.Value...),
|
logger.Panicf("BUG: unexpected error when creating composite tag filter for name=%q and key=%q: %s", name, tf.key, err)
|
||||||
IsNegative: tf.IsNegative,
|
}
|
||||||
IsRegexp: tf.IsRegexp,
|
tfsNew = append(tfsNew, tfNew)
|
||||||
})
|
|
||||||
compositeFilters++
|
compositeFilters++
|
||||||
}
|
}
|
||||||
if compositeFilters == 0 {
|
if compositeFilters == 0 {
|
||||||
return tfs
|
return tfs
|
||||||
}
|
}
|
||||||
return tfsNew
|
tfsCompiled := NewTagFilters()
|
||||||
|
tfsCompiled.tfs = tfsNew
|
||||||
|
return tfsCompiled
|
||||||
}
|
}
|
||||||
|
|
||||||
// TagFilters represents filters used for filtering tags.
|
// TagFilters represents filters used for filtering tags.
|
||||||
|
@ -9,13 +9,29 @@ import (
|
|||||||
func TestConvertToCompositeTagFilters(t *testing.T) {
|
func TestConvertToCompositeTagFilters(t *testing.T) {
|
||||||
f := func(tfs, resultExpected []TagFilter) {
|
f := func(tfs, resultExpected []TagFilter) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
result := ConvertToCompositeTagFilters(tfs)
|
tfsCompiled := NewTagFilters()
|
||||||
|
for _, tf := range tfs {
|
||||||
|
if err := tfsCompiled.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
|
||||||
|
t.Fatalf("cannot add tf=%s: %s", tf.String(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resultCompiled := convertToCompositeTagFilters(tfsCompiled)
|
||||||
|
result := make([]TagFilter, len(resultCompiled.tfs))
|
||||||
|
for i, tf := range resultCompiled.tfs {
|
||||||
|
result[i] = TagFilter{
|
||||||
|
Key: tf.key,
|
||||||
|
Value: tf.value,
|
||||||
|
IsNegative: tf.isNegative,
|
||||||
|
IsRegexp: tf.isRegexp,
|
||||||
|
}
|
||||||
|
}
|
||||||
if !reflect.DeepEqual(result, resultExpected) {
|
if !reflect.DeepEqual(result, resultExpected) {
|
||||||
t.Fatalf("unexpected result;\ngot\n%+v\nwant\n%+v", result, resultExpected)
|
t.Fatalf("unexpected result;\ngot\n%+v\nwant\n%+v", result, resultExpected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Empty filters
|
// Empty filters
|
||||||
f(nil, nil)
|
f(nil, []TagFilter{})
|
||||||
|
|
||||||
// A single non-name filter
|
// A single non-name filter
|
||||||
f([]TagFilter{
|
f([]TagFilter{
|
||||||
@ -167,7 +183,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
// A name filter with negative regexp non-name filter.
|
// A name filter with negative regexp non-name filter, which can be converted to non-regexp.
|
||||||
f([]TagFilter{
|
f([]TagFilter{
|
||||||
{
|
{
|
||||||
Key: nil,
|
Key: nil,
|
||||||
@ -186,6 +202,29 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
|
|||||||
Key: []byte("\xfe\x03barfoo"),
|
Key: []byte("\xfe\x03barfoo"),
|
||||||
Value: []byte("abc"),
|
Value: []byte("abc"),
|
||||||
IsNegative: true,
|
IsNegative: true,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// A name filter with negative regexp non-name filter.
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("abc.+"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: true,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: []byte("\xfe\x03barfoo"),
|
||||||
|
Value: []byte("abc.+"),
|
||||||
|
IsNegative: true,
|
||||||
IsRegexp: true,
|
IsRegexp: true,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -244,7 +283,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
|
|||||||
Key: []byte("\xfe\x03barfoo"),
|
Key: []byte("\xfe\x03barfoo"),
|
||||||
Value: []byte("abc"),
|
Value: []byte("abc"),
|
||||||
IsNegative: true,
|
IsNegative: true,
|
||||||
IsRegexp: true,
|
IsRegexp: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Key: []byte("__graphite__"),
|
Key: []byte("__graphite__"),
|
||||||
@ -254,7 +293,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
// Regexp name filter with non-name filter.
|
// Regexp name filter, which can be converted to non-regexp, with non-name filter.
|
||||||
f([]TagFilter{
|
f([]TagFilter{
|
||||||
{
|
{
|
||||||
Key: nil,
|
Key: nil,
|
||||||
@ -269,9 +308,19 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
|
|||||||
IsRegexp: false,
|
IsRegexp: false,
|
||||||
},
|
},
|
||||||
}, []TagFilter{
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: []byte("\xfe\x03barfoo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Regexp name filter with non-name filter.
|
||||||
|
f([]TagFilter{
|
||||||
{
|
{
|
||||||
Key: nil,
|
Key: nil,
|
||||||
Value: []byte("bar"),
|
Value: []byte("bar.+"),
|
||||||
IsNegative: false,
|
IsNegative: false,
|
||||||
IsRegexp: true,
|
IsRegexp: true,
|
||||||
},
|
},
|
||||||
@ -281,6 +330,42 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
|
|||||||
IsNegative: true,
|
IsNegative: true,
|
||||||
IsRegexp: false,
|
IsRegexp: false,
|
||||||
},
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar.+"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Regexp non-name filter, which matches anything.
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte(".*"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: true,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user