mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-22 08:10:44 +01:00
165 lines
5.6 KiB
Go
165 lines
5.6 KiB
Go
|
package logstorage
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"io"
|
||
|
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||
|
)
|
||
|
|
||
|
// indexBlockHeader contains index information about multiple blocks.
|
||
|
//
|
||
|
// It allows locating the block by streamID and/or by time range.
|
||
|
type indexBlockHeader struct {
|
||
|
// streamID is the minimum streamID covered by the indexBlockHeader
|
||
|
streamID streamID
|
||
|
|
||
|
// minTimestamp is the mimumum timestamp seen across blocks covered by the indexBlockHeader
|
||
|
minTimestamp int64
|
||
|
|
||
|
// maxTimestamp is the maximum timestamp seen across blocks covered by the indexBlockHeader
|
||
|
maxTimestamp int64
|
||
|
|
||
|
// indexBlockOffset is an offset of the linked index block at indexFilename
|
||
|
indexBlockOffset uint64
|
||
|
|
||
|
// indexBlockSize is the size of the linked index block at indexFilename
|
||
|
indexBlockSize uint64
|
||
|
}
|
||
|
|
||
|
// reset resets ih for subsequent re-use.
|
||
|
func (ih *indexBlockHeader) reset() {
|
||
|
ih.streamID.reset()
|
||
|
ih.minTimestamp = 0
|
||
|
ih.maxTimestamp = 0
|
||
|
ih.indexBlockOffset = 0
|
||
|
ih.indexBlockSize = 0
|
||
|
}
|
||
|
|
||
|
// mustWriteIndexBlock writes data with the given additioanl args to sw and updates ih accordingly.
|
||
|
func (ih *indexBlockHeader) mustWriteIndexBlock(data []byte, sidFirst streamID, minTimestamp, maxTimestamp int64, sw *streamWriters) {
|
||
|
ih.streamID = sidFirst
|
||
|
ih.minTimestamp = minTimestamp
|
||
|
ih.maxTimestamp = maxTimestamp
|
||
|
|
||
|
bb := longTermBufPool.Get()
|
||
|
bb.B = encoding.CompressZSTDLevel(bb.B[:0], data, 1)
|
||
|
ih.indexBlockOffset = sw.indexWriter.bytesWritten
|
||
|
ih.indexBlockSize = uint64(len(bb.B))
|
||
|
sw.indexWriter.MustWrite(bb.B)
|
||
|
longTermBufPool.Put(bb)
|
||
|
}
|
||
|
|
||
|
// mustReadNextIndexBlock reads the next index block associated with ih from src, appends it to dst and returns the result.
|
||
|
func (ih *indexBlockHeader) mustReadNextIndexBlock(dst []byte, sr *streamReaders) []byte {
|
||
|
indexReader := &sr.indexReader
|
||
|
|
||
|
indexBlockSize := ih.indexBlockSize
|
||
|
if indexBlockSize > maxIndexBlockSize {
|
||
|
logger.Panicf("FATAL: %s: indexBlockHeader.indexBlockSize=%d cannot exceed %d bytes", indexReader.Path(), indexBlockSize, maxIndexBlockSize)
|
||
|
}
|
||
|
if ih.indexBlockOffset != indexReader.bytesRead {
|
||
|
logger.Panicf("FATAL: %s: indexBlockHeader.indexBlockOffset=%d must equal to %d", indexReader.Path(), ih.indexBlockOffset, indexReader.bytesRead)
|
||
|
}
|
||
|
bbCompressed := longTermBufPool.Get()
|
||
|
bbCompressed.B = bytesutil.ResizeNoCopyMayOverallocate(bbCompressed.B, int(indexBlockSize))
|
||
|
indexReader.MustReadFull(bbCompressed.B)
|
||
|
|
||
|
// Decompress bbCompressed to dst
|
||
|
var err error
|
||
|
dst, err = encoding.DecompressZSTD(dst, bbCompressed.B)
|
||
|
longTermBufPool.Put(bbCompressed)
|
||
|
if err != nil {
|
||
|
logger.Panicf("FATAL: %s: cannot decompress indexBlock read at offset %d with size %d: %s", indexReader.Path(), ih.indexBlockOffset, indexBlockSize, err)
|
||
|
}
|
||
|
return dst
|
||
|
}
|
||
|
|
||
|
// marshal appends marshaled ih to dst and returns the result.
|
||
|
func (ih *indexBlockHeader) marshal(dst []byte) []byte {
|
||
|
dst = ih.streamID.marshal(dst)
|
||
|
dst = encoding.MarshalUint64(dst, uint64(ih.minTimestamp))
|
||
|
dst = encoding.MarshalUint64(dst, uint64(ih.maxTimestamp))
|
||
|
dst = encoding.MarshalUint64(dst, ih.indexBlockOffset)
|
||
|
dst = encoding.MarshalUint64(dst, ih.indexBlockSize)
|
||
|
return dst
|
||
|
}
|
||
|
|
||
|
// unmarshal unmarshals ih from src and returns the tail left.
|
||
|
func (ih *indexBlockHeader) unmarshal(src []byte) ([]byte, error) {
|
||
|
srcOrig := src
|
||
|
|
||
|
// unmarshal ih.streamID
|
||
|
tail, err := ih.streamID.unmarshal(src)
|
||
|
if err != nil {
|
||
|
return srcOrig, fmt.Errorf("cannot unmarshal streamID: %w", err)
|
||
|
}
|
||
|
src = tail
|
||
|
|
||
|
// unmarshal the rest of indexBlockHeader fields
|
||
|
if len(src) < 32 {
|
||
|
return srcOrig, fmt.Errorf("cannot unmarshal indexBlockHeader from %d bytes; need at least 32 bytes", len(src))
|
||
|
}
|
||
|
ih.minTimestamp = int64(encoding.UnmarshalUint64(src))
|
||
|
ih.maxTimestamp = int64(encoding.UnmarshalUint64(src[8:]))
|
||
|
ih.indexBlockOffset = encoding.UnmarshalUint64(src[16:])
|
||
|
ih.indexBlockSize = encoding.UnmarshalUint64(src[24:])
|
||
|
|
||
|
return src[32:], nil
|
||
|
}
|
||
|
|
||
|
// mustReadIndexBlockHeaders reads indexBlockHeader entries from r, appends them to dst and returns the result.
|
||
|
func mustReadIndexBlockHeaders(dst []indexBlockHeader, r *readerWithStats) []indexBlockHeader {
|
||
|
data, err := io.ReadAll(r)
|
||
|
if err != nil {
|
||
|
logger.Panicf("FATAL: cannot read indexBlockHeader entries from %s: %s", r.Path(), err)
|
||
|
}
|
||
|
|
||
|
bb := longTermBufPool.Get()
|
||
|
bb.B, err = encoding.DecompressZSTD(bb.B[:0], data)
|
||
|
if err != nil {
|
||
|
logger.Panicf("FATAL: cannot decompress indexBlockHeader entries from %s: %s", r.Path(), err)
|
||
|
}
|
||
|
dst, err = unmarshalIndexBlockHeaders(dst, bb.B)
|
||
|
if len(bb.B) < 1024*1024 {
|
||
|
longTermBufPool.Put(bb)
|
||
|
}
|
||
|
if err != nil {
|
||
|
logger.Panicf("FATAL: cannot parse indexBlockHeader entries from %s: %s", r.Path(), err)
|
||
|
}
|
||
|
return dst
|
||
|
}
|
||
|
|
||
|
// unmarshalIndexBlockHeaders appends unmarshaled from src indexBlockHeader entries to dst and returns the result.
|
||
|
func unmarshalIndexBlockHeaders(dst []indexBlockHeader, src []byte) ([]indexBlockHeader, error) {
|
||
|
dstOrig := dst
|
||
|
for len(src) > 0 {
|
||
|
if len(dst) < cap(dst) {
|
||
|
dst = dst[:len(dst)+1]
|
||
|
} else {
|
||
|
dst = append(dst, indexBlockHeader{})
|
||
|
}
|
||
|
ih := &dst[len(dst)-1]
|
||
|
tail, err := ih.unmarshal(src)
|
||
|
if err != nil {
|
||
|
return dstOrig, fmt.Errorf("cannot unmarshal indexBlockHeader %d: %w", len(dst)-len(dstOrig), err)
|
||
|
}
|
||
|
src = tail
|
||
|
}
|
||
|
if err := validateIndexBlockHeaders(dst[len(dstOrig):]); err != nil {
|
||
|
return dstOrig, err
|
||
|
}
|
||
|
return dst, nil
|
||
|
}
|
||
|
|
||
|
func validateIndexBlockHeaders(ihs []indexBlockHeader) error {
|
||
|
for i := 1; i < len(ihs); i++ {
|
||
|
if ihs[i].streamID.less(&ihs[i-1].streamID) {
|
||
|
return fmt.Errorf("unexpected indexBlockHeader with smaller streamID=%s after bigger streamID=%s", &ihs[i].streamID, &ihs[i-1].streamID)
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|