lib/storage: skip adaptive searching for tag filter matching the minimum number of metrics if the identical previous search didn't found such filter

This should improve speed for searching metrics among high number of time series
with high churn rate like in big Kubernetes clusters with frequent deployments.
This commit is contained in:
Aliaksandr Valialkin 2019-06-10 14:02:44 +03:00
parent 27e50e86f4
commit d54f5fec0b
3 changed files with 57 additions and 4 deletions

View File

@ -327,6 +327,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSize())
})
@ -346,6 +349,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagCacheBytesSize)
})
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheBytesSize)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheRequests)
@ -374,6 +380,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheRequests())
})
@ -405,6 +414,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheMisses())
})

View File

@ -63,6 +63,10 @@ type indexDB struct {
tagCachePrefixes map[accountProjectKey]uint64
tagCachePrefixesLock sync.RWMutex
// Cache holding useless TagFilters entries, which have no tag filters
// matching low number of metrics.
uselessTagFiltersCache *fastcache.Cache
indexSearchPool sync.Pool
// An inmemory map[uint64]struct{} of deleted metricIDs.
@ -129,6 +133,8 @@ func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache, c
tagCachePrefixes: make(map[accountProjectKey]uint64),
uselessTagFiltersCache: fastcache.New(mem / 128),
currHourMetricIDs: currHourMetricIDs,
prevHourMetricIDs: prevHourMetricIDs,
}
@ -151,6 +157,11 @@ type IndexDBMetrics struct {
TagCacheRequests uint64
TagCacheMisses uint64
UselessTagFiltersCacheSize uint64
UselessTagFiltersCacheBytesSize uint64
UselessTagFiltersCacheRequests uint64
UselessTagFiltersCacheMisses uint64
DeletedMetricsCount uint64
IndexDBRefCount uint64
@ -172,12 +183,21 @@ func (db *indexDB) scheduleToDrop() {
// UpdateMetrics updates m with metrics from the db.
func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
var cs fastcache.Stats
cs.Reset()
db.tagCache.UpdateStats(&cs)
m.TagCacheSize += cs.EntriesCount
m.TagCacheBytesSize += cs.BytesSize
m.TagCacheRequests += cs.GetBigCalls
m.TagCacheMisses += cs.Misses
cs.Reset()
db.uselessTagFiltersCache.UpdateStats(&cs)
m.UselessTagFiltersCacheSize += cs.EntriesCount
m.UselessTagFiltersCacheBytesSize += cs.BytesSize
m.UselessTagFiltersCacheRequests += cs.GetBigCalls
m.UselessTagFiltersCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(len(db.getDeletedMetricIDs()))
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
@ -322,7 +342,7 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
db.metricNameCache.Set(key[:], metricName)
}
func (db *indexDB) marshalTagFiltersKey(dst []byte, tfss []*TagFilters) []byte {
func (db *indexDB) marshalTagFiltersKeyVersioned(dst []byte, tfss []*TagFilters) []byte {
if len(tfss) == 0 {
return nil
}
@ -335,7 +355,7 @@ func (db *indexDB) marshalTagFiltersKey(dst []byte, tfss []*TagFilters) []byte {
db.tagCachePrefixesLock.RUnlock()
if prefix == 0 {
// Create missing prefix.
// It is if multiple concurrent goroutines call invalidateTagCache
// It is OK if multiple concurrent goroutines call invalidateTagCache
// for the same (accountID, projectID).
prefix = db.invalidateTagCache(k.AccountID, k.ProjectID)
}
@ -949,7 +969,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
tfKeyBuf := tagFiltersKeyBufPool.Get()
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
tfKeyBuf.B = db.marshalTagFiltersKey(tfKeyBuf.B[:0], tfss)
tfKeyBuf.B = db.marshalTagFiltersKeyVersioned(tfKeyBuf.B[:0], tfss)
tsids, ok := db.getFromTagCache(tfKeyBuf.B)
if ok {
// Fast path - tsids found in the cache.
@ -1214,6 +1234,14 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
}
func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) {
kb := &is.kb
kb.B = tfs.marshal(kb.B[:0])
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
// Skip useless work below, since the tfs doesn't contain tag filters matching less than maxMetrics metrics.
return nil, nil, errTooManyMetrics
}
// Iteratively increase maxAllowedMetrics up to maxMetrics in order to limit
// the time required for founding the tag filter with minimum matching metrics.
maxAllowedMetrics := 16
@ -1232,7 +1260,10 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters
// Too many metrics matched.
if maxAllowedMetrics >= maxMetrics {
// The tag filter with minimum matching metrics matches at least maxMetrics.
// The tag filter with minimum matching metrics matches at least maxMetrics metrics.
kb.B = tfs.marshal(kb.B[:0])
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
is.db.uselessTagFiltersCache.Set(kb.B, []byte("1"))
return nil, nil, errTooManyMetrics
}

View File

@ -10,6 +10,7 @@ import (
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
@ -89,6 +90,15 @@ func (tfs *TagFilters) Reset(accountID, projectID uint32) {
tfs.commonPrefix = marshalCommonPrefix(tfs.commonPrefix[:0], nsPrefixTagToMetricID, accountID, projectID)
}
func (tfs *TagFilters) marshal(dst []byte) []byte {
dst = encoding.MarshalUint32(dst, tfs.accountID)
dst = encoding.MarshalUint32(dst, tfs.projectID)
for i := range tfs.tfs {
dst = tfs.tfs[i].MarshalNoAccountIDProjectID(dst)
}
return dst
}
// tagFilter represents a filter used for filtering tags.
type tagFilter struct {
key []byte