lib/mergeset: do not update blockStreamReader.bh.firstItem during the merge

Just read the current item directly from blockStreamReader.Block.Items
with the helper method - blockStreamReader.CurrItem()
This commit is contained in:
Aliaksandr Valialkin 2022-07-27 23:04:58 +03:00
parent be1c82beb1
commit a3f5822dc2
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 18 additions and 14 deletions

View File

@ -17,7 +17,8 @@ type blockStreamReader struct {
// Block contains the current block if Next returned true.
Block inmemoryBlock
blockItemIdx int
// The index of the current item in the Block, which is returned from CurrItem()
currItemIdx int
path string
@ -66,7 +67,7 @@ type blockStreamReader struct {
func (bsr *blockStreamReader) reset() {
bsr.Block.Reset()
bsr.blockItemIdx = 0
bsr.currItemIdx = 0
bsr.path = ""
bsr.ph.Reset()
bsr.mrs = nil
@ -185,6 +186,10 @@ func (bsr *blockStreamReader) MustClose() {
bsr.reset()
}
func (bsr *blockStreamReader) CurrItem() string {
return bsr.Block.items[bsr.currItemIdx].String(bsr.Block.data)
}
func (bsr *blockStreamReader) Next() bool {
if bsr.err != nil {
return false
@ -233,7 +238,7 @@ func (bsr *blockStreamReader) Next() bool {
bsr.err = fmt.Errorf("too many blocks read: %d; must be smaller than partHeader.blocksCount %d", bsr.blocksRead, bsr.ph.blocksCount)
return false
}
bsr.blockItemIdx = 0
bsr.currItemIdx = 0
bsr.itemsRead += uint64(len(bsr.Block.items))
if bsr.itemsRead > bsr.ph.itemsCount {
bsr.err = fmt.Errorf("too many items read: %d; must be smaller than partHeader.itemsCount %d", bsr.itemsRead, bsr.ph.itemsCount)

View File

@ -116,18 +116,18 @@ again:
bsr := bsm.bsrHeap[0]
var nextItem []byte
var nextItem string
hasNextItem := false
if len(bsm.bsrHeap) > 1 {
bsr := bsm.bsrHeap.getNextReader()
nextItem = bsr.bh.firstItem
nextItem = bsr.CurrItem()
hasNextItem = true
}
items := bsr.Block.items
data := bsr.Block.data
for bsr.blockItemIdx < len(bsr.Block.items) {
item := items[bsr.blockItemIdx].Bytes(data)
if hasNextItem && string(item) > string(nextItem) {
for bsr.currItemIdx < len(bsr.Block.items) {
item := items[bsr.currItemIdx].Bytes(data)
if hasNextItem && string(item) > nextItem {
break
}
if !bsm.ib.Add(item) {
@ -135,9 +135,9 @@ again:
bsm.flushIB(bsw, ph, itemsMerged)
continue
}
bsr.blockItemIdx++
bsr.currItemIdx++
}
if bsr.blockItemIdx == len(bsr.Block.items) {
if bsr.currItemIdx == len(bsr.Block.items) {
// bsr.Block is fully read. Proceed to the next block.
if bsr.Next() {
heap.Fix(&bsm.bsrHeap, 0)
@ -151,8 +151,7 @@ again:
}
// The next item in the bsr.Block exceeds nextItem.
// Adjust bsr.bh.firstItem and return bsr to heap.
bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)
// Return bsr to heap.
heap.Fix(&bsm.bsrHeap, 0)
goto again
}
@ -212,7 +211,7 @@ func (bh bsrHeap) getNextReader() *blockStreamReader {
}
a := bh[1]
b := bh[2]
if string(a.bh.firstItem) <= string(b.bh.firstItem) {
if a.CurrItem() <= b.CurrItem() {
return a
}
return b
@ -229,7 +228,7 @@ func (bh *bsrHeap) Swap(i, j int) {
func (bh *bsrHeap) Less(i, j int) bool {
x := *bh
return string(x[i].bh.firstItem) < string(x[j].bh.firstItem)
return x[i].CurrItem() < x[j].CurrItem()
}
func (bh *bsrHeap) Pop() interface{} {