From b51c23dc5bfd04a72eb076259c584d444b2b6700 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 10 Feb 2021 16:13:17 +0200 Subject: [PATCH] lib/storage: parallelize tag filters execution a bit This should reduce execution time when a query contains multiple tag filters and each such filter matches big number of time series. --- lib/storage/index_db.go | 87 ++++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 28 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 53a4eb1e9..b7f410ab1 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -2807,7 +2807,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter return a.tf.Less(b.tf) }) - // Populate metricIDs with the first non-negative filter. + // Populate metricIDs for the first non-negative filter. var tfsPostponed []*tagFilter var metricIDs *uint64set.Set maxDateMetrics := maxMetrics * 50 @@ -2859,36 +2859,67 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter } // Intersect metricIDs with the rest of filters. + var mu sync.Mutex + var wg sync.WaitGroup + var errGlobal error + // Limit the number of concurrent goroutines for metricIDs filtering in the hope they reduce the number + // of matching metrics to quite low value, so the remaining filters could be matched by metricName. + concurrencyCh := make(chan struct{}, 2) for i := range tfsRemainingWithCount { tfWithCount := tfsRemainingWithCount[i] - if n := uint64(metricIDs.Len()); n < 1000 || (n < tfWithCount.count/maxIndexScanLoopsPerMetric && n < uint64(maxMetrics)/10) { - // It should be faster performing metricName match on the remaining filters - // instead of scanning big number of entries in the inverted index for these filters. - for i < len(tfsRemainingWithCount) { - tfsPostponed = append(tfsPostponed, tfsRemainingWithCount[i].tf) - i++ - } - break - } tf := tfWithCount.tf - m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics) - if err != nil { - return nil, err - } - if m.Len() >= maxDateMetrics { - // Too many time series found by a single tag filter. Postpone applying this filter via metricName match. - tfsPostponed = append(tfsPostponed, tf) - continue - } - if tf.isNegative { - metricIDs.Subtract(m) - } else { - metricIDs.Intersect(m) - } - if metricIDs.Len() == 0 { - // Short circuit - there is no need in applying the remaining filters to empty set. - return nil, nil - } + wg.Add(1) + go func() { + concurrencyCh <- struct{}{} + defer func() { + <-concurrencyCh + wg.Done() + }() + mu.Lock() + metricIDsLen := metricIDs.Len() + mu.Unlock() + if metricIDsLen == 0 { + // Short circuit - there is no need in applying the remaining filters to empty set. + return + } + if n := uint64(metricIDsLen); n < 1000 || (n < tfWithCount.count/maxIndexScanLoopsPerMetric && n < uint64(maxMetrics)/10) { + // It should be faster performing metricName match on the remaining filters + // instead of scanning big number of entries in the inverted index for these filters. + mu.Lock() + tfsPostponed = append(tfsPostponed, tf) + mu.Unlock() + return + } + isLocal := is.db.getIndexSearch(is.deadline) + m, err := isLocal.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics) + isLocal.db.putIndexSearch(isLocal) + if err != nil { + mu.Lock() + if errGlobal == nil { + errGlobal = err + } + mu.Unlock() + return + } + if m.Len() >= maxDateMetrics { + // Too many time series found by a single tag filter. Postpone applying this filter via metricName match. + mu.Lock() + tfsPostponed = append(tfsPostponed, tf) + mu.Unlock() + return + } + mu.Lock() + if tf.isNegative { + metricIDs.Subtract(m) + } else { + metricIDs.Intersect(m) + } + mu.Unlock() + }() + } + wg.Wait() + if errGlobal != nil { + return nil, errGlobal } if len(tfsPostponed) > 0 { if n := metricIDs.Len(); n > 50000 && n > maxMetrics/10 {