From e4e14697fa321f6ca1bec38dd6b29ab84cf49189 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 26 Sep 2024 22:22:21 +0200 Subject: [PATCH] lib/logstorage: improve performance for stream_context pipe over streams with big number of log entries Do not read timestamps for blocks, which cannot contain surrounding logs. This should improve peformance for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6730 . Also optimize min(_time) and max(_time) calculations a bit by avoiding conversion of timestamp to string when it isn't needed. This should improve performance for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070 . --- lib/logstorage/block_result.go | 47 +++++++++------ lib/logstorage/pipe_stream_context.go | 68 +++++++++++++++++----- lib/logstorage/pipe_stream_context_test.go | 68 +--------------------- lib/logstorage/stats_max.go | 10 +++- lib/logstorage/stats_min.go | 10 +++- lib/logstorage/stats_row_max.go | 10 +++- lib/logstorage/stats_row_min.go | 10 +++- 7 files changed, 118 insertions(+), 105 deletions(-) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 1d55f94118..d703c6c1c4 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -387,35 +387,46 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { br.bm = bm } -func (br *blockResult) getMinTimestamp() int64 { - if br.bm != nil && br.bm.bitsLen == br.rowsLen { - return br.bs.bsw.bh.timestampsHeader.minTimestamp +// intersectsTimeRange returns true if br timestamps intersect (minTimestamp .. maxTimestamp) time range. +func (br *blockResult) intersectsTimeRange(minTimestamp, maxTimestamp int64) bool { + return minTimestamp < br.getMaxTimestamp(minTimestamp) && maxTimestamp > br.getMinTimestamp(maxTimestamp) +} + +func (br *blockResult) getMinTimestamp(minTimestamp int64) int64 { + if br.bs != nil { + bh := &br.bs.bsw.bh + if bh.rowsCount == uint64(br.rowsLen) { + return min(minTimestamp, bh.timestampsHeader.minTimestamp) + } + if minTimestamp <= bh.timestampsHeader.minTimestamp { + return minTimestamp + } } + // Slow path - need to scan timestamps timestamps := br.getTimestamps() - if len(timestamps) == 0 { - return -1 << 63 - } - minTimestamp := timestamps[0] - for i := 1; i < len(timestamps); i++ { - if timestamps[i] < minTimestamp { - minTimestamp = timestamps[i] + for _, timestamp := range timestamps { + if timestamp < minTimestamp { + minTimestamp = timestamp } } return minTimestamp } -func (br *blockResult) getMaxTimestamp() int64 { - if br.bm != nil && br.bm.bitsLen == br.rowsLen { - return br.bs.bsw.bh.timestampsHeader.maxTimestamp +func (br *blockResult) getMaxTimestamp(maxTimestamp int64) int64 { + if br.bs != nil { + bh := &br.bs.bsw.bh + if bh.rowsCount == uint64(br.rowsLen) { + return max(maxTimestamp, bh.timestampsHeader.maxTimestamp) + } + if maxTimestamp >= bh.timestampsHeader.maxTimestamp { + return maxTimestamp + } } + // Slow path - need to scan timestamps timestamps := br.getTimestamps() - if len(timestamps) == 0 { - return (1 << 63) - 1 - } - maxTimestamp := timestamps[len(timestamps)-1] - for i := len(timestamps) - 2; i >= 0; i-- { + for i := len(timestamps) - 1; i >= 0; i-- { if timestamps[i] > maxTimestamp { maxTimestamp = timestamps[i] } diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index 9b4b9f5358..1f4c4017dd 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -35,6 +35,9 @@ func (pc *pipeStreamContext) String() string { if pc.linesAfter > 0 { s += fmt.Sprintf(" after %d", pc.linesAfter) } + if pc.linesBefore <= 0 && pc.linesAfter <= 0 { + s += " after 0" + } return s } @@ -163,21 +166,28 @@ func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRow if stateSize > stateSizeBudget { cancel() + return } - timestamps := br.getTimestamps() - for i, timestamp := range timestamps { + for i := range contextRows { if needStop(pcp.stopCh) { break } - for j := range contextRows { - if j > 0 && timestamp <= contextRows[j-1].neededTimestamp { + + if !contextRows[i].canUpdate(br) { + // Fast path - skip reading block timestamps for the given ctx. + continue + } + + timestamps := br.getTimestamps() + for j, timestamp := range timestamps { + if i > 0 && timestamp <= contextRows[i-1].neededTimestamp { continue } - if j+1 < len(contextRows) && timestamp >= contextRows[j+1].neededTimestamp { + if i+1 < len(contextRows) && timestamp >= contextRows[i+1].neededTimestamp { continue } - stateSize += contextRows[j].update(br, i, timestamp) + stateSize += contextRows[i].update(br, j, timestamp) } } } @@ -247,6 +257,42 @@ func (ctx *streamContextRows) getSortedRows() []*streamContextRow { return rows } +func (ctx *streamContextRows) canUpdate(br *blockResult) bool { + if ctx.linesBefore > 0 { + if len(ctx.rowsBefore) < ctx.linesBefore { + return true + } + minTimestamp := ctx.rowsBefore[0].timestamp - 1 + maxTimestamp := ctx.neededTimestamp + if br.intersectsTimeRange(minTimestamp, maxTimestamp) { + return true + } + } + + if ctx.linesAfter > 0 { + if len(ctx.rowsAfter) < ctx.linesAfter { + return true + } + minTimestamp := ctx.neededTimestamp + maxTimestamp := ctx.rowsAfter[0].timestamp + 1 + if br.intersectsTimeRange(minTimestamp, maxTimestamp) { + return true + } + } + + if ctx.linesBefore <= 0 && ctx.linesAfter <= 0 { + if len(ctx.rowsMatched) == 0 { + return true + } + timestamp := ctx.rowsMatched[0].timestamp + if br.intersectsTimeRange(timestamp-1, timestamp+1) { + return true + } + } + + return false +} + func (ctx *streamContextRows) update(br *blockResult, rowIdx int, rowTimestamp int64) int { if rowTimestamp < ctx.neededTimestamp { if ctx.linesBefore <= 0 { @@ -430,11 +476,6 @@ func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult if br.rowsLen == 0 { return } - if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 { - // Fast path - there is no need to fetch stream context. - pcp.ppNext.writeBlock(workerID, br) - return - } shard := &pcp.shards[workerID] @@ -456,11 +497,6 @@ func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult } func (pcp *pipeStreamContextProcessor) flush() error { - if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 { - // Fast path - nothing to do. - return nil - } - n := pcp.stateSizeBudget.Load() if n <= 0 { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20)) diff --git a/lib/logstorage/pipe_stream_context_test.go b/lib/logstorage/pipe_stream_context_test.go index cd63f41283..5fd8d110b7 100644 --- a/lib/logstorage/pipe_stream_context_test.go +++ b/lib/logstorage/pipe_stream_context_test.go @@ -12,6 +12,7 @@ func TestParsePipeStreamContextSuccess(t *testing.T) { f(`stream_context before 5`) f(`stream_context after 10`) + f(`stream_context after 0`) f(`stream_context before 10 after 20`) } @@ -30,73 +31,6 @@ func TestParsePipeStreamContextFailure(t *testing.T) { f(`stream_context after -4`) } -func TestPipeStreamContext(t *testing.T) { - f := func(pipeStr string, rows, rowsExpected [][]Field) { - t.Helper() - expectPipeResults(t, pipeStr, rows, rowsExpected) - } - - f("stream_context before 0", [][]Field{ - { - {"a", `2`}, - {"b", `3`}, - }, - { - {"a", "2"}, - {"b", "3"}, - }, - { - {"a", `2`}, - {"b", `54`}, - {"c", "d"}, - }, - }, [][]Field{ - { - {"a", `2`}, - {"b", `3`}, - }, - { - {"a", "2"}, - {"b", "3"}, - }, - { - {"a", `2`}, - {"b", `54`}, - {"c", "d"}, - }, - }) - - f("stream_context after 0", [][]Field{ - { - {"a", `2`}, - {"b", `3`}, - }, - { - {"a", "2"}, - {"b", "3"}, - }, - { - {"a", `2`}, - {"b", `54`}, - {"c", "d"}, - }, - }, [][]Field{ - { - {"a", `2`}, - {"b", `3`}, - }, - { - {"a", "2"}, - {"b", "3"}, - }, - { - {"a", `2`}, - {"b", `54`}, - {"c", "d"}, - }, - }) -} - func TestPipeStreamContextUpdateNeededFields(t *testing.T) { f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { t.Helper() diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index 22eb0cbeec..7075215fc8 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -85,7 +85,15 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu } if c.isTime { - maxTimestamp := br.getMaxTimestamp() + timestamp, ok := TryParseTimestampRFC3339Nano(smp.max) + if !ok { + timestamp = -1 << 63 + } + maxTimestamp := br.getMaxTimestamp(timestamp) + if maxTimestamp <= timestamp { + return + } + bb := bbPool.Get() bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp) smp.updateStateBytes(bb.B) diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 4be8d550e0..458fb1ee9d 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -87,7 +87,15 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu } if c.isTime { - minTimestamp := br.getMinTimestamp() + timestamp, ok := TryParseTimestampRFC3339Nano(smp.min) + if !ok { + timestamp = (1 << 63) - 1 + } + minTimestamp := br.getMinTimestamp(timestamp) + if minTimestamp >= timestamp { + return + } + bb := bbPool.Get() bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp) smp.updateStateBytes(bb.B) diff --git a/lib/logstorage/stats_row_max.go b/lib/logstorage/stats_row_max.go index 9982abebd9..7af6bd9f3f 100644 --- a/lib/logstorage/stats_row_max.go +++ b/lib/logstorage/stats_row_max.go @@ -60,7 +60,15 @@ func (smp *statsRowMaxProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } if c.isTime { - maxTimestamp := br.getMaxTimestamp() + timestamp, ok := TryParseTimestampRFC3339Nano(smp.max) + if !ok { + timestamp = -1 << 63 + } + maxTimestamp := br.getMaxTimestamp(timestamp) + if maxTimestamp <= timestamp { + return stateSizeIncrease + } + bb := bbPool.Get() bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp) v := bytesutil.ToUnsafeString(bb.B) diff --git a/lib/logstorage/stats_row_min.go b/lib/logstorage/stats_row_min.go index e0f724b792..051ddb57f5 100644 --- a/lib/logstorage/stats_row_min.go +++ b/lib/logstorage/stats_row_min.go @@ -60,7 +60,15 @@ func (smp *statsRowMinProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } if c.isTime { - minTimestamp := br.getMinTimestamp() + timestamp, ok := TryParseTimestampRFC3339Nano(smp.min) + if !ok { + timestamp = (1 << 63) - 1 + } + minTimestamp := br.getMinTimestamp(timestamp) + if minTimestamp >= timestamp { + return stateSizeIncrease + } + bb := bbPool.Get() bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp) v := bytesutil.ToUnsafeString(bb.B)