mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-21 07:56:26 +01:00
94627113db
* lib/logstorage: prevent from panic during background merge Fixes panic during background merge when resulting block would contain more columns than maxColumnsPerBlock. Buffered data will be flushed and replaced by the next block. See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/logstorage: clarify field description and comment Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
314 lines
8.8 KiB
Go
314 lines
8.8 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// mustMergeBlockStreams merges bsrs to bsw and updates ph accordingly.
|
|
//
|
|
// Finalize() is guaranteed to be called on bsrs and bsw before returning from the func.
|
|
func mustMergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}) {
|
|
bsm := getBlockStreamMerger()
|
|
bsm.mustInit(bsw, bsrs)
|
|
for len(bsm.readersHeap) > 0 {
|
|
if needStop(stopCh) {
|
|
break
|
|
}
|
|
bsr := bsm.readersHeap[0]
|
|
bsm.mustWriteBlock(&bsr.blockData, bsw)
|
|
if bsr.NextBlock() {
|
|
heap.Fix(&bsm.readersHeap, 0)
|
|
} else {
|
|
heap.Pop(&bsm.readersHeap)
|
|
}
|
|
}
|
|
bsm.mustFlushRows()
|
|
putBlockStreamMerger(bsm)
|
|
|
|
bsw.Finalize(ph)
|
|
mustCloseBlockStreamReaders(bsrs)
|
|
}
|
|
|
|
// blockStreamMerger merges block streams
|
|
type blockStreamMerger struct {
|
|
// bsw is the block stream writer to write the merged blocks.
|
|
bsw *blockStreamWriter
|
|
|
|
// bsrs contains the original readers passed to mustInit().
|
|
// They are used by ReadersPaths()
|
|
bsrs []*blockStreamReader
|
|
|
|
// readersHeap contains a heap of readers to read blocks to merge.
|
|
readersHeap blockStreamReadersHeap
|
|
|
|
// streamID is the stream ID for the pending data.
|
|
streamID streamID
|
|
|
|
// sbu is the unmarshaler for strings in rows and rowsTmp.
|
|
sbu *stringsBlockUnmarshaler
|
|
|
|
// vd is the decoder for unmarshaled strings.
|
|
vd *valuesDecoder
|
|
|
|
// bd is the pending blockData.
|
|
// bd is unpacked into rows when needed.
|
|
bd blockData
|
|
|
|
// rows is pending log entries.
|
|
rows rows
|
|
|
|
// rowsTmp is temporary storage for log entries during merge.
|
|
rowsTmp rows
|
|
|
|
// uncompressedRowsSizeBytes is the current size of uncompressed rows.
|
|
//
|
|
// It is used for flushing rows to blocks when their size reaches maxUncompressedBlockSize
|
|
uncompressedRowsSizeBytes uint64
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) reset() {
|
|
bsm.bsw = nil
|
|
|
|
rhs := bsm.readersHeap
|
|
for i := range rhs {
|
|
rhs[i] = nil
|
|
}
|
|
bsm.readersHeap = rhs[:0]
|
|
|
|
bsm.streamID.reset()
|
|
bsm.resetRows()
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) resetRows() {
|
|
if bsm.sbu != nil {
|
|
putStringsBlockUnmarshaler(bsm.sbu)
|
|
bsm.sbu = nil
|
|
}
|
|
if bsm.vd != nil {
|
|
putValuesDecoder(bsm.vd)
|
|
bsm.vd = nil
|
|
}
|
|
bsm.bd.reset()
|
|
|
|
bsm.rows.reset()
|
|
bsm.rowsTmp.reset()
|
|
|
|
bsm.uncompressedRowsSizeBytes = 0
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStreamReader) {
|
|
bsm.reset()
|
|
|
|
bsm.bsw = bsw
|
|
bsm.bsrs = bsrs
|
|
|
|
rsh := bsm.readersHeap[:0]
|
|
for _, bsr := range bsrs {
|
|
if bsr.NextBlock() {
|
|
rsh = append(rsh, bsr)
|
|
}
|
|
}
|
|
bsm.readersHeap = rsh
|
|
heap.Init(&bsm.readersHeap)
|
|
}
|
|
|
|
var mergeStreamsExceedLogger = logger.WithThrottler("mergeStreamsExceed", 10*time.Second)
|
|
|
|
func (bsm *blockStreamMerger) mergeStreamsLimitWarn(bd *blockData) {
|
|
attempted := bsm.rows.uniqueFields + len(bd.columnsData) + len(bd.constColumns)
|
|
mergeStreamsExceedLogger.Warnf("cannot perform background merge: too many columns for block after merge: %d, max columns: %d; "+
|
|
"check ingestion configuration; see: https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#troubleshooting", attempted, maxColumnsPerBlock)
|
|
}
|
|
|
|
// mustWriteBlock writes bd to bsm
|
|
func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWriter) {
|
|
bsm.checkNextBlock(bd)
|
|
switch {
|
|
case !bd.streamID.equal(&bsm.streamID):
|
|
// The bd contains another streamID.
|
|
// Write the current log entries under the current streamID, then process the bd.
|
|
bsm.mustFlushRows()
|
|
bsm.streamID = bd.streamID
|
|
if bd.uncompressedSizeBytes >= maxUncompressedBlockSize {
|
|
// Fast path - write full bd to the output without extracting log entries from it.
|
|
bsw.MustWriteBlockData(bd)
|
|
} else {
|
|
// Slow path - copy the bd to the curr bd.
|
|
bsm.bd.copyFrom(bd)
|
|
}
|
|
case !bsm.rows.hasCapacityFor(bd):
|
|
// Cannot merge bd with bsm.rows as too many columns will be created.
|
|
// Flush bsm.rows and write bd as is.
|
|
bsm.mergeStreamsLimitWarn(bd)
|
|
bsm.mustFlushRows()
|
|
bsw.MustWriteBlockData(bd)
|
|
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
|
|
// The bd contains the same streamID and it is full,
|
|
// so it can be written next after the current log entries
|
|
// without the need to merge the bd with the current log entries.
|
|
// Write the current log entries and then the bd.
|
|
bsm.mustFlushRows()
|
|
bsw.MustWriteBlockData(bd)
|
|
default:
|
|
// The bd contains the same streamID and it isn't full,
|
|
// so it must be merged with the current log entries.
|
|
bsm.mustMergeRows(bd)
|
|
}
|
|
}
|
|
|
|
// checkNextBlock checks whether the bd can be written next after the current data.
|
|
func (bsm *blockStreamMerger) checkNextBlock(bd *blockData) {
|
|
if len(bsm.rows.timestamps) > 0 && bsm.bd.rowsCount > 0 {
|
|
logger.Panicf("BUG: bsm.bd must be empty when bsm.rows isn't empty! got %d log entries in bsm.bd", bsm.bd.rowsCount)
|
|
}
|
|
if bd.streamID.less(&bsm.streamID) {
|
|
logger.Panicf("FATAL: cannot merge %s: the streamID=%s for the next block is smaller than the streamID=%s for the current block",
|
|
bsm.ReadersPaths(), &bd.streamID, &bsm.streamID)
|
|
}
|
|
if !bd.streamID.equal(&bsm.streamID) {
|
|
return
|
|
}
|
|
// streamID at bd equals streamID at bsm. Check that minTimestamp in bd is bigger or equal to the minTimestmap at bsm.
|
|
if bd.rowsCount == 0 {
|
|
return
|
|
}
|
|
nextMinTimestamp := bd.timestampsData.minTimestamp
|
|
if len(bsm.rows.timestamps) == 0 {
|
|
if bsm.bd.rowsCount == 0 {
|
|
return
|
|
}
|
|
minTimestamp := bsm.bd.timestampsData.minTimestamp
|
|
if nextMinTimestamp < minTimestamp {
|
|
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for the current block",
|
|
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
|
|
}
|
|
return
|
|
}
|
|
minTimestamp := bsm.rows.timestamps[0]
|
|
if nextMinTimestamp < minTimestamp {
|
|
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for log entries for the current block",
|
|
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
|
|
}
|
|
}
|
|
|
|
// ReadersPaths returns paths for input blockStreamReaders
|
|
func (bsm *blockStreamMerger) ReadersPaths() string {
|
|
paths := make([]string, len(bsm.bsrs))
|
|
for i, bsr := range bsm.bsrs {
|
|
paths[i] = bsr.Path()
|
|
}
|
|
return fmt.Sprintf("[%s]", strings.Join(paths, ","))
|
|
}
|
|
|
|
// mustMergeRows merges the current log entries inside bsm with bd log entries.
|
|
func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
|
|
if bsm.bd.rowsCount > 0 {
|
|
// Unmarshal log entries from bsm.bd
|
|
bsm.mustUnmarshalRows(&bsm.bd)
|
|
bsm.bd.reset()
|
|
}
|
|
|
|
if !bsm.rows.hasCapacityFor(bd) {
|
|
// Cannot merge bd with bsm.rows as too many columns will be created.
|
|
// Flush bsm.rows and write bd as is.
|
|
bsm.mergeStreamsLimitWarn(bd)
|
|
bsm.mustFlushRows()
|
|
bsm.bsw.MustWriteBlockData(bd)
|
|
return
|
|
}
|
|
|
|
// Unmarshal log entries from bd
|
|
rowsLen := len(bsm.rows.timestamps)
|
|
bsm.mustUnmarshalRows(bd)
|
|
|
|
// Merge unmarshaled log entries
|
|
timestamps := bsm.rows.timestamps
|
|
rows := bsm.rows.rows
|
|
bsm.rowsTmp.mergeRows(timestamps[:rowsLen], timestamps[rowsLen:], rows[:rowsLen], rows[rowsLen:])
|
|
bsm.rows, bsm.rowsTmp = bsm.rowsTmp, bsm.rows
|
|
bsm.rows.uniqueFields = bsm.rowsTmp.uniqueFields
|
|
bsm.rowsTmp.reset()
|
|
|
|
if bsm.uncompressedRowsSizeBytes >= maxUncompressedBlockSize {
|
|
bsm.mustFlushRows()
|
|
}
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustUnmarshalRows(bd *blockData) {
|
|
rowsLen := len(bsm.rows.timestamps)
|
|
if bsm.sbu == nil {
|
|
bsm.sbu = getStringsBlockUnmarshaler()
|
|
}
|
|
if bsm.vd == nil {
|
|
bsm.vd = getValuesDecoder()
|
|
}
|
|
if err := bd.unmarshalRows(&bsm.rows, bsm.sbu, bsm.vd); err != nil {
|
|
logger.Panicf("FATAL: cannot merge %s: cannot unmarshal log entries from blockData: %s", bsm.ReadersPaths(), err)
|
|
}
|
|
bsm.uncompressedRowsSizeBytes += uncompressedRowsSizeBytes(bsm.rows.rows[rowsLen:])
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustFlushRows() {
|
|
if len(bsm.rows.timestamps) == 0 {
|
|
bsm.bsw.MustWriteBlockData(&bsm.bd)
|
|
} else {
|
|
bsm.bsw.MustWriteRows(&bsm.streamID, bsm.rows.timestamps, bsm.rows.rows)
|
|
}
|
|
bsm.resetRows()
|
|
}
|
|
|
|
func getBlockStreamMerger() *blockStreamMerger {
|
|
v := blockStreamMergerPool.Get()
|
|
if v == nil {
|
|
return &blockStreamMerger{}
|
|
}
|
|
return v.(*blockStreamMerger)
|
|
}
|
|
|
|
func putBlockStreamMerger(bsm *blockStreamMerger) {
|
|
bsm.reset()
|
|
blockStreamMergerPool.Put(bsm)
|
|
}
|
|
|
|
var blockStreamMergerPool sync.Pool
|
|
|
|
type blockStreamReadersHeap []*blockStreamReader
|
|
|
|
func (h *blockStreamReadersHeap) Len() int {
|
|
return len(*h)
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Less(i, j int) bool {
|
|
x := *h
|
|
a := &x[i].blockData
|
|
b := &x[j].blockData
|
|
if !a.streamID.equal(&b.streamID) {
|
|
return a.streamID.less(&b.streamID)
|
|
}
|
|
return a.timestampsData.minTimestamp < b.timestampsData.minTimestamp
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Swap(i, j int) {
|
|
x := *h
|
|
x[i], x[j] = x[j], x[i]
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Push(v interface{}) {
|
|
bsr := v.(*blockStreamReader)
|
|
*h = append(*h, bsr)
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Pop() interface{} {
|
|
x := *h
|
|
bsr := x[len(x)-1]
|
|
x[len(x)-1] = nil
|
|
*h = x[:len(x)-1]
|
|
return bsr
|
|
}
|