mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
7bb5f75a2a
- Move uniqueFields from rows to blockStreamMerger struct. This allows localizing all the references to uniqueFields inside blockStreamMerger.mustWriteBlock(), which should improve readability and maintainability of the code. - Remove logging of the event when blocks cannot be merged because they contain more than maxColumnsPerBlock, since the provided logging didn't provide the solution for the issue with too many columns. I couldn't figure out the proper solution, which could be helpful for end user, so decided to remove the logging until we find the solution. This commit also contains the following additional changes: - It truncates field names longer than 128 chars during logs ingestion. This should prevent from ingesting bogus field names. This also should prevent from too big columnsHeader blocks, which could negatively affect search query performance, since columnsHeader is read on every scan of the corresponding data block. - It limits the maximum length of const column value to 256. Longer values are stored in an ordinary columns. This helps limiting the size of columnsHeader blocks and improving search query performance by avoiding reading too long const columns on every scan of the corresponding data block. - It deduplicates columns with identical names during data ingestion and background merging. Previously it was possible to pass columns with duplicate names to block.mustInitFromRows(), and they were stored as is in the block. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4969
124 lines
2.8 KiB
Go
124 lines
2.8 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
)
|
|
|
|
// Field is a single field for the log entry.
|
|
type Field struct {
|
|
// Name is the name of the field
|
|
Name string
|
|
|
|
// Value is the value of the field
|
|
Value string
|
|
}
|
|
|
|
// Reset resets f for future re-use.
|
|
func (f *Field) Reset() {
|
|
f.Name = ""
|
|
f.Value = ""
|
|
}
|
|
|
|
// String returns string representation of f.
|
|
func (f *Field) String() string {
|
|
name := f.Name
|
|
if name == "" {
|
|
name = "_msg"
|
|
}
|
|
return fmt.Sprintf("%q:%q", name, f.Value)
|
|
}
|
|
|
|
func (f *Field) marshal(dst []byte) []byte {
|
|
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Name))
|
|
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Value))
|
|
return dst
|
|
}
|
|
|
|
func (f *Field) unmarshal(src []byte) ([]byte, error) {
|
|
srcOrig := src
|
|
|
|
// Unmarshal field name
|
|
tail, b, err := encoding.UnmarshalBytes(src)
|
|
if err != nil {
|
|
return srcOrig, fmt.Errorf("cannot unmarshal field name: %w", err)
|
|
}
|
|
// Do not use bytesutil.InternBytes(b) here, since it works slower than the string(b) in prod
|
|
f.Name = string(b)
|
|
src = tail
|
|
|
|
// Unmarshal field value
|
|
tail, b, err = encoding.UnmarshalBytes(src)
|
|
if err != nil {
|
|
return srcOrig, fmt.Errorf("cannot unmarshal field value: %w", err)
|
|
}
|
|
// Do not use bytesutil.InternBytes(b) here, since it works slower than the string(b) in prod
|
|
f.Value = string(b)
|
|
src = tail
|
|
|
|
return src, nil
|
|
}
|
|
|
|
// rows is an aux structure used during rows merge
|
|
type rows struct {
|
|
fieldsBuf []Field
|
|
|
|
timestamps []int64
|
|
|
|
rows [][]Field
|
|
}
|
|
|
|
// reset resets rs
|
|
func (rs *rows) reset() {
|
|
fb := rs.fieldsBuf
|
|
for i := range fb {
|
|
fb[i].Reset()
|
|
}
|
|
rs.fieldsBuf = fb[:0]
|
|
|
|
rs.timestamps = rs.timestamps[:0]
|
|
|
|
rows := rs.rows
|
|
for i := range rows {
|
|
rows[i] = nil
|
|
}
|
|
rs.rows = rows[:0]
|
|
}
|
|
|
|
// appendRows appends rows with the given timestamps to rs.
|
|
func (rs *rows) appendRows(timestamps []int64, rows [][]Field) {
|
|
rs.timestamps = append(rs.timestamps, timestamps...)
|
|
|
|
fieldsBuf := rs.fieldsBuf
|
|
for _, fields := range rows {
|
|
fieldsLen := len(fieldsBuf)
|
|
fieldsBuf = append(fieldsBuf, fields...)
|
|
rs.rows = append(rs.rows, fieldsBuf[fieldsLen:])
|
|
}
|
|
rs.fieldsBuf = fieldsBuf
|
|
}
|
|
|
|
// mergeRows merges the args and appends them to rs.
|
|
func (rs *rows) mergeRows(timestampsA, timestampsB []int64, fieldsA, fieldsB [][]Field) {
|
|
for len(timestampsA) > 0 && len(timestampsB) > 0 {
|
|
i := 0
|
|
minTimestamp := timestampsB[0]
|
|
for i < len(timestampsA) && timestampsA[i] <= minTimestamp {
|
|
i++
|
|
}
|
|
rs.appendRows(timestampsA[:i], fieldsA[:i])
|
|
fieldsA = fieldsA[i:]
|
|
timestampsA = timestampsA[i:]
|
|
|
|
fieldsA, fieldsB = fieldsB, fieldsA
|
|
timestampsA, timestampsB = timestampsB, timestampsA
|
|
}
|
|
if len(timestampsA) == 0 {
|
|
rs.appendRows(timestampsB, fieldsB)
|
|
} else {
|
|
rs.appendRows(timestampsA, fieldsA)
|
|
}
|
|
}
|