lib/storage: parallelize tag filters execution a bit

This should reduce execution time when a query contains multiple tag filters and each such filter matches big number of time series.
This commit is contained in:
Aliaksandr Valialkin 2021-02-10 16:13:17 +02:00
parent 4262c2f7c2
commit b27288f1b0

View File

@ -2828,7 +2828,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
return a.tf.Less(b.tf) return a.tf.Less(b.tf)
}) })
// Populate metricIDs with the first non-negative filter. // Populate metricIDs for the first non-negative filter.
var tfsPostponed []*tagFilter var tfsPostponed []*tagFilter
var metricIDs *uint64set.Set var metricIDs *uint64set.Set
maxDateMetrics := maxMetrics * 50 maxDateMetrics := maxMetrics * 50
@ -2880,36 +2880,67 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
} }
// Intersect metricIDs with the rest of filters. // Intersect metricIDs with the rest of filters.
var mu sync.Mutex
var wg sync.WaitGroup
var errGlobal error
// Limit the number of concurrent goroutines for metricIDs filtering in the hope they reduce the number
// of matching metrics to quite low value, so the remaining filters could be matched by metricName.
concurrencyCh := make(chan struct{}, 2)
for i := range tfsRemainingWithCount { for i := range tfsRemainingWithCount {
tfWithCount := tfsRemainingWithCount[i] tfWithCount := tfsRemainingWithCount[i]
if n := uint64(metricIDs.Len()); n < 1000 || (n < tfWithCount.count/maxIndexScanLoopsPerMetric && n < uint64(maxMetrics)/10) { tf := tfWithCount.tf
wg.Add(1)
go func() {
concurrencyCh <- struct{}{}
defer func() {
<-concurrencyCh
wg.Done()
}()
mu.Lock()
metricIDsLen := metricIDs.Len()
mu.Unlock()
if metricIDsLen == 0 {
// Short circuit - there is no need in applying the remaining filters to empty set.
return
}
if n := uint64(metricIDsLen); n < 1000 || (n < tfWithCount.count/maxIndexScanLoopsPerMetric && n < uint64(maxMetrics)/10) {
// It should be faster performing metricName match on the remaining filters // It should be faster performing metricName match on the remaining filters
// instead of scanning big number of entries in the inverted index for these filters. // instead of scanning big number of entries in the inverted index for these filters.
for i < len(tfsRemainingWithCount) { mu.Lock()
tfsPostponed = append(tfsPostponed, tfsRemainingWithCount[i].tf) tfsPostponed = append(tfsPostponed, tf)
i++ mu.Unlock()
return
} }
break isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline)
} m, err := isLocal.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
tf := tfWithCount.tf isLocal.db.putIndexSearch(isLocal)
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
if err != nil { if err != nil {
return nil, err mu.Lock()
if errGlobal == nil {
errGlobal = err
}
mu.Unlock()
return
} }
if m.Len() >= maxDateMetrics { if m.Len() >= maxDateMetrics {
// Too many time series found by a single tag filter. Postpone applying this filter via metricName match. // Too many time series found by a single tag filter. Postpone applying this filter via metricName match.
mu.Lock()
tfsPostponed = append(tfsPostponed, tf) tfsPostponed = append(tfsPostponed, tf)
continue mu.Unlock()
return
} }
mu.Lock()
if tf.isNegative { if tf.isNegative {
metricIDs.Subtract(m) metricIDs.Subtract(m)
} else { } else {
metricIDs.Intersect(m) metricIDs.Intersect(m)
} }
if metricIDs.Len() == 0 { mu.Unlock()
// Short circuit - there is no need in applying the remaining filters to empty set. }()
return nil, nil
} }
wg.Wait()
if errGlobal != nil {
return nil, errGlobal
} }
if len(tfsPostponed) > 0 { if len(tfsPostponed) > 0 {
if n := metricIDs.Len(); n > 50000 && n > maxMetrics/10 { if n := metricIDs.Len(); n > 50000 && n > maxMetrics/10 {