mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-02 01:00:07 +01:00
8dce4eb189
- Move uniqueFields from rows to blockStreamMerger struct. This allows localizing all the references to uniqueFields inside blockStreamMerger.mustWriteBlock(), which should improve readability and maintainability of the code. - Remove logging of the event when blocks cannot be merged because they contain more than maxColumnsPerBlock, since the provided logging didn't provide the solution for the issue with too many columns. I couldn't figure out the proper solution, which could be helpful for end user, so decided to remove the logging until we find the solution. This commit also contains the following additional changes: - It truncates field names longer than 128 chars during logs ingestion. This should prevent from ingesting bogus field names. This also should prevent from too big columnsHeader blocks, which could negatively affect search query performance, since columnsHeader is read on every scan of the corresponding data block. - It limits the maximum length of const column value to 256. Longer values are stored in an ordinary columns. This helps limiting the size of columnsHeader blocks and improving search query performance by avoiding reading too long const columns on every scan of the corresponding data block. - It deduplicates columns with identical names during data ingestion and background merging. Previously it was possible to pass columns with duplicate names to block.mustInitFromRows(), and they were stored as is in the block. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4969
310 lines
8.5 KiB
Go
310 lines
8.5 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// mustMergeBlockStreams merges bsrs to bsw and updates ph accordingly.
|
|
//
|
|
// Finalize() is guaranteed to be called on bsrs and bsw before returning from the func.
|
|
func mustMergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}) {
|
|
bsm := getBlockStreamMerger()
|
|
bsm.mustInit(bsw, bsrs)
|
|
for len(bsm.readersHeap) > 0 {
|
|
if needStop(stopCh) {
|
|
break
|
|
}
|
|
bsr := bsm.readersHeap[0]
|
|
bsm.mustWriteBlock(&bsr.blockData, bsw)
|
|
if bsr.NextBlock() {
|
|
heap.Fix(&bsm.readersHeap, 0)
|
|
} else {
|
|
heap.Pop(&bsm.readersHeap)
|
|
}
|
|
}
|
|
bsm.mustFlushRows()
|
|
putBlockStreamMerger(bsm)
|
|
|
|
bsw.Finalize(ph)
|
|
mustCloseBlockStreamReaders(bsrs)
|
|
}
|
|
|
|
// blockStreamMerger merges block streams
|
|
type blockStreamMerger struct {
|
|
// bsw is the block stream writer to write the merged blocks.
|
|
bsw *blockStreamWriter
|
|
|
|
// bsrs contains the original readers passed to mustInit().
|
|
// They are used by ReadersPaths()
|
|
bsrs []*blockStreamReader
|
|
|
|
// readersHeap contains a heap of readers to read blocks to merge.
|
|
readersHeap blockStreamReadersHeap
|
|
|
|
// streamID is the stream ID for the pending data.
|
|
streamID streamID
|
|
|
|
// sbu is the unmarshaler for strings in rows and rowsTmp.
|
|
sbu *stringsBlockUnmarshaler
|
|
|
|
// vd is the decoder for unmarshaled strings.
|
|
vd *valuesDecoder
|
|
|
|
// bd is the pending blockData.
|
|
// bd is unpacked into rows when needed.
|
|
bd blockData
|
|
|
|
// rows is pending log entries.
|
|
rows rows
|
|
|
|
// rowsTmp is temporary storage for log entries during merge.
|
|
rowsTmp rows
|
|
|
|
// uncompressedRowsSizeBytes is the current size of uncompressed rows.
|
|
//
|
|
// It is used for flushing rows to blocks when their size reaches maxUncompressedBlockSize
|
|
uncompressedRowsSizeBytes uint64
|
|
|
|
// uniqueFields is an upper bound estimation for the number of unique fields in either rows or bd
|
|
//
|
|
// It is used for limiting the number of columns written per block
|
|
uniqueFields int
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) reset() {
|
|
bsm.bsw = nil
|
|
|
|
rhs := bsm.readersHeap
|
|
for i := range rhs {
|
|
rhs[i] = nil
|
|
}
|
|
bsm.readersHeap = rhs[:0]
|
|
|
|
bsm.streamID.reset()
|
|
bsm.resetRows()
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) resetRows() {
|
|
if bsm.sbu != nil {
|
|
putStringsBlockUnmarshaler(bsm.sbu)
|
|
bsm.sbu = nil
|
|
}
|
|
if bsm.vd != nil {
|
|
putValuesDecoder(bsm.vd)
|
|
bsm.vd = nil
|
|
}
|
|
bsm.bd.reset()
|
|
|
|
bsm.rows.reset()
|
|
bsm.rowsTmp.reset()
|
|
|
|
bsm.uncompressedRowsSizeBytes = 0
|
|
bsm.uniqueFields = 0
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStreamReader) {
|
|
bsm.reset()
|
|
|
|
bsm.bsw = bsw
|
|
bsm.bsrs = bsrs
|
|
|
|
rsh := bsm.readersHeap[:0]
|
|
for _, bsr := range bsrs {
|
|
if bsr.NextBlock() {
|
|
rsh = append(rsh, bsr)
|
|
}
|
|
}
|
|
bsm.readersHeap = rsh
|
|
heap.Init(&bsm.readersHeap)
|
|
}
|
|
|
|
// mustWriteBlock writes bd to bsm
|
|
func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWriter) {
|
|
bsm.checkNextBlock(bd)
|
|
uniqueFields := len(bd.columnsData) + len(bd.constColumns)
|
|
switch {
|
|
case !bd.streamID.equal(&bsm.streamID):
|
|
// The bd contains another streamID.
|
|
// Write the current log entries under the current streamID, then process the bd.
|
|
bsm.mustFlushRows()
|
|
bsm.streamID = bd.streamID
|
|
if bd.uncompressedSizeBytes >= maxUncompressedBlockSize {
|
|
// Fast path - write full bd to the output without extracting log entries from it.
|
|
bsw.MustWriteBlockData(bd)
|
|
} else {
|
|
// Slow path - copy the bd to the curr bd.
|
|
bsm.bd.copyFrom(bd)
|
|
bsm.uniqueFields = uniqueFields
|
|
}
|
|
case bsm.uniqueFields+uniqueFields >= maxColumnsPerBlock:
|
|
// Cannot merge bd with bsm.rows, because too many columns will be created.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
|
|
//
|
|
// Flush bsm.rows and copy the bd to the curr bd.
|
|
bsm.mustFlushRows()
|
|
if uniqueFields >= maxColumnsPerBlock {
|
|
bsw.MustWriteBlockData(bd)
|
|
} else {
|
|
bsm.bd.copyFrom(bd)
|
|
bsm.uniqueFields = uniqueFields
|
|
}
|
|
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
|
|
// The bd contains the same streamID and it is full,
|
|
// so it can be written next after the current log entries
|
|
// without the need to merge the bd with the current log entries.
|
|
// Write the current log entries and then the bd.
|
|
bsm.mustFlushRows()
|
|
bsw.MustWriteBlockData(bd)
|
|
default:
|
|
// The bd contains the same streamID and it isn't full,
|
|
// so it must be merged with the current log entries.
|
|
bsm.mustMergeRows(bd)
|
|
bsm.uniqueFields += uniqueFields
|
|
}
|
|
}
|
|
|
|
// checkNextBlock checks whether the bd can be written next after the current data.
|
|
func (bsm *blockStreamMerger) checkNextBlock(bd *blockData) {
|
|
if len(bsm.rows.timestamps) > 0 && bsm.bd.rowsCount > 0 {
|
|
logger.Panicf("BUG: bsm.bd must be empty when bsm.rows isn't empty! got %d log entries in bsm.bd", bsm.bd.rowsCount)
|
|
}
|
|
if bd.streamID.less(&bsm.streamID) {
|
|
logger.Panicf("FATAL: cannot merge %s: the streamID=%s for the next block is smaller than the streamID=%s for the current block",
|
|
bsm.ReadersPaths(), &bd.streamID, &bsm.streamID)
|
|
}
|
|
if !bd.streamID.equal(&bsm.streamID) {
|
|
return
|
|
}
|
|
// streamID at bd equals streamID at bsm. Check that minTimestamp in bd is bigger or equal to the minTimestmap at bsm.
|
|
if bd.rowsCount == 0 {
|
|
return
|
|
}
|
|
nextMinTimestamp := bd.timestampsData.minTimestamp
|
|
if len(bsm.rows.timestamps) == 0 {
|
|
if bsm.bd.rowsCount == 0 {
|
|
return
|
|
}
|
|
minTimestamp := bsm.bd.timestampsData.minTimestamp
|
|
if nextMinTimestamp < minTimestamp {
|
|
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for the current block",
|
|
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
|
|
}
|
|
return
|
|
}
|
|
minTimestamp := bsm.rows.timestamps[0]
|
|
if nextMinTimestamp < minTimestamp {
|
|
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for log entries for the current block",
|
|
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
|
|
}
|
|
}
|
|
|
|
// ReadersPaths returns paths for input blockStreamReaders
|
|
func (bsm *blockStreamMerger) ReadersPaths() string {
|
|
paths := make([]string, len(bsm.bsrs))
|
|
for i, bsr := range bsm.bsrs {
|
|
paths[i] = bsr.Path()
|
|
}
|
|
return fmt.Sprintf("[%s]", strings.Join(paths, ","))
|
|
}
|
|
|
|
// mustMergeRows merges the current log entries inside bsm with bd log entries.
|
|
func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
|
|
if bsm.bd.rowsCount > 0 {
|
|
// Unmarshal log entries from bsm.bd
|
|
bsm.mustUnmarshalRows(&bsm.bd)
|
|
bsm.bd.reset()
|
|
}
|
|
|
|
// Unmarshal log entries from bd
|
|
rowsLen := len(bsm.rows.timestamps)
|
|
bsm.mustUnmarshalRows(bd)
|
|
|
|
// Merge unmarshaled log entries
|
|
timestamps := bsm.rows.timestamps
|
|
rows := bsm.rows.rows
|
|
bsm.rowsTmp.mergeRows(timestamps[:rowsLen], timestamps[rowsLen:], rows[:rowsLen], rows[rowsLen:])
|
|
bsm.rows, bsm.rowsTmp = bsm.rowsTmp, bsm.rows
|
|
bsm.rowsTmp.reset()
|
|
|
|
if bsm.uncompressedRowsSizeBytes >= maxUncompressedBlockSize {
|
|
bsm.mustFlushRows()
|
|
}
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustUnmarshalRows(bd *blockData) {
|
|
rowsLen := len(bsm.rows.timestamps)
|
|
if bsm.sbu == nil {
|
|
bsm.sbu = getStringsBlockUnmarshaler()
|
|
}
|
|
if bsm.vd == nil {
|
|
bsm.vd = getValuesDecoder()
|
|
}
|
|
if err := bd.unmarshalRows(&bsm.rows, bsm.sbu, bsm.vd); err != nil {
|
|
logger.Panicf("FATAL: cannot merge %s: cannot unmarshal log entries from blockData: %s", bsm.ReadersPaths(), err)
|
|
}
|
|
bsm.uncompressedRowsSizeBytes += uncompressedRowsSizeBytes(bsm.rows.rows[rowsLen:])
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustFlushRows() {
|
|
if len(bsm.rows.timestamps) == 0 {
|
|
bsm.bsw.MustWriteBlockData(&bsm.bd)
|
|
} else {
|
|
bsm.bsw.MustWriteRows(&bsm.streamID, bsm.rows.timestamps, bsm.rows.rows)
|
|
}
|
|
bsm.resetRows()
|
|
}
|
|
|
|
func getBlockStreamMerger() *blockStreamMerger {
|
|
v := blockStreamMergerPool.Get()
|
|
if v == nil {
|
|
return &blockStreamMerger{}
|
|
}
|
|
return v.(*blockStreamMerger)
|
|
}
|
|
|
|
func putBlockStreamMerger(bsm *blockStreamMerger) {
|
|
bsm.reset()
|
|
blockStreamMergerPool.Put(bsm)
|
|
}
|
|
|
|
var blockStreamMergerPool sync.Pool
|
|
|
|
type blockStreamReadersHeap []*blockStreamReader
|
|
|
|
func (h *blockStreamReadersHeap) Len() int {
|
|
return len(*h)
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Less(i, j int) bool {
|
|
x := *h
|
|
a := &x[i].blockData
|
|
b := &x[j].blockData
|
|
if !a.streamID.equal(&b.streamID) {
|
|
return a.streamID.less(&b.streamID)
|
|
}
|
|
return a.timestampsData.minTimestamp < b.timestampsData.minTimestamp
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Swap(i, j int) {
|
|
x := *h
|
|
x[i], x[j] = x[j], x[i]
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Push(v interface{}) {
|
|
bsr := v.(*blockStreamReader)
|
|
*h = append(*h, bsr)
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Pop() interface{} {
|
|
x := *h
|
|
bsr := x[len(x)-1]
|
|
x[len(x)-1] = nil
|
|
*h = x[:len(x)-1]
|
|
return bsr
|
|
}
|