VictoriaMetrics/lib/mergeset/block_stream_reader.go

327 lines
8.7 KiB
Go
Raw Normal View History

2019-05-22 23:16:55 +02:00
package mergeset
import (
"fmt"
"io"
"path/filepath"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type blockStreamReader struct {
// Block contains the current block if Next returned true.
Block inmemoryBlock
// isInmemoryBlock is set to true if bsr was initialized with InitFromInmemoryBlock().
isInmemoryBlock bool
// The index of the current item in the Block, which is returned from CurrItem()
currItemIdx int
2019-05-22 23:16:55 +02:00
path string
// ph contains partHeader for the read part.
ph partHeader
// All the metaindexRows.
// The blockStreamReader doesn't own mrs - it must be alive
// during the read.
mrs []metaindexRow
// The index for the currently processed metaindexRow from mrs.
mrIdx int
// Currently processed blockHeaders.
bhs []blockHeader
// The index of the currently processed blockHeader.
bhIdx int
indexReader filestream.ReadCloser
itemsReader filestream.ReadCloser
lensReader filestream.ReadCloser
// Contains the current blockHeader.
bh *blockHeader
// Contains the current storageBlock.
sb storageBlock
// The number of items read so far.
itemsRead uint64
// The number of blocks read so far.
blocksRead uint64
// Whether the first item in the reader checked with ph.firstItem.
firstItemChecked bool
packedBuf []byte
unpackedBuf []byte
// The last error.
err error
}
func (bsr *blockStreamReader) reset() {
bsr.Block.Reset()
bsr.isInmemoryBlock = false
bsr.currItemIdx = 0
2019-05-22 23:16:55 +02:00
bsr.path = ""
bsr.ph.Reset()
bsr.mrs = nil
bsr.mrIdx = 0
bsr.bhs = bsr.bhs[:0]
bsr.bhIdx = 0
bsr.indexReader = nil
bsr.itemsReader = nil
bsr.lensReader = nil
bsr.bh = nil
bsr.sb.Reset()
bsr.itemsRead = 0
bsr.blocksRead = 0
bsr.firstItemChecked = false
bsr.packedBuf = bsr.packedBuf[:0]
bsr.unpackedBuf = bsr.unpackedBuf[:0]
bsr.err = nil
}
func (bsr *blockStreamReader) String() string {
if len(bsr.path) > 0 {
return bsr.path
}
return bsr.ph.String()
}
// InitFromInmemoryBlock initializes bsr from the given ib.
func (bsr *blockStreamReader) InitFromInmemoryBlock(ib *inmemoryBlock) {
bsr.reset()
bsr.Block.CopyFrom(ib)
bsr.Block.SortItems()
bsr.isInmemoryBlock = true
}
// InitFromInmemoryPart initializes bsr from the given mp.
func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
2019-05-22 23:16:55 +02:00
bsr.reset()
var err error
bsr.mrs, err = unmarshalMetaindexRows(bsr.mrs[:0], mp.metaindexData.NewReader())
2019-05-22 23:16:55 +02:00
if err != nil {
logger.Panicf("BUG: cannot unmarshal metaindex rows from inmemory part: %s", err)
}
bsr.ph.CopyFrom(&mp.ph)
bsr.indexReader = mp.indexData.NewReader()
bsr.itemsReader = mp.itemsData.NewReader()
bsr.lensReader = mp.lensData.NewReader()
2019-05-22 23:16:55 +02:00
if bsr.ph.itemsCount <= 0 {
logger.Panicf("BUG: source inmemoryPart must contain at least a single item")
}
if bsr.ph.blocksCount <= 0 {
logger.Panicf("BUG: source inmemoryPart must contain at least a single block")
}
}
// InitFromFilePart initializes bsr from a file-based part on the given path.
//
// Part files are read without OS cache pollution, since the part is usually
// deleted after the merge.
func (bsr *blockStreamReader) InitFromFilePart(path string) error {
bsr.reset()
path = filepath.Clean(path)
all: add Windows build for VictoriaMetrics This commit changes background merge algorithm, so it becomes compatible with Windows file semantics. The previous algorithm for background merge: 1. Merge source parts into a destination part inside tmp directory. 2. Create a file in txn directory with instructions on how to atomically swap source parts with the destination part. 3. Perform instructions from the file. 4. Delete the file with instructions. This algorithm guarantees that either source parts or destination part is visible in the partition after unclean shutdown at any step above, since the remaining files with instructions is replayed on the next restart, after that the remaining contents of the tmp directory is deleted. Unfortunately this algorithm doesn't work under Windows because it disallows removing and moving files, which are in use. So the new algorithm for background merge has been implemented: 1. Merge source parts into a destination part inside the partition directory itself. E.g. now the partition directory may contain both complete and incomplete parts. 2. Atomically update the parts.json file with the new list of parts after the merge, e.g. remove the source parts from the list and add the destination part to the list before storing it to parts.json file. 3. Remove the source parts from disk when they are no longer used. This algorithm guarantees that either source parts or destination part is visible in the partition after unclean shutdown at any step above, since incomplete partitions from step 1 or old source parts from step 3 are removed on the next startup by inspecting parts.json file. This algorithm should work under Windows, since it doesn't remove or move files in use. This algorithm has also the following benefits: - It should work better for NFS. - It fits object storage semantics. The new algorithm changes data storage format, so it is impossible to downgrade to the previous versions of VictoriaMetrics after upgrading to this algorithm. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70
2023-03-19 09:36:05 +01:00
if err := bsr.ph.ReadMetadata(path); err != nil {
return fmt.Errorf("cannot read metadata from %q: %w", path, err)
2019-05-22 23:16:55 +02:00
}
metaindexPath := filepath.Join(path, metaindexFilename)
2019-05-22 23:16:55 +02:00
metaindexFile, err := filestream.Open(metaindexPath, true)
if err != nil {
return fmt.Errorf("cannot open metaindex file in stream mode: %w", err)
2019-05-22 23:16:55 +02:00
}
bsr.mrs, err = unmarshalMetaindexRows(bsr.mrs[:0], metaindexFile)
metaindexFile.MustClose()
if err != nil {
return fmt.Errorf("cannot unmarshal metaindex rows from file %q: %w", metaindexPath, err)
2019-05-22 23:16:55 +02:00
}
indexPath := filepath.Join(path, indexFilename)
2019-05-22 23:16:55 +02:00
indexFile, err := filestream.Open(indexPath, true)
if err != nil {
return fmt.Errorf("cannot open index file in stream mode: %w", err)
2019-05-22 23:16:55 +02:00
}
itemsPath := filepath.Join(path, itemsFilename)
2019-05-22 23:16:55 +02:00
itemsFile, err := filestream.Open(itemsPath, true)
if err != nil {
indexFile.MustClose()
return fmt.Errorf("cannot open items file in stream mode: %w", err)
2019-05-22 23:16:55 +02:00
}
lensPath := filepath.Join(path, lensFilename)
2019-05-22 23:16:55 +02:00
lensFile, err := filestream.Open(lensPath, true)
if err != nil {
indexFile.MustClose()
itemsFile.MustClose()
return fmt.Errorf("cannot open lens file in stream mode: %w", err)
2019-05-22 23:16:55 +02:00
}
bsr.path = path
bsr.indexReader = indexFile
bsr.itemsReader = itemsFile
bsr.lensReader = lensFile
return nil
}
// MustClose closes the bsr.
//
// It closes *Reader files passed to Init.
func (bsr *blockStreamReader) MustClose() {
if !bsr.isInmemoryBlock {
bsr.indexReader.MustClose()
bsr.itemsReader.MustClose()
bsr.lensReader.MustClose()
}
2019-05-22 23:16:55 +02:00
bsr.reset()
}
func (bsr *blockStreamReader) CurrItem() string {
return bsr.Block.items[bsr.currItemIdx].String(bsr.Block.data)
}
2019-05-22 23:16:55 +02:00
func (bsr *blockStreamReader) Next() bool {
if bsr.err != nil {
return false
}
if bsr.isInmemoryBlock {
bsr.err = io.EOF
return true
}
2019-05-22 23:16:55 +02:00
if bsr.bhIdx >= len(bsr.bhs) {
// The current index block is over. Try reading the next index block.
if err := bsr.readNextBHS(); err != nil {
if err == io.EOF {
// Check the last item.
b := &bsr.Block
lastItem := b.items[len(b.items)-1].Bytes(b.data)
2019-05-22 23:16:55 +02:00
if string(bsr.ph.lastItem) != string(lastItem) {
err = fmt.Errorf("unexpected last item; got %X; want %X", lastItem, bsr.ph.lastItem)
}
} else {
err = fmt.Errorf("cannot read the next index block: %w", err)
2019-05-22 23:16:55 +02:00
}
bsr.err = err
return false
}
}
bsr.bh = &bsr.bhs[bsr.bhIdx]
bsr.bhIdx++
bsr.sb.itemsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize))
2019-05-22 23:16:55 +02:00
if err := fs.ReadFullData(bsr.itemsReader, bsr.sb.itemsData); err != nil {
bsr.err = fmt.Errorf("cannot read compressed items block with size %d: %w", bsr.bh.itemsBlockSize, err)
2019-05-22 23:16:55 +02:00
return false
}
bsr.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.lensData, int(bsr.bh.lensBlockSize))
2019-05-22 23:16:55 +02:00
if err := fs.ReadFullData(bsr.lensReader, bsr.sb.lensData); err != nil {
bsr.err = fmt.Errorf("cannot read compressed lens block with size %d: %w", bsr.bh.lensBlockSize, err)
2019-05-22 23:16:55 +02:00
return false
}
if err := bsr.Block.UnmarshalData(&bsr.sb, bsr.bh.firstItem, bsr.bh.commonPrefix, bsr.bh.itemsCount, bsr.bh.marshalType); err != nil {
bsr.err = fmt.Errorf("cannot unmarshal inmemoryBlock from storageBlock with firstItem=%X, commonPrefix=%X, itemsCount=%d, marshalType=%d: %w",
2019-05-22 23:16:55 +02:00
bsr.bh.firstItem, bsr.bh.commonPrefix, bsr.bh.itemsCount, bsr.bh.marshalType, err)
return false
}
bsr.blocksRead++
if bsr.blocksRead > bsr.ph.blocksCount {
bsr.err = fmt.Errorf("too many blocks read: %d; must be smaller than partHeader.blocksCount %d", bsr.blocksRead, bsr.ph.blocksCount)
return false
}
bsr.currItemIdx = 0
2019-05-22 23:16:55 +02:00
bsr.itemsRead += uint64(len(bsr.Block.items))
if bsr.itemsRead > bsr.ph.itemsCount {
bsr.err = fmt.Errorf("too many items read: %d; must be smaller than partHeader.itemsCount %d", bsr.itemsRead, bsr.ph.itemsCount)
return false
}
if !bsr.firstItemChecked {
bsr.firstItemChecked = true
b := &bsr.Block
firstItem := b.items[0].Bytes(b.data)
if string(bsr.ph.firstItem) != string(firstItem) {
bsr.err = fmt.Errorf("unexpected first item; got %X; want %X", firstItem, bsr.ph.firstItem)
2019-05-22 23:16:55 +02:00
return false
}
}
return true
}
func (bsr *blockStreamReader) readNextBHS() error {
if bsr.mrIdx >= len(bsr.mrs) {
return io.EOF
}
mr := &bsr.mrs[bsr.mrIdx]
bsr.mrIdx++
// Read compressed index block.
bsr.packedBuf = bytesutil.ResizeNoCopyMayOverallocate(bsr.packedBuf, int(mr.indexBlockSize))
2019-05-22 23:16:55 +02:00
if err := fs.ReadFullData(bsr.indexReader, bsr.packedBuf); err != nil {
return fmt.Errorf("cannot read compressed index block with size %d: %w", mr.indexBlockSize, err)
2019-05-22 23:16:55 +02:00
}
// Unpack the compressed index block.
var err error
bsr.unpackedBuf, err = encoding.DecompressZSTD(bsr.unpackedBuf[:0], bsr.packedBuf)
if err != nil {
return fmt.Errorf("cannot decompress index block: %w", err)
2019-05-22 23:16:55 +02:00
}
// Unmarshal the unpacked index block into bsr.bhs.
bsr.bhs, err = unmarshalBlockHeadersNoCopy(bsr.bhs[:0], bsr.unpackedBuf, int(mr.blockHeadersCount))
if err != nil {
return fmt.Errorf("cannot unmarshal blockHeaders in the index block #%d: %w", bsr.mrIdx, err)
2019-05-22 23:16:55 +02:00
}
bsr.bhIdx = 0
2019-05-22 23:16:55 +02:00
return nil
}
func (bsr *blockStreamReader) Error() error {
if bsr.err == io.EOF {
return nil
}
return bsr.err
}
func getBlockStreamReader() *blockStreamReader {
v := bsrPool.Get()
if v == nil {
return &blockStreamReader{}
}
return v.(*blockStreamReader)
}
func putBlockStreamReader(bsr *blockStreamReader) {
bsr.MustClose()
bsrPool.Put(bsr)
}
var bsrPool sync.Pool