diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 15ddc8dfe2..b67eb570a6 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) @@ -346,6 +347,10 @@ func ProcessLiveTailRequest(ctx context.Context, w http.ResponseWriter, r *http. end := time.Now().UnixNano() doneCh := ctxWithCancel.Done() + flusher, ok := w.(http.Flusher) + if !ok { + logger.Panicf("BUG: it is expected that http.ResponseWriter (%T) supports http.Flusher interface", w) + } for { start := end - tailOffsetNsecs end = time.Now().UnixNano() @@ -361,7 +366,10 @@ func ProcessLiveTailRequest(ctx context.Context, w http.ResponseWriter, r *http. httpserver.Errorf(w, r, "cannot get tail results for query [%q]: %s", q, err) return } - WriteJSONRows(w, resultRows) + if len(resultRows) > 0 { + WriteJSONRows(w, resultRows) + flusher.Flush() + } select { case <-doneCh: @@ -418,16 +426,12 @@ func (tp *tailProcessor) writeBlock(_ uint, timestamps []int64, columns []logsto return } - // Make sure columns contain _time and _stream_id fields. - // These fields are needed for proper tail work. + // Make sure columns contain _time field, since it is needed for proper tail work. hasTime := false - hasStreamID := false for _, c := range columns { if c.Name == "_time" { hasTime = true - } - if c.Name == "_stream_id" { - hasStreamID = true + break } } if !hasTime { @@ -435,11 +439,6 @@ func (tp *tailProcessor) writeBlock(_ uint, timestamps []int64, columns []logsto tp.cancel() return } - if !hasStreamID { - tp.err = fmt.Errorf("missing _stream_id field") - tp.cancel() - return - } // Copy block rows to tp.perStreamRows for i, timestamp := range timestamps { @@ -458,6 +457,7 @@ func (tp *tailProcessor) writeBlock(_ uint, timestamps []int64, columns []logsto streamID = value } } + tp.perStreamRows[streamID] = append(tp.perStreamRows[streamID], logRow{ timestamp: timestamp, fields: fields, @@ -477,15 +477,14 @@ func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { lastTimestamp, ok := tp.lastTimestamps[streamID] if ok { // Skip already written rows - for i := range rows { - if rows[i].timestamp > lastTimestamp { - rows = rows[i:] - break - } + for len(rows) > 0 && rows[0].timestamp <= lastTimestamp { + rows = rows[1:] } } - resultRows = append(resultRows, rows...) - tp.lastTimestamps[streamID] = rows[len(rows)-1].timestamp + if len(rows) > 0 { + resultRows = append(resultRows, rows...) + tp.lastTimestamps[streamID] = rows[len(rows)-1].timestamp + } } clear(tp.perStreamRows) diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 8df7c69d6f..750292806b 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -132,7 +132,7 @@ VictoriaLogs provides `/select/logsql/tail?query=` HTTP endpoint, which r e.g. it works in the way similar to `tail -f` unix command. For example, the following command returns live tailing logs with the `error` word: ```sh -curl http://localhost:9428/select/logsql/tail -d 'query=error' +curl -N http://localhost:9428/select/logsql/tail -d 'query=error' ``` The `` must conform the following restrictions: @@ -143,16 +143,18 @@ The `` must conform the following restrictions: [`uniq`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe), [`top`](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe), [`unroll`](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe), etc. pipes. -- It must return [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_stream_id`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) - fields, e.g. these fields must be left when using [`fields`](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe), - [`delete`](https://docs.victoriametrics.com/victorialogs/logsql/#delete-pipe) or [`rename`](https://docs.victoriametrics.com/victorialogs/logsql/#rename-pipe) pipes. +- It must return [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) field. For example, this fields must be mentioned + in [`fields`](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe) pipe if this pipe is used. + +- It is recommended to return [`_stream_id`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field for more accurate live tailing + across multiple streams. By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried. If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query performs live tailing for `(AccountID=12, ProjectID=34)` tenant: ```sh -curl http://localhost:9428/select/logsql/tail -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=error' +curl -N http://localhost:9428/select/logsql/tail -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=error' ``` The number of currently executed live tailing requests to `/select/logsql/tail` can be [monitored](https://docs.victoriametrics.com/victorialogs/#monitoring) diff --git a/lib/httpserver/httpserver.go b/lib/httpserver/httpserver.go index 4c9c768e25..b52cd4c10f 100644 --- a/lib/httpserver/httpserver.go +++ b/lib/httpserver/httpserver.go @@ -568,6 +568,21 @@ func (rwa *responseWriterWithAbort) WriteHeader(statusCode int) { rwa.sentHeaders = true } +// Flush implements net/http.Flusher interface +func (rwa *responseWriterWithAbort) Flush() { + if rwa.aborted { + return + } + if !rwa.sentHeaders { + rwa.sentHeaders = true + } + flusher, ok := rwa.ResponseWriter.(http.Flusher) + if !ok { + logger.Panicf("BUG: it is expected http.ResponseWriter (%T) supports http.Flusher interface", rwa.ResponseWriter) + } + flusher.Flush() +} + // abort aborts the client connection associated with rwa. // // The last http chunk in the response stream is intentionally written incorrectly, @@ -618,6 +633,7 @@ func Errorf(w http.ResponseWriter, r *http.Request, format string, args ...inter break } } + if rwa, ok := w.(*responseWriterWithAbort); ok && rwa.sentHeaders { // HTTP status code has been already sent to client, so it cannot be sent again. // Just write errStr to the response and abort the client connection, so the client could notice the error.