lib/storage: further tuning for time series selector code

This commit is contained in:
Aliaksandr Valialkin 2021-03-15 20:31:24 +02:00
parent 923cdb0552
commit 9c77e34ef9

View File

@ -2356,7 +2356,13 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set
metricIDs := &uint64set.Set{}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs)
var loopsCount uint64
var err error
if filter != nil {
loopsCount, err = is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter)
} else {
loopsCount, err = is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs)
}
if err != nil {
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
@ -2493,19 +2499,22 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMe
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) error {
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) (uint64, error) {
sortedFilter := filter.AppendTo(nil)
kb := kbPool.Get()
defer kbPool.Put(kb)
var loopsCount uint64
for _, orSuffix := range tf.orSuffixes {
kb.B = append(kb.B[:0], tf.prefix...)
kb.B = append(kb.B, orSuffix...)
kb.B = append(kb.B, tagSeparatorChar)
if err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative); err != nil {
return err
lc, err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative)
if err != nil {
return loopsCount, err
}
loopsCount += lc
}
return nil
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) {
@ -2539,15 +2548,16 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) error {
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) (uint64, error) {
if len(sortedFilter) == 0 {
return nil
return 0, nil
}
firstFilterMetricID := sortedFilter[0]
lastFilterMetricID := sortedFilter[len(sortedFilter)-1]
ts := &is.ts
mp := &is.mp
mp.Reset()
var loopsCount uint64
loopsPaceLimiter := 0
ts.Seek(prefix)
var sf []uint64
@ -2555,17 +2565,18 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
for ts.NextItem() {
if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
return loopsCount, err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
return nil
return loopsCount, nil
}
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
return err
return loopsCount, err
}
loopsCount += uint64(mp.MetricIDsLen())
firstMetricID, lastMetricID := mp.FirstAndLastMetricIDs()
if lastMetricID < firstFilterMetricID {
// Skip the item, since it contains metricIDs lower
@ -2575,10 +2586,11 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
if firstMetricID > lastFilterMetricID {
// Stop searching, since the current item and all the subsequent items
// contain metricIDs higher than metricIDs in sortedFilter.
return nil
return loopsCount, nil
}
sf = sortedFilter
mp.ParseMetricIDs()
matchingMetricIDs := mp.MetricIDs[:0]
for _, metricID = range mp.MetricIDs {
if len(sf) == 0 {
break
@ -2593,18 +2605,23 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
if metricID < sf[0] {
continue
}
if isNegative {
metricIDs.Del(metricID)
} else {
metricIDs.Add(metricID)
}
matchingMetricIDs = append(matchingMetricIDs, metricID)
sf = sf[1:]
}
if len(matchingMetricIDs) > 0 {
if isNegative {
for _, metricID := range matchingMetricIDs {
metricIDs.Del(metricID)
}
} else {
metricIDs.AddMulti(matchingMetricIDs)
}
}
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
}
return nil
return loopsCount, nil
}
func binarySearchUint64(a []uint64, v uint64) uint {
@ -2792,10 +2809,9 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
})
// Populate metricIDs for the first non-negative filter.
var tfsPostponed []*tagFilter
var metricIDs *uint64set.Set
tfwsRemaining := tfws[:0]
maxDateMetrics := maxMetrics * 100
maxDateMetrics := maxMetrics * 50
for i := range tfws {
tfw := tfws[i]
tf := tfw.tf
@ -2804,13 +2820,16 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
continue
}
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics)
if loopsCount > tfw.loopsCount {
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
}
if err != nil {
return nil, err
}
if m.Len() >= maxDateMetrics {
// Too many time series found by a single tag filter. Postpone applying this filter via metricName match.
tfsPostponed = append(tfsPostponed, tf)
// Too many time series found by a single tag filter. Postpone applying this filter.
tfwsRemaining = append(tfwsRemaining, tfw)
tfw.loopsCount = loopsCount
continue
}
metricIDs = m
@ -2846,6 +2865,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// when the intial tag filters significantly reduce the number of found metricIDs,
// so the remaining filters could be performed via much faster metricName matching instead
// of slow selecting of matching metricIDs.
var tfsPostponed []*tagFilter
for i := range tfwsRemaining {
tfw := tfwsRemaining[i]
tf := tfw.tf
@ -2854,24 +2874,26 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// Short circuit - there is no need in applying the remaining filters to an empty set.
break
}
if uint64(metricIDsLen)*maxIndexScanLoopsPerMetric < tfw.loopsCount {
if tfw.loopsCount > uint64(metricIDsLen)*loopsCountPerMetricNameMatch {
// It should be faster performing metricName match on the remaining filters
// instead of scanning big number of entries in the inverted index for these filters.
for i < len(tfwsRemaining) {
tfw := tfwsRemaining[i]
tf := tfw.tf
tfsPostponed = append(tfsPostponed, tf)
// Store stats for non-executed tf, since it could be updated during protection from thundered herd.
is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount)
continue
i++
}
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, maxDateMetrics)
break
}
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, 0)
if loopsCount > tfw.loopsCount {
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
}
if err != nil {
return nil, err
}
if m.Len() >= maxDateMetrics {
// Too many time series found by a single tag filter. Postpone applying this filter via metricName match.
tfsPostponed = append(tfsPostponed, tf)
continue
}
if tf.isNegative {
metricIDs.Subtract(m)
} else {
@ -3057,7 +3079,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
// Set high loopsCount for failing filter, so it is moved to the end of filter list.
loopsCount = 20e9
}
if metricIDs.Len() >= maxMetrics {
if filter == nil && metricIDs.Len() >= maxMetrics {
// Increase loopsCount for tag filter matching too many metrics,
// So next time it is moved to the end of filter list.
loopsCount *= 2
@ -3154,10 +3176,8 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
return nil
}
// The maximum number of index scan loops.
// Bigger number of loops is slower than updateMetricIDsByMetricNameMatch
// over the found metrics.
const maxIndexScanLoopsPerMetric = 100
// The estimated number of index scan loops a single loop in updateMetricIDsByMetricNameMatch takes.
const loopsCountPerMetricNameMatch = 500
func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) {
if filter.Len() == 0 {
@ -3169,7 +3189,8 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui
}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
if err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter); err != nil {
_, err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter)
if err != nil {
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
return metricIDs, nil