VictoriaMetrics/lib/logstorage/pipe_stream_context.go
Aliaksandr Valialkin 4b1611267f
lib/logstorage: properly return surrounding logs outside the selected time range by stream_context pipe
Previously only logs inside the selected time range could be returned by stream_context pipe.
For example, the following query could return up to 10 surrounding logs only for the last 5 minutes,
while most users expect this query should return up to 10 surrounding logs without restrictions on the time range.

    _time:5m panic | stream_context before 10

This enables the ability to implement stream context feature at VictoriaLogs web UI: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7063 .

Reduce memory usage when returning stream context over big log streams with millions of entries.
The new logic scans over all the log messages for the selected log stream, while keeping in memory only
the given number of surrounding logs. Previously all the logs for the given log stream on the selected time range
were loaded in memory before selecting the needed surrounding logs.
This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6730 .

Reduce the scan performance for big log streams by fetching only the requested fields. For example, the following
query should be executed much faster than before if logs contain many fields other than _stream, _msg and _time:

    panic | stream_context after 30 | fields _stream, _msg, _time
2024-09-26 17:03:45 +02:00

713 lines
17 KiB
Go

package logstorage
import (
"container/heap"
"fmt"
"math"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/contextutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
// pipeStreamContext processes '| stream_context ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe
type pipeStreamContext struct {
// linesBefore is the number of lines to return before the matching line
linesBefore int
// linesAfter is the number of lines to return after the matching line
linesAfter int
}
func (pc *pipeStreamContext) String() string {
s := "stream_context"
if pc.linesBefore > 0 {
s += fmt.Sprintf(" before %d", pc.linesBefore)
}
if pc.linesAfter > 0 {
s += fmt.Sprintf(" after %d", pc.linesAfter)
}
return s
}
func (pc *pipeStreamContext) canLiveTail() bool {
return false
}
var neededFieldsForStreamContext = []string{
"_time",
"_stream_id",
}
func (pc *pipeStreamContext) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.addFields(neededFieldsForStreamContext)
unneededFields.removeFields(neededFieldsForStreamContext)
}
func (pc *pipeStreamContext) optimize() {
// nothing to do
}
func (pc *pipeStreamContext) hasFilterInWithQuery() bool {
return false
}
func (pc *pipeStreamContext) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pc, nil
}
func (pc *pipeStreamContext) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeStreamContextProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeStreamContextProcessorShard{
pipeStreamContextProcessorShardNopad: pipeStreamContextProcessorShardNopad{
pc: pc,
stateSizeBudget: stateSizeBudgetChunk,
},
}
maxStateSize -= stateSizeBudgetChunk
}
pcp := &pipeStreamContextProcessor{
pc: pc,
stopCh: stopCh,
cancel: cancel,
ppNext: ppNext,
shards: shards,
maxStateSize: maxStateSize,
}
pcp.stateSizeBudget.Store(maxStateSize)
return pcp
}
type pipeStreamContextProcessor struct {
pc *pipeStreamContext
stopCh <-chan struct{}
cancel func()
ppNext pipeProcessor
s *Storage
neededColumnNames []string
unneededColumnNames []string
shards []pipeStreamContextProcessorShard
maxStateSize int64
stateSizeBudget atomic.Int64
}
func (pcp *pipeStreamContextProcessor) init(s *Storage, neededColumnNames, unneededColumnNames []string) {
pcp.s = s
pcp.neededColumnNames = neededColumnNames
pcp.unneededColumnNames = unneededColumnNames
}
func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRows []streamContextRow, stateSizeBudget int) ([][]*streamContextRow, error) {
tenantID, ok := getTenantIDFromStreamIDString(streamID)
if !ok {
logger.Panicf("BUG: cannot obtain tenantID from streamID %q", streamID)
}
// construct the query for selecting all the rows for the given streamID
qStr := "_stream_id:" + streamID
if slices.Contains(pcp.neededColumnNames, "*") {
if len(pcp.unneededColumnNames) > 0 {
qStr += " | delete " + fieldNamesString(pcp.unneededColumnNames)
}
} else {
if len(pcp.neededColumnNames) > 0 {
qStr += " | fields " + fieldNamesString(pcp.neededColumnNames)
}
}
q, err := ParseQuery(qStr)
if err != nil {
logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err)
}
// mu protects contextRows and stateSize inside writeBlock callback.
var mu sync.Mutex
contextRows := make([]streamContextRows, len(neededRows))
for i := range neededRows {
contextRows[i] = streamContextRows{
neededTimestamp: neededRows[i].timestamp,
linesBefore: pcp.pc.linesBefore,
linesAfter: pcp.pc.linesAfter,
}
}
sort.Slice(contextRows, func(i, j int) bool {
return contextRows[i].neededTimestamp < contextRows[j].neededTimestamp
})
stateSize := 0
ctxWithCancel, cancel := contextutil.NewStopChanContext(pcp.stopCh)
defer cancel()
writeBlock := func(_ uint, br *blockResult) {
mu.Lock()
defer mu.Unlock()
if stateSize > stateSizeBudget {
cancel()
}
timestamps := br.getTimestamps()
for i, timestamp := range timestamps {
if needStop(pcp.stopCh) {
break
}
for j := range contextRows {
if j > 0 && timestamp <= contextRows[j-1].neededTimestamp {
continue
}
if j+1 < len(contextRows) && timestamp >= contextRows[j+1].neededTimestamp {
continue
}
stateSize += contextRows[j].update(br, i, timestamp)
}
}
}
if err := pcp.s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil {
return nil, err
}
if stateSize > stateSizeBudget {
return nil, fmt.Errorf("more than %dMB of memory is needed for fetching the surrounding logs for %d matching logs", stateSizeBudget/(1<<20), len(neededRows))
}
// return sorted results from contextRows
rowss := make([][]*streamContextRow, len(contextRows))
for i, ctx := range contextRows {
rowss[i] = ctx.getSortedRows()
}
rowss = deduplicateStreamRowss(rowss)
return rowss, nil
}
func deduplicateStreamRowss(streamRowss [][]*streamContextRow) [][]*streamContextRow {
var lastSeenRow *streamContextRow
for _, streamRows := range streamRowss {
if len(streamRows) > 0 {
lastSeenRow = streamRows[len(streamRows)-1]
break
}
}
if lastSeenRow == nil {
return nil
}
resultRowss := streamRowss[:1]
for _, streamRows := range streamRowss[1:] {
i := 0
for i < len(streamRows) && !lastSeenRow.less(streamRows[i]) {
i++
}
streamRows = streamRows[i:]
if len(streamRows) == 0 {
continue
}
resultRowss = append(resultRowss, streamRows)
lastSeenRow = streamRows[len(streamRows)-1]
}
return resultRowss
}
type streamContextRows struct {
neededTimestamp int64
linesBefore int
linesAfter int
rowsBefore streamContextRowsHeapMin
rowsAfter streamContextRowsHeapMax
rowsMatched []*streamContextRow
}
func (ctx *streamContextRows) getSortedRows() []*streamContextRow {
var rows []*streamContextRow
rows = append(rows, ctx.rowsBefore...)
rows = append(rows, ctx.rowsMatched...)
rows = append(rows, ctx.rowsAfter...)
sort.Slice(rows, func(i, j int) bool {
return rows[i].less(rows[j])
})
return rows
}
func (ctx *streamContextRows) update(br *blockResult, rowIdx int, rowTimestamp int64) int {
if rowTimestamp < ctx.neededTimestamp {
if ctx.linesBefore <= 0 {
return 0
}
if len(ctx.rowsBefore) < ctx.linesBefore {
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
heap.Push(&ctx.rowsBefore, r)
return r.sizeBytes()
}
if rowTimestamp <= ctx.rowsBefore[0].timestamp {
return 0
}
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
stateSizeChange := r.sizeBytes() - ctx.rowsBefore[0].sizeBytes()
ctx.rowsBefore[0] = r
heap.Fix(&ctx.rowsBefore, 0)
return stateSizeChange
}
if rowTimestamp > ctx.neededTimestamp {
if ctx.linesAfter <= 0 {
return 0
}
if len(ctx.rowsAfter) < ctx.linesAfter {
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
heap.Push(&ctx.rowsAfter, r)
return r.sizeBytes()
}
if rowTimestamp >= ctx.rowsAfter[0].timestamp {
return 0
}
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
stateSizeChange := r.sizeBytes() - ctx.rowsAfter[0].sizeBytes()
ctx.rowsAfter[0] = r
heap.Fix(&ctx.rowsAfter, 0)
return stateSizeChange
}
// rowTimestamp == ctx.neededTimestamp
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
ctx.rowsMatched = append(ctx.rowsMatched, r)
return r.sizeBytes()
}
func (ctx *streamContextRows) copyRowAtIdx(br *blockResult, rowIdx int, rowTimestamp int64) *streamContextRow {
cs := br.getColumns()
fields := make([]Field, len(cs))
for i, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields[i] = Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
}
}
return &streamContextRow{
timestamp: rowTimestamp,
fields: fields,
}
}
func getTenantIDFromStreamIDString(s string) (TenantID, bool) {
var sid streamID
if !sid.tryUnmarshalFromString(s) {
return TenantID{}, false
}
return sid.tenantID, true
}
type pipeStreamContextProcessorShard struct {
pipeStreamContextProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeStreamContextProcessorShardNopad{})%128]byte
}
type streamContextRow struct {
timestamp int64
fields []Field
}
func (r *streamContextRow) sizeBytes() int {
n := 0
fields := r.fields
for _, f := range fields {
n += len(f.Name) + len(f.Value) + int(unsafe.Sizeof(f))
}
n += int(unsafe.Sizeof(*r) + unsafe.Sizeof(r))
return n
}
func (r *streamContextRow) less(other *streamContextRow) bool {
// compare timestamps at first
if r.timestamp != other.timestamp {
return r.timestamp < other.timestamp
}
// compare fields then
i := 0
aFields := r.fields
bFields := other.fields
for i < len(aFields) && i < len(bFields) {
af := &aFields[i]
bf := &bFields[i]
if af.Name != bf.Name {
return af.Name < bf.Name
}
if af.Value != bf.Value {
return af.Value < bf.Value
}
i++
}
if len(aFields) != len(bFields) {
return len(aFields) < len(bFields)
}
return false
}
type pipeStreamContextProcessorShardNopad struct {
// pc points to the parent pipeStreamContext.
pc *pipeStreamContext
// m holds per-stream matching rows
m map[string][]streamContextRow
// stateSizeBudget is the remaining budget for the whole state size for the shard.
// The per-shard budget is provided in chunks from the parent pipeStreamContextProcessor.
stateSizeBudget int
}
// writeBlock writes br to shard.
func (shard *pipeStreamContextProcessorShard) writeBlock(br *blockResult) {
m := shard.getM()
cs := br.getColumns()
cStreamID := br.getColumnByName("_stream_id")
stateSize := 0
timestamps := br.getTimestamps()
for i, timestamp := range timestamps {
fields := make([]Field, len(cs))
stateSize += int(unsafe.Sizeof(fields[0])) * len(fields)
for j, c := range cs {
v := c.getValueAtRow(br, i)
fields[j] = Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
}
stateSize += len(c.name) + len(v)
}
row := streamContextRow{
timestamp: timestamp,
fields: fields,
}
stateSize += int(unsafe.Sizeof(row))
streamID := cStreamID.getValueAtRow(br, i)
rows, ok := m[streamID]
if !ok {
stateSize += len(streamID)
}
rows = append(rows, row)
streamID = strings.Clone(streamID)
m[streamID] = rows
}
shard.stateSizeBudget -= stateSize
}
func (shard *pipeStreamContextProcessorShard) getM() map[string][]streamContextRow {
if shard.m == nil {
shard.m = make(map[string][]streamContextRow)
}
return shard.m
}
func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 {
// Fast path - there is no need to fetch stream context.
pcp.ppNext.writeBlock(workerID, br)
return
}
shard := &pcp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := pcp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
pcp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
shard.writeBlock(br)
}
func (pcp *pipeStreamContextProcessor) flush() error {
if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 {
// Fast path - nothing to do.
return nil
}
n := pcp.stateSizeBudget.Load()
if n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20))
}
if n > math.MaxInt {
logger.Panicf("BUG: stateSizeBudget shouldn't exceed math.MaxInt=%v; got %d", math.MaxInt, n)
}
stateSizeBudget := int(n)
// merge state across shards
shards := pcp.shards
m := shards[0].getM()
shards = shards[1:]
for i := range shards {
if needStop(pcp.stopCh) {
return nil
}
for streamID, rowsSrc := range shards[i].getM() {
rows, ok := m[streamID]
if !ok {
m[streamID] = rowsSrc
} else {
m[streamID] = append(rows, rowsSrc...)
}
}
}
// write result
wctx := &pipeStreamContextWriteContext{
pcp: pcp,
}
for streamID, rows := range m {
streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget)
if err != nil {
return err
}
if needStop(pcp.stopCh) {
return nil
}
// Write streamRows to the output.
for _, streamRows := range streamRowss {
for _, streamRow := range streamRows {
wctx.writeRow(streamRow.fields)
}
if len(streamRowss) > 1 {
lastRow := streamRows[len(streamRows)-1]
fields := newDelimiterRowFields(lastRow, streamID)
wctx.writeRow(fields)
}
}
}
wctx.flush()
return nil
}
func newDelimiterRowFields(r *streamContextRow, streamID string) []Field {
return []Field{
{
Name: "_time",
Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)),
},
{
Name: "_stream_id",
Value: streamID,
},
{
Name: "_stream",
Value: getFieldValue(r.fields, "_stream"),
},
{
Name: "_msg",
Value: "---",
},
}
}
type pipeStreamContextWriteContext struct {
pcp *pipeStreamContextProcessor
rcs []resultColumn
br blockResult
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
func (wctx *pipeStreamContextWriteContext) writeRow(rowFields []Field) {
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(rowFields)
if areEqualColumns {
for i, f := range rowFields {
if rcs[i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to ppNext and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, f := range rowFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
for i, f := range rowFields {
v := f.Value
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeStreamContextWriteContext) flush() {
rcs := wctx.rcs
br := &wctx.br
wctx.valuesLen = 0
// Flush rcs to ppNext
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.pcp.ppNext.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}
func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) {
if !lex.isKeyword("stream_context") {
return nil, fmt.Errorf("expecting 'stream_context'; got %q", lex.token)
}
lex.nextToken()
linesBefore, linesAfter, err := parsePipeStreamContextBeforeAfter(lex)
if err != nil {
return nil, err
}
pc := &pipeStreamContext{
linesBefore: linesBefore,
linesAfter: linesAfter,
}
return pc, nil
}
func parsePipeStreamContextBeforeAfter(lex *lexer) (int, int, error) {
linesBefore := 0
linesAfter := 0
beforeSet := false
afterSet := false
for {
switch {
case lex.isKeyword("before"):
lex.nextToken()
f, s, err := parseNumber(lex)
if err != nil {
return 0, 0, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err)
}
if f < 0 {
return 0, 0, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s)
}
linesBefore = int(f)
beforeSet = true
case lex.isKeyword("after"):
lex.nextToken()
f, s, err := parseNumber(lex)
if err != nil {
return 0, 0, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err)
}
if f < 0 {
return 0, 0, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s)
}
linesAfter = int(f)
afterSet = true
default:
if !beforeSet && !afterSet {
return 0, 0, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'")
}
return linesBefore, linesAfter, nil
}
}
}
type streamContextRowsHeapMax []*streamContextRow
func (h *streamContextRowsHeapMax) Len() int {
return len(*h)
}
func (h *streamContextRowsHeapMax) Less(i, j int) bool {
a := *h
return a[i].timestamp > a[j].timestamp
}
func (h *streamContextRowsHeapMax) Swap(i, j int) {
a := *h
a[i], a[j] = a[j], a[i]
}
func (h *streamContextRowsHeapMax) Push(v any) {
x := v.(*streamContextRow)
*h = append(*h, x)
}
func (h *streamContextRowsHeapMax) Pop() any {
a := *h
x := a[len(a)-1]
a[len(a)-1] = nil
*h = a[:len(a)-1]
return x
}
type streamContextRowsHeapMin streamContextRowsHeapMax
func (h *streamContextRowsHeapMin) Len() int {
return len(*h)
}
func (h *streamContextRowsHeapMin) Less(i, j int) bool {
a := *h
return a[i].timestamp < a[j].timestamp
}
func (h *streamContextRowsHeapMin) Swap(i, j int) {
a := *h
a[i], a[j] = a[j], a[i]
}
func (h *streamContextRowsHeapMin) Push(v any) {
x := v.(*streamContextRow)
*h = append(*h, x)
}
func (h *streamContextRowsHeapMin) Pop() any {
a := *h
x := a[len(a)-1]
a[len(a)-1] = nil
*h = a[:len(a)-1]
return x
}