lib/storage: more tuning for tag filters sorting according the time they take

This commit is contained in:
Aliaksandr Valialkin 2021-02-16 21:22:10 +02:00
parent 35a23234ca
commit 1ab7a8dfd5

View File

@ -1147,8 +1147,8 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
defer wg.Done()
tvssLocal := make(map[string]struct{})
isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline)
defer is.db.putIndexSearch(isLocal)
err := isLocal.searchTagValueSuffixesForDate(tvssLocal, date, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
is.db.putIndexSearch(isLocal)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
@ -2335,7 +2335,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
// Slow path - try searching over the whole inverted index.
// Sort tag filters for faster ts.Seek below.
sort.Slice(tfs.tfs, func(i, j int) bool {
sort.SliceStable(tfs.tfs, func(i, j int) bool {
return tfs.tfs[i].Less(&tfs.tfs[j])
})
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics)
@ -2710,8 +2710,8 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
go func(date uint64) {
defer wg.Done()
isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline)
defer is.db.putIndexSearch(isLocal)
m, err := isLocal.getMetricIDsForDate(date, maxMetrics)
is.db.putIndexSearch(isLocal)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
@ -2769,8 +2769,8 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
go func(date uint64) {
defer wg.Done()
isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline)
defer is.db.putIndexSearch(isLocal)
m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics)
is.db.putIndexSearch(isLocal)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
@ -2803,35 +2803,40 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
// Sort tfs by the duration from previous queries.
// This way we limit the amount of work below by applying more specific filters at first.
// This way we limit the amount of work below by applying fast filters at first.
type tagFilterWithWeight struct {
tf *tagFilter
durationSeconds float64
lastQueryTimestamp uint64
}
tfsWithWeight := make([]tagFilterWithWeight, len(tfs.tfs))
kb := &is.kb
var buf []byte
tfws := make([]tagFilterWithWeight, len(tfs.tfs))
ct := fasttime.UnixTimestamp()
for i := range tfs.tfs {
tf := &tfs.tfs[i]
kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf, is.accountID, is.projectID)
buf = is.db.durationsPerDateTagFilterCache.Get(buf[:0], kb.B)
var lastQueryTimestamp uint64
// Assume unknwon tag filters may take up to a second for execution.
durationSeconds := float64(1)
if len(buf) == 16 {
lastQueryTimestamp = encoding.UnmarshalUint64(buf)
n := encoding.UnmarshalUint64(buf[8:])
durationSeconds = math.Float64frombits(n)
durationSeconds, lastQueryTimestamp := is.getDurationAndTimestampForDateFilter(date, tf)
if ct > lastQueryTimestamp+60 {
// It is time to update filter duration stats.
if tf.isNegative || tf.isRegexp && len(tf.orSuffixes) == 0 {
// Negative and regexp filters usually take the most time, so move them to the end of filters
// in the hope they won't be executed at all.
if durationSeconds == 0 {
durationSeconds = 10
}
tfsWithWeight[i] = tagFilterWithWeight{
} else {
// Reset duration stats for relatively fast {key="value"} and {key=~"foo|bar|baz"} filters, so it is re-populated below.
if durationSeconds < 0.5 {
durationSeconds = 0
}
}
}
tfws[i] = tagFilterWithWeight{
tf: tf,
durationSeconds: durationSeconds,
lastQueryTimestamp: lastQueryTimestamp,
}
}
sort.Slice(tfsWithWeight, func(i, j int) bool {
a, b := &tfsWithWeight[i], &tfsWithWeight[j]
sort.SliceStable(tfws, func(i, j int) bool {
a, b := &tfws[i], &tfws[j]
if a.durationSeconds != b.durationSeconds {
return a.durationSeconds < b.durationSeconds
}
@ -2842,12 +2847,12 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
var tfsPostponed []*tagFilter
var metricIDs *uint64set.Set
maxDateMetrics := maxMetrics * 50
tfsRemainingWithWeight := tfsWithWeight[:0]
for i := range tfsWithWeight {
tfw := tfsWithWeight[i]
tfwsRemaining := tfws[:0]
for i := range tfws {
tfw := tfws[i]
tf := tfw.tf
if tf.isNegative {
tfsRemainingWithWeight = append(tfsRemainingWithWeight, tfw)
tfwsRemaining = append(tfwsRemaining, tfw)
continue
}
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics)
@ -2861,8 +2866,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
}
metricIDs = m
i++
for i < len(tfsWithWeight) {
tfsRemainingWithWeight = append(tfsRemainingWithWeight, tfsWithWeight[i])
for i < len(tfws) {
tfwsRemaining = append(tfwsRemaining, tfws[i])
i++
}
break
@ -2892,8 +2897,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// when the intial tag filters significantly reduce the number of found metricIDs,
// so the remaining filters could be performed via much faster metricName matching instead
// of slow selecting of matching metricIDs.
for i := range tfsRemainingWithWeight {
tfw := tfsRemainingWithWeight[i]
for i := range tfwsRemaining {
tfw := tfwsRemaining[i]
tf := tfw.tf
metricIDsLen := metricIDs.Len()
if metricIDsLen == 0 {
@ -3094,44 +3099,57 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
}
kb := kbPool.Get()
defer kbPool.Put(kb)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...)
tfNew := *tf
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
tfNew.prefix = kb.B
startTime := time.Now()
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
kbPool.Put(kb)
currentTimestamp := fasttime.UnixTimestamp()
if currentTimestamp == lastQueryTimestamp {
if currentTimestamp > lastQueryTimestamp+5 {
// The cache already contains quite fresh entry for the current (date, tf).
// Do not update it too frequently.
return metricIDs, err
}
// Store the duration for tag filter execution in the cache in order to sort tag filters
// in ascending durations on the next search.
duration := time.Since(startTime)
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
if err != nil {
// Set duration to big value, so the given tag filter will be moved to the end
// of tag filters on the next search.
duration = time.Hour
}
durationSeconds := duration.Seconds()
durationSeconds := time.Since(startTime).Seconds()
if metricIDs.Len() >= maxMetrics {
// Increase the duration for tag filter matching too many metrics,
// So next time it will be applied after filters matching lower number of metrics.
durationSeconds *= 2
}
n := math.Float64bits(durationSeconds)
kb.B = encoding.MarshalUint64(kb.B[:0], currentTimestamp)
kb.B = encoding.MarshalUint64(kb.B, n)
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
is.storeDurationAndTimestampForDateFilter(date, tf, durationSeconds, currentTimestamp)
return metricIDs, err
}
func (is *indexSearch) getDurationAndTimestampForDateFilter(date uint64, tf *tagFilter) (float64, uint64) {
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
kb := kbPool.Get()
defer kbPool.Put(kb)
kb.B = is.db.durationsPerDateTagFilterCache.Get(kb.B[:0], is.kb.B)
if len(kb.B) != 16 {
return 0, 0
}
n := encoding.UnmarshalUint64(kb.B)
durationSeconds := math.Float64frombits(n)
timestamp := encoding.UnmarshalUint64(kb.B[8:])
return durationSeconds, timestamp
}
func (is *indexSearch) storeDurationAndTimestampForDateFilter(date uint64, tf *tagFilter, durationSeconds float64, timestamp uint64) {
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
n := math.Float64bits(durationSeconds)
kb := kbPool.Get()
kb.B = encoding.MarshalUint64(kb.B[:0], n)
kb.B = encoding.MarshalUint64(kb.B, timestamp)
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
kbPool.Put(kb)
}
func appendDateTagFilterCacheKey(dst []byte, date uint64, tf *tagFilter, accountID, projectID uint32) []byte {
dst = encoding.MarshalUint64(dst, date)
dst = tf.Marshal(dst, accountID, projectID)