VictoriaMetrics/lib/storage/block_stream_merger.go
Aliaksandr Valialkin 51f2e473f5
lib/storage: skip blocks outside the configured retention during search
Blocks outside the configured retention are eventually deleted during background merge.
But such blocks may reside in the storage for long time until background merge.
Previously VictoriaMetrics could spend additional CPU time on processing such blocks
during search queries. Now these blocks are skipped.
2022-10-24 02:56:13 +03:00

156 lines
3.1 KiB
Go

package storage
import (
"container/heap"
"fmt"
"io"
)
// blockStreamMerger is used for merging block streams.
type blockStreamMerger struct {
// The current block to work with.
Block *Block
bsrHeap blockStreamReaderHeap
// Blocks with smaller timestamps are removed because of retention.
retentionDeadline int64
// Whether the call to NextBlock must be no-op.
nextBlockNoop bool
// The last error
err error
}
func (bsm *blockStreamMerger) reset() {
bsm.Block = nil
for i := range bsm.bsrHeap {
bsm.bsrHeap[i] = nil
}
bsm.bsrHeap = bsm.bsrHeap[:0]
bsm.retentionDeadline = 0
bsm.nextBlockNoop = false
bsm.err = nil
}
// Init initializes bsm with the given bsrs.
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline int64) {
bsm.reset()
bsm.retentionDeadline = retentionDeadline
for _, bsr := range bsrs {
if bsr.NextBlock() {
bsm.bsrHeap = append(bsm.bsrHeap, bsr)
continue
}
if err := bsr.Error(); err != nil {
bsm.err = fmt.Errorf("cannot obtain the next block to merge: %w", err)
return
}
}
if len(bsm.bsrHeap) == 0 {
bsm.err = io.EOF
return
}
heap.Init(&bsm.bsrHeap)
bsm.Block = &bsm.bsrHeap[0].Block
bsm.nextBlockNoop = true
}
func (bsm *blockStreamMerger) getRetentionDeadline(bh *blockHeader) int64 {
return bsm.retentionDeadline
}
// NextBlock stores the next block in bsm.Block.
//
// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks
// for the same TSID may contain overlapped time ranges.
func (bsm *blockStreamMerger) NextBlock() bool {
if bsm.err != nil {
return false
}
if bsm.nextBlockNoop {
bsm.nextBlockNoop = false
return true
}
bsm.err = bsm.nextBlock()
switch bsm.err {
case nil:
return true
case io.EOF:
return false
default:
bsm.err = fmt.Errorf("cannot obtain the next block to merge: %w", bsm.err)
return false
}
}
func (bsm *blockStreamMerger) nextBlock() error {
bsrMin := bsm.bsrHeap[0]
if bsrMin.NextBlock() {
heap.Fix(&bsm.bsrHeap, 0)
bsm.Block = &bsm.bsrHeap[0].Block
return nil
}
if err := bsrMin.Error(); err != nil {
bsm.Block = nil
return err
}
heap.Pop(&bsm.bsrHeap)
if len(bsm.bsrHeap) == 0 {
bsm.Block = nil
return io.EOF
}
bsm.Block = &bsm.bsrHeap[0].Block
return nil
}
func (bsm *blockStreamMerger) Error() error {
if bsm.err == io.EOF {
return nil
}
return bsm.err
}
type blockStreamReaderHeap []*blockStreamReader
func (bsrh *blockStreamReaderHeap) Len() int {
return len(*bsrh)
}
func (bsrh *blockStreamReaderHeap) Less(i, j int) bool {
x := *bsrh
a, b := &x[i].Block.bh, &x[j].Block.bh
if a.TSID.MetricID == b.TSID.MetricID {
// Fast path for identical TSID values.
return a.MinTimestamp < b.MinTimestamp
}
// Slow path for distinct TSID values.
return a.TSID.Less(&b.TSID)
}
func (bsrh *blockStreamReaderHeap) Swap(i, j int) {
x := *bsrh
x[i], x[j] = x[j], x[i]
}
func (bsrh *blockStreamReaderHeap) Push(x interface{}) {
*bsrh = append(*bsrh, x.(*blockStreamReader))
}
func (bsrh *blockStreamReaderHeap) Pop() interface{} {
a := *bsrh
v := a[len(a)-1]
*bsrh = a[:len(a)-1]
return v
}