From 55952f8f2e57b3be6b93f0296aa749f40cbc3cec Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 16 Feb 2021 13:03:58 +0200 Subject: [PATCH] lib/storage: tune sorting for tag filters --- lib/storage/index_db.go | 92 ++++++++++++++++++++++---------------- lib/storage/tag_filters.go | 6 +-- 2 files changed, 56 insertions(+), 42 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index eef9039d4c..ba2e895485 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -2801,33 +2802,38 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set } func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) { - // Sort tfs by the number of matching filters from previous queries. + // Sort tfs by the duration from previous queries. // This way we limit the amount of work below by applying more specific filters at first. - type tagFilterWithCount struct { - tf *tagFilter - seconds float64 + type tagFilterWithWeight struct { + tf *tagFilter + durationSeconds float64 + lastQueryTimestamp uint64 } - tfsWithCount := make([]tagFilterWithCount, len(tfs.tfs)) + tfsWithWeight := make([]tagFilterWithWeight, len(tfs.tfs)) kb := &is.kb var buf []byte for i := range tfs.tfs { tf := &tfs.tfs[i] kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf, is.accountID, is.projectID) buf = is.db.durationsPerDateTagFilterCache.Get(buf[:0], kb.B) - seconds := float64(0) - if len(buf) == 8 { - n := encoding.UnmarshalUint64(buf) - seconds = math.Float64frombits(n) + var lastQueryTimestamp uint64 + // Assume unknwon tag filters may take up to a second for execution. + durationSeconds := float64(1) + if len(buf) == 16 { + lastQueryTimestamp = encoding.UnmarshalUint64(buf) + n := encoding.UnmarshalUint64(buf[8:]) + durationSeconds = math.Float64frombits(n) } - tfsWithCount[i] = tagFilterWithCount{ - tf: tf, - seconds: seconds, + tfsWithWeight[i] = tagFilterWithWeight{ + tf: tf, + durationSeconds: durationSeconds, + lastQueryTimestamp: lastQueryTimestamp, } } - sort.Slice(tfsWithCount, func(i, j int) bool { - a, b := &tfsWithCount[i], &tfsWithCount[j] - if a.seconds != b.seconds { - return a.seconds < b.seconds + sort.Slice(tfsWithWeight, func(i, j int) bool { + a, b := &tfsWithWeight[i], &tfsWithWeight[j] + if a.durationSeconds != b.durationSeconds { + return a.durationSeconds < b.durationSeconds } return a.tf.Less(b.tf) }) @@ -2836,14 +2842,15 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter var tfsPostponed []*tagFilter var metricIDs *uint64set.Set maxDateMetrics := maxMetrics * 50 - tfsRemainingWithCount := tfsWithCount[:0] - for i := range tfsWithCount { - tf := tfsWithCount[i].tf + tfsRemainingWithWeight := tfsWithWeight[:0] + for i := range tfsWithWeight { + tfw := tfsWithWeight[i] + tf := tfw.tf if tf.isNegative { - tfsRemainingWithCount = append(tfsRemainingWithCount, tfsWithCount[i]) + tfsRemainingWithWeight = append(tfsRemainingWithWeight, tfw) continue } - m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics) + m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics) if err != nil { return nil, err } @@ -2854,8 +2861,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter } metricIDs = m i++ - for i < len(tfsWithCount) { - tfsRemainingWithCount = append(tfsRemainingWithCount, tfsWithCount[i]) + for i < len(tfsWithWeight) { + tfsRemainingWithWeight = append(tfsRemainingWithWeight, tfsWithWeight[i]) i++ } break @@ -2885,21 +2892,21 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter // when the intial tag filters significantly reduce the number of found metricIDs, // so the remaining filters could be performed via much faster metricName matching instead // of slow selecting of matching metricIDs. - for i := range tfsRemainingWithCount { - tfWithCount := tfsRemainingWithCount[i] - tf := tfWithCount.tf + for i := range tfsRemainingWithWeight { + tfw := tfsRemainingWithWeight[i] + tf := tfw.tf metricIDsLen := metricIDs.Len() if metricIDsLen == 0 { // Short circuit - there is no need in applying the remaining filters to an empty set. break } - if float64(metricIDsLen)/metricNameMatchesPerSecond < tfWithCount.seconds { + if float64(metricIDsLen)/metricNameMatchesPerSecond < tfw.durationSeconds { // It should be faster performing metricName match on the remaining filters // instead of scanning big number of entries in the inverted index for these filters. tfsPostponed = append(tfsPostponed, tf) continue } - m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics) + m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics) if err != nil { return nil, err } @@ -2919,11 +2926,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter return nil, nil } if len(tfsPostponed) > 0 { - if n := metricIDs.Len(); n > 50000 && n > maxMetrics/10 { - // It will be slow to perform metricName match on this number of time series. - // Fall back to global search. - return nil, errFallbackToMetricNameMatch - } // Apply the postponed filters via metricName match. var m uint64set.Set if err := is.updateMetricIDsByMetricNameMatch(&m, metricIDs, tfsPostponed); err != nil { @@ -2938,7 +2940,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter // // This value is used for determining when matching by metric name must be perfromed instead of matching // by the remaining tag filters. -const metricNameMatchesPerSecond = 10000 +const metricNameMatchesPerSecond = 50000 func (is *indexSearch) storeDateMetricID(date, metricID uint64) error { ii := getIndexItems() @@ -3086,7 +3088,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { return true, nil } -func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) { +func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTimestamp, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) { // Augument tag filter prefix for per-date search instead of global search. if !bytes.HasPrefix(tf.prefix, commonPrefix) { logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix) @@ -3102,18 +3104,30 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, tfNew.prefix = kb.B startTime := time.Now() metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics) - duration := time.Since(startTime) + currentTimestamp := fasttime.UnixTimestamp() + if currentTimestamp == lastQueryTimestamp { + // The cache already contains quite fresh entry for the current (date, tf). + // Do not update it too frequently. + return metricIDs, err + } // Store the duration for tag filter execution in the cache in order to sort tag filters // in ascending durations on the next search. + duration := time.Since(startTime) is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID) if err != nil { // Set duration to big value, so the given tag filter will be moved to the end // of tag filters on the next search. duration = time.Hour } - seconds := duration.Seconds() - n := math.Float64bits(seconds) - kb.B = encoding.MarshalUint64(kb.B[:0], n) + durationSeconds := duration.Seconds() + if metricIDs.Len() >= maxMetrics { + // Increase the duration for tag filter matching too many metrics, + // So next time it will be applied after filters matching lower number of metrics. + durationSeconds *= 2 + } + n := math.Float64bits(durationSeconds) + kb.B = encoding.MarshalUint64(kb.B[:0], currentTimestamp) + kb.B = encoding.MarshalUint64(kb.B, n) is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B) return metricIDs, err } diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index 3e6e8c3fcb..b09c64487c 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -255,12 +255,12 @@ func (tf *tagFilter) Less(other *tagFilter) bool { if tf.matchCost != other.matchCost { return tf.matchCost < other.matchCost } - if tf.isRegexp != other.isRegexp { - return !tf.isRegexp - } if tf.isNegative != other.isNegative { return !tf.isNegative } + if tf.isRegexp != other.isRegexp { + return !tf.isRegexp + } if len(tf.orSuffixes) != len(other.orSuffixes) { return len(tf.orSuffixes) < len(other.orSuffixes) }