VictoriaMetrics/lib/storage/partition_search.go
2022-07-11 19:23:25 +03:00

205 lines
4.2 KiB
Go

package storage
import (
"container/heap"
"fmt"
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// partitionSearch represents a search in the partition.
type partitionSearch struct {
// BlockRef is the block found after NextBlock call.
BlockRef *BlockRef
// pt is a partition to search.
pt *partition
// pws hold parts snapshot for the given partition during Init call.
// This snapshot is used for calling Part.PutParts on partitionSearch.MustClose.
pws []*partWrapper
psPool []partSearch
psHeap partSearchHeap
err error
nextBlockNoop bool
needClosing bool
}
func (pts *partitionSearch) reset() {
pts.BlockRef = nil
pts.pt = nil
for i := range pts.pws {
pts.pws[i] = nil
}
pts.pws = pts.pws[:0]
for i := range pts.psPool {
pts.psPool[i].reset()
}
pts.psPool = pts.psPool[:0]
for i := range pts.psHeap {
pts.psHeap[i] = nil
}
pts.psHeap = pts.psHeap[:0]
pts.err = nil
pts.nextBlockNoop = false
pts.needClosing = false
}
// Init initializes the search in the given partition for the given tsid and tr.
//
// tsids must be sorted.
// tsids cannot be modified after the Init call, since it is owned by pts.
//
// MustClose must be called when partition search is done.
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
if pts.needClosing {
logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init")
}
pts.reset()
pts.pt = pt
pts.needClosing = true
if len(tsids) == 0 {
// Fast path - zero tsids.
pts.err = io.EOF
return
}
if pt.tr.MinTimestamp > tr.MaxTimestamp || pt.tr.MaxTimestamp < tr.MinTimestamp {
// Fast path - the partition doesn't contain rows for the given time range.
pts.err = io.EOF
return
}
pts.pws = pt.GetParts(pts.pws[:0])
// Initialize psPool.
if n := len(pts.pws) - cap(pts.psPool); n > 0 {
pts.psPool = append(pts.psPool[:cap(pts.psPool)], make([]partSearch, n)...)
}
pts.psPool = pts.psPool[:len(pts.pws)]
for i, pw := range pts.pws {
pts.psPool[i].Init(pw.p, tsids, tr)
}
// Initialize the psHeap.
pts.psHeap = pts.psHeap[:0]
for i := range pts.psPool {
ps := &pts.psPool[i]
if !ps.NextBlock() {
if err := ps.Error(); err != nil {
// Return only the first error, since it has no sense in returning all errors.
pts.err = fmt.Errorf("cannot initialize partition search: %w", err)
return
}
continue
}
pts.psHeap = append(pts.psHeap, ps)
}
if len(pts.psHeap) == 0 {
pts.err = io.EOF
return
}
heap.Init(&pts.psHeap)
pts.BlockRef = &pts.psHeap[0].BlockRef
pts.nextBlockNoop = true
}
// NextBlock advances to the next block.
//
// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks
// for the same TSID may contain overlapped time ranges.
func (pts *partitionSearch) NextBlock() bool {
if pts.err != nil {
return false
}
if pts.nextBlockNoop {
pts.nextBlockNoop = false
return true
}
pts.err = pts.nextBlock()
if pts.err != nil {
if pts.err != io.EOF {
pts.err = fmt.Errorf("cannot obtain the next block to search in the partition: %w", pts.err)
}
return false
}
return true
}
func (pts *partitionSearch) nextBlock() error {
psMin := pts.psHeap[0]
if psMin.NextBlock() {
heap.Fix(&pts.psHeap, 0)
pts.BlockRef = &pts.psHeap[0].BlockRef
return nil
}
if err := psMin.Error(); err != nil {
return err
}
heap.Pop(&pts.psHeap)
if len(pts.psHeap) == 0 {
return io.EOF
}
pts.BlockRef = &pts.psHeap[0].BlockRef
return nil
}
func (pts *partitionSearch) Error() error {
if pts.err == io.EOF {
return nil
}
return pts.err
}
// MustClose closes the pts.
func (pts *partitionSearch) MustClose() {
if !pts.needClosing {
logger.Panicf("BUG: missing Init call before the MustClose call")
}
pts.pt.PutParts(pts.pws)
pts.reset()
}
type partSearchHeap []*partSearch
func (psh *partSearchHeap) Len() int {
return len(*psh)
}
func (psh *partSearchHeap) Less(i, j int) bool {
x := *psh
return x[i].BlockRef.bh.Less(&x[j].BlockRef.bh)
}
func (psh *partSearchHeap) Swap(i, j int) {
x := *psh
x[i], x[j] = x[j], x[i]
}
func (psh *partSearchHeap) Push(x interface{}) {
*psh = append(*psh, x.(*partSearch))
}
func (psh *partSearchHeap) Pop() interface{} {
a := *psh
v := a[len(a)-1]
*psh = a[:len(a)-1]
return v
}