mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-04 13:52:05 +01:00
223 lines
5.9 KiB
Go
223 lines
5.9 KiB
Go
package mergeset
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// PrepareBlockCallback can transform the passed items allocated at the given data.
|
|
//
|
|
// The callback is called during merge before flushing full block of the given items
|
|
// to persistent storage.
|
|
//
|
|
// The callback must return sorted items. The first and the last item must be unchanged.
|
|
// The callback can re-use data and items for storing the result.
|
|
type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte)
|
|
|
|
// mergeBlockStreams merges bsrs and writes result to bsw.
|
|
//
|
|
// It also fills ph.
|
|
//
|
|
// prepareBlock is optional.
|
|
//
|
|
// The function immediately returns when stopCh is closed.
|
|
//
|
|
// It also atomically adds the number of items merged to itemsMerged.
|
|
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{},
|
|
itemsMerged *uint64) error {
|
|
bsm := bsmPool.Get().(*blockStreamMerger)
|
|
if err := bsm.Init(bsrs, prepareBlock); err != nil {
|
|
return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)
|
|
}
|
|
err := bsm.Merge(bsw, ph, stopCh, itemsMerged)
|
|
bsm.reset()
|
|
bsmPool.Put(bsm)
|
|
bsw.MustClose()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("cannot merge %d block streams: %s: %w", len(bsrs), bsrs, err)
|
|
}
|
|
|
|
var bsmPool = &sync.Pool{
|
|
New: func() interface{} {
|
|
return &blockStreamMerger{}
|
|
},
|
|
}
|
|
|
|
type blockStreamMerger struct {
|
|
prepareBlock PrepareBlockCallback
|
|
|
|
bsrHeap bsrHeap
|
|
|
|
// ib is a scratch block with pending items.
|
|
ib inmemoryBlock
|
|
|
|
phFirstItemCaught bool
|
|
|
|
// This are auxiliary buffers used in flushIB
|
|
// for consistency checks after prepareBlock call.
|
|
firstItem []byte
|
|
lastItem []byte
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) reset() {
|
|
bsm.prepareBlock = nil
|
|
|
|
for i := range bsm.bsrHeap {
|
|
bsm.bsrHeap[i] = nil
|
|
}
|
|
bsm.bsrHeap = bsm.bsrHeap[:0]
|
|
bsm.ib.Reset()
|
|
|
|
bsm.phFirstItemCaught = false
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback) error {
|
|
bsm.reset()
|
|
bsm.prepareBlock = prepareBlock
|
|
for _, bsr := range bsrs {
|
|
if bsr.Next() {
|
|
bsm.bsrHeap = append(bsm.bsrHeap, bsr)
|
|
}
|
|
|
|
if err := bsr.Error(); err != nil {
|
|
return fmt.Errorf("cannot obtain the next block from blockStreamReader %q: %w", bsr.path, err)
|
|
}
|
|
}
|
|
heap.Init(&bsm.bsrHeap)
|
|
|
|
if len(bsm.bsrHeap) == 0 {
|
|
return fmt.Errorf("bsrHeap cannot be empty")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var errForciblyStopped = fmt.Errorf("forcibly stopped")
|
|
|
|
func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error {
|
|
again:
|
|
if len(bsm.bsrHeap) == 0 {
|
|
// Write the last (maybe incomplete) inmemoryBlock to bsw.
|
|
bsm.flushIB(bsw, ph, itemsMerged)
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-stopCh:
|
|
return errForciblyStopped
|
|
default:
|
|
}
|
|
|
|
bsr := heap.Pop(&bsm.bsrHeap).(*blockStreamReader)
|
|
|
|
var nextItem []byte
|
|
hasNextItem := false
|
|
if len(bsm.bsrHeap) > 0 {
|
|
nextItem = bsm.bsrHeap[0].bh.firstItem
|
|
hasNextItem = true
|
|
}
|
|
for bsr.blockItemIdx < len(bsr.Block.items) {
|
|
item := bsr.Block.items[bsr.blockItemIdx]
|
|
if hasNextItem && string(item) > string(nextItem) {
|
|
break
|
|
}
|
|
if !bsm.ib.Add(item) {
|
|
// The bsm.ib is full. Flush it to bsw and continue.
|
|
bsm.flushIB(bsw, ph, itemsMerged)
|
|
continue
|
|
}
|
|
bsr.blockItemIdx++
|
|
}
|
|
if bsr.blockItemIdx == len(bsr.Block.items) {
|
|
// bsr.Block is fully read. Proceed to the next block.
|
|
if bsr.Next() {
|
|
heap.Push(&bsm.bsrHeap, bsr)
|
|
goto again
|
|
}
|
|
if err := bsr.Error(); err != nil {
|
|
return fmt.Errorf("cannot read storageBlock: %w", err)
|
|
}
|
|
goto again
|
|
}
|
|
|
|
// The next item in the bsr.Block exceeds nextItem.
|
|
// Adjust bsr.bh.firstItem and return bsr to heap.
|
|
bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx]...)
|
|
heap.Push(&bsm.bsrHeap, bsr)
|
|
goto again
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *uint64) {
|
|
if len(bsm.ib.items) == 0 {
|
|
// Nothing to flush.
|
|
return
|
|
}
|
|
atomic.AddUint64(itemsMerged, uint64(len(bsm.ib.items)))
|
|
if bsm.prepareBlock != nil {
|
|
bsm.firstItem = append(bsm.firstItem[:0], bsm.ib.items[0]...)
|
|
bsm.lastItem = append(bsm.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...)
|
|
bsm.ib.data, bsm.ib.items = bsm.prepareBlock(bsm.ib.data, bsm.ib.items)
|
|
if len(bsm.ib.items) == 0 {
|
|
// Nothing to flush
|
|
return
|
|
}
|
|
// Consistency checks after prepareBlock call.
|
|
firstItem := bsm.ib.items[0]
|
|
if string(firstItem) != string(bsm.firstItem) {
|
|
logger.Panicf("BUG: prepareBlock must return first item equal to the original first item;\ngot\n%X\nwant\n%X", firstItem, bsm.firstItem)
|
|
}
|
|
lastItem := bsm.ib.items[len(bsm.ib.items)-1]
|
|
if string(lastItem) != string(bsm.lastItem) {
|
|
logger.Panicf("BUG: prepareBlock must return last item equal to the original last item;\ngot\n%X\nwant\n%X", lastItem, bsm.lastItem)
|
|
}
|
|
// Verify whether the bsm.ib.items are sorted only in tests, since this
|
|
// can be expensive check in prod for items with long common prefix.
|
|
if isInTest && !bsm.ib.isSorted() {
|
|
logger.Panicf("BUG: prepareBlock must return sorted items;\ngot\n%s", bsm.ib.debugItemsString())
|
|
}
|
|
}
|
|
ph.itemsCount += uint64(len(bsm.ib.items))
|
|
if !bsm.phFirstItemCaught {
|
|
ph.firstItem = append(ph.firstItem[:0], bsm.ib.items[0]...)
|
|
bsm.phFirstItemCaught = true
|
|
}
|
|
ph.lastItem = append(ph.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...)
|
|
bsw.WriteBlock(&bsm.ib)
|
|
bsm.ib.Reset()
|
|
ph.blocksCount++
|
|
}
|
|
|
|
type bsrHeap []*blockStreamReader
|
|
|
|
func (bh *bsrHeap) Len() int {
|
|
return len(*bh)
|
|
}
|
|
|
|
func (bh *bsrHeap) Swap(i, j int) {
|
|
x := *bh
|
|
x[i], x[j] = x[j], x[i]
|
|
}
|
|
|
|
func (bh *bsrHeap) Less(i, j int) bool {
|
|
x := *bh
|
|
return string(x[i].bh.firstItem) < string(x[j].bh.firstItem)
|
|
}
|
|
|
|
func (bh *bsrHeap) Pop() interface{} {
|
|
a := *bh
|
|
v := a[len(a)-1]
|
|
*bh = a[:len(a)-1]
|
|
return v
|
|
}
|
|
|
|
func (bh *bsrHeap) Push(x interface{}) {
|
|
v := x.(*blockStreamReader)
|
|
*bh = append(*bh, v)
|
|
}
|