VictoriaMetrics/lib/mergeset/metaindex_row.go
Aliaksandr Valialkin e1bf8440eb
lib/mergeset: prevent from possible too big indexBlockSize panic
This panic could occur when samples with too long label values are ingested into VictoriaMetrics.
This could result in too long fistItem and commonPrefix values at blockHeader (up to 64kb each).
This may inflate the maximum index block size by 4 * maxIndexBlockSize.
2024-02-08 12:54:10 +02:00

126 lines
3.6 KiB
Go

package mergeset
import (
"fmt"
"io"
"sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
// metaindexRow describes a block of blockHeaders aka index block.
type metaindexRow struct {
// First item in the first block.
// It is used for fast lookup of the required index block.
firstItem []byte
// The number of blockHeaders the block contains.
blockHeadersCount uint32
// The offset of the block in the index file.
indexBlockOffset uint64
// The size of the block in the index file.
indexBlockSize uint32
}
func (mr *metaindexRow) Reset() {
mr.firstItem = mr.firstItem[:0]
mr.blockHeadersCount = 0
mr.indexBlockOffset = 0
mr.indexBlockSize = 0
}
func (mr *metaindexRow) Marshal(dst []byte) []byte {
dst = encoding.MarshalBytes(dst, mr.firstItem)
dst = encoding.MarshalUint32(dst, mr.blockHeadersCount)
dst = encoding.MarshalUint64(dst, mr.indexBlockOffset)
dst = encoding.MarshalUint32(dst, mr.indexBlockSize)
return dst
}
func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) {
// Unmarshal firstItem
tail, fi, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err)
}
mr.firstItem = append(mr.firstItem[:0], fi...)
src = tail
// Unmarshal blockHeadersCount
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal blockHeadersCount from %d bytes; need at least %d bytes", len(src), 4)
}
mr.blockHeadersCount = encoding.UnmarshalUint32(src)
src = src[4:]
// Unmarshal indexBlockOffset
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal indexBlockOffset from %d bytes; need at least %d bytes", len(src), 8)
}
mr.indexBlockOffset = encoding.UnmarshalUint64(src)
src = src[8:]
// Unmarshal indexBlockSize
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal indexBlockSize from %d bytes; need at least %d bytes", len(src), 4)
}
mr.indexBlockSize = encoding.UnmarshalUint32(src)
src = src[4:]
if mr.blockHeadersCount <= 0 {
return src, fmt.Errorf("blockHeadersCount must be bigger than 0; got %d", mr.blockHeadersCount)
}
if mr.indexBlockSize > 4*maxIndexBlockSize {
// The index block size can exceed maxIndexBlockSize by up to 4x,
// since it can contain commonPrefix and firstItem at blockHeader
// with the maximum length of maxIndexBlockSize per each field.
return src, fmt.Errorf("too big indexBlockSize: %d; cannot exceed %d", mr.indexBlockSize, 4*maxIndexBlockSize)
}
return src, nil
}
func unmarshalMetaindexRows(dst []metaindexRow, r io.Reader) ([]metaindexRow, error) {
// It is ok to read all the metaindex in memory,
// since it is quite small.
compressedData, err := io.ReadAll(r)
if err != nil {
return dst, fmt.Errorf("cannot read metaindex data: %w", err)
}
data, err := encoding.DecompressZSTD(nil, compressedData)
if err != nil {
return dst, fmt.Errorf("cannot decompress metaindex data: %w", err)
}
dstLen := len(dst)
for len(data) > 0 {
if len(dst) < cap(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, metaindexRow{})
}
mr := &dst[len(dst)-1]
tail, err := mr.Unmarshal(data)
if err != nil {
return dst, fmt.Errorf("cannot unmarshal metaindexRow #%d from metaindex data: %w", len(dst)-dstLen, err)
}
data = tail
}
if dstLen == len(dst) {
return dst, fmt.Errorf("expecting non-zero metaindex rows; got zero")
}
// Make sure metaindexRows are sorted by firstItem.
tmp := dst[dstLen:]
ok := sort.SliceIsSorted(tmp, func(i, j int) bool {
return string(tmp[i].firstItem) < string(tmp[j].firstItem)
})
if !ok {
return dst, fmt.Errorf("metaindex %d rows aren't sorted by firstItem", len(tmp))
}
return dst, nil
}