mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
cdfe854c9b
This improves the code readability and debuggability, since the output of these functions stops depending on global state.
274 lines
8.0 KiB
Go
274 lines
8.0 KiB
Go
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// blockStreamWriter represents block stream writer.
|
|
type blockStreamWriter struct {
|
|
compressLevel int
|
|
path string
|
|
|
|
// Use io.Writer type for timestampsWriter and valuesWriter
|
|
// in order to remove I2I conversion in WriteExternalBlock
|
|
// when passing them to fs.MustWriteData
|
|
timestampsWriter io.Writer
|
|
valuesWriter io.Writer
|
|
|
|
indexWriter filestream.WriteCloser
|
|
metaindexWriter filestream.WriteCloser
|
|
|
|
mr metaindexRow
|
|
|
|
timestampsBlockOffset uint64
|
|
valuesBlockOffset uint64
|
|
indexBlockOffset uint64
|
|
|
|
indexData []byte
|
|
compressedIndexData []byte
|
|
|
|
metaindexData []byte
|
|
compressedMetaindexData []byte
|
|
|
|
// prevTimestamps* is used as an optimization for reducing disk space usage
|
|
// when serially written blocks have identical timestamps.
|
|
// This is usually the case when adjacent blocks contain metrics scraped from the same target,
|
|
// since such metrics have identical timestamps.
|
|
prevTimestampsData []byte
|
|
prevTimestampsBlockOffset uint64
|
|
}
|
|
|
|
func (bsw *blockStreamWriter) assertWriteClosers() {
|
|
_ = bsw.timestampsWriter.(filestream.WriteCloser)
|
|
_ = bsw.valuesWriter.(filestream.WriteCloser)
|
|
}
|
|
|
|
// Init initializes bsw with the given writers.
|
|
func (bsw *blockStreamWriter) reset() {
|
|
bsw.compressLevel = 0
|
|
bsw.path = ""
|
|
|
|
bsw.timestampsWriter = nil
|
|
bsw.valuesWriter = nil
|
|
bsw.indexWriter = nil
|
|
bsw.metaindexWriter = nil
|
|
|
|
bsw.mr.Reset()
|
|
|
|
bsw.timestampsBlockOffset = 0
|
|
bsw.valuesBlockOffset = 0
|
|
bsw.indexBlockOffset = 0
|
|
|
|
bsw.indexData = bsw.indexData[:0]
|
|
bsw.compressedIndexData = bsw.compressedIndexData[:0]
|
|
|
|
bsw.metaindexData = bsw.metaindexData[:0]
|
|
bsw.compressedMetaindexData = bsw.compressedMetaindexData[:0]
|
|
|
|
bsw.prevTimestampsData = bsw.prevTimestampsData[:0]
|
|
bsw.prevTimestampsBlockOffset = 0
|
|
}
|
|
|
|
// InitFromInmemoryPart initialzes bsw from inmemory part.
|
|
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart) {
|
|
bsw.reset()
|
|
|
|
// Use the minimum compression level for in-memory blocks,
|
|
// since they are going to be re-compressed during the merge into file-based blocks.
|
|
bsw.compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
|
|
|
bsw.timestampsWriter = &mp.timestampsData
|
|
bsw.valuesWriter = &mp.valuesData
|
|
bsw.indexWriter = &mp.indexData
|
|
bsw.metaindexWriter = &mp.metaindexData
|
|
|
|
bsw.assertWriteClosers()
|
|
}
|
|
|
|
// InitFromFilePart initializes bsw from a file-based part on the given path.
|
|
//
|
|
// The bsw doesn't pollute OS page cache if nocache is set.
|
|
func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compressLevel int) error {
|
|
path = filepath.Clean(path)
|
|
|
|
// Create the directory
|
|
if err := fs.MkdirAllFailIfExist(path); err != nil {
|
|
return fmt.Errorf("cannot create directory %q: %w", path, err)
|
|
}
|
|
|
|
// Create part files in the directory.
|
|
timestampsPath := path + "/timestamps.bin"
|
|
timestampsFile, err := filestream.Create(timestampsPath, nocache)
|
|
if err != nil {
|
|
fs.MustRemoveAll(path)
|
|
return fmt.Errorf("cannot create timestamps file: %w", err)
|
|
}
|
|
|
|
valuesPath := path + "/values.bin"
|
|
valuesFile, err := filestream.Create(valuesPath, nocache)
|
|
if err != nil {
|
|
timestampsFile.MustClose()
|
|
fs.MustRemoveAll(path)
|
|
return fmt.Errorf("cannot create values file: %w", err)
|
|
}
|
|
|
|
indexPath := path + "/index.bin"
|
|
indexFile, err := filestream.Create(indexPath, nocache)
|
|
if err != nil {
|
|
timestampsFile.MustClose()
|
|
valuesFile.MustClose()
|
|
fs.MustRemoveAll(path)
|
|
return fmt.Errorf("cannot create index file: %w", err)
|
|
}
|
|
|
|
// Always cache metaindex file in OS page cache, since it is immediately
|
|
// read after the merge.
|
|
metaindexPath := path + "/metaindex.bin"
|
|
metaindexFile, err := filestream.Create(metaindexPath, false)
|
|
if err != nil {
|
|
timestampsFile.MustClose()
|
|
valuesFile.MustClose()
|
|
indexFile.MustClose()
|
|
fs.MustRemoveAll(path)
|
|
return fmt.Errorf("cannot create metaindex file: %w", err)
|
|
}
|
|
|
|
bsw.reset()
|
|
bsw.compressLevel = compressLevel
|
|
bsw.path = path
|
|
|
|
bsw.timestampsWriter = timestampsFile
|
|
bsw.valuesWriter = valuesFile
|
|
bsw.indexWriter = indexFile
|
|
bsw.metaindexWriter = metaindexFile
|
|
|
|
bsw.assertWriteClosers()
|
|
|
|
return nil
|
|
}
|
|
|
|
// MustClose closes the bsw.
|
|
//
|
|
// It closes *Writer files passed to Init*.
|
|
func (bsw *blockStreamWriter) MustClose() {
|
|
// Flush remaining data.
|
|
bsw.flushIndexData()
|
|
|
|
// Write metaindex data.
|
|
bsw.compressedMetaindexData = encoding.CompressZSTDLevel(bsw.compressedMetaindexData[:0], bsw.metaindexData, bsw.compressLevel)
|
|
fs.MustWriteData(bsw.metaindexWriter, bsw.compressedMetaindexData)
|
|
|
|
// Close writers.
|
|
bsw.timestampsWriter.(filestream.WriteCloser).MustClose()
|
|
bsw.valuesWriter.(filestream.WriteCloser).MustClose()
|
|
bsw.indexWriter.MustClose()
|
|
bsw.metaindexWriter.MustClose()
|
|
|
|
// Sync bsw.path contents to make sure it doesn't disappear
|
|
// after system crash or power loss.
|
|
if bsw.path != "" {
|
|
fs.MustSyncPath(bsw.path)
|
|
}
|
|
|
|
bsw.reset()
|
|
}
|
|
|
|
// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.
|
|
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64, needDedup bool) {
|
|
atomic.AddUint64(rowsMerged, uint64(b.rowsCount()))
|
|
if needDedup {
|
|
di := GetDedupInterval()
|
|
dedupInterval := di.Milliseconds()
|
|
b.deduplicateSamplesDuringMerge(dedupInterval)
|
|
}
|
|
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
|
|
usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData)
|
|
if usePrevTimestamps {
|
|
// The current timestamps block equals to the previous timestamps block.
|
|
// Update headerData so it points to the previous timestamps block. This saves disk space.
|
|
headerData, timestampsData, valuesData = b.MarshalData(bsw.prevTimestampsBlockOffset, bsw.valuesBlockOffset)
|
|
atomic.AddUint64(×tampsBlocksMerged, 1)
|
|
atomic.AddUint64(×tampsBytesSaved, uint64(len(timestampsData)))
|
|
}
|
|
bsw.indexData = append(bsw.indexData, headerData...)
|
|
bsw.mr.RegisterBlockHeader(&b.bh)
|
|
if len(bsw.indexData) >= maxBlockSize {
|
|
bsw.flushIndexData()
|
|
}
|
|
if !usePrevTimestamps {
|
|
bsw.prevTimestampsData = append(bsw.prevTimestampsData[:0], timestampsData...)
|
|
bsw.prevTimestampsBlockOffset = bsw.timestampsBlockOffset
|
|
fs.MustWriteData(bsw.timestampsWriter, timestampsData)
|
|
bsw.timestampsBlockOffset += uint64(len(timestampsData))
|
|
}
|
|
fs.MustWriteData(bsw.valuesWriter, valuesData)
|
|
bsw.valuesBlockOffset += uint64(len(valuesData))
|
|
updatePartHeader(b, ph)
|
|
}
|
|
|
|
var (
|
|
timestampsBlocksMerged uint64
|
|
timestampsBytesSaved uint64
|
|
)
|
|
|
|
func updatePartHeader(b *Block, ph *partHeader) {
|
|
ph.BlocksCount++
|
|
ph.RowsCount += uint64(b.bh.RowsCount)
|
|
if b.bh.MinTimestamp < ph.MinTimestamp {
|
|
ph.MinTimestamp = b.bh.MinTimestamp
|
|
}
|
|
if b.bh.MaxTimestamp > ph.MaxTimestamp {
|
|
ph.MaxTimestamp = b.bh.MaxTimestamp
|
|
}
|
|
}
|
|
|
|
func (bsw *blockStreamWriter) flushIndexData() {
|
|
if len(bsw.indexData) == 0 {
|
|
return
|
|
}
|
|
|
|
// Write compressed index block to index data.
|
|
bsw.compressedIndexData = encoding.CompressZSTDLevel(bsw.compressedIndexData[:0], bsw.indexData, bsw.compressLevel)
|
|
indexBlockSize := len(bsw.compressedIndexData)
|
|
if uint64(indexBlockSize) >= 1<<32 {
|
|
logger.Panicf("BUG: indexBlock size must fit uint32; got %d", indexBlockSize)
|
|
}
|
|
fs.MustWriteData(bsw.indexWriter, bsw.compressedIndexData)
|
|
|
|
// Write metaindex row to metaindex data.
|
|
bsw.mr.IndexBlockOffset = bsw.indexBlockOffset
|
|
bsw.mr.IndexBlockSize = uint32(indexBlockSize)
|
|
bsw.metaindexData = bsw.mr.Marshal(bsw.metaindexData)
|
|
|
|
// Update offsets.
|
|
bsw.indexBlockOffset += uint64(indexBlockSize)
|
|
|
|
bsw.indexData = bsw.indexData[:0]
|
|
bsw.mr.Reset()
|
|
}
|
|
|
|
func getBlockStreamWriter() *blockStreamWriter {
|
|
v := bswPool.Get()
|
|
if v == nil {
|
|
return &blockStreamWriter{}
|
|
}
|
|
return v.(*blockStreamWriter)
|
|
}
|
|
|
|
func putBlockStreamWriter(bsw *blockStreamWriter) {
|
|
bsw.reset()
|
|
bswPool.Put(bsw)
|
|
}
|
|
|
|
var bswPool sync.Pool
|