VictoriaMetrics/lib/mergeset/inmemory_part.go
Aliaksandr Valialkin eb7df27e20
lib/{mergeset,storage}: properly fsync part directory listing after writing in-memory part to disk
This is a follow-up after 42bba64aa7

Previously the part directory listing was fsync'ed implicitly inside partHeader.WriteMetadata()
by calling fs.WriteFileAtomically(). Now it must be fsync'ed explicitly.

There is no need in fsync'ing the parent directory, since it is fsync'ed by the caller
when updating parts.json file.
2023-04-13 21:21:46 -07:00

123 lines
4.1 KiB
Go

package mergeset
import (
"fmt"
"path/filepath"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type inmemoryPart struct {
ph partHeader
bh blockHeader
mr metaindexRow
metaindexData bytesutil.ByteBuffer
indexData bytesutil.ByteBuffer
itemsData bytesutil.ByteBuffer
lensData bytesutil.ByteBuffer
}
func (mp *inmemoryPart) Reset() {
mp.ph.Reset()
mp.bh.Reset()
mp.mr.Reset()
mp.metaindexData.Reset()
mp.indexData.Reset()
mp.itemsData.Reset()
mp.lensData.Reset()
}
// StoreToDisk stores mp to the given path on disk.
func (mp *inmemoryPart) StoreToDisk(path string) error {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return fmt.Errorf("cannot create directory %q: %w", path, err)
}
metaindexPath := filepath.Join(path, metaindexFilename)
if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil {
return fmt.Errorf("cannot store metaindex: %w", err)
}
indexPath := filepath.Join(path, indexFilename)
if err := fs.WriteFileAndSync(indexPath, mp.indexData.B); err != nil {
return fmt.Errorf("cannot store index: %w", err)
}
itemsPath := filepath.Join(path, itemsFilename)
if err := fs.WriteFileAndSync(itemsPath, mp.itemsData.B); err != nil {
return fmt.Errorf("cannot store items: %w", err)
}
lensPath := filepath.Join(path, lensFilename)
if err := fs.WriteFileAndSync(lensPath, mp.lensData.B); err != nil {
return fmt.Errorf("cannot store lens: %w", err)
}
if err := mp.ph.WriteMetadata(path); err != nil {
return fmt.Errorf("cannot store metadata: %w", err)
}
fs.MustSyncPath(path)
// Do not sync parent directory - it must be synced by the caller.
return nil
}
// Init initializes mp from ib.
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
mp.Reset()
// Re-use mp.itemsData and mp.lensData in sb.
// This eliminates copying itemsData and lensData from sb to mp later.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2247
sb := &storageBlock{}
sb.itemsData = mp.itemsData.B[:0]
sb.lensData = mp.lensData.B[:0]
// Use the minimum possible compressLevel for compressing inmemoryPart,
// since it will be merged into file part soon.
// See https://github.com/facebook/zstd/releases/tag/v1.3.4 for details about negative compression level
compressLevel := -5
mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)
mp.ph.itemsCount = uint64(len(ib.items))
mp.ph.blocksCount = 1
mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
mp.itemsData.B = sb.itemsData
mp.bh.itemsBlockOffset = 0
mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))
mp.lensData.B = sb.lensData
mp.bh.lensBlockOffset = 0
mp.bh.lensBlockSize = uint32(len(mp.lensData.B))
bb := inmemoryPartBytePool.Get()
bb.B = mp.bh.Marshal(bb.B[:0])
mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, compressLevel)
mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
mp.mr.blockHeadersCount = 1
mp.mr.indexBlockOffset = 0
mp.mr.indexBlockSize = uint32(len(mp.indexData.B))
bb.B = mp.mr.Marshal(bb.B[:0])
mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, compressLevel)
inmemoryPartBytePool.Put(bb)
}
var inmemoryPartBytePool bytesutil.ByteBufferPool
// It is safe calling NewPart multiple times.
// It is unsafe re-using mp while the returned part is in use.
func (mp *inmemoryPart) NewPart() *part {
size := mp.size()
p, err := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData)
if err != nil {
logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err)
}
return p
}
func (mp *inmemoryPart) size() uint64 {
return uint64(cap(mp.metaindexData.B) + cap(mp.indexData.B) + cap(mp.itemsData.B) + cap(mp.lensData.B))
}