lib/logstorage: consistently sort stream contexts belonging to different streams by the minimum time seen in the matching logs

This should simplify debugging of stream_context output, since it remains stable over repeated requests.
This commit is contained in:
Aliaksandr Valialkin 2024-09-27 11:15:43 +02:00
parent bce56d430d
commit bc0bb0c36a
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
3 changed files with 40 additions and 6 deletions

View File

@ -15,6 +15,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* BUGFIX: consistently return matching log streams sorted by time from [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). Previously log streams could be returned in arbitrary order with every request. This could complicate using `stream_context` pipe.
* BUGFIX: add missing `_msg="---"` delimiter between stream contexts belonging to different [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). This should simplify investigating `stream_context` output for multiple matching log streams.
## [v0.30.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.30.0-victorialogs)
Released at 2024-09-27

View File

@ -530,7 +530,10 @@ func (pcp *pipeStreamContextProcessor) flush() error {
pcp: pcp,
}
for streamID, rows := range m {
// write output contexts in the ascending order of rows
streamIDs := getStreamIDsSortedByMinRowTimestamp(m)
for _, streamID := range streamIDs {
rows := m[streamID]
streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget)
if err != nil {
return err
@ -557,6 +560,34 @@ func (pcp *pipeStreamContextProcessor) flush() error {
return nil
}
func getStreamIDsSortedByMinRowTimestamp(m map[string][]streamContextRow) []string {
type streamTimestamp struct {
streamID string
timestamp int64
}
streamTimestamps := make([]streamTimestamp, 0, len(m))
for streamID, rows := range m {
minTimestamp := rows[0].timestamp
for _, r := range rows[1:] {
if r.timestamp < minTimestamp {
minTimestamp = r.timestamp
}
}
streamTimestamps = append(streamTimestamps, streamTimestamp{
streamID: streamID,
timestamp: minTimestamp,
})
}
sort.Slice(streamTimestamps, func(i, j int) bool {
return streamTimestamps[i].timestamp < streamTimestamps[j].timestamp
})
streamIDs := make([]string, len(streamTimestamps))
for i := range streamIDs {
streamIDs[i] = streamTimestamps[i].streamID
}
return streamIDs
}
func newDelimiterRowFields(r *streamContextRow, streamID string) []Field {
return []Field{
{

View File

@ -662,7 +662,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context before 0
| stats count() rows`, [][]Field{
{
{"rows", "33"},
{"rows", "66"},
},
})
})
@ -671,7 +671,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context before 0 after 0
| stats count() rows`, [][]Field{
{
{"rows", "33"},
{"rows", "66"},
},
})
})
@ -680,7 +680,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context before 1
| stats count() rows`, [][]Field{
{
{"rows", "66"},
{"rows", "99"},
},
})
})
@ -689,7 +689,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context after 1
| stats count() rows`, [][]Field{
{
{"rows", "66"},
{"rows", "99"},
},
})
})
@ -698,7 +698,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context before 1 after 1
| stats count() rows`, [][]Field{
{
{"rows", "99"},
{"rows", "132"},
},
})
})