lib/storage: return back in-order applying of tag filters, since concurrently executing tag filters can result in CPU and RAM waste in common case

This commit is contained in:
Aliaksandr Valialkin 2021-02-10 22:40:20 +02:00
parent b51c23dc5b
commit 0e26b7168a

View File

@ -2853,73 +2853,45 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
}
metricIDs = m
}
if metricIDs.Len() == 0 {
// There is no sense in inspecting tfsRemainingWithCount, since the result will be empty.
return nil, nil
}
// Intersect metricIDs with the rest of filters.
var mu sync.Mutex
var wg sync.WaitGroup
var errGlobal error
// Limit the number of concurrent goroutines for metricIDs filtering in the hope they reduce the number
// of matching metrics to quite low value, so the remaining filters could be matched by metricName.
concurrencyCh := make(chan struct{}, 2)
//
// Do not run these tag filters in parallel, since this may result in CPU and RAM waste
// when the intial tag filters significantly reduce the number of found metricIDs,
// so the remaining filters could be performed via much faster metricName matching instead
// of slow selecting of matching metricIDs.
for i := range tfsRemainingWithCount {
tfWithCount := tfsRemainingWithCount[i]
tf := tfWithCount.tf
wg.Add(1)
go func() {
concurrencyCh <- struct{}{}
defer func() {
<-concurrencyCh
wg.Done()
}()
mu.Lock()
metricIDsLen := metricIDs.Len()
mu.Unlock()
if metricIDsLen == 0 {
// Short circuit - there is no need in applying the remaining filters to empty set.
return
}
if n := uint64(metricIDsLen); n < 1000 || (n < tfWithCount.count/maxIndexScanLoopsPerMetric && n < uint64(maxMetrics)/10) {
// It should be faster performing metricName match on the remaining filters
// instead of scanning big number of entries in the inverted index for these filters.
mu.Lock()
tfsPostponed = append(tfsPostponed, tf)
mu.Unlock()
return
}
isLocal := is.db.getIndexSearch(is.deadline)
m, err := isLocal.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
isLocal.db.putIndexSearch(isLocal)
if err != nil {
mu.Lock()
if errGlobal == nil {
errGlobal = err
}
mu.Unlock()
return
}
if m.Len() >= maxDateMetrics {
// Too many time series found by a single tag filter. Postpone applying this filter via metricName match.
mu.Lock()
tfsPostponed = append(tfsPostponed, tf)
mu.Unlock()
return
}
mu.Lock()
if tf.isNegative {
metricIDs.Subtract(m)
} else {
metricIDs.Intersect(m)
}
mu.Unlock()
}()
metricIDsLen := metricIDs.Len()
if metricIDsLen == 0 {
// Short circuit - there is no need in applying the remaining filters to an empty set.
break
}
if n := uint64(metricIDsLen); n < 1000 || (n < tfWithCount.count/maxIndexScanLoopsPerMetric && n < uint64(maxMetrics)/10) {
// It should be faster performing metricName match on the remaining filters
// instead of scanning big number of entries in the inverted index for these filters.
tfsPostponed = append(tfsPostponed, tf)
continue
}
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
if err != nil {
return nil, err
}
if m.Len() >= maxDateMetrics {
// Too many time series found by a single tag filter. Postpone applying this filter via metricName match.
tfsPostponed = append(tfsPostponed, tf)
continue
}
if tf.isNegative {
metricIDs.Subtract(m)
} else {
metricIDs.Intersect(m)
}
}
wg.Wait()
if errGlobal != nil {
return nil, errGlobal
if metricIDs.Len() == 0 {
// There is no need in applying tfsPostponed, since the result is empty.
return nil, nil
}
if len(tfsPostponed) > 0 {
if n := metricIDs.Len(); n > 50000 && n > maxMetrics/10 {