From 628b8eb55e12f26162bddd7fbf3e73030a406ac7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 18 Feb 2021 13:56:50 +0200 Subject: [PATCH] lib/storage: prevent from running identical heavy tag filters in concurrent queries when measuring the number of loops for such tag filter. This should reduce CPU usage spikes when measuring the number of loops needed for heavy tag filters --- lib/storage/index_db.go | 44 +++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index d667ac75a9..792a8e1165 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -2821,10 +2821,20 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter for i := range tfs.tfs { tf := &tfs.tfs[i] loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf) + origLoopsCount := loopsCount if currentTime > lastQueryTimestamp+60*60 { // Reset loopsCount to 0 every hour for collecting updated stats for the tf. loopsCount = 0 } + if loopsCount == 0 { + // Prevent from possible thundering herd issue when heavy tf is executed from multiple concurrent queries + // by temporary persisting its position in the tag filters list. + if origLoopsCount == 0 { + origLoopsCount = 10e6 + } + lastQueryTimestamp = 0 + is.storeLoopsCountForDateFilter(date, tf, origLoopsCount, lastQueryTimestamp) + } tfws[i] = tagFilterWithWeight{ tf: tf, loopsCount: loopsCount, @@ -2851,7 +2861,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter tfwsRemaining = append(tfwsRemaining, tfw) continue } - m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, nil, tfs.commonPrefix, maxDateMetrics) + m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics) + is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp) if err != nil { return nil, err } @@ -2905,9 +2916,12 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter // It should be faster performing metricName match on the remaining filters // instead of scanning big number of entries in the inverted index for these filters. tfsPostponed = append(tfsPostponed, tf) + // Store stats for non-executed tf, since it could be updated during protection from thundered herd. + is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount, tfw.lastQueryTimestamp) continue } - m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, metricIDs, tfs.commonPrefix, maxDateMetrics) + m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, maxDateMetrics) + is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp) if err != nil { return nil, err } @@ -3083,8 +3097,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { return true, nil } -func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTimestamp, date uint64, - filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) { +func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, uint64, error) { // Augument tag filter prefix for per-date search instead of global search. if !bytes.HasPrefix(tf.prefix, commonPrefix) { logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix) @@ -3098,25 +3111,16 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime tfNew.prefix = kb.B metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics) kbPool.Put(kb) - currentTimestamp := fasttime.UnixTimestamp() - if currentTimestamp > lastQueryTimestamp+5 { - // The cache already contains quite fresh entry for the current (date, tf). - // Do not update it too frequently. - return metricIDs, err - } - // Store the loopsCount for tag filter in the cache in order to sort tag filters - // in ascending durations on the next search. if err != nil { // Set high loopsCount for failing filter, so it is moved to the end of filter list. - loopsCount = 1 << 30 + loopsCount = 1e9 } if metricIDs.Len() >= maxMetrics { // Increase loopsCount for tag filter matching too many metrics, // So next time it is moved to the end of filter list. loopsCount *= 2 } - is.storeLoopsCountAndTimestampForDateFilter(date, tf, loopsCount, currentTimestamp) - return metricIDs, err + return metricIDs, loopsCount, err } func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (uint64, uint64) { @@ -3132,11 +3136,17 @@ func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *t return loopsCount, timestamp } -func (is *indexSearch) storeLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter, loopsCount, timestamp uint64) { +func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount, prevTimestamp uint64) { + currentTimestamp := fasttime.UnixTimestamp() + if currentTimestamp < prevTimestamp+5 { + // The cache already contains quite fresh entry for the current (date, tf). + // Do not update it too frequently. + return + } is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID) kb := kbPool.Get() kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount) - kb.B = encoding.MarshalUint64(kb.B, timestamp) + kb.B = encoding.MarshalUint64(kb.B, currentTimestamp) is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B) kbPool.Put(kb) }