From d05cac6c9859631b4f90de8a949d22ba10d6fe9f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 30 Jul 2021 08:37:10 +0300 Subject: [PATCH] li/storage: re-use the per-day inverted index search code for searching in global index This allows removing a big pile of outdated code for global index search. This may help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1486 --- app/vmstorage/main.go | 21 +- lib/storage/index_db.go | 513 +++-------------------------------- lib/storage/index_db_test.go | 8 +- lib/storage/tag_filters.go | 7 - 4 files changed, 38 insertions(+), 511 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index b04b721ff3..b80a2d2ea5 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -448,12 +448,6 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 { return float64(idbm().MissingTSIDsForMetricID) }) - metrics.NewGauge(`vm_date_metric_ids_search_calls_total`, func() float64 { - return float64(idbm().DateMetricIDsSearchCalls) - }) - metrics.NewGauge(`vm_date_metric_ids_search_hits_total`, func() float64 { - return float64(idbm().DateMetricIDsSearchHits) - }) metrics.NewGauge(`vm_index_blocks_with_metric_ids_processed_total`, func() float64 { return float64(idbm().IndexBlocksWithMetricIDsProcessed) }) @@ -609,6 +603,9 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_date_range_hits_total`, func() float64 { return float64(idbm().DateRangeSearchHits) }) + metrics.NewGauge(`vm_global_search_calls_total`, func() float64 { + return float64(idbm().GlobalSearchCalls) + }) metrics.NewGauge(`vm_missing_metric_names_for_metric_id_total`, func() float64 { return float64(idbm().MissingMetricNamesForMetricID) @@ -654,9 +651,6 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagFiltersCacheSize) }) - metrics.NewGauge(`vm_cache_entries{type="indexdb/uselessTagFilters"}`, func() float64 { - return float64(idbm().UselessTagFiltersCacheSize) - }) metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheSize()) }) @@ -697,9 +691,6 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagFiltersCacheSizeBytes) }) - metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 { - return float64(idbm().UselessTagFiltersCacheSizeBytes) - }) metrics.NewGauge(`vm_cache_size_bytes{type="storage/prefetchedMetricIDs"}`, func() float64 { return float64(m().PrefetchedMetricIDsSizeBytes) }) @@ -728,9 +719,6 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagFiltersCacheRequests) }) - metrics.NewGauge(`vm_cache_requests_total{type="indexdb/uselessTagFilters"}`, func() float64 { - return float64(idbm().UselessTagFiltersCacheRequests) - }) metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheRequests()) }) @@ -759,9 +747,6 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagFiltersCacheMisses) }) - metrics.NewGauge(`vm_cache_misses_total{type="indexdb/uselessTagFilters"}`, func() float64 { - return float64(idbm().UselessTagFiltersCacheMisses) - }) metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheMisses()) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 00af9b9297..7cbeae94dd 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -63,18 +63,15 @@ type indexDB struct { // High rate for this value means corrupted indexDB. missingTSIDsForMetricID uint64 - // The number of searches for metric ids by days. - dateMetricIDsSearchCalls uint64 - - // The number of successful searches for metric ids by days. - dateMetricIDsSearchHits uint64 - // The number of calls for date range searches. dateRangeSearchCalls uint64 // The number of hits for date range searches. dateRangeSearchHits uint64 + // The number of calls for global search. + globalSearchCalls uint64 + // missingMetricNamesForMetricID is a counter of missing MetricID -> MetricName entries. // High rate may mean corrupted indexDB due to unclean shutdown. // The db must be automatically recovered after that. @@ -94,10 +91,6 @@ type indexDB struct { // The parent storage. s *Storage - // Cache for useless TagFilters entries, which have no tag filters - // matching low number of metrics. - uselessTagFiltersCache *workingsetcache.Cache - // Cache for (date, tagFilter) -> loopsCount, which is used for reducing // the amount of work when matching a set of filters. loopsPerDateTagFilterCache *workingsetcache.Cache @@ -128,7 +121,6 @@ func openIndexDB(path string, s *Storage) (*indexDB, error) { tagFiltersCache: workingsetcache.New(mem/32, time.Hour), s: s, - uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour), } return db, nil @@ -143,11 +135,6 @@ type IndexDBMetrics struct { TagFiltersCacheRequests uint64 TagFiltersCacheMisses uint64 - UselessTagFiltersCacheSize uint64 - UselessTagFiltersCacheSizeBytes uint64 - UselessTagFiltersCacheRequests uint64 - UselessTagFiltersCacheMisses uint64 - DeletedMetricsCount uint64 IndexDBRefCount uint64 @@ -157,11 +144,10 @@ type IndexDBMetrics struct { RecentHourMetricIDsSearchCalls uint64 RecentHourMetricIDsSearchHits uint64 - DateMetricIDsSearchCalls uint64 - DateMetricIDsSearchHits uint64 DateRangeSearchCalls uint64 DateRangeSearchHits uint64 + GlobalSearchCalls uint64 MissingMetricNamesForMetricID uint64 @@ -190,23 +176,15 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.TagFiltersCacheRequests += cs.GetCalls m.TagFiltersCacheMisses += cs.Misses - cs.Reset() - db.uselessTagFiltersCache.UpdateStats(&cs) - m.UselessTagFiltersCacheSize += cs.EntriesCount - m.UselessTagFiltersCacheSizeBytes += cs.BytesSize - m.UselessTagFiltersCacheRequests += cs.GetCalls - m.UselessTagFiltersCacheMisses += cs.Misses - m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len()) m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated) m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID) - m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) - m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits) + m.GlobalSearchCalls += atomic.LoadUint64(&db.globalSearchCalls) m.MissingMetricNamesForMetricID += atomic.LoadUint64(&db.missingMetricNamesForMetricID) @@ -277,12 +255,10 @@ func (db *indexDB) decRef() { // Free space occupied by caches owned by db. db.tagFiltersCache.Stop() - db.uselessTagFiltersCache.Stop() db.loopsPerDateTagFilterCache.Stop() db.tagFiltersCache = nil db.s = nil - db.uselessTagFiltersCache = nil db.loopsPerDateTagFilterCache = nil if atomic.LoadUint64(&db.mustDrop) == 0 { @@ -1600,9 +1576,6 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { // Reset MetricName -> TSID cache, since it may contain deleted TSIDs. db.s.resetAndSaveTSIDCache() - // Do not reset uselessTagFiltersCache, since the found metricIDs - // on cache miss are filtered out later with deletedMetricIDs. - // Store the metricIDs as deleted. // Make this after updating the deletedMetricIDs and resetting caches // in order to exclude the possibility of the inconsistent state when the deleted metricIDs @@ -1966,167 +1939,6 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs return nil } -func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, *uint64set.Set, error) { - minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics) - if err == nil { - return minTf, minMetricIDs, nil - } - if err != errTooManyMetrics { - return nil, nil, err - } - - // All the tag filters match too many metrics. - - // Slow path: try filtering the matching metrics by time range. - // This should work well for cases when old metrics are constantly substituted - // by big number of new metrics. For example, prometheus-operator creates many new - // metrics for each new deployment. - // - // Allow fetching up to 20*maxMetrics metrics for the given time range - // in the hope these metricIDs will be filtered out by other filters later. - maxTimeRangeMetrics := 20 * maxMetrics - metricIDsForTimeRange, err := is.getMetricIDsForTimeRange(tr, maxTimeRangeMetrics+1) - if err == errMissingMetricIDsForDate { - return nil, nil, fmt.Errorf("cannot find tag filter matching less than %d time series; "+ - "either increase -search.maxUniqueTimeseries or use more specific tag filters", maxMetrics) - } - if err != nil { - return nil, nil, err - } - if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics { - return nil, metricIDsForTimeRange, nil - } - return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range", - maxMetrics, tr.String()) -} - -func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) { - appendCacheKeyPrefix := func(dst []byte, prefix byte) []byte { - dst = append(dst, prefix) - dst = append(dst, is.db.name...) - dst = encoding.MarshalUint64(dst, uint64(maxMetrics)) - return dst - } - kb := &is.kb - kb.B = appendCacheKeyPrefix(kb.B[:0], uselessMultiTagFiltersKeyPrefix) - kb.B = tfs.marshal(kb.B) - if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 { - // Skip useless work below, since the tfs doesn't contain tag filters matching less than maxMetrics metrics. - return nil, nil, errTooManyMetrics - } - - // Iteratively increase maxAllowedMetrics up to maxMetrics in order to limit - // the time required for founding the tag filter with minimum matching metrics. - maxAllowedMetrics := 16 - if maxAllowedMetrics > maxMetrics { - maxAllowedMetrics = maxMetrics - } - for { - minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCount(tfs, maxAllowedMetrics) - if err != errTooManyMetrics { - if err != nil { - return nil, nil, err - } - if minMetricIDs.Len() < maxAllowedMetrics { - // Found the tag filter with the minimum number of metrics. - return minTf, minMetricIDs, nil - } - } - - // Too many metrics matched. - if maxAllowedMetrics >= maxMetrics { - // The tag filter with minimum matching metrics matches at least maxMetrics metrics. - kb.B = appendCacheKeyPrefix(kb.B[:0], uselessMultiTagFiltersKeyPrefix) - kb.B = tfs.marshal(kb.B) - is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue) - return nil, nil, errTooManyMetrics - } - - // Increase maxAllowedMetrics and try again. - maxAllowedMetrics *= 4 - if maxAllowedMetrics > maxMetrics { - maxAllowedMetrics = maxMetrics - } - } -} - -var errTooManyMetrics = errors.New("all the tag filters match too many metrics") - -func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) { - appendCacheKeyPrefix := func(dst []byte, prefix byte) []byte { - dst = append(dst, prefix) - dst = append(dst, is.db.name...) - dst = encoding.MarshalUint64(dst, uint64(maxMetrics)) - return dst - } - var minMetricIDs *uint64set.Set - var minTf *tagFilter - kb := &is.kb - uselessTagFilters := 0 - for i := range tfs.tfs { - tf := &tfs.tfs[i] - if tf.isNegative { - // Skip negative filters. - continue - } - - kb.B = appendCacheKeyPrefix(kb.B[:0], uselessSingleTagFilterKeyPrefix) - kb.B = tf.Marshal(kb.B) - if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 { - // Skip useless work below, since the tf matches at least maxMetrics metrics. - uselessTagFilters++ - continue - } - - metricIDs, _, err := is.getMetricIDsForTagFilter(tf, maxMetrics, int64Max) - if err != nil { - return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %w", tf, err) - } - if metricIDs.Len() >= maxMetrics { - // The tf matches at least maxMetrics. Skip it - kb.B = appendCacheKeyPrefix(kb.B[:0], uselessSingleTagFilterKeyPrefix) - kb.B = tf.Marshal(kb.B) - is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue) - uselessTagFilters++ - continue - } - - minMetricIDs = metricIDs - minTf = tf - maxMetrics = minMetricIDs.Len() - if maxMetrics <= 1 { - // There is no need in inspecting other filters, since minTf - // already matches 0 or 1 metric. - break - } - } - if minTf != nil { - return minTf, minMetricIDs, nil - } - if uselessTagFilters == len(tfs.tfs) { - // All the tag filters return at least maxMetrics entries. - return nil, nil, errTooManyMetrics - } - - // There is no positive filter with small number of matching metrics. - // Create it, so it matches all the MetricIDs. - kb.B = appendCacheKeyPrefix(kb.B[:0], uselessNegativeTagFilterKeyPrefix) - kb.B = tfs.marshal(kb.B) - if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 { - return nil, nil, errTooManyMetrics - } - metricIDs := &uint64set.Set{} - if err := is.updateMetricIDsAll(metricIDs, maxMetrics); err != nil { - return nil, nil, err - } - if metricIDs.Len() >= maxMetrics { - kb.B = appendCacheKeyPrefix(kb.B[:0], uselessNegativeTagFilterKeyPrefix) - kb.B = tfs.marshal(kb.B) - is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue) - } - return nil, metricIDs, nil -} - func removeCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter { if !hasCompositeTagFilters(tfs, prefix) { return tfs @@ -2334,41 +2146,16 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf return err } - // Slow path - try searching over the whole inverted index. - - // Sort tag filters for faster ts.Seek below. - sort.Slice(tfs.tfs, func(i, j int) bool { - return tfs.tfs[i].Less(&tfs.tfs[j]) - }) - minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics) + // Slow path - fall back to search in the global inverted index. + atomic.AddUint64(&is.db.globalSearchCalls, 1) + m, err := is.getMetricIDsForDateAndFilters(0, tfs, maxMetrics) if err != nil { return err } - - // Find intersection of minTf with other tfs. - for i := range tfs.tfs { - tf := &tfs.tfs[i] - if tf == minTf { - continue - } - mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs) - if err != nil { - return err - } - minMetricIDs = mIDs - } - metricIDs.UnionMayOwn(minMetricIDs) + metricIDs.UnionMayOwn(m) return nil } -const ( - uselessSingleTagFilterKeyPrefix = 0 - uselessMultiTagFiltersKeyPrefix = 1 - uselessNegativeTagFilterKeyPrefix = 2 -) - -var uselessTagFilterCacheValue = []byte("1") - func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) { if tf.isNegative { logger.Panicf("BUG: isNegative must be false") @@ -2376,7 +2163,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, m metricIDs := &uint64set.Set{} if len(tf.orSuffixes) > 0 { // Fast path for orSuffixes - seek for rows for each value from orSuffixes. - loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, metricIDs, maxMetrics, maxLoopsCount) + loopsCount, err := is.updateMetricIDsForOrSuffixes(tf, metricIDs, maxMetrics, maxLoopsCount) if err != nil { return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf) } @@ -2384,9 +2171,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, m } // Slow path - scan for all the rows with the given prefix. - // Pass nil filter to getMetricIDsForTagFilterSlow, since it works faster on production workloads - // than non-nil filter with many entries. - loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, nil, metricIDs.Add, maxLoopsCount) + loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, metricIDs.Add, maxLoopsCount) if err != nil { return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf) } @@ -2395,7 +2180,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, m var errTooManyLoops = fmt.Errorf("too many loops is needed for applying this filter") -func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, f func(metricID uint64), maxLoopsCount int64) (int64, error) { +func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, f func(metricID uint64), maxLoopsCount int64) (int64, error) { if len(tf.orSuffixes) > 0 { logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes) } @@ -2442,18 +2227,10 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6 // There is no need in checking it again with potentially // slow tf.matchSuffix, which may call regexp. for _, metricID := range mp.MetricIDs { - if filter != nil && !filter.Has(metricID) { - continue - } f(metricID) } continue } - if filter != nil && !mp.HasCommonMetricIDs(filter) { - // Faster path: there is no need in calling tf.matchSuffix, - // since the current row has no matching metricIDs. - continue - } // Slow path: need tf.matchSuffix call. ok, err := tf.matchSuffix(suffix) // Assume that tf.matchSuffix call needs 10x more time than a single metric scan iteration. @@ -2485,9 +2262,6 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6 prevMatch = true prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...) for _, metricID := range mp.MetricIDs { - if filter != nil && !filter.Has(metricID) { - continue - } f(metricID) } } @@ -2497,7 +2271,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6 return loopsCount, nil } -func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) { +func (is *indexSearch) updateMetricIDsForOrSuffixes(tf *tagFilter, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) { if tf.isNegative { logger.Panicf("BUG: isNegative must be false") } @@ -2508,7 +2282,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, metri kb.B = append(kb.B[:0], tf.prefix...) kb.B = append(kb.B, orSuffix...) kb.B = append(kb.B, tagSeparatorChar) - lc, err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, metricIDs, maxMetrics, maxLoopsCount-loopsCount) + lc, err := is.updateMetricIDsForOrSuffix(kb.B, metricIDs, maxMetrics, maxLoopsCount-loopsCount) loopsCount += lc if err != nil { return loopsCount, err @@ -2520,25 +2294,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, metri return loopsCount, nil } -func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set, maxLoopsCount int64) (int64, error) { - sortedFilter := filter.AppendTo(nil) - kb := kbPool.Get() - defer kbPool.Put(kb) - var loopsCount int64 - for _, orSuffix := range tf.orSuffixes { - kb.B = append(kb.B[:0], tf.prefix...) - kb.B = append(kb.B, orSuffix...) - kb.B = append(kb.B, tagSeparatorChar) - lc, err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative, maxLoopsCount-loopsCount) - loopsCount += lc - if err != nil { - return loopsCount, err - } - } - return loopsCount, nil -} - -func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) { +func (is *indexSearch) updateMetricIDsForOrSuffix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) { ts := &is.ts mp := &is.mp mp.Reset() @@ -2572,156 +2328,8 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, metricI return loopsCount, nil } -func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool, maxLoopsCount int64) (int64, error) { - if len(sortedFilter) == 0 { - return 0, nil - } - firstFilterMetricID := sortedFilter[0] - lastFilterMetricID := sortedFilter[len(sortedFilter)-1] - ts := &is.ts - mp := &is.mp - mp.Reset() - var loopsCount int64 - loopsPaceLimiter := 0 - ts.Seek(prefix) - var sf []uint64 - var metricID uint64 - for ts.NextItem() { - if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 { - if err := checkSearchDeadlineAndPace(is.deadline); err != nil { - return loopsCount, err - } - } - loopsPaceLimiter++ - item := ts.Item - if !bytes.HasPrefix(item, prefix) { - return loopsCount, nil - } - if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil { - return loopsCount, err - } - loopsCount += int64(mp.MetricIDsLen()) - if loopsCount > maxLoopsCount { - return loopsCount, errTooManyLoops - } - firstMetricID, lastMetricID := mp.FirstAndLastMetricIDs() - if lastMetricID < firstFilterMetricID { - // Skip the item, since it contains metricIDs lower - // than metricIDs in sortedFilter. - continue - } - if firstMetricID > lastFilterMetricID { - // Stop searching, since the current item and all the subsequent items - // contain metricIDs higher than metricIDs in sortedFilter. - return loopsCount, nil - } - sf = sortedFilter - mp.ParseMetricIDs() - matchingMetricIDs := mp.MetricIDs[:0] - for _, metricID = range mp.MetricIDs { - if len(sf) == 0 { - break - } - if metricID > sf[0] { - n := binarySearchUint64(sf, metricID) - sf = sf[n:] - if len(sf) == 0 { - break - } - } - if metricID < sf[0] { - continue - } - matchingMetricIDs = append(matchingMetricIDs, metricID) - sf = sf[1:] - } - if len(matchingMetricIDs) > 0 { - if isNegative { - for _, metricID := range matchingMetricIDs { - metricIDs.Del(metricID) - } - } else { - metricIDs.AddMulti(matchingMetricIDs) - } - } - } - if err := ts.Error(); err != nil { - return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err) - } - return loopsCount, nil -} - -func binarySearchUint64(a []uint64, v uint64) uint { - // Copy-pasted sort.Search from https://golang.org/src/sort/search.go?s=2246:2286#L49 - i, j := uint(0), uint(len(a)) - for i < j { - h := (i + j) >> 1 - if h < uint(len(a)) && a[h] < v { - i = h + 1 - } else { - j = h - } - } - return i -} - var errFallbackToGlobalSearch = errors.New("fall back from per-day index search to global index search") -var errMissingMetricIDsForDate = errors.New("missing metricIDs for date") - -func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) { - atomic.AddUint64(&is.db.dateMetricIDsSearchCalls, 1) - minDate := uint64(tr.MinTimestamp) / msecPerDay - maxDate := uint64(tr.MaxTimestamp) / msecPerDay - if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch { - // Too much dates must be covered. Give up. - return nil, errMissingMetricIDsForDate - } - if minDate == maxDate { - // Fast path - query on a single day. - metricIDs, err := is.getMetricIDsForDate(minDate, maxMetrics) - if err != nil { - return nil, err - } - atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1) - return metricIDs, nil - } - - // Slower path - query over multiple days in parallel. - metricIDs := &uint64set.Set{} - var wg sync.WaitGroup - var errGlobal error - var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below. - for minDate <= maxDate { - wg.Add(1) - go func(date uint64) { - defer wg.Done() - isLocal := is.db.getIndexSearch(is.deadline) - m, err := isLocal.getMetricIDsForDate(date, maxMetrics) - is.db.putIndexSearch(isLocal) - mu.Lock() - defer mu.Unlock() - if errGlobal != nil { - return - } - if err != nil { - errGlobal = err - return - } - if metricIDs.Len() < maxMetrics { - metricIDs.UnionMayOwn(m) - } - }(minDate) - minDate++ - } - wg.Wait() - if errGlobal != nil { - return nil, errGlobal - } - atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1) - return metricIDs, nil -} - const maxDaysForPerDaySearch = 40 func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { @@ -2875,10 +2483,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter // so later they can be filtered out with negative filters. m, err := is.getMetricIDsForDate(date, maxDateMetrics) if err != nil { - if err == errMissingMetricIDsForDate { - // Zero time series were written on the given date. - return nil, nil - } return nil, fmt.Errorf("cannot obtain all the metricIDs: %w", err) } if m.Len() >= maxDateMetrics { @@ -3100,13 +2704,18 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { } func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) { - // Augument tag filter prefix for per-date search instead of global search. if !bytes.HasPrefix(tf.prefix, commonPrefix) { logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix) } kb := kbPool.Get() - kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) - kb.B = encoding.MarshalUint64(kb.B, date) + if date != 0 { + // Use per-date search. + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + } else { + // Use global search if date isn't set. + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) + } kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...) tfNew := *tf tfNew.isNegative = false // isNegative for the original tf is handled by the caller. @@ -3152,8 +2761,14 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64 // Extract all the metricIDs from (date, __name__=value)->metricIDs entries. kb := kbPool.Get() defer kbPool.Put(kb) - kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) - kb.B = encoding.MarshalUint64(kb.B, date) + if date != 0 { + // Use per-date search + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + } else { + // Use global search + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) + } kb.B = marshalTagValue(kb.B, nil) var metricIDs uint64set.Set if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil { @@ -3162,15 +2777,6 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64 return &metricIDs, nil } -func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error { - kb := kbPool.Get() - defer kbPool.Put(kb) - // Extract all the metricIDs from (__name__=value)->metricIDs entries. - kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) - kb.B = marshalTagValue(kb.B, nil) - return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics) -} - func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error { ts := &is.ts mp := &is.mp @@ -3211,38 +2817,6 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64 // The estimated number of index scan loops a single loop in updateMetricIDsByMetricNameMatch takes. const loopsCountPerMetricNameMatch = 150 -func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) { - if filter.Len() == 0 { - return nil, nil - } - metricIDs := filter - if !tf.isNegative { - metricIDs = &uint64set.Set{} - } - if len(tf.orSuffixes) > 0 { - // Fast path for orSuffixes - seek for rows for each value from orSuffixes. - _, err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter, int64Max) - if err != nil { - return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf) - } - return metricIDs, nil - } - - // Slow path - scan for all the rows with the given prefix. - _, err := is.getMetricIDsForTagFilterSlow(tf, filter, func(metricID uint64) { - if tf.isNegative { - // filter must be equal to metricIDs - metricIDs.Del(metricID) - } else { - metricIDs.Add(metricID) - } - }, int64Max) - if err != nil { - return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf) - } - return metricIDs, nil -} - var kbPool bytesutil.ByteBufferPool // Returns local unique MetricID. @@ -3367,21 +2941,6 @@ func (mp *tagToMetricIDsRowParser) EqualPrefix(x *tagToMetricIDsRowParser) bool return mp.Date == x.Date && mp.NSPrefix == x.NSPrefix } -// FirstAndLastMetricIDs returns the first and the last metricIDs in the mp.tail. -func (mp *tagToMetricIDsRowParser) FirstAndLastMetricIDs() (uint64, uint64) { - tail := mp.tail - if len(tail) < 8 { - logger.Panicf("BUG: cannot unmarshal metricID from %d bytes; need 8 bytes", len(tail)) - return 0, 0 - } - firstMetricID := encoding.UnmarshalUint64(tail) - lastMetricID := firstMetricID - if len(tail) > 8 { - lastMetricID = encoding.UnmarshalUint64(tail[len(tail)-8:]) - } - return firstMetricID, lastMetricID -} - // MetricIDsLen returns the number of MetricIDs in the mp.tail func (mp *tagToMetricIDsRowParser) MetricIDsLen() int { return len(mp.tail) / 8 @@ -3410,16 +2969,6 @@ func (mp *tagToMetricIDsRowParser) ParseMetricIDs() { } } -// HasCommonMetricIDs returns true if mp has at least one common metric id with filter. -func (mp *tagToMetricIDsRowParser) HasCommonMetricIDs(filter *uint64set.Set) bool { - for _, metricID := range mp.MetricIDs { - if filter.Has(metricID) { - return true - } - } - return false -} - // IsDeletedTag verifies whether the tag from mp is deleted according to dmis. // // dmis must contain deleted MetricIDs. diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 6cdeac28a0..3bae66b888 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1546,12 +1546,12 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } } - // Check that all the metrics are found in updateMetricIDsAll - var metricIDs uint64set.Set - if err := is2.updateMetricIDsAll(&metricIDs, metricsPerDay*days); err != nil { + // Check that all the metrics are found in global index + metricIDs, err := is2.getMetricIDsForDate(0, metricsPerDay*days) + if err != nil { t.Fatalf("unexpected error: %s", err) } - if !allMetricIDs.Equal(&metricIDs) { + if !allMetricIDs.Equal(metricIDs) { t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil)) } diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index 6f88b6e54a..869fc951ac 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -209,13 +209,6 @@ func (tfs *TagFilters) Reset() { tfs.commonPrefix = marshalCommonPrefix(tfs.commonPrefix[:0], nsPrefixTagToMetricIDs) } -func (tfs *TagFilters) marshal(dst []byte) []byte { - for i := range tfs.tfs { - dst = tfs.tfs[i].Marshal(dst) - } - return dst -} - // tagFilter represents a filter used for filtering tags. type tagFilter struct { key []byte