VictoriaMetrics/lib/logstorage/block.go
Aliaksandr Valialkin fe5f16b662
lib/logstorage: dynamically adjust the number of (bloom, values) shards in a part depending on the number of non-const columns
This allows reducing the amounts of data, which must be read during queries over logs with big number of fields (aka "wide events").
This, in turn, improves query performance when the data, which needs to be scanned during the query, doesn't fit OS page cache.

(cherry picked from commit 7a62eefa34)
2024-10-30 09:52:51 +01:00

659 lines
15 KiB
Go

package logstorage
import (
"fmt"
"sort"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// block represents a block of log entries.
type block struct {
// timestamps contains timestamps for log entries.
timestamps []int64
// columns contains values for fields seen in log entries.
columns []column
// constColumns contains fields with constant values across all the block entries.
constColumns []Field
}
func (b *block) reset() {
b.timestamps = b.timestamps[:0]
cs := b.columns
for i := range cs {
cs[i].reset()
}
b.columns = cs[:0]
ccs := b.constColumns
for i := range ccs {
ccs[i].Reset()
}
b.constColumns = ccs[:0]
}
// uncompressedSizeBytes returns the total size of the origianl log entries stored in b.
//
// It is supposed that every log entry has the following format:
//
// 2006-01-02T15:04:05.999999999Z07:00 field1=value1 ... fieldN=valueN
func (b *block) uncompressedSizeBytes() uint64 {
rowsCount := uint64(b.Len())
// Take into account timestamps
n := rowsCount * uint64(len(time.RFC3339Nano))
// Take into account columns
cs := b.columns
for i := range cs {
c := &cs[i]
nameLen := uint64(len(c.name))
if nameLen == 0 {
nameLen = uint64(len("_msg"))
}
for _, v := range c.values {
if len(v) > 0 {
n += nameLen + 2 + uint64(len(v))
}
}
}
// Take into account constColumns
ccs := b.constColumns
for i := range ccs {
cc := &ccs[i]
nameLen := uint64(len(cc.Name))
if nameLen == 0 {
nameLen = uint64(len("_msg"))
}
n += rowsCount * (2 + nameLen + uint64(len(cc.Value)))
}
return n
}
// uncompressedRowsSizeBytes returns the size of the uncompressed rows.
//
// It is supposed that every row has the following format:
//
// 2006-01-02T15:04:05.999999999Z07:00 field1=value1 ... fieldN=valueN
func uncompressedRowsSizeBytes(rows [][]Field) uint64 {
n := uint64(0)
for _, fields := range rows {
n += uncompressedRowSizeBytes(fields)
}
return n
}
// uncompressedRowSizeBytes returns the size of uncompressed row.
//
// It is supposed that the row has the following format:
//
// 2006-01-02T15:04:05.999999999Z07:00 field1=value1 ... fieldN=valueN
func uncompressedRowSizeBytes(fields []Field) uint64 {
n := uint64(len(time.RFC3339Nano)) // log timestamp
for _, f := range fields {
nameLen := len(f.Name)
if nameLen == 0 {
nameLen = len("_msg")
}
n += uint64(2 + nameLen + len(f.Value))
}
return n
}
// column contains values for the given field name seen in log entries.
type column struct {
// name is the field name
name string
// values is the values seen for the given log entries.
values []string
}
func (c *column) reset() {
c.name = ""
clear(c.values)
c.values = c.values[:0]
}
func (c *column) canStoreInConstColumn() bool {
values := c.values
if len(values) == 0 {
return true
}
value := values[0]
if len(value) > maxConstColumnValueSize {
return false
}
for _, v := range values[1:] {
if value != v {
return false
}
}
return true
}
func (c *column) resizeValues(valuesLen int) []string {
c.values = slicesutil.SetLength(c.values, valuesLen)
return c.values
}
// mustWriteTo writes c to sw and updates ch accordingly.
//
// ch is valid until c is changed.
func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) {
ch.reset()
ch.name = c.name
bloomValuesWriter := sw.getBloomValuesWriterForColumnName(ch.name)
// encode values
ve := getValuesEncoder()
ch.valueType, ch.minValue, ch.maxValue = ve.encode(c.values, &ch.valuesDict)
bb := longTermBufPool.Get()
defer longTermBufPool.Put(bb)
// marshal values
bb.B = marshalStringsBlock(bb.B[:0], ve.values)
putValuesEncoder(ve)
ch.valuesSize = uint64(len(bb.B))
if ch.valuesSize > maxValuesBlockSize {
logger.Panicf("BUG: too valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
}
ch.valuesOffset = bloomValuesWriter.values.bytesWritten
bloomValuesWriter.values.MustWrite(bb.B)
// create and marshal bloom filter for c.values
if ch.valueType != valueTypeDict {
hashesBuf := encoding.GetUint64s(0)
hashesBuf.A = tokenizeHashes(hashesBuf.A[:0], c.values)
bb.B = bloomFilterMarshalHashes(bb.B[:0], hashesBuf.A)
encoding.PutUint64s(hashesBuf)
} else {
// there is no need in ecoding bloom filter for dictionary type,
// since it isn't used during querying - all the dictionary values are available in ch.valuesDict
bb.B = bb.B[:0]
}
ch.bloomFilterSize = uint64(len(bb.B))
if ch.bloomFilterSize > maxBloomFilterBlockSize {
logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize)
}
ch.bloomFilterOffset = bloomValuesWriter.bloom.bytesWritten
bloomValuesWriter.bloom.MustWrite(bb.B)
}
func (b *block) assertValid() {
// Check that timestamps are in ascending order
timestamps := b.timestamps
for i := 1; i < len(timestamps); i++ {
if timestamps[i-1] > timestamps[i] {
logger.Panicf("BUG: log entries must be sorted by timestamp; got the previous entry with bigger timestamp %d than the current entry with timestamp %d",
timestamps[i-1], timestamps[i])
}
}
// Check that the number of items in each column matches the number of items in the block.
itemsCount := len(timestamps)
columns := b.columns
for _, c := range columns {
if len(c.values) != itemsCount {
logger.Panicf("BUG: unexpected number of values for column %q: got %d; want %d", c.name, len(c.values), itemsCount)
}
}
}
// MustInitFromRows initializes b from the given timestamps and rows.
//
// It is expected that timestamps are sorted.
//
// b is valid until rows are changed.
func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) {
b.reset()
assertTimestampsSorted(timestamps)
b.timestamps = append(b.timestamps, timestamps...)
b.mustInitFromRows(rows)
b.sortColumnsByName()
}
// mustInitiFromRows initializes b from rows.
//
// b is valid until rows are changed.
func (b *block) mustInitFromRows(rows [][]Field) {
rowsLen := len(rows)
if rowsLen == 0 {
// Nothing to do
return
}
if areSameFieldsInRows(rows) {
// Fast path - all the log entries have the same fields
fields := rows[0]
for i := range fields {
f := &fields[i]
if canStoreInConstColumn(rows, i) {
cc := b.extendConstColumns()
cc.Name = f.Name
cc.Value = f.Value
} else {
c := b.extendColumns()
c.name = f.Name
values := c.resizeValues(rowsLen)
for j := range rows {
values[j] = rows[j][i].Value
}
}
}
return
}
// Slow path - log entries contain different set of fields
// Determine indexes for columns
columnIdxs := getColumnIdxs()
for i := range rows {
fields := rows[i]
for j := range fields {
name := fields[j].Name
if _, ok := columnIdxs[name]; !ok {
columnIdxs[name] = len(columnIdxs)
}
}
}
// Initialize columns
cs := b.resizeColumns(len(columnIdxs))
for name, idx := range columnIdxs {
c := &cs[idx]
c.name = name
c.resizeValues(rowsLen)
}
// Write rows to block
for i := range rows {
for _, f := range rows[i] {
idx := columnIdxs[f.Name]
cs[idx].values[i] = f.Value
}
}
putColumnIdxs(columnIdxs)
// Detect const columns
for i := len(cs) - 1; i >= 0; i-- {
c := &cs[i]
if !c.canStoreInConstColumn() {
continue
}
cc := b.extendConstColumns()
cc.Name = c.name
cc.Value = c.values[0]
c.reset()
if i < len(cs)-1 {
swapColumns(c, &cs[len(cs)-1])
}
cs = cs[:len(cs)-1]
}
b.columns = cs
}
func swapColumns(a, b *column) {
*a, *b = *b, *a
}
func canStoreInConstColumn(rows [][]Field, colIdx int) bool {
if len(rows) == 0 {
return true
}
value := rows[0][colIdx].Value
if len(value) > maxConstColumnValueSize {
return false
}
rows = rows[1:]
for i := range rows {
if value != rows[i][colIdx].Value {
return false
}
}
return true
}
func assertTimestampsSorted(timestamps []int64) {
for i := range timestamps {
if i > 0 && timestamps[i-1] > timestamps[i] {
logger.Panicf("BUG: log entries must be sorted by timestamp; got the previous entry with bigger timestamp %d than the current entry with timestamp %d",
timestamps[i-1], timestamps[i])
}
}
}
func (b *block) extendConstColumns() *Field {
ccs := b.constColumns
if cap(ccs) > len(ccs) {
ccs = ccs[:len(ccs)+1]
} else {
ccs = append(ccs, Field{})
}
b.constColumns = ccs
return &ccs[len(ccs)-1]
}
func (b *block) extendColumns() *column {
cs := b.columns
if cap(cs) > len(cs) {
cs = cs[:len(cs)+1]
} else {
cs = append(cs, column{})
}
b.columns = cs
return &cs[len(cs)-1]
}
func (b *block) resizeColumns(columnsLen int) []column {
b.columns = slicesutil.SetLength(b.columns, columnsLen)
return b.columns
}
func (b *block) sortColumnsByName() {
if len(b.columns)+len(b.constColumns) > maxColumnsPerBlock {
logger.Panicf("BUG: too big number of columns detected in the block: %d; the number of columns mustn't exceed %d",
len(b.columns)+len(b.constColumns), maxColumnsPerBlock)
}
cs := getColumnsSorter()
cs.columns = b.columns
sort.Sort(cs)
putColumnsSorter(cs)
ccs := getConstColumnsSorter()
ccs.columns = b.constColumns
sort.Sort(ccs)
putConstColumnsSorter(ccs)
}
// Len returns the number of log entries in b.
func (b *block) Len() int {
return len(b.timestamps)
}
// InitFromBlockData unmarshals bd to b.
//
// sbu and vd are used as a temporary storage for unmarshaled column values.
//
// The b becomes outdated after sbu or vd is reset.
func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, vd *valuesDecoder) error {
b.reset()
if bd.rowsCount > maxRowsPerBlock {
return fmt.Errorf("too many entries found in the block: %d; mustn't exceed %d", bd.rowsCount, maxRowsPerBlock)
}
rowsCount := int(bd.rowsCount)
// unmarshal timestamps
td := &bd.timestampsData
var err error
b.timestamps, err = encoding.UnmarshalTimestamps(b.timestamps[:0], td.data, td.marshalType, td.minTimestamp, rowsCount)
if err != nil {
return fmt.Errorf("cannot unmarshal timestamps: %w", err)
}
// unmarshal columns
cds := bd.columnsData
cs := b.resizeColumns(len(cds))
for i := range cds {
cd := &cds[i]
c := &cs[i]
c.name = sbu.copyString(cd.name)
c.values, err = sbu.unmarshal(c.values[:0], cd.valuesData, uint64(rowsCount))
if err != nil {
return fmt.Errorf("cannot unmarshal column %d: %w", i, err)
}
if err = vd.decodeInplace(c.values, cd.valueType, cd.valuesDict.values); err != nil {
return fmt.Errorf("cannot decode column values: %w", err)
}
}
// unmarshal constColumns
b.constColumns = sbu.appendFields(b.constColumns[:0], bd.constColumns)
return nil
}
// mustWriteTo writes b with the given sid to sw and updates bh accordingly.
func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
b.assertValid()
bh.reset()
bh.streamID = *sid
bh.uncompressedSizeBytes = b.uncompressedSizeBytes()
bh.rowsCount = uint64(b.Len())
// Marshal timestamps
mustWriteTimestampsTo(&bh.timestampsHeader, b.timestamps, sw)
// Marshal columns
cs := b.columns
csh := getColumnsHeader()
chs := csh.resizeColumnHeaders(len(cs))
for i := range cs {
cs[i].mustWriteTo(&chs[i], sw)
}
csh.constColumns = append(csh.constColumns[:0], b.constColumns...)
csh.mustWriteTo(bh, sw, g)
putColumnsHeader(csh)
}
// appendRowsTo appends log entries from b to dst.
func (b *block) appendRowsTo(dst *rows) {
// copy timestamps
dst.timestamps = append(dst.timestamps, b.timestamps...)
// copy columns
fieldsBuf := dst.fieldsBuf
ccs := b.constColumns
cs := b.columns
for i := range b.timestamps {
fieldsLen := len(fieldsBuf)
// copy const columns
fieldsBuf = append(fieldsBuf, ccs...)
// copy other columns
for j := range cs {
c := &cs[j]
value := c.values[i]
if len(value) == 0 {
continue
}
fieldsBuf = append(fieldsBuf, Field{
Name: c.name,
Value: value,
})
}
dst.rows = append(dst.rows, fieldsBuf[fieldsLen:])
}
dst.fieldsBuf = fieldsBuf
}
func areSameFieldsInRows(rows [][]Field) bool {
if len(rows) < 2 {
return true
}
fields := rows[0]
// Verify that all the field names are unique
m := getFieldsSet()
for i := range fields {
f := &fields[i]
if _, ok := m[f.Name]; ok {
// Field name isn't unique
return false
}
m[f.Name] = struct{}{}
}
putFieldsSet(m)
// Verify that all the fields are the same across rows
rows = rows[1:]
for i := range rows {
leFields := rows[i]
if len(fields) != len(leFields) {
return false
}
for j := range leFields {
if leFields[j].Name != fields[j].Name {
return false
}
}
}
return true
}
func getFieldsSet() map[string]struct{} {
v := fieldsSetPool.Get()
if v == nil {
return make(map[string]struct{})
}
return v.(map[string]struct{})
}
func putFieldsSet(m map[string]struct{}) {
clear(m)
fieldsSetPool.Put(m)
}
var fieldsSetPool sync.Pool
var columnIdxsPool sync.Pool
func getColumnIdxs() map[string]int {
v := columnIdxsPool.Get()
if v == nil {
return make(map[string]int)
}
return v.(map[string]int)
}
func putColumnIdxs(m map[string]int) {
clear(m)
columnIdxsPool.Put(m)
}
func getBlock() *block {
v := blockPool.Get()
if v == nil {
return &block{}
}
return v.(*block)
}
func putBlock(b *block) {
b.reset()
blockPool.Put(b)
}
var blockPool sync.Pool
type columnsSorter struct {
columns []column
}
func (cs *columnsSorter) reset() {
cs.columns = nil
}
func (cs *columnsSorter) Len() int {
return len(cs.columns)
}
func (cs *columnsSorter) Less(i, j int) bool {
columns := cs.columns
return columns[i].name < columns[j].name
}
func (cs *columnsSorter) Swap(i, j int) {
columns := cs.columns
columns[i], columns[j] = columns[j], columns[i]
}
func getColumnsSorter() *columnsSorter {
v := columnsSorterPool.Get()
if v == nil {
return &columnsSorter{}
}
return v.(*columnsSorter)
}
func putColumnsSorter(cs *columnsSorter) {
cs.reset()
columnsSorterPool.Put(cs)
}
var columnsSorterPool sync.Pool
type constColumnsSorter struct {
columns []Field
}
func (ccs *constColumnsSorter) reset() {
ccs.columns = nil
}
func (ccs *constColumnsSorter) Len() int {
return len(ccs.columns)
}
func (ccs *constColumnsSorter) Less(i, j int) bool {
columns := ccs.columns
return columns[i].Name < columns[j].Name
}
func (ccs *constColumnsSorter) Swap(i, j int) {
columns := ccs.columns
columns[i], columns[j] = columns[j], columns[i]
}
func getConstColumnsSorter() *constColumnsSorter {
v := constColumnsSorterPool.Get()
if v == nil {
return &constColumnsSorter{}
}
return v.(*constColumnsSorter)
}
func putConstColumnsSorter(ccs *constColumnsSorter) {
ccs.reset()
constColumnsSorterPool.Put(ccs)
}
var constColumnsSorterPool sync.Pool
// mustWriteTimestampsTo writes timestamps to sw and updates th accordingly
func mustWriteTimestampsTo(th *timestampsHeader, timestamps []int64, sw *streamWriters) {
th.reset()
bb := longTermBufPool.Get()
bb.B, th.marshalType, th.minTimestamp = encoding.MarshalTimestamps(bb.B[:0], timestamps, 64)
if len(bb.B) > maxTimestampsBlockSize {
logger.Panicf("BUG: too big block with timestamps: %d bytes; the maximum supported size is %d bytes", len(bb.B), maxTimestampsBlockSize)
}
th.maxTimestamp = timestamps[len(timestamps)-1]
th.blockOffset = sw.timestampsWriter.bytesWritten
th.blockSize = uint64(len(bb.B))
sw.timestampsWriter.MustWrite(bb.B)
longTermBufPool.Put(bb)
}