lib/storage: sort tag filters by actual execution time instead of by the number of matching time series

This should improve query speed for queries with regexp filters matching small number of time series
on a label with big number of unique values.
This commit is contained in:
Aliaksandr Valialkin 2021-02-15 00:16:37 +02:00
parent c727d2219b
commit 71c417427c
2 changed files with 37 additions and 26 deletions

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"math"
"path/filepath"
"sort"
"sync"
@ -103,9 +104,9 @@ type indexDB struct {
// matching low number of metrics.
uselessTagFiltersCache *workingsetcache.Cache
// Cache for (date, tagFilter) -> metricIDsLen, which is used for reducing
// Cache for (date, tagFilter) -> filterDuration, which is used for reducing
// the amount of work when matching a set of filters.
metricIDsPerDateTagFilterCache *workingsetcache.Cache
durationsPerDateTagFilterCache *workingsetcache.Cache
indexSearchPool sync.Pool
@ -154,7 +155,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
metricNameCache: metricNameCache,
tsidCache: tsidCache,
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
durationsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
minTimestampForCompositeIndex: minTimestampForCompositeIndex,
}
@ -315,14 +316,14 @@ func (db *indexDB) decRef() {
// Free space occupied by caches owned by db.
db.tagCache.Stop()
db.uselessTagFiltersCache.Stop()
db.metricIDsPerDateTagFilterCache.Stop()
db.durationsPerDateTagFilterCache.Stop()
db.tagCache = nil
db.metricIDCache = nil
db.metricNameCache = nil
db.tsidCache = nil
db.uselessTagFiltersCache = nil
db.metricIDsPerDateTagFilterCache = nil
db.durationsPerDateTagFilterCache = nil
if atomic.LoadUint64(&db.mustDrop) == 0 {
return
@ -2778,9 +2779,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// Sort tfs by the number of matching filters from previous queries.
// This way we limit the amount of work below by applying more specific filters at first.
type tagFilterWithCount struct {
tf *tagFilter
cost uint64
count uint64
tf *tagFilter
seconds float64
}
tfsWithCount := make([]tagFilterWithCount, len(tfs.tfs))
kb := &is.kb
@ -2788,23 +2788,23 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
for i := range tfs.tfs {
tf := &tfs.tfs[i]
kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf)
buf = is.db.metricIDsPerDateTagFilterCache.Get(buf[:0], kb.B)
count := uint64(0)
buf = is.db.durationsPerDateTagFilterCache.Get(buf[:0], kb.B)
seconds := float64(0)
if len(buf) == 8 {
count = encoding.UnmarshalUint64(buf)
n := encoding.UnmarshalUint64(buf)
seconds = math.Float64frombits(n)
}
tfsWithCount[i] = tagFilterWithCount{
tf: tf,
cost: count * tf.matchCost,
count: count,
tf: tf,
seconds: seconds,
}
}
sort.Slice(tfsWithCount, func(i, j int) bool {
a, b := &tfsWithCount[i], &tfsWithCount[j]
if a.cost != b.cost {
return a.cost < b.cost
if a.seconds != b.seconds {
return a.seconds < b.seconds
}
return a.tf.Less(b.tf)
return a.tf.matchCost < b.tf.matchCost
})
// Populate metricIDs for the first non-negative filter.
@ -2868,7 +2868,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// Short circuit - there is no need in applying the remaining filters to an empty set.
break
}
if n := uint64(metricIDsLen); n < 1000 || (n < tfWithCount.count/maxIndexScanLoopsPerMetric && n < uint64(maxMetrics)/10) {
if float64(metricIDsLen)/metricNameMatchesPerSecond < tfWithCount.seconds {
// It should be faster performing metricName match on the remaining filters
// instead of scanning big number of entries in the inverted index for these filters.
tfsPostponed = append(tfsPostponed, tf)
@ -2909,6 +2909,12 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
return metricIDs, nil
}
// The estimated number of per-second loops inside updateMetricIDsByMetricNameMatch
//
// This value is used for determining when matching by metric name must be perfromed instead of matching
// by the remaining tag filters.
const metricNameMatchesPerSecond = 10000
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
ii := getIndexItems()
defer putIndexItems(ii)
@ -3069,18 +3075,21 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
tfNew := *tf
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
tfNew.prefix = kb.B
startTime := time.Now()
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
// Store the number of matching metricIDs in the cache in order to sort tag filters
// in ascending number of matching metricIDs on the next search.
duration := time.Since(startTime)
// Store the duration for tag filter execution in the cache in order to sort tag filters
// in ascending durations on the next search.
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
metricIDsLen := uint64(metricIDs.Len())
if err != nil {
// Set metricIDsLen to maxMetrics, so the given entry will be moved to the end
// Set duration to big value, so the given tag filter will be moved to the end
// of tag filters on the next search.
metricIDsLen = uint64(maxMetrics)
duration = time.Hour
}
kb.B = encoding.MarshalUint64(kb.B[:0], metricIDsLen)
is.db.metricIDsPerDateTagFilterCache.Set(is.kb.B, kb.B)
seconds := duration.Seconds()
n := math.Float64bits(seconds)
kb.B = encoding.MarshalUint64(kb.B[:0], n)
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
return metricIDs, err
}

View File

@ -547,7 +547,9 @@ func getOptimizedReMatchFunc(reMatch func(b []byte) bool, expr string) (func(b [
return reMatch, "", reMatchCost
}
// The following & default cost values are returned from BenchmarkOptimizedReMatchCost
// These cost values are used for sorting tag filters in ascending order or the required CPU time for execution.
//
// These values are obtained from BenchmarkOptimizedReMatchCost benchmark.
const (
fullMatchCost = 1
prefixMatchCost = 2