mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-24 11:20:18 +01:00
512 lines
12 KiB
Go
512 lines
12 KiB
Go
|
package logstorage
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"math"
|
||
|
"sort"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"unsafe"
|
||
|
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||
|
)
|
||
|
|
||
|
// pipeStreamContext processes '| stream_context ...' queries.
|
||
|
//
|
||
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe
|
||
|
type pipeStreamContext struct {
|
||
|
// linesBefore is the number of lines to return before the matching line
|
||
|
linesBefore int
|
||
|
|
||
|
// linesAfter is the number of lines to return after the matching line
|
||
|
linesAfter int
|
||
|
}
|
||
|
|
||
|
func (pc *pipeStreamContext) String() string {
|
||
|
s := "stream_context"
|
||
|
if pc.linesBefore > 0 {
|
||
|
s += fmt.Sprintf(" before %d", pc.linesBefore)
|
||
|
}
|
||
|
if pc.linesAfter > 0 {
|
||
|
s += fmt.Sprintf(" after %d", pc.linesAfter)
|
||
|
}
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
func (pc *pipeStreamContext) canLiveTail() bool {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
var neededFieldsForStreamContext = []string{
|
||
|
"_time",
|
||
|
"_stream_id",
|
||
|
}
|
||
|
|
||
|
func (pc *pipeStreamContext) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||
|
neededFields.addFields(neededFieldsForStreamContext)
|
||
|
unneededFields.removeFields(neededFieldsForStreamContext)
|
||
|
}
|
||
|
|
||
|
func (pc *pipeStreamContext) optimize() {
|
||
|
// nothing to do
|
||
|
}
|
||
|
|
||
|
func (pc *pipeStreamContext) hasFilterInWithQuery() bool {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
func (pc *pipeStreamContext) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
|
||
|
return pc, nil
|
||
|
}
|
||
|
|
||
|
func (pc *pipeStreamContext) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
|
||
|
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
|
||
|
|
||
|
shards := make([]pipeStreamContextProcessorShard, workersCount)
|
||
|
for i := range shards {
|
||
|
shards[i] = pipeStreamContextProcessorShard{
|
||
|
pipeStreamContextProcessorShardNopad: pipeStreamContextProcessorShardNopad{
|
||
|
pc: pc,
|
||
|
stateSizeBudget: stateSizeBudgetChunk,
|
||
|
},
|
||
|
}
|
||
|
maxStateSize -= stateSizeBudgetChunk
|
||
|
}
|
||
|
|
||
|
pcp := &pipeStreamContextProcessor{
|
||
|
pc: pc,
|
||
|
stopCh: stopCh,
|
||
|
cancel: cancel,
|
||
|
ppNext: ppNext,
|
||
|
|
||
|
shards: shards,
|
||
|
|
||
|
maxStateSize: maxStateSize,
|
||
|
}
|
||
|
pcp.stateSizeBudget.Store(maxStateSize)
|
||
|
|
||
|
return pcp
|
||
|
}
|
||
|
|
||
|
type pipeStreamContextProcessor struct {
|
||
|
pc *pipeStreamContext
|
||
|
stopCh <-chan struct{}
|
||
|
cancel func()
|
||
|
ppNext pipeProcessor
|
||
|
|
||
|
shards []pipeStreamContextProcessorShard
|
||
|
|
||
|
getStreamRows func(streamID string, stateSizeBudget int) ([]streamContextRow, error)
|
||
|
|
||
|
maxStateSize int64
|
||
|
stateSizeBudget atomic.Int64
|
||
|
}
|
||
|
|
||
|
func (pcp *pipeStreamContextProcessor) init(ctx context.Context, s *Storage, minTimestamp, maxTimestamp int64) {
|
||
|
pcp.getStreamRows = func(streamID string, stateSizeBudget int) ([]streamContextRow, error) {
|
||
|
return getStreamRows(ctx, s, streamID, minTimestamp, maxTimestamp, stateSizeBudget)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestamp, maxTimestamp int64, stateSizeBudget int) ([]streamContextRow, error) {
|
||
|
tenantID, ok := getTenantIDFromStreamIDString(streamID)
|
||
|
if !ok {
|
||
|
logger.Panicf("BUG: cannot obtain tenantID from streamID %q", streamID)
|
||
|
}
|
||
|
|
||
|
qStr := "_stream_id:" + streamID
|
||
|
q, err := ParseQuery(qStr)
|
||
|
if err != nil {
|
||
|
logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err)
|
||
|
}
|
||
|
q.AddTimeFilter(minTimestamp, maxTimestamp)
|
||
|
|
||
|
ctxWithCancel, cancel := context.WithCancel(ctx)
|
||
|
defer cancel()
|
||
|
|
||
|
var mu sync.Mutex
|
||
|
var rows []streamContextRow
|
||
|
stateSize := 0
|
||
|
writeBlock := func(_ uint, br *blockResult) {
|
||
|
mu.Lock()
|
||
|
defer mu.Unlock()
|
||
|
|
||
|
if stateSize > stateSizeBudget {
|
||
|
cancel()
|
||
|
}
|
||
|
|
||
|
cs := br.getColumns()
|
||
|
for i, timestamp := range br.timestamps {
|
||
|
fields := make([]Field, len(cs))
|
||
|
stateSize += int(unsafe.Sizeof(fields[0])) * len(fields)
|
||
|
|
||
|
for j, c := range cs {
|
||
|
v := c.getValueAtRow(br, i)
|
||
|
fields[j] = Field{
|
||
|
Name: strings.Clone(c.name),
|
||
|
Value: strings.Clone(v),
|
||
|
}
|
||
|
stateSize += len(c.name) + len(v)
|
||
|
}
|
||
|
|
||
|
row := streamContextRow{
|
||
|
timestamp: timestamp,
|
||
|
fields: fields,
|
||
|
}
|
||
|
stateSize += int(unsafe.Sizeof(row))
|
||
|
rows = append(rows, row)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
if stateSize > stateSizeBudget {
|
||
|
return nil, fmt.Errorf("more than %dMB of memory is needed for query [%s]", stateSizeBudget/(1<<20), q)
|
||
|
}
|
||
|
|
||
|
return rows, nil
|
||
|
}
|
||
|
|
||
|
func getTenantIDFromStreamIDString(s string) (TenantID, bool) {
|
||
|
var sid streamID
|
||
|
if !sid.tryUnmarshalFromString(s) {
|
||
|
return TenantID{}, false
|
||
|
}
|
||
|
return sid.tenantID, true
|
||
|
}
|
||
|
|
||
|
type pipeStreamContextProcessorShard struct {
|
||
|
pipeStreamContextProcessorShardNopad
|
||
|
|
||
|
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
||
|
_ [128 - unsafe.Sizeof(pipeStreamContextProcessorShardNopad{})%128]byte
|
||
|
}
|
||
|
|
||
|
type streamContextRow struct {
|
||
|
timestamp int64
|
||
|
fields []Field
|
||
|
}
|
||
|
|
||
|
type pipeStreamContextProcessorShardNopad struct {
|
||
|
// pc points to the parent pipeStreamContext.
|
||
|
pc *pipeStreamContext
|
||
|
|
||
|
// m holds per-stream matching rows
|
||
|
m map[string][]streamContextRow
|
||
|
|
||
|
// stateSizeBudget is the remaining budget for the whole state size for the shard.
|
||
|
// The per-shard budget is provided in chunks from the parent pipeStreamContextProcessor.
|
||
|
stateSizeBudget int
|
||
|
}
|
||
|
|
||
|
// writeBlock writes br to shard.
|
||
|
func (shard *pipeStreamContextProcessorShard) writeBlock(br *blockResult) {
|
||
|
m := shard.getM()
|
||
|
|
||
|
cs := br.getColumns()
|
||
|
cStreamID := br.getColumnByName("_stream_id")
|
||
|
stateSize := 0
|
||
|
for i, timestamp := range br.timestamps {
|
||
|
fields := make([]Field, len(cs))
|
||
|
stateSize += int(unsafe.Sizeof(fields[0])) * len(fields)
|
||
|
|
||
|
for j, c := range cs {
|
||
|
v := c.getValueAtRow(br, i)
|
||
|
fields[j] = Field{
|
||
|
Name: strings.Clone(c.name),
|
||
|
Value: strings.Clone(v),
|
||
|
}
|
||
|
stateSize += len(c.name) + len(v)
|
||
|
}
|
||
|
|
||
|
row := streamContextRow{
|
||
|
timestamp: timestamp,
|
||
|
fields: fields,
|
||
|
}
|
||
|
stateSize += int(unsafe.Sizeof(row))
|
||
|
|
||
|
streamID := cStreamID.getValueAtRow(br, i)
|
||
|
rows, ok := m[streamID]
|
||
|
if !ok {
|
||
|
stateSize += len(streamID)
|
||
|
}
|
||
|
rows = append(rows, row)
|
||
|
streamID = strings.Clone(streamID)
|
||
|
m[streamID] = rows
|
||
|
}
|
||
|
|
||
|
shard.stateSizeBudget -= stateSize
|
||
|
}
|
||
|
|
||
|
func (shard *pipeStreamContextProcessorShard) getM() map[string][]streamContextRow {
|
||
|
if shard.m == nil {
|
||
|
shard.m = make(map[string][]streamContextRow)
|
||
|
}
|
||
|
return shard.m
|
||
|
}
|
||
|
|
||
|
func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult) {
|
||
|
if len(br.timestamps) == 0 {
|
||
|
return
|
||
|
}
|
||
|
if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 {
|
||
|
// Fast path - there is no need to fetch stream context.
|
||
|
pcp.ppNext.writeBlock(workerID, br)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
shard := &pcp.shards[workerID]
|
||
|
|
||
|
for shard.stateSizeBudget < 0 {
|
||
|
// steal some budget for the state size from the global budget.
|
||
|
remaining := pcp.stateSizeBudget.Add(-stateSizeBudgetChunk)
|
||
|
if remaining < 0 {
|
||
|
// The state size is too big. Stop processing data in order to avoid OOM crash.
|
||
|
if remaining+stateSizeBudgetChunk >= 0 {
|
||
|
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
|
||
|
pcp.cancel()
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
shard.stateSizeBudget += stateSizeBudgetChunk
|
||
|
}
|
||
|
|
||
|
shard.writeBlock(br)
|
||
|
}
|
||
|
|
||
|
func (pcp *pipeStreamContextProcessor) flush() error {
|
||
|
if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 {
|
||
|
// Fast path - nothing to do.
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
n := pcp.stateSizeBudget.Load()
|
||
|
if n <= 0 {
|
||
|
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20))
|
||
|
}
|
||
|
if n > math.MaxInt {
|
||
|
logger.Panicf("BUG: stateSizeBudget shouldn't exceed math.MaxInt=%v; got %d", math.MaxInt, n)
|
||
|
}
|
||
|
stateSizeBudget := int(n)
|
||
|
|
||
|
// merge state across shards
|
||
|
shards := pcp.shards
|
||
|
m := shards[0].getM()
|
||
|
shards = shards[1:]
|
||
|
for i := range shards {
|
||
|
if needStop(pcp.stopCh) {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
for streamID, rowsSrc := range shards[i].getM() {
|
||
|
rows, ok := m[streamID]
|
||
|
if !ok {
|
||
|
m[streamID] = rowsSrc
|
||
|
} else {
|
||
|
m[streamID] = append(rows, rowsSrc...)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// write result
|
||
|
wctx := &pipeStreamContextWriteContext{
|
||
|
pcp: pcp,
|
||
|
}
|
||
|
|
||
|
for streamID, rows := range m {
|
||
|
streamRows, err := pcp.getStreamRows(streamID, stateSizeBudget)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("cannot read rows for _stream_id=%q: %w", streamID, err)
|
||
|
}
|
||
|
if needStop(pcp.stopCh) {
|
||
|
return nil
|
||
|
}
|
||
|
resultRows, err := getStreamContextRows(streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("cannot obtain context rows for _stream_id=%q: %w", streamID, err)
|
||
|
}
|
||
|
for _, rowFields := range resultRows {
|
||
|
wctx.writeRow(rowFields)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
wctx.flush()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func getStreamContextRows(streamRows, rows []streamContextRow, linesBefore, linesAfter int) ([][]Field, error) {
|
||
|
sortStreamContextRows(streamRows)
|
||
|
sortStreamContextRows(rows)
|
||
|
|
||
|
var resultRows [][]Field
|
||
|
idxNext := 0
|
||
|
for _, r := range rows {
|
||
|
idx := getStreamContextRowIdx(streamRows, r.timestamp)
|
||
|
if idx < 0 {
|
||
|
// This error may happen when streamRows became out of sync with rows.
|
||
|
// For example, when some streamRows were deleted after obtaining rows.
|
||
|
return nil, fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d", r.timestamp, len(streamRows), len(rows))
|
||
|
}
|
||
|
|
||
|
idxStart := idx - linesBefore
|
||
|
if idxStart < idxNext {
|
||
|
idxStart = idxNext
|
||
|
}
|
||
|
for idxStart < idx {
|
||
|
resultRows = append(resultRows, streamRows[idxStart].fields)
|
||
|
idxStart++
|
||
|
}
|
||
|
|
||
|
if idx >= idxNext {
|
||
|
resultRows = append(resultRows, streamRows[idx].fields)
|
||
|
idxNext = idx + 1
|
||
|
}
|
||
|
|
||
|
idxEnd := idx + 1 + linesAfter
|
||
|
for idxNext < idxEnd && idxNext < len(streamRows) {
|
||
|
resultRows = append(resultRows, streamRows[idxNext].fields)
|
||
|
idxNext++
|
||
|
}
|
||
|
|
||
|
if idxNext >= len(streamRows) {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return resultRows, nil
|
||
|
}
|
||
|
|
||
|
func getStreamContextRowIdx(rows []streamContextRow, timestamp int64) int {
|
||
|
n := sort.Search(len(rows), func(i int) bool {
|
||
|
return rows[i].timestamp >= timestamp
|
||
|
})
|
||
|
if n == len(rows) {
|
||
|
return -1
|
||
|
}
|
||
|
if rows[n].timestamp != timestamp {
|
||
|
return -1
|
||
|
}
|
||
|
return n
|
||
|
}
|
||
|
|
||
|
func sortStreamContextRows(rows []streamContextRow) {
|
||
|
sort.SliceStable(rows, func(i, j int) bool {
|
||
|
return rows[i].timestamp < rows[j].timestamp
|
||
|
})
|
||
|
}
|
||
|
|
||
|
type pipeStreamContextWriteContext struct {
|
||
|
pcp *pipeStreamContextProcessor
|
||
|
rcs []resultColumn
|
||
|
br blockResult
|
||
|
|
||
|
// rowsCount is the number of rows in the current block
|
||
|
rowsCount int
|
||
|
|
||
|
// valuesLen is the total length of values in the current block
|
||
|
valuesLen int
|
||
|
}
|
||
|
|
||
|
func (wctx *pipeStreamContextWriteContext) writeRow(rowFields []Field) {
|
||
|
rcs := wctx.rcs
|
||
|
|
||
|
areEqualColumns := len(rcs) == len(rowFields)
|
||
|
if areEqualColumns {
|
||
|
for i, f := range rowFields {
|
||
|
if rcs[i].name != f.Name {
|
||
|
areEqualColumns = false
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if !areEqualColumns {
|
||
|
// send the current block to ppNext and construct a block with new set of columns
|
||
|
wctx.flush()
|
||
|
|
||
|
rcs = wctx.rcs[:0]
|
||
|
for _, f := range rowFields {
|
||
|
rcs = appendResultColumnWithName(rcs, f.Name)
|
||
|
}
|
||
|
wctx.rcs = rcs
|
||
|
}
|
||
|
|
||
|
for i, f := range rowFields {
|
||
|
v := f.Value
|
||
|
rcs[i].addValue(v)
|
||
|
wctx.valuesLen += len(v)
|
||
|
}
|
||
|
|
||
|
wctx.rowsCount++
|
||
|
if wctx.valuesLen >= 1_000_000 {
|
||
|
wctx.flush()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (wctx *pipeStreamContextWriteContext) flush() {
|
||
|
rcs := wctx.rcs
|
||
|
br := &wctx.br
|
||
|
|
||
|
wctx.valuesLen = 0
|
||
|
|
||
|
// Flush rcs to ppNext
|
||
|
br.setResultColumns(rcs, wctx.rowsCount)
|
||
|
wctx.rowsCount = 0
|
||
|
wctx.pcp.ppNext.writeBlock(0, br)
|
||
|
br.reset()
|
||
|
for i := range rcs {
|
||
|
rcs[i].resetValues()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) {
|
||
|
if !lex.isKeyword("stream_context") {
|
||
|
return nil, fmt.Errorf("expecting 'stream_context'; got %q", lex.token)
|
||
|
}
|
||
|
lex.nextToken()
|
||
|
|
||
|
linesBefore := 0
|
||
|
beforeSet := false
|
||
|
if lex.isKeyword("before") {
|
||
|
lex.nextToken()
|
||
|
f, s, err := parseNumber(lex)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err)
|
||
|
}
|
||
|
if f < 0 {
|
||
|
return nil, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s)
|
||
|
}
|
||
|
linesBefore = int(f)
|
||
|
beforeSet = true
|
||
|
}
|
||
|
|
||
|
linesAfter := 0
|
||
|
afterSet := false
|
||
|
if lex.isKeyword("after") {
|
||
|
lex.nextToken()
|
||
|
f, s, err := parseNumber(lex)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err)
|
||
|
}
|
||
|
if f < 0 {
|
||
|
return nil, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s)
|
||
|
}
|
||
|
linesAfter = int(f)
|
||
|
afterSet = true
|
||
|
}
|
||
|
|
||
|
if !beforeSet && !afterSet {
|
||
|
return nil, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'")
|
||
|
}
|
||
|
|
||
|
pc := &pipeStreamContext{
|
||
|
linesBefore: linesBefore,
|
||
|
linesAfter: linesAfter,
|
||
|
}
|
||
|
return pc, nil
|
||
|
}
|