mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-11 20:52:24 +01:00
116 lines
3.7 KiB
Go
116 lines
3.7 KiB
Go
package mergeset
|
|
|
|
import (
|
|
"path/filepath"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
type inmemoryPart struct {
|
|
ph partHeader
|
|
bh blockHeader
|
|
mr metaindexRow
|
|
|
|
metaindexData bytesutil.ByteBuffer
|
|
indexData bytesutil.ByteBuffer
|
|
itemsData bytesutil.ByteBuffer
|
|
lensData bytesutil.ByteBuffer
|
|
}
|
|
|
|
func (mp *inmemoryPart) Reset() {
|
|
mp.ph.Reset()
|
|
mp.bh.Reset()
|
|
mp.mr.Reset()
|
|
|
|
mp.metaindexData.Reset()
|
|
mp.indexData.Reset()
|
|
mp.itemsData.Reset()
|
|
mp.lensData.Reset()
|
|
}
|
|
|
|
// MustStoreToDisk stores mp to the given path on disk.
|
|
func (mp *inmemoryPart) MustStoreToDisk(path string) {
|
|
fs.MustMkdirFailIfExist(path)
|
|
|
|
metaindexPath := filepath.Join(path, metaindexFilename)
|
|
fs.MustWriteSync(metaindexPath, mp.metaindexData.B)
|
|
|
|
indexPath := filepath.Join(path, indexFilename)
|
|
fs.MustWriteSync(indexPath, mp.indexData.B)
|
|
|
|
itemsPath := filepath.Join(path, itemsFilename)
|
|
fs.MustWriteSync(itemsPath, mp.itemsData.B)
|
|
|
|
lensPath := filepath.Join(path, lensFilename)
|
|
fs.MustWriteSync(lensPath, mp.lensData.B)
|
|
|
|
mp.ph.MustWriteMetadata(path)
|
|
|
|
fs.MustSyncPath(path)
|
|
// Do not sync parent directory - it must be synced by the caller.
|
|
}
|
|
|
|
// Init initializes mp from ib.
|
|
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
|
|
mp.Reset()
|
|
|
|
// Re-use mp.itemsData and mp.lensData in sb.
|
|
// This eliminates copying itemsData and lensData from sb to mp later.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2247
|
|
sb := &storageBlock{}
|
|
sb.itemsData = mp.itemsData.B[:0]
|
|
sb.lensData = mp.lensData.B[:0]
|
|
|
|
// Use the minimum possible compressLevel for compressing inmemoryPart,
|
|
// since it will be merged into file part soon.
|
|
// See https://github.com/facebook/zstd/releases/tag/v1.3.4 for details about negative compression level
|
|
compressLevel := -5
|
|
mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)
|
|
|
|
mp.ph.itemsCount = uint64(len(ib.items))
|
|
mp.ph.blocksCount = 1
|
|
mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
|
|
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
|
|
|
|
mp.itemsData.B = sb.itemsData
|
|
mp.bh.itemsBlockOffset = 0
|
|
mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))
|
|
|
|
mp.lensData.B = sb.lensData
|
|
mp.bh.lensBlockOffset = 0
|
|
mp.bh.lensBlockSize = uint32(len(mp.lensData.B))
|
|
|
|
bb := inmemoryPartBytePool.Get()
|
|
bb.B = mp.bh.Marshal(bb.B[:0])
|
|
if len(bb.B) > 3*maxIndexBlockSize {
|
|
// marshaled blockHeader can exceed indexBlockSize when firstItem and commonPrefix sizes are close to indexBlockSize
|
|
logger.Panicf("BUG: too big index block: %d bytes; mustn't exceed %d bytes", len(bb.B), 3*maxIndexBlockSize)
|
|
}
|
|
mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, compressLevel)
|
|
|
|
mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
|
|
mp.mr.blockHeadersCount = 1
|
|
mp.mr.indexBlockOffset = 0
|
|
mp.mr.indexBlockSize = uint32(len(mp.indexData.B))
|
|
bb.B = mp.mr.Marshal(bb.B[:0])
|
|
mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, compressLevel)
|
|
inmemoryPartBytePool.Put(bb)
|
|
}
|
|
|
|
var inmemoryPartBytePool bytesutil.ByteBufferPool
|
|
|
|
// It is safe calling NewPart multiple times.
|
|
// It is unsafe re-using mp while the returned part is in use.
|
|
func (mp *inmemoryPart) NewPart() *part {
|
|
size := mp.size()
|
|
p := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData)
|
|
return p
|
|
}
|
|
|
|
func (mp *inmemoryPart) size() uint64 {
|
|
return uint64(cap(mp.metaindexData.B) + cap(mp.indexData.B) + cap(mp.itemsData.B) + cap(mp.lensData.B))
|
|
}
|