package logstorage import ( "context" "fmt" "math" "sort" "strings" "sync" "sync/atomic" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) // pipeStreamContext processes '| stream_context ...' queries. // // See https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe type pipeStreamContext struct { // linesBefore is the number of lines to return before the matching line linesBefore int // linesAfter is the number of lines to return after the matching line linesAfter int } func (pc *pipeStreamContext) String() string { s := "stream_context" if pc.linesBefore > 0 { s += fmt.Sprintf(" before %d", pc.linesBefore) } if pc.linesAfter > 0 { s += fmt.Sprintf(" after %d", pc.linesAfter) } return s } func (pc *pipeStreamContext) canLiveTail() bool { return false } var neededFieldsForStreamContext = []string{ "_time", "_stream_id", } func (pc *pipeStreamContext) updateNeededFields(neededFields, unneededFields fieldsSet) { neededFields.addFields(neededFieldsForStreamContext) unneededFields.removeFields(neededFieldsForStreamContext) } func (pc *pipeStreamContext) optimize() { // nothing to do } func (pc *pipeStreamContext) hasFilterInWithQuery() bool { return false } func (pc *pipeStreamContext) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { return pc, nil } func (pc *pipeStreamContext) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { maxStateSize := int64(float64(memory.Allowed()) * 0.2) shards := make([]pipeStreamContextProcessorShard, workersCount) for i := range shards { shards[i] = pipeStreamContextProcessorShard{ pipeStreamContextProcessorShardNopad: pipeStreamContextProcessorShardNopad{ pc: pc, stateSizeBudget: stateSizeBudgetChunk, }, } maxStateSize -= stateSizeBudgetChunk } pcp := &pipeStreamContextProcessor{ pc: pc, stopCh: stopCh, cancel: cancel, ppNext: ppNext, shards: shards, maxStateSize: maxStateSize, } pcp.stateSizeBudget.Store(maxStateSize) return pcp } type pipeStreamContextProcessor struct { pc *pipeStreamContext stopCh <-chan struct{} cancel func() ppNext pipeProcessor shards []pipeStreamContextProcessorShard getStreamRows func(streamID string, stateSizeBudget int) ([]streamContextRow, error) maxStateSize int64 stateSizeBudget atomic.Int64 } func (pcp *pipeStreamContextProcessor) init(ctx context.Context, s *Storage, minTimestamp, maxTimestamp int64) { pcp.getStreamRows = func(streamID string, stateSizeBudget int) ([]streamContextRow, error) { return getStreamRows(ctx, s, streamID, minTimestamp, maxTimestamp, stateSizeBudget) } } func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestamp, maxTimestamp int64, stateSizeBudget int) ([]streamContextRow, error) { tenantID, ok := getTenantIDFromStreamIDString(streamID) if !ok { logger.Panicf("BUG: cannot obtain tenantID from streamID %q", streamID) } qStr := "_stream_id:" + streamID q, err := ParseQuery(qStr) if err != nil { logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err) } q.AddTimeFilter(minTimestamp, maxTimestamp) ctxWithCancel, cancel := context.WithCancel(ctx) defer cancel() var mu sync.Mutex var rows []streamContextRow stateSize := 0 writeBlock := func(_ uint, br *blockResult) { mu.Lock() defer mu.Unlock() if stateSize > stateSizeBudget { cancel() } cs := br.getColumns() for i, timestamp := range br.timestamps { fields := make([]Field, len(cs)) stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) for j, c := range cs { v := c.getValueAtRow(br, i) fields[j] = Field{ Name: strings.Clone(c.name), Value: strings.Clone(v), } stateSize += len(c.name) + len(v) } row := streamContextRow{ timestamp: timestamp, fields: fields, } stateSize += int(unsafe.Sizeof(row)) rows = append(rows, row) } } if err := s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil { return nil, err } if stateSize > stateSizeBudget { return nil, fmt.Errorf("more than %dMB of memory is needed for query [%s]", stateSizeBudget/(1<<20), q) } return rows, nil } func getTenantIDFromStreamIDString(s string) (TenantID, bool) { var sid streamID if !sid.tryUnmarshalFromString(s) { return TenantID{}, false } return sid.tenantID, true } type pipeStreamContextProcessorShard struct { pipeStreamContextProcessorShardNopad // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . _ [128 - unsafe.Sizeof(pipeStreamContextProcessorShardNopad{})%128]byte } type streamContextRow struct { timestamp int64 fields []Field } type pipeStreamContextProcessorShardNopad struct { // pc points to the parent pipeStreamContext. pc *pipeStreamContext // m holds per-stream matching rows m map[string][]streamContextRow // stateSizeBudget is the remaining budget for the whole state size for the shard. // The per-shard budget is provided in chunks from the parent pipeStreamContextProcessor. stateSizeBudget int } // writeBlock writes br to shard. func (shard *pipeStreamContextProcessorShard) writeBlock(br *blockResult) { m := shard.getM() cs := br.getColumns() cStreamID := br.getColumnByName("_stream_id") stateSize := 0 for i, timestamp := range br.timestamps { fields := make([]Field, len(cs)) stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) for j, c := range cs { v := c.getValueAtRow(br, i) fields[j] = Field{ Name: strings.Clone(c.name), Value: strings.Clone(v), } stateSize += len(c.name) + len(v) } row := streamContextRow{ timestamp: timestamp, fields: fields, } stateSize += int(unsafe.Sizeof(row)) streamID := cStreamID.getValueAtRow(br, i) rows, ok := m[streamID] if !ok { stateSize += len(streamID) } rows = append(rows, row) streamID = strings.Clone(streamID) m[streamID] = rows } shard.stateSizeBudget -= stateSize } func (shard *pipeStreamContextProcessorShard) getM() map[string][]streamContextRow { if shard.m == nil { shard.m = make(map[string][]streamContextRow) } return shard.m } func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult) { if len(br.timestamps) == 0 { return } if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 { // Fast path - there is no need to fetch stream context. pcp.ppNext.writeBlock(workerID, br) return } shard := &pcp.shards[workerID] for shard.stateSizeBudget < 0 { // steal some budget for the state size from the global budget. remaining := pcp.stateSizeBudget.Add(-stateSizeBudgetChunk) if remaining < 0 { // The state size is too big. Stop processing data in order to avoid OOM crash. if remaining+stateSizeBudgetChunk >= 0 { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. pcp.cancel() } return } shard.stateSizeBudget += stateSizeBudgetChunk } shard.writeBlock(br) } func (pcp *pipeStreamContextProcessor) flush() error { if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 { // Fast path - nothing to do. return nil } n := pcp.stateSizeBudget.Load() if n <= 0 { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20)) } if n > math.MaxInt { logger.Panicf("BUG: stateSizeBudget shouldn't exceed math.MaxInt=%v; got %d", math.MaxInt, n) } stateSizeBudget := int(n) // merge state across shards shards := pcp.shards m := shards[0].getM() shards = shards[1:] for i := range shards { if needStop(pcp.stopCh) { return nil } for streamID, rowsSrc := range shards[i].getM() { rows, ok := m[streamID] if !ok { m[streamID] = rowsSrc } else { m[streamID] = append(rows, rowsSrc...) } } } // write result wctx := &pipeStreamContextWriteContext{ pcp: pcp, } for streamID, rows := range m { streamRows, err := pcp.getStreamRows(streamID, stateSizeBudget) if err != nil { return fmt.Errorf("cannot read rows for _stream_id=%q: %w", streamID, err) } if needStop(pcp.stopCh) { return nil } resultRows, err := getStreamContextRows(streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter) if err != nil { return fmt.Errorf("cannot obtain context rows for _stream_id=%q: %w", streamID, err) } for _, rowFields := range resultRows { wctx.writeRow(rowFields) } } wctx.flush() return nil } func getStreamContextRows(streamRows, rows []streamContextRow, linesBefore, linesAfter int) ([][]Field, error) { sortStreamContextRows(streamRows) sortStreamContextRows(rows) var resultRows [][]Field idxNext := 0 for _, r := range rows { idx := getStreamContextRowIdx(streamRows, r.timestamp) if idx < 0 { // This error may happen when streamRows became out of sync with rows. // For example, when some streamRows were deleted after obtaining rows. return nil, fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d", r.timestamp, len(streamRows), len(rows)) } idxStart := idx - linesBefore if idxStart < idxNext { idxStart = idxNext } for idxStart < idx { resultRows = append(resultRows, streamRows[idxStart].fields) idxStart++ } if idx >= idxNext { resultRows = append(resultRows, streamRows[idx].fields) idxNext = idx + 1 } idxEnd := idx + 1 + linesAfter for idxNext < idxEnd && idxNext < len(streamRows) { resultRows = append(resultRows, streamRows[idxNext].fields) idxNext++ } if idxNext >= len(streamRows) { break } } return resultRows, nil } func getStreamContextRowIdx(rows []streamContextRow, timestamp int64) int { n := sort.Search(len(rows), func(i int) bool { return rows[i].timestamp >= timestamp }) if n == len(rows) { return -1 } if rows[n].timestamp != timestamp { return -1 } return n } func sortStreamContextRows(rows []streamContextRow) { sort.SliceStable(rows, func(i, j int) bool { return rows[i].timestamp < rows[j].timestamp }) } type pipeStreamContextWriteContext struct { pcp *pipeStreamContextProcessor rcs []resultColumn br blockResult // rowsCount is the number of rows in the current block rowsCount int // valuesLen is the total length of values in the current block valuesLen int } func (wctx *pipeStreamContextWriteContext) writeRow(rowFields []Field) { rcs := wctx.rcs areEqualColumns := len(rcs) == len(rowFields) if areEqualColumns { for i, f := range rowFields { if rcs[i].name != f.Name { areEqualColumns = false break } } } if !areEqualColumns { // send the current block to ppNext and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] for _, f := range rowFields { rcs = appendResultColumnWithName(rcs, f.Name) } wctx.rcs = rcs } for i, f := range rowFields { v := f.Value rcs[i].addValue(v) wctx.valuesLen += len(v) } wctx.rowsCount++ if wctx.valuesLen >= 1_000_000 { wctx.flush() } } func (wctx *pipeStreamContextWriteContext) flush() { rcs := wctx.rcs br := &wctx.br wctx.valuesLen = 0 // Flush rcs to ppNext br.setResultColumns(rcs, wctx.rowsCount) wctx.rowsCount = 0 wctx.pcp.ppNext.writeBlock(0, br) br.reset() for i := range rcs { rcs[i].resetValues() } } func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) { if !lex.isKeyword("stream_context") { return nil, fmt.Errorf("expecting 'stream_context'; got %q", lex.token) } lex.nextToken() linesBefore := 0 beforeSet := false if lex.isKeyword("before") { lex.nextToken() f, s, err := parseNumber(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err) } if f < 0 { return nil, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s) } linesBefore = int(f) beforeSet = true } linesAfter := 0 afterSet := false if lex.isKeyword("after") { lex.nextToken() f, s, err := parseNumber(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err) } if f < 0 { return nil, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s) } linesAfter = int(f) afterSet = true } if !beforeSet && !afterSet { return nil, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'") } pc := &pipeStreamContext{ linesBefore: linesBefore, linesAfter: linesAfter, } return pc, nil }