From d07e09b1e4a4f054815dfcc4fc1be6053ffe3e25 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 9 Oct 2024 12:26:07 +0200 Subject: [PATCH] app/vlogscli: add support for live tailing (cherry picked from commit e31625e0b20146d328ca18578fbc08a237546fa8) Signed-off-by: hagen1778 # Conflicts: # Makefile --- app/vlogscli/json_prettifier.go | 30 +++++- app/vlogscli/main.go | 135 ++++++++++++++++++------- docs/VictoriaLogs/CHANGELOG.md | 3 + docs/VictoriaLogs/querying/vlogscli.md | 16 +++ 4 files changed, 145 insertions(+), 39 deletions(-) diff --git a/app/vlogscli/json_prettifier.go b/app/vlogscli/json_prettifier.go index 0fe4f6d5f3..1369155b45 100644 --- a/app/vlogscli/json_prettifier.go +++ b/app/vlogscli/json_prettifier.go @@ -17,6 +17,7 @@ const ( outputModeJSONMultiline = outputMode(0) outputModeJSONSingleline = outputMode(1) outputModeLogfmt = outputMode(2) + outputModeCompact = outputMode(3) ) func getOutputFormatter(outputMode outputMode) func(w io.Writer, fields []logstorage.Field) error { @@ -31,6 +32,8 @@ func getOutputFormatter(outputMode outputMode) func(w io.Writer, fields []logsto } case outputModeLogfmt: return writeLogfmtObject + case outputModeCompact: + return writeCompactObject default: panic(fmt.Errorf("BUG: unexpected outputMode=%d", outputMode)) } @@ -94,8 +97,13 @@ func (jp *jsonPrettifier) prettifyJSONLines() error { if err := jp.formatter(jp.bw, fields); err != nil { return err } + + // Flush bw after every output line in order to show results as soon as they appear. + if err := jp.bw.Flush(); err != nil { + return err + } } - return jp.bw.Flush() + return nil } func (jp *jsonPrettifier) Close() error { @@ -161,6 +169,26 @@ func writeLogfmtObject(w io.Writer, fields []logstorage.Field) error { return err } +func writeCompactObject(w io.Writer, fields []logstorage.Field) error { + if len(fields) == 1 { + // Just write field value as is without name + _, err := fmt.Fprintf(w, "%s\n", fields[0].Value) + return err + } + if len(fields) == 2 && fields[0].Name == "_time" || fields[1].Name == "_time" { + // Write _time\tfieldValue as is + if fields[0].Name == "_time" { + _, err := fmt.Fprintf(w, "%s\t%s\n", fields[0].Value, fields[1].Value) + return err + } + _, err := fmt.Fprintf(w, "%s\t%s\n", fields[1].Value, fields[0].Value) + return err + } + + // Fall back to logfmt + return writeLogfmtObject(w, fields) +} + func writeJSONObject(w io.Writer, fields []logstorage.Field, isMultiline bool) error { if len(fields) == 0 { fmt.Fprintf(w, "{}\n") diff --git a/app/vlogscli/main.go b/app/vlogscli/main.go index 055cea13ff..ef4a3145a6 100644 --- a/app/vlogscli/main.go +++ b/app/vlogscli/main.go @@ -27,7 +27,9 @@ import ( var ( datasourceURL = flag.String("datasource.url", "http://localhost:9428/select/logsql/query", "URL for querying VictoriaLogs; "+ - "see https://docs.victoriametrics.com/victorialogs/querying/#querying-logs") + "see https://docs.victoriametrics.com/victorialogs/querying/#querying-logs . See also -tail.url") + tailURL = flag.String("tail.url", "", "URL for live tailing queries to VictoriaLogs; see https://docs.victoriametrics.com/victorialogs/querying/#live-tailing ."+ + "The url is automatically detected from -datasource.url by replacing /query with /tail at the end if -tail.url is empty") historyFile = flag.String("historyFile", "vlogscli-history", "Path to file with command history") header = flagutil.NewArrayString("header", "Optional header to pass in request -datasource.url in the form 'HeaderName: value'") ) @@ -95,9 +97,7 @@ func runReadlineLoop(rl *readline.Instance, incompleteLine *string) { case io.EOF: if s != "" { // This is non-interactive query execution. - if err := executeQuery(context.Background(), rl, s, outputMode); err != nil { - fmt.Fprintf(rl, "%s\n", err) - } + executeQuery(context.Background(), rl, s, outputMode) } return case readline.ErrInterrupt: @@ -147,6 +147,13 @@ func runReadlineLoop(rl *readline.Instance, incompleteLine *string) { s = "" continue } + if s == `\c` { + fmt.Fprintf(rl, "compact output mode\n") + outputMode = outputModeCompact + historyLines = pushToHistory(rl, historyLines, s) + s = "" + continue + } if s == `\logfmt` { fmt.Fprintf(rl, "logfmt output mode\n") outputMode = outputModeLogfmt @@ -163,18 +170,9 @@ func runReadlineLoop(rl *readline.Instance, incompleteLine *string) { // Execute the query ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) - err = executeQuery(ctx, rl, s, outputMode) + executeQuery(ctx, rl, s, outputMode) cancel() - if err != nil { - if errors.Is(err, context.Canceled) { - fmt.Fprintf(rl, "\n") - } else { - fmt.Fprintf(rl, "%s\n", err) - } - // Save queries in the history even if they weren't finished successfully - } - historyLines = pushToHistory(rl, historyLines, s) s = "" rl.SetPrompt(firstLinePrompt) @@ -257,26 +255,90 @@ func printCommandsHelp(w io.Writer) { \h - show this help \s - singleline json output mode \m - multiline json output mode +\c - compact output \logfmt - logfmt output mode +\tail - live tail results `) } -func executeQuery(ctx context.Context, output io.Writer, s string, outputMode outputMode) error { - // Parse the query and convert it to canonical view. - s = strings.TrimSuffix(s, ";") - q, err := logstorage.ParseQuery(s) - if err != nil { - return fmt.Errorf("cannot parse query: %w", err) +func executeQuery(ctx context.Context, output io.Writer, qStr string, outputMode outputMode) { + if strings.HasPrefix(qStr, `\tail `) { + tailQuery(ctx, output, qStr, outputMode) + return } - qStr := q.String() + + respBody := getQueryResponse(ctx, output, qStr, outputMode, *datasourceURL) + if respBody == nil { + return + } + defer func() { + _ = respBody.Close() + }() + + if err := readWithLess(respBody); err != nil { + fmt.Fprintf(output, "error when reading query response: %s\n", err) + return + } +} + +func tailQuery(ctx context.Context, output io.Writer, qStr string, outputMode outputMode) { + qStr = strings.TrimPrefix(qStr, `\tail `) + qURL, err := getTailURL() + if err != nil { + fmt.Fprintf(output, "%s\n", err) + return + } + + respBody := getQueryResponse(ctx, output, qStr, outputMode, qURL) + if respBody == nil { + return + } + defer func() { + _ = respBody.Close() + }() + + if _, err := io.Copy(output, respBody); err != nil { + if !errors.Is(err, context.Canceled) && !isErrPipe(err) { + fmt.Fprintf(output, "error when live tailing query response: %s\n", err) + } + fmt.Fprintf(output, "\n") + return + } +} + +func getTailURL() (string, error) { + if *tailURL != "" { + return *tailURL, nil + } + + u, err := url.Parse(*datasourceURL) + if err != nil { + return "", fmt.Errorf("cannot parse -datasource.url=%q: %w", *datasourceURL, err) + } + if !strings.HasSuffix(u.Path, "/query") { + return "", fmt.Errorf("cannot find /query suffix in -datasource.url=%q", *datasourceURL) + } + u.Path = u.Path[:len(u.Path)-len("/query")] + "/tail" + return u.String(), nil +} + +func getQueryResponse(ctx context.Context, output io.Writer, qStr string, outputMode outputMode, qURL string) io.ReadCloser { + // Parse the query and convert it to canonical view. + qStr = strings.TrimSuffix(qStr, ";") + q, err := logstorage.ParseQuery(qStr) + if err != nil { + fmt.Fprintf(output, "cannot parse query: %s\n", err) + return nil + } + qStr = q.String() fmt.Fprintf(output, "executing [%s]...", qStr) - // Prepare HTTP request for VictoriaLogs + // Prepare HTTP request for qURL args := make(url.Values) args.Set("query", qStr) data := strings.NewReader(args.Encode()) - req, err := http.NewRequestWithContext(ctx, "POST", *datasourceURL, data) + req, err := http.NewRequestWithContext(ctx, "POST", qURL, data) if err != nil { panic(fmt.Errorf("BUG: cannot prepare request to server: %w", err)) } @@ -285,37 +347,34 @@ func executeQuery(ctx context.Context, output io.Writer, s string, outputMode ou req.Header.Set(h.Name, h.Value) } - // Execute HTTP request at VictoriaLogs + // Execute HTTP request at qURL startTime := time.Now() resp, err := httpClient.Do(req) queryDuration := time.Since(startTime) fmt.Fprintf(output, "; duration: %.3fs\n", queryDuration.Seconds()) if err != nil { - return fmt.Errorf("cannot execute query: %w", err) + if errors.Is(err, context.Canceled) { + fmt.Fprintf(output, "\n") + } else { + fmt.Fprintf(output, "cannot execute query: %s\n", err) + } + return nil } - defer func() { - _ = resp.Body.Close() - }() + // Verify response code if resp.StatusCode != http.StatusOK { body, err := io.ReadAll(resp.Body) if err != nil { body = []byte(fmt.Sprintf("cannot read response body: %s", err)) } - return fmt.Errorf("unexpected status code: %d; response body:\n%s", resp.StatusCode, body) + fmt.Fprintf(output, "unexpected status code: %d; response body:\n%s\n", resp.StatusCode, body) + return nil } - // Prettify the response and stream it to 'less'. + // Prettify the response body jp := newJSONPrettifier(resp.Body, outputMode) - defer func() { - _ = jp.Close() - }() - if err := readWithLess(jp); err != nil { - return fmt.Errorf("error when reading query response: %w", err) - } - - return nil + return jp } var httpClient = &http.Client{} diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 6c5d107a37..d09d15be3f 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -17,6 +17,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## [v0.34.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.34.0-victorialogs) +* FEATURE: [vlogscli](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/): add ability to live tail query results - see [these docs](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/#live-tailing). +* FEATURE: [vlogscli](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/): add compact output mode for query results. It can be enabled by typing `\c` and then pressing `enter`. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/#output-modes). + Released at 2024-10-08 * FEATURE: [vlogscli](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/): add ability to display results in `logfmt` mode, single-line and multi-line JSON modes according [these docs](https://docs.victoriametrics.com/victorialogs/querying/vlogscli/#output-modes). diff --git a/docs/VictoriaLogs/querying/vlogscli.md b/docs/VictoriaLogs/querying/vlogscli.md index 440e5be95e..a2186c546b 100644 --- a/docs/VictoriaLogs/querying/vlogscli.md +++ b/docs/VictoriaLogs/querying/vlogscli.md @@ -77,6 +77,17 @@ See also [`less` docs](https://man7.org/linux/man-pages/man1/less.1.html) and [command-line integration docs for VictoriaMetrics](https://docs.victoriametrics.com/victorialogs/querying/#command-line). +## Live tailing + +`vlogsql` enters live tailing mode when the query is prepended with `\tail ` command. For example: + +``` +;> \tail {kubernetes_container_name="vmagent"}; +``` + +By default `vlogscli` derives [the URL for live tailing](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) from the `-datasource.url` command-line flag +by replacing `/query` with `/tail` at the end of `-datasource.url`. The URL for live tailing can be specified explicitly via `-tail.url` command-line flag. + ## Query history `vlogsql` supports query history - press `up` and `down` keys for navigating the history. @@ -90,6 +101,7 @@ Press `Enter` when the needed query is found in order to execute it. Press `Ctrl+C` for exit from the `search history` mode. See also [other available shortcuts](https://github.com/chzyer/readline/blob/f533ef1caae91a1fcc90875ff9a5a030f0237c6a/doc/shortcut.md). + ## Output modes By default `vlogscli` displays query results as prettified JSON object with every field on a separate line. @@ -99,4 +111,8 @@ Fields in every JSON object are sorted in alphabetical order. This simplifies lo * A single JSON line per every result. Type `\s` and press `enter` for this mode. * Multline JSON per every result. Type `\m` and press `enter` for this mode. +* Compact output. Type `\c` and press `enter` for this mode. + This mode shows field values as is if the response contains a single field + (for example if [`fields _msg` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe) is used) + plus optional [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). * [Logfmt output](https://brandur.org/logfmt). Type `\logfmt` and press `enter` for this mode.