diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index ad636b239..cb8ef56fc 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -45,6 +45,7 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ } if step <= 0 { httpserver.Errorf(w, r, "'step' must be bigger than zero") + return } // Obtain offset @@ -563,6 +564,137 @@ func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { return tailRows, nil } +// ProcessStatsQueryRangeRequest handles /select/logsql/stats_query_range request. +// +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats +func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + q, tenantIDs, err := parseCommonArgs(r) + if err != nil { + httpserver.SendPrometheusError(w, r, err) + return + } + + // Obtain step + stepStr := r.FormValue("step") + if stepStr == "" { + stepStr = "1d" + } + step, err := promutils.ParseDuration(stepStr) + if err != nil { + err = fmt.Errorf("cannot parse 'step' arg: %s", err) + httpserver.SendPrometheusError(w, r, err) + return + } + if step <= 0 { + err := fmt.Errorf("'step' must be bigger than zero") + httpserver.SendPrometheusError(w, r, err) + return + } + + // Obtain `by(...)` fields from the last `| stats` pipe in q. + // Add `_time:step` to the `by(...)` list. + byFields, ok := q.GetStatsByFields(int64(step)) + if !ok { + err := fmt.Errorf("the query must end with '| stats ...'; got [%s]", q) + httpserver.SendPrometheusError(w, r, err) + return + } + + q.Optimize() + + m := make(map[string]*statsSeries) + var mLock sync.Mutex + + writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { + clonedColumnNames := make([]string, len(columns)) + for i, c := range columns { + clonedColumnNames[i] = strings.Clone(c.Name) + } + for i := range timestamps { + timestamp := q.GetTimestamp() + labels := make([]logstorage.Field, 0, len(byFields)) + for j, c := range columns { + if c.Name == "_time" { + nsec, ok := logstorage.TryParseTimestampRFC3339Nano(c.Values[i]) + if ok { + timestamp = nsec + continue + } + } + if slices.Contains(byFields, c.Name) { + labels = append(labels, logstorage.Field{ + Name: clonedColumnNames[j], + Value: strings.Clone(c.Values[i]), + }) + } + } + + var dst []byte + for j, c := range columns { + if !slices.Contains(byFields, c.Name) { + name := clonedColumnNames[j] + dst = dst[:0] + dst = append(dst, name...) + dst = logstorage.MarshalFieldsToJSON(dst, labels) + key := string(dst) + p := statsPoint{ + Timestamp: timestamp, + Value: strings.Clone(c.Values[i]), + } + + mLock.Lock() + ss := m[key] + if ss == nil { + ss = &statsSeries{ + key: key, + Name: name, + Labels: labels, + } + m[key] = ss + } + ss.Points = append(ss.Points, p) + mLock.Unlock() + } + } + } + } + + if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil { + err = fmt.Errorf("cannot execute query [%s]: %s", q, err) + httpserver.SendPrometheusError(w, r, err) + return + } + + // Sort the collected stats by time + rows := make([]*statsSeries, 0, len(m)) + for _, ss := range m { + points := ss.Points + sort.Slice(points, func(i, j int) bool { + return points[i].Timestamp < points[j].Timestamp + }) + rows = append(rows, ss) + } + sort.Slice(rows, func(i, j int) bool { + return rows[i].key < rows[j].key + }) + + w.Header().Set("Content-Type", "application/json") + WriteStatsQueryRangeResponse(w, rows) +} + +type statsSeries struct { + key string + + Name string + Labels []logstorage.Field + Points []statsPoint +} + +type statsPoint struct { + Timestamp int64 + Value string +} + // ProcessStatsQueryRequest handles /select/logsql/stats_query request. // // See https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats @@ -573,8 +705,8 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt return } - // Verify that q ends with `| stats` pipe - byFields, ok := q.GetStatsByFields() + // Obtain `by(...)` fields from the last `| stats` pipe in q. + byFields, ok := q.GetStatsByFields(0) if !ok { err := fmt.Errorf("the query must end with '| stats ...'; got [%s]", q) httpserver.SendPrometheusError(w, r, err) @@ -611,6 +743,7 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt Timestamp: timestamp, Value: strings.Clone(c.Values[i]), } + rowsLock.Lock() rows = append(rows, r) rowsLock.Unlock() diff --git a/app/vlselect/logsql/stats_query_range_response.qtpl b/app/vlselect/logsql/stats_query_range_response.qtpl new file mode 100644 index 000000000..2c71233f0 --- /dev/null +++ b/app/vlselect/logsql/stats_query_range_response.qtpl @@ -0,0 +1,52 @@ +{% stripspace %} + +// StatsQueryRangeResponse generates response for /select/logsql/stats_query_range +{% func StatsQueryRangeResponse(rows []*statsSeries) %} +{ + "status":"success", + "data":{ + "resultType":"matrix", + "result":[ + {% if len(rows) > 0 %} + {%= formatStatsSeries(rows[0]) %} + {% code rows = rows[1:] %} + {% for i := range rows %} + ,{%= formatStatsSeries(rows[i]) %} + {% endfor %} + {% endif %} + ] + } +} +{% endfunc %} + +{% func formatStatsSeries(ss *statsSeries) %} +{ + "metric":{ + "__name__":{%q= ss.Name %} + {% if len(ss.Labels) > 0 %} + {% for _, label := range ss.Labels %} + ,{%q= label.Name %}:{%q= label.Value %} + {% endfor %} + {% endif %} + }, + "values":[ + {% code points := ss.Points %} + {% if len(points) > 0 %} + {%= formatStatsPoint(&points[0]) %} + {% code points = points[1:] %} + {% for i := range points %} + ,{%= formatStatsPoint(&points[i]) %} + {% endfor %} + {% endif %} + ] +} +{% endfunc %} + +{% func formatStatsPoint(p *statsPoint) %} +[ + {%f= float64(p.Timestamp)/1e9 %}, + {%q= p.Value %} +] +{% endfunc %} + +{% endstripspace %} diff --git a/app/vlselect/logsql/stats_query_range_response.qtpl.go b/app/vlselect/logsql/stats_query_range_response.qtpl.go new file mode 100644 index 000000000..ac8afd72f --- /dev/null +++ b/app/vlselect/logsql/stats_query_range_response.qtpl.go @@ -0,0 +1,188 @@ +// Code generated by qtc from "stats_query_range_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// StatsQueryRangeResponse generates response for /select/logsql/stats_query_range + +//line app/vlselect/logsql/stats_query_range_response.qtpl:4 +package logsql + +//line app/vlselect/logsql/stats_query_range_response.qtpl:4 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/stats_query_range_response.qtpl:4 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/stats_query_range_response.qtpl:4 +func StreamStatsQueryRangeResponse(qw422016 *qt422016.Writer, rows []*statsSeries) { +//line app/vlselect/logsql/stats_query_range_response.qtpl:4 + qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:10 + if len(rows) > 0 { +//line app/vlselect/logsql/stats_query_range_response.qtpl:11 + streamformatStatsSeries(qw422016, rows[0]) +//line app/vlselect/logsql/stats_query_range_response.qtpl:12 + rows = rows[1:] + +//line app/vlselect/logsql/stats_query_range_response.qtpl:13 + for i := range rows { +//line app/vlselect/logsql/stats_query_range_response.qtpl:13 + qw422016.N().S(`,`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:14 + streamformatStatsSeries(qw422016, rows[i]) +//line app/vlselect/logsql/stats_query_range_response.qtpl:15 + } +//line app/vlselect/logsql/stats_query_range_response.qtpl:16 + } +//line app/vlselect/logsql/stats_query_range_response.qtpl:16 + qw422016.N().S(`]}}`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 +} + +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 +func WriteStatsQueryRangeResponse(qq422016 qtio422016.Writer, rows []*statsSeries) { +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 + StreamStatsQueryRangeResponse(qw422016, rows) +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 +} + +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 +func StatsQueryRangeResponse(rows []*statsSeries) string { +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 + WriteStatsQueryRangeResponse(qb422016, rows) +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 + return qs422016 +//line app/vlselect/logsql/stats_query_range_response.qtpl:20 +} + +//line app/vlselect/logsql/stats_query_range_response.qtpl:22 +func streamformatStatsSeries(qw422016 *qt422016.Writer, ss *statsSeries) { +//line app/vlselect/logsql/stats_query_range_response.qtpl:22 + qw422016.N().S(`{"metric":{"__name__":`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:25 + qw422016.N().Q(ss.Name) +//line app/vlselect/logsql/stats_query_range_response.qtpl:26 + if len(ss.Labels) > 0 { +//line app/vlselect/logsql/stats_query_range_response.qtpl:27 + for _, label := range ss.Labels { +//line app/vlselect/logsql/stats_query_range_response.qtpl:27 + qw422016.N().S(`,`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:28 + qw422016.N().Q(label.Name) +//line app/vlselect/logsql/stats_query_range_response.qtpl:28 + qw422016.N().S(`:`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:28 + qw422016.N().Q(label.Value) +//line app/vlselect/logsql/stats_query_range_response.qtpl:29 + } +//line app/vlselect/logsql/stats_query_range_response.qtpl:30 + } +//line app/vlselect/logsql/stats_query_range_response.qtpl:30 + qw422016.N().S(`},"values":[`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:33 + points := ss.Points + +//line app/vlselect/logsql/stats_query_range_response.qtpl:34 + if len(points) > 0 { +//line app/vlselect/logsql/stats_query_range_response.qtpl:35 + streamformatStatsPoint(qw422016, &points[0]) +//line app/vlselect/logsql/stats_query_range_response.qtpl:36 + points = points[1:] + +//line app/vlselect/logsql/stats_query_range_response.qtpl:37 + for i := range points { +//line app/vlselect/logsql/stats_query_range_response.qtpl:37 + qw422016.N().S(`,`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:38 + streamformatStatsPoint(qw422016, &points[i]) +//line app/vlselect/logsql/stats_query_range_response.qtpl:39 + } +//line app/vlselect/logsql/stats_query_range_response.qtpl:40 + } +//line app/vlselect/logsql/stats_query_range_response.qtpl:40 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 +} + +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 +func writeformatStatsSeries(qq422016 qtio422016.Writer, ss *statsSeries) { +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 + streamformatStatsSeries(qw422016, ss) +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 +} + +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 +func formatStatsSeries(ss *statsSeries) string { +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 + writeformatStatsSeries(qb422016, ss) +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 + return qs422016 +//line app/vlselect/logsql/stats_query_range_response.qtpl:43 +} + +//line app/vlselect/logsql/stats_query_range_response.qtpl:45 +func streamformatStatsPoint(qw422016 *qt422016.Writer, p *statsPoint) { +//line app/vlselect/logsql/stats_query_range_response.qtpl:45 + qw422016.N().S(`[`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:47 + qw422016.N().F(float64(p.Timestamp) / 1e9) +//line app/vlselect/logsql/stats_query_range_response.qtpl:47 + qw422016.N().S(`,`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:48 + qw422016.N().Q(p.Value) +//line app/vlselect/logsql/stats_query_range_response.qtpl:48 + qw422016.N().S(`]`) +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 +} + +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 +func writeformatStatsPoint(qq422016 qtio422016.Writer, p *statsPoint) { +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 + streamformatStatsPoint(qw422016, p) +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 +} + +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 +func formatStatsPoint(p *statsPoint) string { +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 + writeformatStatsPoint(qb422016, p) +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 + return qs422016 +//line app/vlselect/logsql/stats_query_range_response.qtpl:50 +} diff --git a/app/vlselect/main.go b/app/vlselect/main.go index 9033f4040..c08a53884 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -197,6 +197,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re logsqlStatsQueryRequests.Inc() logsql.ProcessStatsQueryRequest(ctx, w, r) return true + case "/select/logsql/stats_query_range": + logsqlStatsQueryRangeRequests.Inc() + logsql.ProcessStatsQueryRangeRequest(ctx, w, r) + return true case "/select/logsql/stream_field_names": logsqlStreamFieldNamesRequests.Inc() logsql.ProcessStreamFieldNamesRequest(ctx, w, r) @@ -237,6 +241,7 @@ var ( logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`) logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`) logsqlStatsQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stats_query"}`) + logsqlStatsQueryRangeRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stats_query_range"}`) logsqlStreamFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_names"}`) logsqlStreamFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_values"}`) logsqlStreamIDsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_ids"}`) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 2ae28e32c..1481ebd40 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip * FEATURE: add [`/select/logsql/stats_query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats), which is going to be used by [vmalert](https://docs.victoriametrics.com/vmalert/) for executing alerting and recording rules against VictoriaLogs. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6942) for details. +* FEATURE: add [`/select/logsql/stats_query_range` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats), which is going to be used by [VictoriaLogs plugin for Grafana](https://docs.victoriametrics.com/victorialogs/victorialogs-datasource/) for building time series panels. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6943) for details. * FEATURE: optimize [multi-exact queries](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter) with many phrases to search. For example, `ip:in(path:="/foo/bar" | keep ip)` when there are many unique values for `ip` field among log entries with `/foo/bar` path. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add support for displaying the top 5 log streams in the hits graph. The remaining log streams are grouped into an "other" label. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6545). * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add the ability to customize the graph display with options for bar, line, stepped line, and points. diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index a10f5768f..f170d7b65 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -14,6 +14,7 @@ VictoriaLogs provides the following HTTP endpoints: - [`/select/logsql/tail`](#live-tailing) for live tailing of query results. - [`/select/logsql/hits`](#querying-hits-stats) for querying log hits stats over the given time range. - [`/select/logsql/stats_query`](#querying-log-stats) for querying log stats at the given time. +- [`/select/logsql/stats_query_range`](#querying-log-range-stats) for querying log stats over the given time range. - [`/select/logsql/stream_ids`](#querying-stream_ids) for querying `_stream_id` values of [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). - [`/select/logsql/streams`](#querying-streams) for querying [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). - [`/select/logsql/stream_field_names`](#querying-stream-field-names) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field names. @@ -107,6 +108,7 @@ See also: - [Live tailing](#live-tailing) - [Querying hits stats](#querying-hits-stats) - [Querying log stats](#querying-log-stats) +- [Querying log range stats](#querying-log-range-stats) - [Querying streams](#querying-streams) - [Querying stream field names](#querying-stream-field-names) - [Querying stream field values](#querying-stream-field-values) @@ -276,6 +278,7 @@ See also: - [Querying logs](#querying-logs) - [Querying log stats](#querying-log-stats) +- [Querying log range stats](#querying-log-range-stats) - [Querying streams](#querying-streams) - [HTTP API](#http-api) @@ -285,12 +288,12 @@ VictoriaLogs provides `/select/logsql/stats_query?query=&time=` HTTP e for the given [`query`](https://docs.victoriametrics.com/victorialogs/logsql/) at the given timestamp `t` in the format compatible with [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries). -The `` arg can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). -If `` is missing, then it equals to the current time. - The `` must contain [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). The calculated stats is converted into metrics with labels enumerated in `by(...)` clause of the `| stats by(...)` pipe. +The `` arg can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). +If `` is missing, then it equals to the current time. + For example, the following command returns the number of logs per each `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across logs over `2024-01-01` day by UTC: @@ -345,6 +348,100 @@ The `/select/logsql/stats_query` API is useful for generating Prometheus-compati See also: +- [Querying log range stats](#querying-log-range-stats) +- [Querying logs](#querying-logs) +- [Querying hits stats](#querying-hits-stats) +- [HTTP API](#http-api) + +### Querying log range stats + +VictoriaLogs provides `/select/logsql/stats_query_range?query=&start=&end=&step=` HTTP endpoint, which returns log stats +for the given [`query`](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[start ... end]` time range with the given `step` interval. +The stats is returned in the format compatible with [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries). + +The `` must contain [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). The calculated stats is converted into metrics +with labels enumerated in `by(...)` clause of the `| stats by(...)` pipe. + +The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). +If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. +If `` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs. + +The `` arg can contain values in [the format specified here](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-time-buckets). +If `` is missing, then it equals to `1d` (one day). + +For example, the following command returns the number of logs per each `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +across logs over `2024-01-01` day by UTC with 6-hour granularity: + +```sh +curl http://localhost:9428/select/logsql/stats_query_range -d 'query=* | stats by (level) count(*)' -d 'start=2024-01-01' -d 'end=2024-01-02' -d 'step=6h' +``` + +Below is an example JSON output returned from this endpoint: + +```json +{ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "__name__": "count(*)", + "level": "info" + }, + "values": [ + [ + 1704067200, + "103125" + ], + [ + 1704088800, + "102500" + ], + [ + 1704110400, + "103125" + ], + [ + 1704132000, + "102500" + ] + ] + }, + { + "metric": { + "__name__": "count(*)", + "level": "error" + }, + "values": [ + [ + 1704067200, + "31" + ], + [ + 1704088800, + "25" + ], + [ + 1704110400, + "31" + ], + [ + 1704132000, + "125" + ] + ] + } + ] + } +} +``` + +The `/select/logsql/stats_query_range` API is useful for generating Prometheus-compatible graphs in Grafana. + +See also: + +- [Querying log stats](#querying-log-stats) - [Querying logs](#querying-logs) - [Querying hits stats](#querying-hits-stats) - [HTTP API](#http-api) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index a95a34e1f..57831ca29 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -454,10 +454,12 @@ func (q *Query) Optimize() { } } -// GetStatsByFields returns `| stats by (...)` fields from q if q contains safe `| stats ...` pipe in the end. +// GetStatsByFields returns `by (...)` fields from the last `stats` pipe at q. +// +// If step > 0, then _time:step field is added to the last `stats by(...)` pipe at q. // // False is returned if q doesn't contain safe `| stats ...` pipe. -func (q *Query) GetStatsByFields() ([]string, bool) { +func (q *Query) GetStatsByFields(step int64) ([]string, bool) { pipes := q.pipes idx := getLastPipeStatsIdx(pipes) @@ -465,8 +467,13 @@ func (q *Query) GetStatsByFields() ([]string, bool) { return nil, false } + ps := pipes[idx].(*pipeStats) + + // add _time:step to ps.byFields if it doesn't contain it yet. + ps.byFields = addByTimeField(ps.byFields, step) + // extract by(...) field names from stats pipe - byFields := pipes[idx].(*pipeStats).byFields + byFields := ps.byFields fields := make([]string, len(byFields)) for i, f := range byFields { fields[i] = f.name @@ -525,6 +532,34 @@ func getLastPipeStatsIdx(pipes []pipe) int { return -1 } +func addByTimeField(byFields []*byStatsField, step int64) []*byStatsField { + if step <= 0 { + return byFields + } + stepStr := fmt.Sprintf("%d", step) + dstFields := make([]*byStatsField, 0, len(byFields)+1) + hasByTime := false + for _, f := range byFields { + if f.name == "_time" { + f = &byStatsField{ + name: "_time", + bucketSizeStr: stepStr, + bucketSize: float64(step), + } + hasByTime = true + } + dstFields = append(dstFields, f) + } + if !hasByTime { + dstFields = append(dstFields, &byStatsField{ + name: "_time", + bucketSizeStr: stepStr, + bucketSize: float64(step), + }) + } + return dstFields +} + func removeStarFilters(f filter) filter { visitFunc := func(f filter) bool { fp, ok := f.(*filterPrefix) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 4cbf18cef..366ed9c80 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -2101,6 +2101,35 @@ func TestQueryDropAllPipes(t *testing.T) { f(`foo | filter bar:baz | stats by (x) min(y)`, `foo bar:baz`) } +func TestQueryGetStatsByFields_PositiveStep(t *testing.T) { + f := func(qStr string, step int64, fieldsExpected []string, qExpected string) { + t.Helper() + + q, err := ParseQuery(qStr) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", qStr, err) + } + fields, ok := q.GetStatsByFields(step) + if !ok { + t.Fatalf("cannot obtain byFields from the query [%s]", qStr) + } + if !reflect.DeepEqual(fields, fieldsExpected) { + t.Fatalf("unexpected byFields;\ngot\n%q\nwant\n%q", fields, fieldsExpected) + } + + // Verify the resulting query + qResult := q.String() + if qResult != qExpected { + t.Fatalf("unexpected query\ngot\n%s\nwant\n%s", qResult, qExpected) + } + } + + f(`* | count()`, nsecsPerHour, []string{"_time"}, `* | stats by (_time:3600000000000) count(*) as "count(*)"`) + f(`* | by (level) count() x`, nsecsPerDay, []string{"level", "_time"}, `* | stats by (level, _time:86400000000000) count(*) as x`) + f(`* | by (_time:1m) count() x`, nsecsPerDay, []string{"_time"}, `* | stats by (_time:86400000000000) count(*) as x`) + f(`* | by (_time:1m offset 30s,level) count() x, count_uniq(z) y`, nsecsPerDay, []string{"_time", "level"}, `* | stats by (_time:86400000000000, level) count(*) as x, count_uniq(z) as y`) +} + func TestQueryGetStatsByFields_Success(t *testing.T) { f := func(qStr string, fieldsExpected []string) { t.Helper() @@ -2109,7 +2138,7 @@ func TestQueryGetStatsByFields_Success(t *testing.T) { if err != nil { t.Fatalf("cannot parse [%s]: %s", qStr, err) } - fields, ok := q.GetStatsByFields() + fields, ok := q.GetStatsByFields(0) if !ok { t.Fatalf("cannot obtain byFields from the query [%s]", qStr) } @@ -2156,7 +2185,7 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) { if err != nil { t.Fatalf("cannot parse [%s]: %s", qStr, err) } - fields, ok := q.GetStatsByFields() + fields, ok := q.GetStatsByFields(0) if ok { t.Fatalf("expecting failure to get byFields for the query [%s]", qStr) }