lib/storage: optimize per-day inverted index search for tag filters matching big number of time series

- Sort tag filters in the ascending number of matching time series
  in order to apply the most specific filters first.
- Fall back to metricName search for filters matching big number of time series
  (usually this are negative filters or regexp filters).
This commit is contained in:
Aliaksandr Valialkin 2020-03-31 00:44:41 +03:00
parent d450249955
commit 7e755b4bac
2 changed files with 103 additions and 28 deletions

View File

@ -126,10 +126,14 @@ type indexDB struct {
// Cache for fast MetricID -> MetricName lookup. // Cache for fast MetricID -> MetricName lookup.
metricNameCache *workingsetcache.Cache metricNameCache *workingsetcache.Cache
// Cache holding useless TagFilters entries, which have no tag filters // Cache for useless TagFilters entries, which have no tag filters
// matching low number of metrics. // matching low number of metrics.
uselessTagFiltersCache *workingsetcache.Cache uselessTagFiltersCache *workingsetcache.Cache
// Cache for (date, tagFilter) -> metricIDsLen, which is used for reducing
// the amount of work when matching a set of filters.
metricIDsPerDateTagFilterCache *workingsetcache.Cache
indexSearchPool sync.Pool indexSearchPool sync.Pool
// An inmemory set of deleted metricIDs. // An inmemory set of deleted metricIDs.
@ -178,10 +182,11 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
tb: tb, tb: tb,
name: name, name: name,
tagCache: workingsetcache.New(mem/32, time.Hour), tagCache: workingsetcache.New(mem/32, time.Hour),
metricIDCache: metricIDCache, metricIDCache: metricIDCache,
metricNameCache: metricNameCache, metricNameCache: metricNameCache,
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
currHourMetricIDs: currHourMetricIDs, currHourMetricIDs: currHourMetricIDs,
prevHourMetricIDs: prevHourMetricIDs, prevHourMetricIDs: prevHourMetricIDs,
@ -348,11 +353,13 @@ func (db *indexDB) decRef() {
// Free space occupied by caches owned by db. // Free space occupied by caches owned by db.
db.tagCache.Stop() db.tagCache.Stop()
db.uselessTagFiltersCache.Stop() db.uselessTagFiltersCache.Stop()
db.metricIDsPerDateTagFilterCache.Stop()
db.tagCache = nil db.tagCache = nil
db.metricIDCache = nil db.metricIDCache = nil
db.metricNameCache = nil db.metricNameCache = nil
db.uselessTagFiltersCache = nil db.uselessTagFiltersCache = nil
db.metricIDsPerDateTagFilterCache = nil
if atomic.LoadUint64(&db.mustDrop) == 0 { if atomic.LoadUint64(&db.mustDrop) == 0 {
return return
@ -1053,8 +1060,7 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return 0, err return 0, err
} }
// The minDate can contain incomplete inverted index, so increment it. // There are no (date,tag)->metricIDs entries in the database yet.
minDate++
return minDate, nil return minDate, nil
} }
@ -1700,22 +1706,8 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
// Sort tag filters for faster ts.Seek below. // Sort tag filters for faster ts.Seek below.
sort.Slice(tfs.tfs, func(i, j int) bool { sort.Slice(tfs.tfs, func(i, j int) bool {
// Move regexp and negative filters to the end, since they require scanning return tfs.tfs[i].Less(&tfs.tfs[j])
// all the entries for the given label.
a := &tfs.tfs[i]
b := &tfs.tfs[j]
if a.isRegexp != b.isRegexp {
return !a.isRegexp
}
if a.isNegative != b.isNegative {
return !a.isNegative
}
if len(a.orSuffixes) != len(b.orSuffixes) {
return len(a.orSuffixes) < len(b.orSuffixes)
}
return bytes.Compare(a.prefix, b.prefix) < 0
}) })
err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics)
if err == nil { if err == nil {
// Fast path: found metricIDs by date range. // Fast path: found metricIDs by date range.
@ -2177,10 +2169,40 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
} }
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) { func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
// Populate metricIDs with the first non-negative filter. // Sort tfs by the number of matching filters from previous queries.
var tfFirst *tagFilter // This way we limit the amount of work below by applying more specific filters at first.
type tagFilterWithCount struct {
tf *tagFilter
count uint64
}
tfsWithCount := make([]tagFilterWithCount, len(tfs.tfs))
kb := &is.kb
var buf []byte
for i := range tfs.tfs { for i := range tfs.tfs {
tf := &tfs.tfs[i] tf := &tfs.tfs[i]
kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf, tfs.accountID, tfs.projectID)
buf = is.db.metricIDsPerDateTagFilterCache.Get(buf[:0], kb.B)
count := uint64(0)
if len(buf) == 8 {
count = encoding.UnmarshalUint64(buf)
}
tfsWithCount[i] = tagFilterWithCount{
tf: tf,
count: count,
}
}
sort.Slice(tfsWithCount, func(i, j int) bool {
a, b := &tfsWithCount[i], &tfsWithCount[j]
if a.count != b.count {
return a.count < b.count
}
return a.tf.Less(b.tf)
})
// Populate metricIDs with the first non-negative filter.
var tfFirst *tagFilter
for i := range tfsWithCount {
tf := tfsWithCount[i].tf
if tf.isNegative { if tf.isNegative {
continue continue
} }
@ -2217,11 +2239,30 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
} }
// Intersect metricIDs with the rest of filters. // Intersect metricIDs with the rest of filters.
for i := range tfs.tfs { for i := range tfsWithCount {
tf := &tfs.tfs[i] tfWithCount := &tfsWithCount[i]
tf := tfWithCount.tf
if tf == tfFirst { if tf == tfFirst {
continue continue
} }
if n := uint64(metricIDs.Len()); n < 1000 || n < tfWithCount.count/maxIndexScanLoopsPerMetric {
// It should be faster performing metricName match on the remaining filters
// instead of scanning big number of entries in the inverted index for these filters.
tfsRemaining := tfsWithCount[i:]
tfsPostponed := make([]*tagFilter, 0, len(tfsRemaining))
for j := range tfsRemaining {
tf := tfsRemaining[j].tf
if tf == tfFirst {
continue
}
tfsPostponed = append(tfsPostponed, tf)
}
var m uint64set.Set
if err := is.updateMetricIDsByMetricNameMatch(&m, metricIDs, tfsPostponed, tfs.accountID, tfs.projectID); err != nil {
return nil, err
}
return &m, nil
}
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, tfs.accountID, tfs.projectID, maxDateMetrics) m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, tfs.accountID, tfs.projectID, maxDateMetrics)
if err != nil { if err != nil {
return nil, err return nil, err
@ -2376,7 +2417,26 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
tfNew := *tf tfNew := *tf
tfNew.isNegative = false // isNegative for the original tf is handled by the caller. tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
tfNew.prefix = kb.B tfNew.prefix = kb.B
return is.getMetricIDsForTagFilter(&tfNew, maxMetrics) metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
// Store the number of matching metricIDs in the cache in order to sort tag filters
// in ascending number of matching metricIDs on the next search.
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, accountID, projectID)
metricIDsLen := uint64(metricIDs.Len())
if err != nil {
// Set metricIDsLen to maxMetrics, so the given entry will be moved to the end
// of tag filters on the next search.
metricIDsLen = uint64(maxMetrics)
}
kb.B = encoding.MarshalUint64(kb.B[:0], metricIDsLen)
is.db.metricIDsPerDateTagFilterCache.Set(is.kb.B, kb.B)
return metricIDs, err
}
func appendDateTagFilterCacheKey(dst []byte, date uint64, tf *tagFilter, accountID, projectID uint32) []byte {
dst = encoding.MarshalUint64(dst, date)
dst = tf.Marshal(dst, accountID, projectID)
return dst
} }
func (is *indexSearch) getMetricIDsForDate(date uint64, accountID, projectID uint32, maxMetrics int) (*uint64set.Set, error) { func (is *indexSearch) getMetricIDsForDate(date uint64, accountID, projectID uint32, maxMetrics int) (*uint64set.Set, error) {
@ -2444,7 +2504,7 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
// over the found metrics. // over the found metrics.
const maxIndexScanLoopsPerMetric = 100 const maxIndexScanLoopsPerMetric = 100
// The maximum number of slow index scan loops per. // The maximum number of slow index scan loops.
// Bigger number of loops is slower than updateMetricIDsByMetricNameMatch // Bigger number of loops is slower than updateMetricIDsByMetricNameMatch
// over the found metrics. // over the found metrics.
const maxIndexScanSlowLoopsPerMetric = 20 const maxIndexScanSlowLoopsPerMetric = 20

View File

@ -160,6 +160,21 @@ type tagFilter struct {
matchesEmptyValue bool matchesEmptyValue bool
} }
func (tf *tagFilter) Less(other *tagFilter) bool {
// Move regexp and negative filters to the end, since they require scanning
// all the entries for the given label.
if tf.isRegexp != other.isRegexp {
return !tf.isRegexp
}
if tf.isNegative != other.isNegative {
return !tf.isNegative
}
if len(tf.orSuffixes) != len(other.orSuffixes) {
return len(tf.orSuffixes) < len(other.orSuffixes)
}
return bytes.Compare(tf.prefix, other.prefix) < 0
}
// String returns human-readable tf value. // String returns human-readable tf value.
func (tf *tagFilter) String() string { func (tf *tagFilter) String() string {
op := "=" op := "="