mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
9ba6be4179
This will allow storing wide events with up to 2K fields per event into VictoriaLogs.
While at it, remove the misleading comment that columnsHeader is read in full per each matching block.
This is no longer the case after the improvements made at 202eb429a7
.
Now only the needed columnHeader is read for the column mentioned in the query.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6425#issuecomment-2418337124
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
317 lines
8.6 KiB
Go
317 lines
8.6 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// mustMergeBlockStreams merges bsrs to bsw and updates ph accordingly.
|
|
//
|
|
// Finalize() is guaranteed to be called on bsrs and bsw before returning from the func.
|
|
func mustMergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}) {
|
|
bsm := getBlockStreamMerger()
|
|
bsm.mustInit(bsw, bsrs)
|
|
for len(bsm.readersHeap) > 0 {
|
|
if needStop(stopCh) {
|
|
break
|
|
}
|
|
bsr := bsm.readersHeap[0]
|
|
bsm.mustWriteBlock(&bsr.blockData, bsw)
|
|
if bsr.NextBlock() {
|
|
heap.Fix(&bsm.readersHeap, 0)
|
|
} else {
|
|
heap.Pop(&bsm.readersHeap)
|
|
}
|
|
}
|
|
bsm.mustFlushRows()
|
|
putBlockStreamMerger(bsm)
|
|
|
|
bsw.Finalize(ph)
|
|
mustCloseBlockStreamReaders(bsrs)
|
|
}
|
|
|
|
// blockStreamMerger merges block streams
|
|
type blockStreamMerger struct {
|
|
// bsw is the block stream writer to write the merged blocks.
|
|
bsw *blockStreamWriter
|
|
|
|
// bsrs contains the original readers passed to mustInit().
|
|
// They are used by ReadersPaths()
|
|
bsrs []*blockStreamReader
|
|
|
|
// readersHeap contains a heap of readers to read blocks to merge.
|
|
readersHeap blockStreamReadersHeap
|
|
|
|
// streamID is the stream ID for the pending data.
|
|
streamID streamID
|
|
|
|
// sbu is the unmarshaler for strings in rows and rowsTmp.
|
|
sbu *stringsBlockUnmarshaler
|
|
|
|
// vd is the decoder for unmarshaled strings.
|
|
vd *valuesDecoder
|
|
|
|
// bd is the pending blockData.
|
|
// bd is unpacked into rows when needed.
|
|
bd blockData
|
|
|
|
// a holds bd data.
|
|
a arena
|
|
|
|
// rows is pending log entries.
|
|
rows rows
|
|
|
|
// rowsTmp is temporary storage for log entries during merge.
|
|
rowsTmp rows
|
|
|
|
// uncompressedRowsSizeBytes is the current size of uncompressed rows.
|
|
//
|
|
// It is used for flushing rows to blocks when their size reaches maxUncompressedBlockSize
|
|
uncompressedRowsSizeBytes uint64
|
|
|
|
// uniqueFields is an upper bound estimation for the number of unique fields in either rows or bd
|
|
//
|
|
// It is used for limiting the number of columns written per block
|
|
uniqueFields int
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) reset() {
|
|
bsm.bsw = nil
|
|
|
|
rhs := bsm.readersHeap
|
|
for i := range rhs {
|
|
rhs[i] = nil
|
|
}
|
|
bsm.readersHeap = rhs[:0]
|
|
|
|
bsm.streamID.reset()
|
|
bsm.resetRows()
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) resetRows() {
|
|
if bsm.sbu != nil {
|
|
putStringsBlockUnmarshaler(bsm.sbu)
|
|
bsm.sbu = nil
|
|
}
|
|
if bsm.vd != nil {
|
|
putValuesDecoder(bsm.vd)
|
|
bsm.vd = nil
|
|
}
|
|
bsm.bd.reset()
|
|
bsm.a.reset()
|
|
|
|
bsm.rows.reset()
|
|
bsm.rowsTmp.reset()
|
|
|
|
bsm.uncompressedRowsSizeBytes = 0
|
|
bsm.uniqueFields = 0
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStreamReader) {
|
|
bsm.reset()
|
|
|
|
bsm.bsw = bsw
|
|
bsm.bsrs = bsrs
|
|
|
|
rsh := bsm.readersHeap[:0]
|
|
for _, bsr := range bsrs {
|
|
if bsr.NextBlock() {
|
|
rsh = append(rsh, bsr)
|
|
}
|
|
}
|
|
bsm.readersHeap = rsh
|
|
heap.Init(&bsm.readersHeap)
|
|
}
|
|
|
|
// mustWriteBlock writes bd to bsm
|
|
func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWriter) {
|
|
bsm.checkNextBlock(bd)
|
|
uniqueFields := len(bd.columnsData) + len(bd.constColumns)
|
|
switch {
|
|
case !bd.streamID.equal(&bsm.streamID):
|
|
// The bd contains another streamID.
|
|
// Write the current log entries under the current streamID, then process the bd.
|
|
bsm.mustFlushRows()
|
|
bsm.streamID = bd.streamID
|
|
if bd.uncompressedSizeBytes >= maxUncompressedBlockSize {
|
|
// Fast path - write full bd to the output without extracting log entries from it.
|
|
bsw.MustWriteBlockData(bd)
|
|
} else {
|
|
// Slow path - copy the bd to the curr bd.
|
|
bsm.a.reset()
|
|
bsm.bd.copyFrom(&bsm.a, bd)
|
|
bsm.uniqueFields = uniqueFields
|
|
}
|
|
case bsm.uniqueFields+uniqueFields > maxColumnsPerBlock:
|
|
// Cannot merge bd with bsm.rows, because too many columns will be created.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
|
|
//
|
|
// Flush bsm.rows and copy the bd to the curr bd.
|
|
bsm.mustFlushRows()
|
|
if uniqueFields >= maxColumnsPerBlock {
|
|
bsw.MustWriteBlockData(bd)
|
|
} else {
|
|
bsm.a.reset()
|
|
bsm.bd.copyFrom(&bsm.a, bd)
|
|
bsm.uniqueFields = uniqueFields
|
|
}
|
|
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
|
|
// The bd contains the same streamID and it is full,
|
|
// so it can be written next after the current log entries
|
|
// without the need to merge the bd with the current log entries.
|
|
// Write the current log entries and then the bd.
|
|
bsm.mustFlushRows()
|
|
bsw.MustWriteBlockData(bd)
|
|
default:
|
|
// The bd contains the same streamID and it isn't full,
|
|
// so it must be merged with the current log entries.
|
|
bsm.mustMergeRows(bd)
|
|
bsm.uniqueFields += uniqueFields
|
|
}
|
|
}
|
|
|
|
// checkNextBlock checks whether the bd can be written next after the current data.
|
|
func (bsm *blockStreamMerger) checkNextBlock(bd *blockData) {
|
|
if len(bsm.rows.timestamps) > 0 && bsm.bd.rowsCount > 0 {
|
|
logger.Panicf("BUG: bsm.bd must be empty when bsm.rows isn't empty! got %d log entries in bsm.bd", bsm.bd.rowsCount)
|
|
}
|
|
if bd.streamID.less(&bsm.streamID) {
|
|
logger.Panicf("FATAL: cannot merge %s: the streamID=%s for the next block is smaller than the streamID=%s for the current block",
|
|
bsm.ReadersPaths(), &bd.streamID, &bsm.streamID)
|
|
}
|
|
if !bd.streamID.equal(&bsm.streamID) {
|
|
return
|
|
}
|
|
// streamID at bd equals streamID at bsm. Check that minTimestamp in bd is bigger or equal to the minTimestmap at bsm.
|
|
if bd.rowsCount == 0 {
|
|
return
|
|
}
|
|
nextMinTimestamp := bd.timestampsData.minTimestamp
|
|
if len(bsm.rows.timestamps) == 0 {
|
|
if bsm.bd.rowsCount == 0 {
|
|
return
|
|
}
|
|
minTimestamp := bsm.bd.timestampsData.minTimestamp
|
|
if nextMinTimestamp < minTimestamp {
|
|
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for the current block",
|
|
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
|
|
}
|
|
return
|
|
}
|
|
minTimestamp := bsm.rows.timestamps[0]
|
|
if nextMinTimestamp < minTimestamp {
|
|
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for log entries for the current block",
|
|
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
|
|
}
|
|
}
|
|
|
|
// ReadersPaths returns paths for input blockStreamReaders
|
|
func (bsm *blockStreamMerger) ReadersPaths() string {
|
|
paths := make([]string, len(bsm.bsrs))
|
|
for i, bsr := range bsm.bsrs {
|
|
paths[i] = bsr.Path()
|
|
}
|
|
return fmt.Sprintf("[%s]", strings.Join(paths, ","))
|
|
}
|
|
|
|
// mustMergeRows merges the current log entries inside bsm with bd log entries.
|
|
func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
|
|
if bsm.bd.rowsCount > 0 {
|
|
// Unmarshal log entries from bsm.bd
|
|
bsm.mustUnmarshalRows(&bsm.bd)
|
|
bsm.bd.reset()
|
|
bsm.a.reset()
|
|
}
|
|
|
|
// Unmarshal log entries from bd
|
|
rowsLen := len(bsm.rows.timestamps)
|
|
bsm.mustUnmarshalRows(bd)
|
|
|
|
// Merge unmarshaled log entries
|
|
timestamps := bsm.rows.timestamps
|
|
rows := bsm.rows.rows
|
|
bsm.rowsTmp.mergeRows(timestamps[:rowsLen], timestamps[rowsLen:], rows[:rowsLen], rows[rowsLen:])
|
|
bsm.rows, bsm.rowsTmp = bsm.rowsTmp, bsm.rows
|
|
bsm.rowsTmp.reset()
|
|
|
|
if bsm.uncompressedRowsSizeBytes >= maxUncompressedBlockSize {
|
|
bsm.mustFlushRows()
|
|
}
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustUnmarshalRows(bd *blockData) {
|
|
rowsLen := len(bsm.rows.timestamps)
|
|
if bsm.sbu == nil {
|
|
bsm.sbu = getStringsBlockUnmarshaler()
|
|
}
|
|
if bsm.vd == nil {
|
|
bsm.vd = getValuesDecoder()
|
|
}
|
|
if err := bd.unmarshalRows(&bsm.rows, bsm.sbu, bsm.vd); err != nil {
|
|
logger.Panicf("FATAL: cannot merge %s: cannot unmarshal log entries from blockData: %s", bsm.ReadersPaths(), err)
|
|
}
|
|
bsm.uncompressedRowsSizeBytes += uncompressedRowsSizeBytes(bsm.rows.rows[rowsLen:])
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustFlushRows() {
|
|
if len(bsm.rows.timestamps) == 0 {
|
|
bsm.bsw.MustWriteBlockData(&bsm.bd)
|
|
} else {
|
|
bsm.bsw.MustWriteRows(&bsm.streamID, bsm.rows.timestamps, bsm.rows.rows)
|
|
}
|
|
bsm.resetRows()
|
|
}
|
|
|
|
func getBlockStreamMerger() *blockStreamMerger {
|
|
v := blockStreamMergerPool.Get()
|
|
if v == nil {
|
|
return &blockStreamMerger{}
|
|
}
|
|
return v.(*blockStreamMerger)
|
|
}
|
|
|
|
func putBlockStreamMerger(bsm *blockStreamMerger) {
|
|
bsm.reset()
|
|
blockStreamMergerPool.Put(bsm)
|
|
}
|
|
|
|
var blockStreamMergerPool sync.Pool
|
|
|
|
type blockStreamReadersHeap []*blockStreamReader
|
|
|
|
func (h *blockStreamReadersHeap) Len() int {
|
|
return len(*h)
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Less(i, j int) bool {
|
|
x := *h
|
|
a := &x[i].blockData
|
|
b := &x[j].blockData
|
|
if !a.streamID.equal(&b.streamID) {
|
|
return a.streamID.less(&b.streamID)
|
|
}
|
|
return a.timestampsData.minTimestamp < b.timestampsData.minTimestamp
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Swap(i, j int) {
|
|
x := *h
|
|
x[i], x[j] = x[j], x[i]
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Push(v any) {
|
|
bsr := v.(*blockStreamReader)
|
|
*h = append(*h, bsr)
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Pop() any {
|
|
x := *h
|
|
bsr := x[len(x)-1]
|
|
x[len(x)-1] = nil
|
|
*h = x[:len(x)-1]
|
|
return bsr
|
|
}
|