lib/storage: add more fine-grained pace limiting for search

This commit is contained in:
Aliaksandr Valialkin 2020-07-23 19:21:49 +03:00
parent 461481fbdf
commit 2a45871823
5 changed files with 72 additions and 11 deletions

View File

@ -116,7 +116,7 @@ func timeseriesWorker(workerID uint) {
} }
} }
// RunParallel runs in parallel f for all the results from rss. // RunParallel runs f in parallel for all the results from rss.
// //
// f shouldn't hold references to rs after returning. // f shouldn't hold references to rs after returning.
// workerID is the id of the worker goroutine that calls f. // workerID is the id of the worker goroutine that calls f.

View File

@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
@ -768,10 +769,15 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
mp := &is.mp mp := &is.mp
mp.Reset() mp.Reset()
dmis := is.db.getDeletedMetricIDs() dmis := is.db.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() { for len(tks) < maxTagKeys && ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
loopsPaceLimiter++
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
break break
@ -840,11 +846,16 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
mp := &is.mp mp := &is.mp
mp.Reset() mp.Reset()
dmis := is.db.getDeletedMetricIDs() dmis := is.db.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, tagKey) kb.B = marshalTagValue(kb.B, tagKey)
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for len(tvs) < maxTagValues && ts.NextItem() { for len(tvs) < maxTagValues && ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
loopsPaceLimiter++
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
break break
@ -908,12 +919,17 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
mp := &is.mp mp := &is.mp
loopsPaceLimiter := 0
var metricIDsLen uint64 var metricIDsLen uint64
// Extract the number of series from ((__name__=value): metricIDs) rows // Extract the number of series from ((__name__=value): metricIDs) rows
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, nil) kb.B = marshalTagValue(kb.B, nil)
ts.Seek(kb.B) ts.Seek(kb.B)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
loopsPaceLimiter++
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, kb.B) { if !bytes.HasPrefix(item, kb.B) {
break break
@ -974,11 +990,16 @@ func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus,
var labelValueCountByLabelName, seriesCountByLabelValuePair uint64 var labelValueCountByLabelName, seriesCountByLabelValuePair uint64
nameEqualBytes := []byte("__name__=") nameEqualBytes := []byte("__name__=")
loopsPaceLimiter := 0
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date) kb.B = encoding.MarshalUint64(kb.B, date)
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
loopsPaceLimiter++
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
break break
@ -1496,7 +1517,10 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
// Obtain TSID values for the given metricIDs. // Obtain TSID values for the given metricIDs.
tsids := make([]TSID, len(metricIDs)) tsids := make([]TSID, len(metricIDs))
i := 0 i := 0
for _, metricID := range metricIDs { for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&(1<<10) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster // Try obtaining TSIDs from MetricID->TSID cache. This is much faster
// than scanning the mergeset if it contains a lot of metricIDs. // than scanning the mergeset if it contains a lot of metricIDs.
tsid := &tsids[i] tsid := &tsids[i]
@ -1563,7 +1587,10 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
defer kbPool.Put(metricName) defer kbPool.Put(metricName)
mn := GetMetricName() mn := GetMetricName()
defer PutMetricName(mn) defer PutMetricName(mn)
for _, metricID := range sortedMetricIDs { for loopsPaceLimiter, metricID := range sortedMetricIDs {
if loopsPaceLimiter&(1<<10) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
var err error var err error
metricName.B, err = is.searchMetricName(metricName.B[:0], metricID) metricName.B, err = is.searchMetricName(metricName.B[:0], metricID)
if err != nil { if err != nil {
@ -2036,16 +2063,21 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
} }
// Scan all the rows with tf.prefix and call f on every tf match. // Scan all the rows with tf.prefix and call f on every tf match.
loops := 0
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
mp := &is.mp mp := &is.mp
mp.Reset() mp.Reset()
var prevMatchingSuffix []byte var prevMatchingSuffix []byte
var prevMatch bool var prevMatch bool
loops := 0
loopsPaceLimiter := 0
prefix := tf.prefix prefix := tf.prefix
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<14) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
loopsPaceLimiter++
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
return nil return nil
@ -2161,8 +2193,13 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
mp.Reset() mp.Reset()
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
loops := 0 loops := 0
loopsPaceLimiter := 0
ts.Seek(prefix) ts.Seek(prefix)
for metricIDs.Len() < maxMetrics && ts.NextItem() { for metricIDs.Len() < maxMetrics && ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
loopsPaceLimiter++
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
return nil return nil
@ -2194,10 +2231,15 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
mp.Reset() mp.Reset()
maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric
loops := 0 loops := 0
loopsPaceLimiter := 0
ts.Seek(prefix) ts.Seek(prefix)
var sf []uint64 var sf []uint64
var metricID uint64 var metricID uint64
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<12) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
loopsPaceLimiter++
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
return nil return nil
@ -2730,8 +2772,13 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics i
func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error { func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error {
ts := &is.ts ts := &is.ts
mp := &is.mp mp := &is.mp
loopsPaceLimiter := 0
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
loopsPaceLimiter++
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
return nil return nil

View File

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
) )
// BlockRef references a Block. // BlockRef references a Block.
@ -66,6 +67,8 @@ type Search struct {
err error err error
needClosing bool needClosing bool
loops int
} }
func (s *Search) reset() { func (s *Search) reset() {
@ -76,6 +79,7 @@ func (s *Search) reset() {
s.ts.reset() s.ts.reset()
s.err = nil s.err = nil
s.needClosing = false s.needClosing = false
s.loops = 0
} }
// Init initializes s from the given storage, tfss and tr. // Init initializes s from the given storage, tfss and tr.
@ -129,6 +133,10 @@ func (s *Search) NextMetricBlock() bool {
return false return false
} }
for s.ts.NextBlock() { for s.ts.NextBlock() {
if s.loops&(1<<10) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
s.loops++
tsid := &s.ts.BlockRef.bh.TSID tsid := &s.ts.BlockRef.bh.TSID
var err error var err error
s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID) s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID)

View File

@ -800,11 +800,6 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr. // searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
// Make sure that there are enough resources for processing data ingestion before starting the query.
// This should prevent from data ingestion starvation when provessing heavy queries.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 .
storagepacelimiter.Search.WaitIfNeeded()
// Do not cache tfss -> tsids here, since the caching is performed // Do not cache tfss -> tsids here, since the caching is performed
// on idb level. // on idb level.
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics) tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics)
@ -818,10 +813,14 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
// //
// This should speed-up further searchMetricName calls for metricIDs from tsids. // This should speed-up further searchMetricName calls for metricIDs from tsids.
func (s *Storage) prefetchMetricNames(tsids []TSID) error { func (s *Storage) prefetchMetricNames(tsids []TSID) error {
if len(tsids) == 0 {
return nil
}
var metricIDs uint64Sorter var metricIDs uint64Sorter
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
for i := range tsids { for i := range tsids {
metricID := tsids[i].MetricID tsid := &tsids[i]
metricID := tsid.MetricID
if prefetchedMetricIDs.Has(metricID) { if prefetchedMetricIDs.Has(metricID) {
continue continue
} }
@ -840,7 +839,10 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error {
idb := s.idb() idb := s.idb()
is := idb.getIndexSearch() is := idb.getIndexSearch()
defer idb.putIndexSearch(is) defer idb.putIndexSearch(is)
for _, metricID := range metricIDs { for loops, metricID := range metricIDs {
if loops&(1<<10) == 0 {
storagepacelimiter.Search.WaitIfNeeded()
}
metricName, err = is.searchMetricName(metricName[:0], metricID) metricName, err = is.searchMetricName(metricName[:0], metricID)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %w", metricID, err) return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %w", metricID, err)

View File

@ -5,7 +5,11 @@ import (
) )
// Search limits the pace of search calls when there is at least a single in-flight assisted merge. // Search limits the pace of search calls when there is at least a single in-flight assisted merge.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
var Search = pacelimiter.New() var Search = pacelimiter.New()
// BigMerges limits the pace for big merges when there is at least a single in-flight small merge. // BigMerges limits the pace for big merges when there is at least a single in-flight small merge.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648
var BigMerges = pacelimiter.New() var BigMerges = pacelimiter.New()