mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
167 lines
4.6 KiB
Go
167 lines
4.6 KiB
Go
|
package storage
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"io/ioutil"
|
||
|
"sort"
|
||
|
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||
|
)
|
||
|
|
||
|
// metaindexRow is a single metaindex row.
|
||
|
//
|
||
|
// The row points to a single index block containing block headers.
|
||
|
type metaindexRow struct {
|
||
|
// TSID is the first TSID in the corresponding index block.
|
||
|
TSID TSID
|
||
|
|
||
|
// BlockHeadersCount is the number of block headers
|
||
|
// in the given index block.
|
||
|
BlockHeadersCount uint32
|
||
|
|
||
|
// MinTimestamp is the minimum timestamp in the given index block.
|
||
|
MinTimestamp int64
|
||
|
|
||
|
// MaxTimestamp is the maximum timestamp in the given index block.
|
||
|
MaxTimestamp int64
|
||
|
|
||
|
// IndexBlockOffset is the offset of index block.
|
||
|
IndexBlockOffset uint64
|
||
|
|
||
|
// IndexBlockSize is the size of compressed index block.
|
||
|
IndexBlockSize uint32
|
||
|
}
|
||
|
|
||
|
// Reset resets the mr using the given tsid.
|
||
|
func (mr *metaindexRow) Reset() {
|
||
|
mr.TSID = TSID{}
|
||
|
mr.BlockHeadersCount = 0
|
||
|
mr.MinTimestamp = (1 << 63) - 1
|
||
|
mr.MaxTimestamp = -1 << 63
|
||
|
mr.IndexBlockOffset = 0
|
||
|
mr.IndexBlockSize = 0
|
||
|
}
|
||
|
|
||
|
// RegisterBlockHeader registers the given bh in the mr.
|
||
|
func (mr *metaindexRow) RegisterBlockHeader(bh *blockHeader) {
|
||
|
mr.BlockHeadersCount++
|
||
|
if mr.BlockHeadersCount == 1 {
|
||
|
mr.TSID = bh.TSID
|
||
|
mr.MinTimestamp = bh.MinTimestamp
|
||
|
mr.MaxTimestamp = bh.MaxTimestamp
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if bh.MinTimestamp < mr.MinTimestamp {
|
||
|
mr.MinTimestamp = bh.MinTimestamp
|
||
|
}
|
||
|
if bh.MaxTimestamp > mr.MaxTimestamp {
|
||
|
mr.MaxTimestamp = bh.MaxTimestamp
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Marshal appends marshaled mr to dst and returns the result.
|
||
|
func (mr *metaindexRow) Marshal(dst []byte) []byte {
|
||
|
dst = mr.TSID.Marshal(dst)
|
||
|
dst = encoding.MarshalUint32(dst, mr.BlockHeadersCount)
|
||
|
dst = encoding.MarshalInt64(dst, mr.MinTimestamp)
|
||
|
dst = encoding.MarshalInt64(dst, mr.MaxTimestamp)
|
||
|
dst = encoding.MarshalUint64(dst, mr.IndexBlockOffset)
|
||
|
dst = encoding.MarshalUint32(dst, mr.IndexBlockSize)
|
||
|
return dst
|
||
|
}
|
||
|
|
||
|
// Unmarshal unmarshals mr from src and returns the tail of src.
|
||
|
func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) {
|
||
|
// Unmarshal TSID
|
||
|
tail, err := mr.TSID.Unmarshal(src)
|
||
|
if err != nil {
|
||
|
return src, fmt.Errorf("cannot unmarshal TSID: %s", err)
|
||
|
}
|
||
|
src = tail
|
||
|
|
||
|
// Unmarshal BlockHeadersCount
|
||
|
if len(src) < 4 {
|
||
|
return src, fmt.Errorf("cannot unmarshal BlockHeadersCount from %d bytes; want at least %d bytes", len(src), 4)
|
||
|
}
|
||
|
mr.BlockHeadersCount = encoding.UnmarshalUint32(src)
|
||
|
src = src[4:]
|
||
|
|
||
|
// Unmarshal MinTimestamp
|
||
|
if len(src) < 8 {
|
||
|
return src, fmt.Errorf("cannot unmarshal MinTimestamp from %d bytes; want at least %d bytes", len(src), 8)
|
||
|
}
|
||
|
mr.MinTimestamp = encoding.UnmarshalInt64(src)
|
||
|
src = src[8:]
|
||
|
|
||
|
// Unmarshal MaxTimestamp
|
||
|
if len(src) < 8 {
|
||
|
return src, fmt.Errorf("cannot unmarshal MaxTimestamp from %d bytes; want at least %d bytes", len(src), 8)
|
||
|
}
|
||
|
mr.MaxTimestamp = encoding.UnmarshalInt64(src)
|
||
|
src = src[8:]
|
||
|
|
||
|
// Unmarshal IndexBlockOffset
|
||
|
if len(src) < 8 {
|
||
|
return src, fmt.Errorf("cannot unmarshal IndexBlockOffset from %d bytes; want at least %d bytes", len(src), 8)
|
||
|
}
|
||
|
mr.IndexBlockOffset = encoding.UnmarshalUint64(src)
|
||
|
src = src[8:]
|
||
|
|
||
|
// Unmarshal IndexBlockSize
|
||
|
if len(src) < 4 {
|
||
|
return src, fmt.Errorf("cannot unmarshal IndexBlockSize from %d bytes; want at least %d bytes", len(src), 4)
|
||
|
}
|
||
|
mr.IndexBlockSize = encoding.UnmarshalUint32(src)
|
||
|
src = src[4:]
|
||
|
|
||
|
// Validate unmarshaled data.
|
||
|
if mr.BlockHeadersCount <= 0 {
|
||
|
return src, fmt.Errorf("BlockHeadersCount must be greater than 0")
|
||
|
}
|
||
|
if mr.IndexBlockSize > 2*8*maxBlockSize {
|
||
|
return src, fmt.Errorf("too big IndexBlockSize; got %d; cannot exceed %d", mr.IndexBlockSize, 2*8*maxBlockSize)
|
||
|
}
|
||
|
|
||
|
return src, nil
|
||
|
}
|
||
|
|
||
|
func unmarshalMetaindexRows(dst []metaindexRow, r io.Reader) ([]metaindexRow, error) {
|
||
|
compressedData, err := ioutil.ReadAll(r)
|
||
|
if err != nil {
|
||
|
return dst, fmt.Errorf("cannot read metaindex rows: %s", err)
|
||
|
}
|
||
|
data, err := encoding.DecompressZSTD(nil, compressedData)
|
||
|
if err != nil {
|
||
|
return dst, fmt.Errorf("cannot decompress metaindex rows: %s", err)
|
||
|
}
|
||
|
|
||
|
dstLen := len(dst)
|
||
|
for len(data) > 0 {
|
||
|
if len(dst) < cap(dst) {
|
||
|
dst = dst[:len(dst)+1]
|
||
|
} else {
|
||
|
dst = append(dst, metaindexRow{})
|
||
|
}
|
||
|
mr := &dst[len(dst)-1]
|
||
|
tail, err := mr.Unmarshal(data)
|
||
|
if err != nil {
|
||
|
return dst, fmt.Errorf("cannot unmarshal metaindexRow #%d from metaindex data: %s", len(dst)-dstLen, err)
|
||
|
}
|
||
|
data = tail
|
||
|
}
|
||
|
if dstLen == len(dst) {
|
||
|
return dst, fmt.Errorf("expecting non-zero metaindex rows; got zero")
|
||
|
}
|
||
|
|
||
|
// Make sure metaindex rows are sorted by tsid
|
||
|
tmp := dst[dstLen:]
|
||
|
ok := sort.SliceIsSorted(tmp, func(i, j int) bool { return tmp[i].TSID.Less(&tmp[j].TSID) })
|
||
|
if !ok {
|
||
|
return dst, fmt.Errorf("metaindexRow values must be sorted by TSID; got %+v", tmp)
|
||
|
}
|
||
|
|
||
|
return dst, nil
|
||
|
}
|