VictoriaMetrics/lib/mergeset/table_search.go

224 lines
4.2 KiB
Go
Raw Normal View History

2019-05-22 23:16:55 +02:00
package mergeset
import (
"bytes"
"container/heap"
"fmt"
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// TableSearch is a reusable cursor used for searching in the Table.
type TableSearch struct {
// Item contains the next item after successful NextItem
// or FirstItemWithPrefix call.
//
// Item contents breaks after the next call to NextItem.
Item []byte
tb *Table
pws []*partWrapper
psPool []partSearch
psHeap partSearchHeap
err error
nextItemNoop bool
needClosing bool
}
func (ts *TableSearch) reset() {
ts.Item = nil
ts.tb = nil
for i := range ts.pws {
ts.pws[i] = nil
}
ts.pws = ts.pws[:0]
for i := range ts.psPool {
ts.psPool[i].reset()
}
ts.psPool = ts.psPool[:0]
for i := range ts.psHeap {
ts.psHeap[i] = nil
}
ts.psHeap = ts.psHeap[:0]
ts.err = nil
ts.nextItemNoop = false
ts.needClosing = false
}
// Init initializes ts for searching in the tb.
//
// MustClose must be called when the ts is no longer needed.
func (ts *TableSearch) Init(tb *Table, shouldCacheBlock func(item []byte) bool) {
2019-05-22 23:16:55 +02:00
if ts.needClosing {
logger.Panicf("BUG: missing MustClose call before the next call to Init")
}
ts.reset()
ts.tb = tb
ts.needClosing = true
ts.pws = ts.tb.getParts(ts.pws[:0])
// Initialize the psPool.
if n := len(ts.pws) - cap(ts.psPool); n > 0 {
ts.psPool = append(ts.psPool[:cap(ts.psPool)], make([]partSearch, n)...)
}
ts.psPool = ts.psPool[:len(ts.pws)]
for i, pw := range ts.pws {
ts.psPool[i].Init(pw.p, shouldCacheBlock)
2019-05-22 23:16:55 +02:00
}
}
// Seek seeks for the first item greater or equal to k in the ts.
func (ts *TableSearch) Seek(k []byte) {
if err := ts.Error(); err != nil {
// Do nothing on unrecoverable error.
return
}
ts.err = nil
// Initialize the psHeap.
var errors []error
ts.psHeap = ts.psHeap[:0]
for i := range ts.psPool {
ps := &ts.psPool[i]
ps.Seek(k)
if !ps.NextItem() {
if err := ps.Error(); err != nil {
errors = append(errors, err)
}
continue
}
ts.psHeap = append(ts.psHeap, ps)
}
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
ts.err = fmt.Errorf("cannot seek %q: %w", k, errors[0])
2019-05-22 23:16:55 +02:00
return
}
if len(ts.psHeap) == 0 {
ts.err = io.EOF
return
}
heap.Init(&ts.psHeap)
ts.Item = ts.psHeap[0].Item
ts.nextItemNoop = true
}
// FirstItemWithPrefix seeks for the first item with the given prefix in the ts.
//
// It returns io.EOF if such an item doesn't exist.
func (ts *TableSearch) FirstItemWithPrefix(prefix []byte) error {
ts.Seek(prefix)
if !ts.NextItem() {
if err := ts.Error(); err != nil {
return err
}
return io.EOF
}
if err := ts.Error(); err != nil {
return err
}
if !bytes.HasPrefix(ts.Item, prefix) {
return io.EOF
}
return nil
}
// NextItem advances to the next item.
func (ts *TableSearch) NextItem() bool {
if ts.err != nil {
return false
}
if ts.nextItemNoop {
ts.nextItemNoop = false
return true
}
ts.err = ts.nextBlock()
if ts.err != nil {
if ts.err != io.EOF {
ts.err = fmt.Errorf("cannot obtain the next block to search in the table: %w", ts.err)
2019-05-22 23:16:55 +02:00
}
return false
}
return true
}
func (ts *TableSearch) nextBlock() error {
psMin := ts.psHeap[0]
if psMin.NextItem() {
heap.Fix(&ts.psHeap, 0)
ts.Item = ts.psHeap[0].Item
return nil
}
if err := psMin.Error(); err != nil {
return err
}
heap.Pop(&ts.psHeap)
if len(ts.psHeap) == 0 {
return io.EOF
}
ts.Item = ts.psHeap[0].Item
return nil
}
// Error returns the last error in ts.
func (ts *TableSearch) Error() error {
if ts.err == io.EOF {
return nil
}
return ts.err
}
// MustClose closes the ts.
func (ts *TableSearch) MustClose() {
if !ts.needClosing {
logger.Panicf("BUG: missing Init call before MustClose call")
}
ts.tb.putParts(ts.pws)
ts.reset()
}
type partSearchHeap []*partSearch
func (psh *partSearchHeap) Len() int {
return len(*psh)
}
func (psh *partSearchHeap) Less(i, j int) bool {
x := *psh
return string(x[i].Item) < string(x[j].Item)
}
func (psh *partSearchHeap) Swap(i, j int) {
x := *psh
x[i], x[j] = x[j], x[i]
}
func (psh *partSearchHeap) Push(x interface{}) {
*psh = append(*psh, x.(*partSearch))
}
func (psh *partSearchHeap) Pop() interface{} {
a := *psh
v := a[len(a)-1]
*psh = a[:len(a)-1]
return v
}