diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index b3191beb2b..56efffff8e 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* BUGFIX: consistently return matching log streams sorted by time from [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). Previously log streams could be returned in arbitrary order with every request. This could complicate using `stream_context` pipe. +* BUGFIX: add missing `_msg="---"` delimiter between stream contexts belonging to different [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). This should simplify investigating `stream_context` output for multiple matching log streams. + ## [v0.30.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.30.0-victorialogs) Released at 2024-09-27 diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index e9a6d790bd..06a86c39dc 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -530,7 +530,10 @@ func (pcp *pipeStreamContextProcessor) flush() error { pcp: pcp, } - for streamID, rows := range m { + // write output contexts in the ascending order of rows + streamIDs := getStreamIDsSortedByMinRowTimestamp(m) + for _, streamID := range streamIDs { + rows := m[streamID] streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget) if err != nil { return err @@ -557,6 +560,34 @@ func (pcp *pipeStreamContextProcessor) flush() error { return nil } +func getStreamIDsSortedByMinRowTimestamp(m map[string][]streamContextRow) []string { + type streamTimestamp struct { + streamID string + timestamp int64 + } + streamTimestamps := make([]streamTimestamp, 0, len(m)) + for streamID, rows := range m { + minTimestamp := rows[0].timestamp + for _, r := range rows[1:] { + if r.timestamp < minTimestamp { + minTimestamp = r.timestamp + } + } + streamTimestamps = append(streamTimestamps, streamTimestamp{ + streamID: streamID, + timestamp: minTimestamp, + }) + } + sort.Slice(streamTimestamps, func(i, j int) bool { + return streamTimestamps[i].timestamp < streamTimestamps[j].timestamp + }) + streamIDs := make([]string, len(streamTimestamps)) + for i := range streamIDs { + streamIDs[i] = streamTimestamps[i].streamID + } + return streamIDs +} + func newDelimiterRowFields(r *streamContextRow, streamID string) []Field { return []Field{ { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index fb7709fe9b..4fd7ed0ac1 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -662,7 +662,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 0 | stats count() rows`, [][]Field{ { - {"rows", "33"}, + {"rows", "66"}, }, }) }) @@ -671,7 +671,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 0 after 0 | stats count() rows`, [][]Field{ { - {"rows", "33"}, + {"rows", "66"}, }, }) }) @@ -680,7 +680,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 1 | stats count() rows`, [][]Field{ { - {"rows", "66"}, + {"rows", "99"}, }, }) }) @@ -689,7 +689,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context after 1 | stats count() rows`, [][]Field{ { - {"rows", "66"}, + {"rows", "99"}, }, }) }) @@ -698,7 +698,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 1 after 1 | stats count() rows`, [][]Field{ { - {"rows", "99"}, + {"rows", "132"}, }, }) })