lib/storage: skip blocks outside the configured retention during search

Blocks outside the configured retention are eventually deleted during background merge.
But such blocks may reside in the storage for long time until background merge.
Previously VictoriaMetrics could spend additional CPU time on processing such blocks
during search queries. Now these blocks are skipped.
This commit is contained in:
Aliaksandr Valialkin 2022-10-24 02:52:38 +03:00
parent e2f0b76ebf
commit dba218a8ce
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
4 changed files with 15 additions and 4 deletions

View File

@ -316,7 +316,7 @@ func CopyDirectory(srcPath, dstPath string) error {
}
src := filepath.Join(srcPath, de.Name())
dst := filepath.Join(dstPath, de.Name())
if err := copyFile(src, dst); err != nil {
if err := CopyFile(src, dst); err != nil {
return err
}
}
@ -324,7 +324,8 @@ func CopyDirectory(srcPath, dstPath string) error {
return nil
}
func copyFile(srcPath, dstPath string) error {
// CopyFile copies the file from srcPath to dstPath.
func CopyFile(srcPath, dstPath string) error {
src, err := os.Open(srcPath)
if err != nil {
return err

View File

@ -61,7 +61,7 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline
bsm.nextBlockNoop = true
}
func (bsm *blockStreamMerger) getRetentionDeadline(b *Block) int64 {
func (bsm *blockStreamMerger) getRetentionDeadline(bh *blockHeader) int64 {
return bsm.retentionDeadline
}

View File

@ -57,7 +57,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
continue
}
retentionDeadline := bsm.getRetentionDeadline(b)
retentionDeadline := bsm.getRetentionDeadline(&b.bh)
if b.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention.
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))

View File

@ -96,6 +96,9 @@ type Search struct {
// idb is used for MetricName lookup for the found data blocks.
idb *indexDB
// retentionDeadline is used for filtering out blocks outside the configured retention.
retentionDeadline int64
ts tableSearch
// tr contains time range used in the serach.
@ -121,6 +124,7 @@ func (s *Search) reset() {
s.MetricBlockRef.BlockRef = nil
s.idb = nil
s.retentionDeadline = 0
s.ts.reset()
s.tr = TimeRange{}
s.tfss = nil
@ -142,9 +146,11 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
if s.needClosing {
logger.Panicf("BUG: missing MustClose call before the next call to Init")
}
retentionDeadline := int64(fasttime.UnixTimestamp()*1e3) - storage.retentionMsecs
s.reset()
s.idb = storage.idb()
s.retentionDeadline = retentionDeadline
s.tr = tr
s.tfss = tfss
s.deadline = deadline
@ -202,6 +208,10 @@ func (s *Search) NextMetricBlock() bool {
s.loops++
tsid := &s.ts.BlockRef.bh.TSID
if tsid.MetricID != s.prevMetricID {
if s.ts.BlockRef.bh.MaxTimestamp < s.retentionDeadline {
// Skip the block, since it contains only data outside the configured retention.
continue
}
var err error
s.MetricBlockRef.MetricName, err = s.idb.searchMetricNameWithCache(s.MetricBlockRef.MetricName[:0], tsid.MetricID)
if err != nil {