mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
lib/storage: further tune filters sorting logic
This commit is contained in:
parent
133b288681
commit
f669531506
@ -2386,7 +2386,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set
|
||||
}
|
||||
|
||||
// Slow path - scan for all the rows with the given prefix.
|
||||
maxLoopsCount := uint64(maxMetrics) * maxIndexScanSlowLoopsPerMetric
|
||||
maxLoopsCount := uint64(maxMetrics) * maxIndexScanLoopsPerMetric
|
||||
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, metricIDs.Add)
|
||||
if err != nil {
|
||||
if err == errFallbackToMetricNameMatch {
|
||||
@ -2725,7 +2725,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
|
||||
if maxDate < minDate {
|
||||
// Per-day inverted index doesn't cover the selected date range.
|
||||
return errFallbackToMetricNameMatch
|
||||
return fmt.Errorf("maxDate=%d cannot be smaller than minDate=%d", maxDate, minDate)
|
||||
}
|
||||
if maxDate-minDate > maxDaysForDateMetricIDs {
|
||||
// Too much dates must be covered. Give up, since it may be slow.
|
||||
@ -2788,9 +2788,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||
// This stats is usually collected from the previous queries.
|
||||
// This way we limit the amount of work below by applying fast filters at first.
|
||||
type tagFilterWithWeight struct {
|
||||
tf *tagFilter
|
||||
loopsCount uint64
|
||||
lastQueryTimestamp uint64
|
||||
tf *tagFilter
|
||||
loopsCount uint64
|
||||
}
|
||||
tfws := make([]tagFilterWithWeight, len(tfs.tfs))
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
@ -2798,26 +2797,29 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||
tf := &tfs.tfs[i]
|
||||
loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
|
||||
origLoopsCount := loopsCount
|
||||
if currentTime > lastQueryTimestamp+3*3600 {
|
||||
// Update stats once per 3 hours only for relatively fast tag filters.
|
||||
// There is no need in spending CPU resources on updating stats for slow tag filters.
|
||||
if loopsCount == 0 && tf.looksLikeHeavy() {
|
||||
// Set high loopsCount for heavy tag filters instead of spending CPU time on their execution.
|
||||
loopsCount = 100e6
|
||||
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
|
||||
}
|
||||
if currentTime > lastQueryTimestamp+3600 {
|
||||
// Update stats once per hour for relatively fast tag filters.
|
||||
// There is no need in spending CPU resources on updating stats for heavy tag filters.
|
||||
if loopsCount <= 10e6 {
|
||||
loopsCount = 0
|
||||
}
|
||||
}
|
||||
if loopsCount == 0 {
|
||||
// Prevent from possible thundering herd issue when heavy tf is executed from multiple concurrent queries
|
||||
// Prevent from possible thundering herd issue when potentially heavy tf is executed from multiple concurrent queries
|
||||
// by temporary persisting its position in the tag filters list.
|
||||
if origLoopsCount == 0 {
|
||||
origLoopsCount = 10e6
|
||||
origLoopsCount = 50e6
|
||||
}
|
||||
lastQueryTimestamp = 0
|
||||
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount, lastQueryTimestamp)
|
||||
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount)
|
||||
}
|
||||
tfws[i] = tagFilterWithWeight{
|
||||
tf: tf,
|
||||
loopsCount: loopsCount,
|
||||
lastQueryTimestamp: lastQueryTimestamp,
|
||||
tf: tf,
|
||||
loopsCount: loopsCount,
|
||||
}
|
||||
}
|
||||
sort.Slice(tfws, func(i, j int) bool {
|
||||
@ -2841,8 +2843,12 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||
continue
|
||||
}
|
||||
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics)
|
||||
is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp)
|
||||
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
|
||||
if err != nil {
|
||||
if err == errFallbackToMetricNameMatch {
|
||||
tfsPostponed = append(tfsPostponed, tf)
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if m.Len() >= maxDateMetrics {
|
||||
@ -2896,12 +2902,16 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||
// instead of scanning big number of entries in the inverted index for these filters.
|
||||
tfsPostponed = append(tfsPostponed, tf)
|
||||
// Store stats for non-executed tf, since it could be updated during protection from thundered herd.
|
||||
is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount, tfw.lastQueryTimestamp)
|
||||
is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount)
|
||||
continue
|
||||
}
|
||||
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, maxDateMetrics)
|
||||
is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp)
|
||||
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
|
||||
if err != nil {
|
||||
if err == errFallbackToMetricNameMatch {
|
||||
tfsPostponed = append(tfsPostponed, tf)
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if m.Len() >= maxDateMetrics {
|
||||
@ -3115,13 +3125,8 @@ func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *t
|
||||
return loopsCount, timestamp
|
||||
}
|
||||
|
||||
func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount, prevTimestamp uint64) {
|
||||
func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount uint64) {
|
||||
currentTimestamp := fasttime.UnixTimestamp()
|
||||
if currentTimestamp < prevTimestamp+5 {
|
||||
// The cache already contains quite fresh entry for the current (date, tf).
|
||||
// Do not update it too frequently.
|
||||
return
|
||||
}
|
||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
|
||||
kb := kbPool.Get()
|
||||
kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount)
|
||||
@ -3201,11 +3206,6 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
|
||||
// over the found metrics.
|
||||
const maxIndexScanLoopsPerMetric = 100
|
||||
|
||||
// The maximum number of slow index scan loops.
|
||||
// Bigger number of loops is slower than updateMetricIDsByMetricNameMatch
|
||||
// over the found metrics.
|
||||
const maxIndexScanSlowLoopsPerMetric = 20
|
||||
|
||||
func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) {
|
||||
if filter.Len() == 0 {
|
||||
return nil, nil
|
||||
@ -3251,7 +3251,7 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil
|
||||
}
|
||||
|
||||
// Slow path - scan for all the rows with the given prefix.
|
||||
maxLoopsCount := uint64(filter.Len()) * maxIndexScanSlowLoopsPerMetric
|
||||
maxLoopsCount := uint64(filter.Len()) * maxIndexScanLoopsPerMetric
|
||||
_, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, func(metricID uint64) {
|
||||
if tf.isNegative {
|
||||
// filter must be equal to metricIDs
|
||||
|
@ -248,6 +248,10 @@ type tagFilter struct {
|
||||
graphiteReverseSuffix []byte
|
||||
}
|
||||
|
||||
func (tf *tagFilter) looksLikeHeavy() bool {
|
||||
return tf.isRegexp && len(tf.orSuffixes) == 0
|
||||
}
|
||||
|
||||
func (tf *tagFilter) isComposite() bool {
|
||||
k := tf.key
|
||||
return len(k) > 0 && k[0] == compositeTagKeyPrefix
|
||||
|
Loading…
Reference in New Issue
Block a user