mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 22:32:20 +01:00
1cda542c48
When WriteFileAndSync fails, then the caller eventually logs the error message and exits. The error message returned by WriteFileAndSync already contains the path to the file, which couldn't be created. This information alongside the call stack is enough for debugging the issue. So just use log.Panicf("FATAL: ...") inside MustWriteAndSync(). This simplifies error handling at caller side a bit.
118 lines
3.7 KiB
Go
118 lines
3.7 KiB
Go
package mergeset
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
type inmemoryPart struct {
|
|
ph partHeader
|
|
bh blockHeader
|
|
mr metaindexRow
|
|
|
|
metaindexData bytesutil.ByteBuffer
|
|
indexData bytesutil.ByteBuffer
|
|
itemsData bytesutil.ByteBuffer
|
|
lensData bytesutil.ByteBuffer
|
|
}
|
|
|
|
func (mp *inmemoryPart) Reset() {
|
|
mp.ph.Reset()
|
|
mp.bh.Reset()
|
|
mp.mr.Reset()
|
|
|
|
mp.metaindexData.Reset()
|
|
mp.indexData.Reset()
|
|
mp.itemsData.Reset()
|
|
mp.lensData.Reset()
|
|
}
|
|
|
|
// StoreToDisk stores mp to the given path on disk.
|
|
func (mp *inmemoryPart) StoreToDisk(path string) error {
|
|
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
|
return fmt.Errorf("cannot create directory %q: %w", path, err)
|
|
}
|
|
metaindexPath := filepath.Join(path, metaindexFilename)
|
|
fs.MustWriteFileAndSync(metaindexPath, mp.metaindexData.B)
|
|
|
|
indexPath := filepath.Join(path, indexFilename)
|
|
fs.MustWriteFileAndSync(indexPath, mp.indexData.B)
|
|
|
|
itemsPath := filepath.Join(path, itemsFilename)
|
|
fs.MustWriteFileAndSync(itemsPath, mp.itemsData.B)
|
|
|
|
lensPath := filepath.Join(path, lensFilename)
|
|
fs.MustWriteFileAndSync(lensPath, mp.lensData.B)
|
|
|
|
mp.ph.MustWriteMetadata(path)
|
|
|
|
fs.MustSyncPath(path)
|
|
// Do not sync parent directory - it must be synced by the caller.
|
|
return nil
|
|
}
|
|
|
|
// Init initializes mp from ib.
|
|
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
|
|
mp.Reset()
|
|
|
|
// Re-use mp.itemsData and mp.lensData in sb.
|
|
// This eliminates copying itemsData and lensData from sb to mp later.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2247
|
|
sb := &storageBlock{}
|
|
sb.itemsData = mp.itemsData.B[:0]
|
|
sb.lensData = mp.lensData.B[:0]
|
|
|
|
// Use the minimum possible compressLevel for compressing inmemoryPart,
|
|
// since it will be merged into file part soon.
|
|
// See https://github.com/facebook/zstd/releases/tag/v1.3.4 for details about negative compression level
|
|
compressLevel := -5
|
|
mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)
|
|
|
|
mp.ph.itemsCount = uint64(len(ib.items))
|
|
mp.ph.blocksCount = 1
|
|
mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
|
|
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
|
|
|
|
mp.itemsData.B = sb.itemsData
|
|
mp.bh.itemsBlockOffset = 0
|
|
mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))
|
|
|
|
mp.lensData.B = sb.lensData
|
|
mp.bh.lensBlockOffset = 0
|
|
mp.bh.lensBlockSize = uint32(len(mp.lensData.B))
|
|
|
|
bb := inmemoryPartBytePool.Get()
|
|
bb.B = mp.bh.Marshal(bb.B[:0])
|
|
mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, compressLevel)
|
|
|
|
mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
|
|
mp.mr.blockHeadersCount = 1
|
|
mp.mr.indexBlockOffset = 0
|
|
mp.mr.indexBlockSize = uint32(len(mp.indexData.B))
|
|
bb.B = mp.mr.Marshal(bb.B[:0])
|
|
mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, compressLevel)
|
|
inmemoryPartBytePool.Put(bb)
|
|
}
|
|
|
|
var inmemoryPartBytePool bytesutil.ByteBufferPool
|
|
|
|
// It is safe calling NewPart multiple times.
|
|
// It is unsafe re-using mp while the returned part is in use.
|
|
func (mp *inmemoryPart) NewPart() *part {
|
|
size := mp.size()
|
|
p, err := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData)
|
|
if err != nil {
|
|
logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err)
|
|
}
|
|
return p
|
|
}
|
|
|
|
func (mp *inmemoryPart) size() uint64 {
|
|
return uint64(cap(mp.metaindexData.B) + cap(mp.indexData.B) + cap(mp.itemsData.B) + cap(mp.lensData.B))
|
|
}
|