mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-11 20:52:24 +01:00
397bb8771b
The 3c246cdf00
added an optimization where the previous metaindexRow
could be saved to disk when the current block header couldn't be added indexBlock because the resulting
indexBlock size became too big. This could result in an empty metaindexRow.firstItem for the next metaindexRow.
201 lines
5.6 KiB
Go
201 lines
5.6 KiB
Go
package mergeset
|
|
|
|
import (
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
)
|
|
|
|
type blockStreamWriter struct {
|
|
compressLevel int
|
|
|
|
metaindexWriter filestream.WriteCloser
|
|
indexWriter filestream.WriteCloser
|
|
itemsWriter filestream.WriteCloser
|
|
lensWriter filestream.WriteCloser
|
|
|
|
sb storageBlock
|
|
bh blockHeader
|
|
mr metaindexRow
|
|
|
|
unpackedIndexBlockBuf []byte
|
|
packedIndexBlockBuf []byte
|
|
|
|
unpackedMetaindexBuf []byte
|
|
packedMetaindexBuf []byte
|
|
|
|
itemsBlockOffset uint64
|
|
lensBlockOffset uint64
|
|
indexBlockOffset uint64
|
|
|
|
// whether the first item for mr has been caught.
|
|
mrFirstItemCaught bool
|
|
}
|
|
|
|
func (bsw *blockStreamWriter) reset() {
|
|
bsw.compressLevel = 0
|
|
|
|
bsw.metaindexWriter = nil
|
|
bsw.indexWriter = nil
|
|
bsw.itemsWriter = nil
|
|
bsw.lensWriter = nil
|
|
|
|
bsw.sb.Reset()
|
|
bsw.bh.Reset()
|
|
bsw.mr.Reset()
|
|
|
|
bsw.unpackedIndexBlockBuf = bsw.unpackedIndexBlockBuf[:0]
|
|
bsw.packedIndexBlockBuf = bsw.packedIndexBlockBuf[:0]
|
|
|
|
bsw.unpackedMetaindexBuf = bsw.unpackedMetaindexBuf[:0]
|
|
bsw.packedMetaindexBuf = bsw.packedMetaindexBuf[:0]
|
|
|
|
bsw.itemsBlockOffset = 0
|
|
bsw.lensBlockOffset = 0
|
|
bsw.indexBlockOffset = 0
|
|
|
|
bsw.mrFirstItemCaught = false
|
|
}
|
|
|
|
func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compressLevel int) {
|
|
bsw.reset()
|
|
|
|
bsw.compressLevel = compressLevel
|
|
bsw.metaindexWriter = &mp.metaindexData
|
|
bsw.indexWriter = &mp.indexData
|
|
bsw.itemsWriter = &mp.itemsData
|
|
bsw.lensWriter = &mp.lensData
|
|
}
|
|
|
|
// MustInitFromFilePart initializes bsw from a file-based part on the given path.
|
|
//
|
|
// The bsw doesn't pollute OS page cache if nocache is set.
|
|
func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) {
|
|
path = filepath.Clean(path)
|
|
|
|
// Create the directory
|
|
fs.MustMkdirFailIfExist(path)
|
|
|
|
// Create part files in the directory.
|
|
|
|
// Always cache metaindex file in OS page cache, since it is immediately
|
|
// read after the merge.
|
|
metaindexPath := filepath.Join(path, metaindexFilename)
|
|
metaindexFile := filestream.MustCreate(metaindexPath, false)
|
|
|
|
indexPath := filepath.Join(path, indexFilename)
|
|
indexFile := filestream.MustCreate(indexPath, nocache)
|
|
|
|
itemsPath := filepath.Join(path, itemsFilename)
|
|
itemsFile := filestream.MustCreate(itemsPath, nocache)
|
|
|
|
lensPath := filepath.Join(path, lensFilename)
|
|
lensFile := filestream.MustCreate(lensPath, nocache)
|
|
|
|
bsw.reset()
|
|
bsw.compressLevel = compressLevel
|
|
|
|
bsw.metaindexWriter = metaindexFile
|
|
bsw.indexWriter = indexFile
|
|
bsw.itemsWriter = itemsFile
|
|
bsw.lensWriter = lensFile
|
|
}
|
|
|
|
// MustClose closes the bsw.
|
|
//
|
|
// It closes *Writer files passed to Init*.
|
|
func (bsw *blockStreamWriter) MustClose() {
|
|
// Flush the remaining data.
|
|
bsw.flushIndexData()
|
|
|
|
// Compress and write metaindex.
|
|
bsw.packedMetaindexBuf = encoding.CompressZSTDLevel(bsw.packedMetaindexBuf[:0], bsw.unpackedMetaindexBuf, bsw.compressLevel)
|
|
fs.MustWriteData(bsw.metaindexWriter, bsw.packedMetaindexBuf)
|
|
|
|
// Close all the writers.
|
|
bsw.metaindexWriter.MustClose()
|
|
bsw.indexWriter.MustClose()
|
|
bsw.itemsWriter.MustClose()
|
|
bsw.lensWriter.MustClose()
|
|
|
|
bsw.reset()
|
|
}
|
|
|
|
// WriteBlock writes ib to bsw.
|
|
//
|
|
// ib must be sorted.
|
|
func (bsw *blockStreamWriter) WriteBlock(ib *inmemoryBlock) {
|
|
bsw.bh.firstItem, bsw.bh.commonPrefix, bsw.bh.itemsCount, bsw.bh.marshalType = ib.MarshalSortedData(&bsw.sb, bsw.bh.firstItem[:0], bsw.bh.commonPrefix[:0], bsw.compressLevel)
|
|
|
|
// Write itemsData
|
|
fs.MustWriteData(bsw.itemsWriter, bsw.sb.itemsData)
|
|
bsw.bh.itemsBlockSize = uint32(len(bsw.sb.itemsData))
|
|
bsw.bh.itemsBlockOffset = bsw.itemsBlockOffset
|
|
bsw.itemsBlockOffset += uint64(bsw.bh.itemsBlockSize)
|
|
|
|
// Write lensData
|
|
fs.MustWriteData(bsw.lensWriter, bsw.sb.lensData)
|
|
bsw.bh.lensBlockSize = uint32(len(bsw.sb.lensData))
|
|
bsw.bh.lensBlockOffset = bsw.lensBlockOffset
|
|
bsw.lensBlockOffset += uint64(bsw.bh.lensBlockSize)
|
|
|
|
// Write blockHeader
|
|
unpackedIndexBlockBufLen := len(bsw.unpackedIndexBlockBuf)
|
|
bsw.unpackedIndexBlockBuf = bsw.bh.Marshal(bsw.unpackedIndexBlockBuf)
|
|
if len(bsw.unpackedIndexBlockBuf) > maxIndexBlockSize {
|
|
bsw.unpackedIndexBlockBuf = bsw.unpackedIndexBlockBuf[:unpackedIndexBlockBufLen]
|
|
bsw.flushIndexData()
|
|
bsw.unpackedIndexBlockBuf = bsw.bh.Marshal(bsw.unpackedIndexBlockBuf)
|
|
}
|
|
|
|
if !bsw.mrFirstItemCaught {
|
|
bsw.mr.firstItem = append(bsw.mr.firstItem[:0], bsw.bh.firstItem...)
|
|
bsw.mrFirstItemCaught = true
|
|
}
|
|
bsw.bh.Reset()
|
|
bsw.mr.blockHeadersCount++
|
|
}
|
|
|
|
// The maximum size of index block with multiple blockHeaders.
|
|
const maxIndexBlockSize = 64 * 1024
|
|
|
|
func (bsw *blockStreamWriter) flushIndexData() {
|
|
if len(bsw.unpackedIndexBlockBuf) == 0 {
|
|
// Nothing to flush.
|
|
return
|
|
}
|
|
|
|
// Write indexBlock.
|
|
bsw.packedIndexBlockBuf = encoding.CompressZSTDLevel(bsw.packedIndexBlockBuf[:0], bsw.unpackedIndexBlockBuf, bsw.compressLevel)
|
|
fs.MustWriteData(bsw.indexWriter, bsw.packedIndexBlockBuf)
|
|
bsw.mr.indexBlockSize = uint32(len(bsw.packedIndexBlockBuf))
|
|
bsw.mr.indexBlockOffset = bsw.indexBlockOffset
|
|
bsw.indexBlockOffset += uint64(bsw.mr.indexBlockSize)
|
|
bsw.unpackedIndexBlockBuf = bsw.unpackedIndexBlockBuf[:0]
|
|
|
|
// Write metaindexRow.
|
|
bsw.unpackedMetaindexBuf = bsw.mr.Marshal(bsw.unpackedMetaindexBuf)
|
|
bsw.mr.Reset()
|
|
|
|
// Notify that the next call to WriteBlock must catch the first item.
|
|
bsw.mrFirstItemCaught = false
|
|
}
|
|
|
|
func getBlockStreamWriter() *blockStreamWriter {
|
|
v := bswPool.Get()
|
|
if v == nil {
|
|
return &blockStreamWriter{}
|
|
}
|
|
return v.(*blockStreamWriter)
|
|
}
|
|
|
|
func putBlockStreamWriter(bsw *blockStreamWriter) {
|
|
bsw.reset()
|
|
bswPool.Put(bsw)
|
|
}
|
|
|
|
var bswPool sync.Pool
|