mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/storage: optimize per-day inverted index search for tag filters matching big number of time series
- Sort tag filters in the ascending number of matching time series in order to apply the most specific filters first. - Fall back to metricName search for filters matching big number of time series (usually this are negative filters or regexp filters).
This commit is contained in:
parent
318326c309
commit
5d99ca6cfc
@ -126,10 +126,14 @@ type indexDB struct {
|
||||
// Cache for fast MetricID -> MetricName lookup.
|
||||
metricNameCache *workingsetcache.Cache
|
||||
|
||||
// Cache holding useless TagFilters entries, which have no tag filters
|
||||
// Cache for useless TagFilters entries, which have no tag filters
|
||||
// matching low number of metrics.
|
||||
uselessTagFiltersCache *workingsetcache.Cache
|
||||
|
||||
// Cache for (date, tagFilter) -> metricIDsLen, which is used for reducing
|
||||
// the amount of work when matching a set of filters.
|
||||
metricIDsPerDateTagFilterCache *workingsetcache.Cache
|
||||
|
||||
indexSearchPool sync.Pool
|
||||
|
||||
// An inmemory set of deleted metricIDs.
|
||||
@ -178,10 +182,11 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
|
||||
tb: tb,
|
||||
name: name,
|
||||
|
||||
tagCache: workingsetcache.New(mem/32, time.Hour),
|
||||
metricIDCache: metricIDCache,
|
||||
metricNameCache: metricNameCache,
|
||||
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
|
||||
tagCache: workingsetcache.New(mem/32, time.Hour),
|
||||
metricIDCache: metricIDCache,
|
||||
metricNameCache: metricNameCache,
|
||||
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
|
||||
metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
|
||||
|
||||
currHourMetricIDs: currHourMetricIDs,
|
||||
prevHourMetricIDs: prevHourMetricIDs,
|
||||
@ -348,11 +353,13 @@ func (db *indexDB) decRef() {
|
||||
// Free space occupied by caches owned by db.
|
||||
db.tagCache.Stop()
|
||||
db.uselessTagFiltersCache.Stop()
|
||||
db.metricIDsPerDateTagFilterCache.Stop()
|
||||
|
||||
db.tagCache = nil
|
||||
db.metricIDCache = nil
|
||||
db.metricNameCache = nil
|
||||
db.uselessTagFiltersCache = nil
|
||||
db.metricIDsPerDateTagFilterCache = nil
|
||||
|
||||
if atomic.LoadUint64(&db.mustDrop) == 0 {
|
||||
return
|
||||
@ -1014,8 +1021,7 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
|
||||
item := ts.Item
|
||||
if !bytes.HasPrefix(item, prefix) {
|
||||
// The databse doesn't contain per-day inverted index yet.
|
||||
// Return the next date, since the current date may contain unindexed data.
|
||||
return minDate + 1, nil
|
||||
return minDate, nil
|
||||
}
|
||||
suffix := item[len(prefix):]
|
||||
|
||||
@ -1024,15 +1030,13 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
|
||||
return 0, fmt.Errorf("unexpected (date, tag)->metricIDs row len; must be at least 8 bytes; got %d bytes", len(suffix))
|
||||
}
|
||||
minDate = encoding.UnmarshalUint64(suffix)
|
||||
// The minDate can contain incomplete inverted index, so increment it.
|
||||
return minDate + 1, nil
|
||||
return minDate, nil
|
||||
}
|
||||
if err := ts.Error(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// There are no (date,tag)->metricIDs entries in the database yet.
|
||||
// Return the next date, since the current date may contain unindexed data.
|
||||
return minDate + 1, nil
|
||||
return minDate, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
|
||||
@ -1675,22 +1679,8 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
|
||||
func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
|
||||
// Sort tag filters for faster ts.Seek below.
|
||||
sort.Slice(tfs.tfs, func(i, j int) bool {
|
||||
// Move regexp and negative filters to the end, since they require scanning
|
||||
// all the entries for the given label.
|
||||
a := &tfs.tfs[i]
|
||||
b := &tfs.tfs[j]
|
||||
if a.isRegexp != b.isRegexp {
|
||||
return !a.isRegexp
|
||||
}
|
||||
if a.isNegative != b.isNegative {
|
||||
return !a.isNegative
|
||||
}
|
||||
if len(a.orSuffixes) != len(b.orSuffixes) {
|
||||
return len(a.orSuffixes) < len(b.orSuffixes)
|
||||
}
|
||||
return bytes.Compare(a.prefix, b.prefix) < 0
|
||||
return tfs.tfs[i].Less(&tfs.tfs[j])
|
||||
})
|
||||
|
||||
err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics)
|
||||
if err == nil {
|
||||
// Fast path: found metricIDs by date range.
|
||||
@ -2152,10 +2142,40 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||
}
|
||||
|
||||
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
|
||||
// Populate metricIDs with the first non-negative filter.
|
||||
var tfFirst *tagFilter
|
||||
// Sort tfs by the number of matching filters from previous queries.
|
||||
// This way we limit the amount of work below by applying more specific filters at first.
|
||||
type tagFilterWithCount struct {
|
||||
tf *tagFilter
|
||||
count uint64
|
||||
}
|
||||
tfsWithCount := make([]tagFilterWithCount, len(tfs.tfs))
|
||||
kb := &is.kb
|
||||
var buf []byte
|
||||
for i := range tfs.tfs {
|
||||
tf := &tfs.tfs[i]
|
||||
kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf)
|
||||
buf = is.db.metricIDsPerDateTagFilterCache.Get(buf[:0], kb.B)
|
||||
count := uint64(0)
|
||||
if len(buf) == 8 {
|
||||
count = encoding.UnmarshalUint64(buf)
|
||||
}
|
||||
tfsWithCount[i] = tagFilterWithCount{
|
||||
tf: tf,
|
||||
count: count,
|
||||
}
|
||||
}
|
||||
sort.Slice(tfsWithCount, func(i, j int) bool {
|
||||
a, b := &tfsWithCount[i], &tfsWithCount[j]
|
||||
if a.count != b.count {
|
||||
return a.count < b.count
|
||||
}
|
||||
return a.tf.Less(b.tf)
|
||||
})
|
||||
|
||||
// Populate metricIDs with the first non-negative filter.
|
||||
var tfFirst *tagFilter
|
||||
for i := range tfsWithCount {
|
||||
tf := tfsWithCount[i].tf
|
||||
if tf.isNegative {
|
||||
continue
|
||||
}
|
||||
@ -2192,11 +2212,30 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||
}
|
||||
|
||||
// Intersect metricIDs with the rest of filters.
|
||||
for i := range tfs.tfs {
|
||||
tf := &tfs.tfs[i]
|
||||
for i := range tfsWithCount {
|
||||
tfWithCount := &tfsWithCount[i]
|
||||
tf := tfWithCount.tf
|
||||
if tf == tfFirst {
|
||||
continue
|
||||
}
|
||||
if n := uint64(metricIDs.Len()); n < 1000 || n < tfWithCount.count/maxIndexScanLoopsPerMetric {
|
||||
// It should be faster performing metricName match on the remaining filters
|
||||
// instead of scanning big number of entries in the inverted index for these filters.
|
||||
tfsRemaining := tfsWithCount[i:]
|
||||
tfsPostponed := make([]*tagFilter, 0, len(tfsRemaining))
|
||||
for j := range tfsRemaining {
|
||||
tf := tfsRemaining[j].tf
|
||||
if tf == tfFirst {
|
||||
continue
|
||||
}
|
||||
tfsPostponed = append(tfsPostponed, tf)
|
||||
}
|
||||
var m uint64set.Set
|
||||
if err := is.updateMetricIDsByMetricNameMatch(&m, metricIDs, tfsPostponed); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &m, nil
|
||||
}
|
||||
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -2340,7 +2379,26 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
|
||||
tfNew := *tf
|
||||
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
|
||||
tfNew.prefix = kb.B
|
||||
return is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
|
||||
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
|
||||
|
||||
// Store the number of matching metricIDs in the cache in order to sort tag filters
|
||||
// in ascending number of matching metricIDs on the next search.
|
||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
|
||||
metricIDsLen := uint64(metricIDs.Len())
|
||||
if err != nil {
|
||||
// Set metricIDsLen to maxMetrics, so the given entry will be moved to the end
|
||||
// of tag filters on the next search.
|
||||
metricIDsLen = uint64(maxMetrics)
|
||||
}
|
||||
kb.B = encoding.MarshalUint64(kb.B[:0], metricIDsLen)
|
||||
is.db.metricIDsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
||||
return metricIDs, err
|
||||
}
|
||||
|
||||
func appendDateTagFilterCacheKey(dst []byte, date uint64, tf *tagFilter) []byte {
|
||||
dst = encoding.MarshalUint64(dst, date)
|
||||
dst = tf.Marshal(dst)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64set.Set, error) {
|
||||
@ -2408,7 +2466,7 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
|
||||
// over the found metrics.
|
||||
const maxIndexScanLoopsPerMetric = 100
|
||||
|
||||
// The maximum number of slow index scan loops per.
|
||||
// The maximum number of slow index scan loops.
|
||||
// Bigger number of loops is slower than updateMetricIDsByMetricNameMatch
|
||||
// over the found metrics.
|
||||
const maxIndexScanSlowLoopsPerMetric = 20
|
||||
|
@ -148,6 +148,21 @@ type tagFilter struct {
|
||||
matchesEmptyValue bool
|
||||
}
|
||||
|
||||
func (tf *tagFilter) Less(other *tagFilter) bool {
|
||||
// Move regexp and negative filters to the end, since they require scanning
|
||||
// all the entries for the given label.
|
||||
if tf.isRegexp != other.isRegexp {
|
||||
return !tf.isRegexp
|
||||
}
|
||||
if tf.isNegative != other.isNegative {
|
||||
return !tf.isNegative
|
||||
}
|
||||
if len(tf.orSuffixes) != len(other.orSuffixes) {
|
||||
return len(tf.orSuffixes) < len(other.orSuffixes)
|
||||
}
|
||||
return bytes.Compare(tf.prefix, other.prefix) < 0
|
||||
}
|
||||
|
||||
// String returns human-readable tf value.
|
||||
func (tf *tagFilter) String() string {
|
||||
op := "="
|
||||
|
Loading…
Reference in New Issue
Block a user