VictoriaMetrics/lib/logstorage/rows.go
Zakhar Bessarab 876bce5a57
lib/logstorage: prevent from panic during background merge (#4969)
* lib/logstorage: prevent from panic during background merge

Fixes panic during background merge when resulting block would contain more columns than maxColumnsPerBlock.
Buffered data will be flushed and replaced by the next block.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/logstorage: clarify field description and comment

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2023-10-02 19:29:31 +02:00

134 lines
3.2 KiB
Go

package logstorage
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
// Field is a single field for the log entry.
type Field struct {
// Name is the name of the field
Name string
// Value is the value of the field
Value string
}
// Reset resets f for future re-use.
func (f *Field) Reset() {
f.Name = ""
f.Value = ""
}
// String returns string representation of f.
func (f *Field) String() string {
name := f.Name
if name == "" {
name = "_msg"
}
return fmt.Sprintf("%q:%q", name, f.Value)
}
func (f *Field) marshal(dst []byte) []byte {
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Name))
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Value))
return dst
}
func (f *Field) unmarshal(src []byte) ([]byte, error) {
srcOrig := src
// Unmarshal field name
tail, b, err := encoding.UnmarshalBytes(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal field name: %w", err)
}
// Do not use bytesutil.InternBytes(b) here, since it works slower than the string(b) in prod
f.Name = string(b)
src = tail
// Unmarshal field value
tail, b, err = encoding.UnmarshalBytes(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal field value: %w", err)
}
// Do not use bytesutil.InternBytes(b) here, since it works slower than the string(b) in prod
f.Value = string(b)
src = tail
return src, nil
}
// rows is an aux structure used during rows merge
type rows struct {
fieldsBuf []Field
// uniqueFields is the maximum estimated number of unique fields which are currently stored in fieldsBuf.
// it is used to perform worst case estimation when merging rows.
uniqueFields int
timestamps []int64
rows [][]Field
}
// reset resets rs
func (rs *rows) reset() {
fb := rs.fieldsBuf
for i := range fb {
fb[i].Reset()
}
rs.fieldsBuf = fb[:0]
rs.timestamps = rs.timestamps[:0]
rows := rs.rows
for i := range rows {
rows[i] = nil
}
rs.rows = rows[:0]
}
// appendRows appends rows with the given timestamps to rs.
func (rs *rows) appendRows(timestamps []int64, rows [][]Field) {
rs.timestamps = append(rs.timestamps, timestamps...)
fieldsBuf := rs.fieldsBuf
for _, fields := range rows {
fieldsLen := len(fieldsBuf)
fieldsBuf = append(fieldsBuf, fields...)
rs.rows = append(rs.rows, fieldsBuf[fieldsLen:])
}
rs.fieldsBuf = fieldsBuf
}
// mergeRows merges the args and appends them to rs.
func (rs *rows) mergeRows(timestampsA, timestampsB []int64, fieldsA, fieldsB [][]Field) {
for len(timestampsA) > 0 && len(timestampsB) > 0 {
i := 0
minTimestamp := timestampsB[0]
for i < len(timestampsA) && timestampsA[i] <= minTimestamp {
i++
}
rs.appendRows(timestampsA[:i], fieldsA[:i])
fieldsA = fieldsA[i:]
timestampsA = timestampsA[i:]
fieldsA, fieldsB = fieldsB, fieldsA
timestampsA, timestampsB = timestampsB, timestampsA
}
if len(timestampsA) == 0 {
rs.appendRows(timestampsB, fieldsB)
} else {
rs.appendRows(timestampsA, fieldsA)
}
}
// hasCapacityFor returns true if merging bd with rs won't create too many columns
// for creating a new block.
func (rs *rows) hasCapacityFor(bd *blockData) bool {
return rs.uniqueFields+len(bd.columnsData)+len(bd.constColumns) < maxColumnsPerBlock
}