lib/storage: small refactoring: move retentionDeadline to blockStreamMerger

This allows defining per-block retention in the future by updating the getRetentionDeadline function
This commit is contained in:
Aliaksandr Valialkin 2022-10-23 14:30:16 +03:00
parent 31071347ca
commit 63419d8e7c
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 14 additions and 4 deletions

View File

@ -13,6 +13,9 @@ type blockStreamMerger struct {
bsrHeap blockStreamReaderHeap bsrHeap blockStreamReaderHeap
// Blocks with smaller timestamps are removed because of retention.
retentionDeadline int64
// Whether the call to NextBlock must be no-op. // Whether the call to NextBlock must be no-op.
nextBlockNoop bool nextBlockNoop bool
@ -26,13 +29,15 @@ func (bsm *blockStreamMerger) reset() {
bsm.bsrHeap[i] = nil bsm.bsrHeap[i] = nil
} }
bsm.bsrHeap = bsm.bsrHeap[:0] bsm.bsrHeap = bsm.bsrHeap[:0]
bsm.retentionDeadline = 0
bsm.nextBlockNoop = false bsm.nextBlockNoop = false
bsm.err = nil bsm.err = nil
} }
// Init initializes bsm with the given bsrs. // Init initializes bsm with the given bsrs.
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) { func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline int64) {
bsm.reset() bsm.reset()
bsm.retentionDeadline = retentionDeadline
for _, bsr := range bsrs { for _, bsr := range bsrs {
if bsr.NextBlock() { if bsr.NextBlock() {
bsm.bsrHeap = append(bsm.bsrHeap, bsr) bsm.bsrHeap = append(bsm.bsrHeap, bsr)
@ -54,6 +59,10 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) {
bsm.nextBlockNoop = true bsm.nextBlockNoop = true
} }
func (bsm *blockStreamMerger) getRetentionDeadline(b *Block) int64 {
return bsm.retentionDeadline
}
// NextBlock stores the next block in bsm.Block. // NextBlock stores the next block in bsm.Block.
// //
// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks // The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks

View File

@ -20,8 +20,8 @@ func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStre
ph.Reset() ph.Reset()
bsm := bsmPool.Get().(*blockStreamMerger) bsm := bsmPool.Get().(*blockStreamMerger)
bsm.Init(bsrs) bsm.Init(bsrs, retentionDeadline)
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted) err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted)
bsm.reset() bsm.reset()
bsmPool.Put(bsm) bsmPool.Put(bsm)
bsw.MustClose() bsw.MustClose()
@ -40,7 +40,7 @@ var bsmPool = &sync.Pool{
var errForciblyStopped = fmt.Errorf("forcibly stopped") var errForciblyStopped = fmt.Errorf("forcibly stopped")
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{},
dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error { dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
pendingBlockIsEmpty := true pendingBlockIsEmpty := true
pendingBlock := getBlock() pendingBlock := getBlock()
defer putBlock(pendingBlock) defer putBlock(pendingBlock)
@ -58,6 +58,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
continue continue
} }
retentionDeadline := bsm.getRetentionDeadline(b)
if b.bh.MaxTimestamp < retentionDeadline { if b.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention. // Skip blocks out of the given retention.
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))