mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 16:42:27 +01:00
7bb5f75a2a
- Move uniqueFields from rows to blockStreamMerger struct. This allows localizing all the references to uniqueFields inside blockStreamMerger.mustWriteBlock(), which should improve readability and maintainability of the code. - Remove logging of the event when blocks cannot be merged because they contain more than maxColumnsPerBlock, since the provided logging didn't provide the solution for the issue with too many columns. I couldn't figure out the proper solution, which could be helpful for end user, so decided to remove the logging until we find the solution. This commit also contains the following additional changes: - It truncates field names longer than 128 chars during logs ingestion. This should prevent from ingesting bogus field names. This also should prevent from too big columnsHeader blocks, which could negatively affect search query performance, since columnsHeader is read on every scan of the corresponding data block. - It limits the maximum length of const column value to 256. Longer values are stored in an ordinary columns. This helps limiting the size of columnsHeader blocks and improving search query performance by avoiding reading too long const columns on every scan of the corresponding data block. - It deduplicates columns with identical names during data ingestion and background merging. Previously it was possible to pass columns with duplicate names to block.mustInitFromRows(), and they were stored as is in the block. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4969
312 lines
7.1 KiB
Go
312 lines
7.1 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
)
|
|
|
|
// LogRows holds a set of rows needed for Storage.MustAddRows
|
|
//
|
|
// LogRows must be obtained via GetLogRows()
|
|
type LogRows struct {
|
|
// buf holds all the bytes referred by items in LogRows
|
|
buf []byte
|
|
|
|
// fieldsBuf holds all the fields referred by items in LogRows
|
|
fieldsBuf []Field
|
|
|
|
// streamIDs holds streamIDs for rows added to LogRows
|
|
streamIDs []streamID
|
|
|
|
// streamTagsCanonicals holds streamTagsCanonical entries for rows added to LogRows
|
|
streamTagsCanonicals [][]byte
|
|
|
|
// timestamps holds stimestamps for rows added to LogRows
|
|
timestamps []int64
|
|
|
|
// rows holds fields for rows atted to LogRows.
|
|
rows [][]Field
|
|
|
|
// sf is a helper for sorting fields in every added row
|
|
sf sortedFields
|
|
|
|
// streamFields contains names for stream fields
|
|
streamFields map[string]struct{}
|
|
|
|
// ignoreFields contains names for log fields, which must be skipped during data ingestion
|
|
ignoreFields map[string]struct{}
|
|
}
|
|
|
|
type sortedFields []Field
|
|
|
|
func (sf *sortedFields) Len() int {
|
|
return len(*sf)
|
|
}
|
|
|
|
func (sf *sortedFields) Less(i, j int) bool {
|
|
a := *sf
|
|
return a[i].Name < a[j].Name
|
|
}
|
|
|
|
func (sf *sortedFields) Swap(i, j int) {
|
|
a := *sf
|
|
a[i], a[j] = a[j], a[i]
|
|
}
|
|
|
|
// RowFormatter implementes fmt.Stringer for []Field aka a single log row
|
|
type RowFormatter []Field
|
|
|
|
// String returns user-readable representation for rf
|
|
func (rf *RowFormatter) String() string {
|
|
b := append([]byte{}, '{')
|
|
|
|
fields := *rf
|
|
if len(fields) > 0 {
|
|
b = append(b, fields[0].String()...)
|
|
fields = fields[1:]
|
|
for _, field := range fields {
|
|
b = append(b, ',')
|
|
b = append(b, field.String()...)
|
|
}
|
|
}
|
|
|
|
b = append(b, '}')
|
|
return string(b)
|
|
}
|
|
|
|
// Reset resets lr with all its settings.
|
|
//
|
|
// Call ResetKeepSettings() for resetting lr without resetting its settings.
|
|
func (lr *LogRows) Reset() {
|
|
lr.ResetKeepSettings()
|
|
|
|
sfs := lr.streamFields
|
|
for k := range sfs {
|
|
delete(sfs, k)
|
|
}
|
|
|
|
ifs := lr.ignoreFields
|
|
for k := range ifs {
|
|
delete(ifs, k)
|
|
}
|
|
}
|
|
|
|
// ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows().
|
|
func (lr *LogRows) ResetKeepSettings() {
|
|
lr.buf = lr.buf[:0]
|
|
|
|
fb := lr.fieldsBuf
|
|
for i := range fb {
|
|
fb[i].Reset()
|
|
}
|
|
lr.fieldsBuf = fb[:0]
|
|
|
|
sids := lr.streamIDs
|
|
for i := range sids {
|
|
sids[i].reset()
|
|
}
|
|
lr.streamIDs = sids[:0]
|
|
|
|
sns := lr.streamTagsCanonicals
|
|
for i := range sns {
|
|
sns[i] = nil
|
|
}
|
|
lr.streamTagsCanonicals = sns[:0]
|
|
|
|
lr.timestamps = lr.timestamps[:0]
|
|
|
|
rows := lr.rows
|
|
for i := range rows {
|
|
rows[i] = nil
|
|
}
|
|
lr.rows = rows[:0]
|
|
|
|
lr.sf = nil
|
|
}
|
|
|
|
// NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
|
|
func (lr *LogRows) NeedFlush() bool {
|
|
return len(lr.buf) > (maxUncompressedBlockSize/8)*7
|
|
}
|
|
|
|
// MustAdd adds a log entry with the given args to lr.
|
|
//
|
|
// It is OK to modify the args after returning from the function,
|
|
// since lr copies all the args to internal data.
|
|
//
|
|
// field names longer than MaxFieldNameSize are automatically truncated to MaxFieldNameSize length.
|
|
func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field) {
|
|
// Compose StreamTags from fields according to lr.streamFields
|
|
sfs := lr.streamFields
|
|
st := GetStreamTags()
|
|
for i := range fields {
|
|
f := &fields[i]
|
|
if _, ok := sfs[f.Name]; ok {
|
|
st.Add(f.Name, f.Value)
|
|
}
|
|
}
|
|
|
|
// Marshal StreamTags
|
|
bb := bbPool.Get()
|
|
bb.B = st.MarshalCanonical(bb.B)
|
|
PutStreamTags(st)
|
|
|
|
// Calculate the id for the StreamTags
|
|
var sid streamID
|
|
sid.tenantID = tenantID
|
|
sid.id = hash128(bb.B)
|
|
|
|
// Store the row
|
|
lr.mustAddInternal(sid, timestamp, fields, bb.B)
|
|
bbPool.Put(bb)
|
|
}
|
|
|
|
func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field, streamTagsCanonical []byte) {
|
|
buf := lr.buf
|
|
bufLen := len(buf)
|
|
buf = append(buf, streamTagsCanonical...)
|
|
|
|
lr.streamTagsCanonicals = append(lr.streamTagsCanonicals, buf[bufLen:])
|
|
lr.streamIDs = append(lr.streamIDs, sid)
|
|
lr.timestamps = append(lr.timestamps, timestamp)
|
|
|
|
// Store all the fields
|
|
ifs := lr.ignoreFields
|
|
fb := lr.fieldsBuf
|
|
fieldsLen := len(fb)
|
|
for i := range fields {
|
|
f := &fields[i]
|
|
|
|
if _, ok := ifs[f.Name]; ok {
|
|
// Skip fields from the ifs map
|
|
continue
|
|
}
|
|
if f.Value == "" {
|
|
// Skip fields without values
|
|
continue
|
|
}
|
|
|
|
fb = append(fb, Field{})
|
|
dstField := &fb[len(fb)-1]
|
|
|
|
bufLen = len(buf)
|
|
fieldName := f.Name
|
|
if len(fieldName) > MaxFieldNameSize {
|
|
fieldName = fieldName[:MaxFieldNameSize]
|
|
}
|
|
if fieldName != "_msg" {
|
|
buf = append(buf, fieldName...)
|
|
}
|
|
dstField.Name = bytesutil.ToUnsafeString(buf[bufLen:])
|
|
|
|
bufLen = len(buf)
|
|
buf = append(buf, f.Value...)
|
|
dstField.Value = bytesutil.ToUnsafeString(buf[bufLen:])
|
|
}
|
|
lr.sf = fb[fieldsLen:]
|
|
sort.Sort(&lr.sf)
|
|
lr.rows = append(lr.rows, lr.sf)
|
|
|
|
lr.fieldsBuf = fb
|
|
lr.buf = buf
|
|
}
|
|
|
|
// GetRowString returns string representation of the row with the given idx.
|
|
func (lr *LogRows) GetRowString(idx int) string {
|
|
tf := TimeFormatter(lr.timestamps[idx])
|
|
streamTags := getStreamTagsString(lr.streamTagsCanonicals[idx])
|
|
var rf RowFormatter
|
|
rf = append(rf[:0], lr.rows[idx]...)
|
|
rf = append(rf, Field{
|
|
Name: "_time",
|
|
Value: tf.String(),
|
|
})
|
|
rf = append(rf, Field{
|
|
Name: "_stream",
|
|
Value: streamTags,
|
|
})
|
|
sort.Slice(rf, func(i, j int) bool {
|
|
return rf[i].Name < rf[j].Name
|
|
})
|
|
return rf.String()
|
|
}
|
|
|
|
// GetLogRows returns LogRows from the pool for the given streamFields.
|
|
//
|
|
// streamFields is a set of field names, which must be associated with the stream.
|
|
// ignoreFields is a set of field names, which must be ignored during data ingestion.
|
|
//
|
|
// Return back it to the pool with PutLogRows() when it is no longer needed.
|
|
func GetLogRows(streamFields, ignoreFields []string) *LogRows {
|
|
v := logRowsPool.Get()
|
|
if v == nil {
|
|
v = &LogRows{}
|
|
}
|
|
lr := v.(*LogRows)
|
|
|
|
// Initialize streamFields
|
|
sfs := lr.streamFields
|
|
if sfs == nil {
|
|
sfs = make(map[string]struct{}, len(streamFields))
|
|
lr.streamFields = sfs
|
|
}
|
|
for _, f := range streamFields {
|
|
sfs[f] = struct{}{}
|
|
}
|
|
|
|
// Initialize ignoreFields
|
|
ifs := lr.ignoreFields
|
|
if ifs == nil {
|
|
ifs = make(map[string]struct{}, len(ignoreFields))
|
|
lr.ignoreFields = ifs
|
|
}
|
|
for _, f := range ignoreFields {
|
|
if f != "" {
|
|
ifs[f] = struct{}{}
|
|
}
|
|
}
|
|
|
|
return lr
|
|
}
|
|
|
|
// PutLogRows returns lr to the pool.
|
|
func PutLogRows(lr *LogRows) {
|
|
lr.Reset()
|
|
logRowsPool.Put(lr)
|
|
}
|
|
|
|
var logRowsPool sync.Pool
|
|
|
|
// Len returns the number of items in lr.
|
|
func (lr *LogRows) Len() int {
|
|
return len(lr.streamIDs)
|
|
}
|
|
|
|
// Less returns true if (streamID, timestamp) for row i is smaller than the (streamID, timestamp) for row j
|
|
func (lr *LogRows) Less(i, j int) bool {
|
|
a := &lr.streamIDs[i]
|
|
b := &lr.streamIDs[j]
|
|
if !a.equal(b) {
|
|
return a.less(b)
|
|
}
|
|
return lr.timestamps[i] < lr.timestamps[j]
|
|
}
|
|
|
|
// Swap swaps rows i and j in lr.
|
|
func (lr *LogRows) Swap(i, j int) {
|
|
a := &lr.streamIDs[i]
|
|
b := &lr.streamIDs[j]
|
|
*a, *b = *b, *a
|
|
|
|
tsA, tsB := &lr.timestamps[i], &lr.timestamps[j]
|
|
*tsA, *tsB = *tsB, *tsA
|
|
|
|
snA, snB := &lr.streamTagsCanonicals[i], &lr.streamTagsCanonicals[j]
|
|
*snA, *snB = *snB, *snA
|
|
|
|
fieldsA, fieldsB := &lr.rows[i], &lr.rows[j]
|
|
*fieldsA, *fieldsB = *fieldsB, *fieldsA
|
|
}
|