diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 6cf2b266f0..c09887872f 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -2,13 +2,17 @@ package logsql import ( "context" + "fmt" + "math" "net/http" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) // ProcessQueryRequest handles /select/logsql/query request. @@ -19,23 +23,46 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req httpserver.Errorf(w, r, "%s", err) return } - limit, err := httputils.GetInt(r, "limit") - if err != nil { - httpserver.Errorf(w, r, "%s", err) - return - } + // Parse query qStr := r.FormValue("query") q, err := logstorage.ParseQuery(qStr) if err != nil { httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err) return } - w.Header().Set("Content-Type", "application/stream+json; charset=utf-8") + // Parse optional start and end args + start, okStart, err := getTimeNsec(r, "start") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + end, okEnd, err := getTimeNsec(r, "end") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + if okStart || okEnd { + if !okStart { + start = math.MinInt64 + } + if !okEnd { + end = math.MaxInt64 + } + q.AddTimeFilter(start, end) + } + + // Parse limit query arg + limit, err := httputils.GetInt(r, "limit") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } if limit > 0 { q.AddPipeLimit(uint64(limit)) } + q.Optimize() tenantIDs := []logstorage.TenantID{tenantID} @@ -54,6 +81,7 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req blockResultPool.Put(bb) } + w.Header().Set("Content-Type", "application/stream+json; charset=utf-8") err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock) bw.FlushIgnoreErrors() @@ -66,3 +94,16 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req } var blockResultPool bytesutil.ByteBufferPool + +func getTimeNsec(r *http.Request, argName string) (int64, bool, error) { + s := r.FormValue(argName) + if s == "" { + return 0, false, nil + } + currentTimestamp := float64(time.Now().UnixNano()) / 1e9 + secs, err := promutils.ParseTimeAt(s, currentTimestamp) + if err != nil { + return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err) + } + return int64(secs * 1e9), true, nil +} diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 7f57884498..aef94f305c 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,10 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: add support for optional `start` and `end` query args to [HTTP querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api), which can be used for limiting the time range for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/). +* FEATURE: add ability to return the first `N` results from [`sort` pipe](#https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). This is useful when `N` biggest or `N` smallest values must be returned from large amounts of logs. +* FEATURE: add [`quantile`](https://docs.victoriametrics.com/victorialogs/logsql/#quantile-stats) and [`median`](https://docs.victoriametrics.com/victorialogs/logsql/#median-stats) [stats functions](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). + ## [v0.6.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.1-victorialogs) Released at 2024-05-14 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 44291699c0..62b6a4979c 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1128,6 +1128,7 @@ By default rows are selected in arbitrary order because of performance reasons, See also: +- [`sort` pipe](#sort-pipe) - [`offset` pipe](#offset-pipe) ### offset pipe @@ -1147,6 +1148,7 @@ Rows can be sorted with [`sort` pipe](#sort-pipe). See also: - [`limit` pipe](#limit-pipe) +- [`sort` pipe](#sort-pipe) ### rename pipe @@ -1198,11 +1200,31 @@ The reverse order can be applied globally via `desc` keyword after `by(...)` cla _time:5m | sort by (foo, bar) desc ``` +Sorting of big number of logs can consume a lot of CPU time and memory. Sometimes it is enough to return the first `N` entries with the biggest +or the smallest values. This can be done by adding `limit N` to the end of `sort ...` pipe. +Such a query consumes lower amounts of memory when sorting big number of logs, since it keeps in memory only `N` log entries. +For example, the following query returns top 10 log entries with the biggest values +for the `request_duration` [field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) during the last hour: + +```logsql +_time:1h | sort by (request_duration desc) limit 10 +``` + +If the first `N` sorted results must be skipped, then `offset N` can be added to `sort` pipe. For example, +the following query skips the first 10 logs with the biggest `request_duration` [field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model), +and then returns the next 20 sorted logs for the last 5 minutes: + +```logsql +_time:1h | sort by (request_duration desc) offset 10 limit 20 +``` + Note that sorting of big number of logs can be slow and can consume a lot of additional memory. It is recommended limiting the number of logs before sorting with the following approaches: +- Adding `limit N` to the end of `sort ...` pipe. - Reducing the selected time range with [time filter](#time-filter). - Using more specific [filters](#filters), so they select less logs. +- Limiting the number of selected [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) via [`fields` pipe](#fields-pipe). See also: @@ -1371,8 +1393,11 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe): - [`count_empty`](#count_empty-stats) calculates the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`count_uniq`](#count_uniq-stats) calculates the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`max`](#max-stats) calcualtes the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`median`](#median-stats) calcualtes the [median](https://en.wikipedia.org/wiki/Median) value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`quantile`](#quantile-stats) calculates the given quantile for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`sum_len`](#sum_len-stats) calculates the sum of lengths for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`values`](#values-stats) returns all the values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1391,6 +1416,8 @@ _time:5m | stats avg(duration) avg_duration See also: +- [`median`](#median-stats) +- [`quantile`](#quantile-stats) - [`min`](#min-stats) - [`max`](#max-stats) - [`sum`](#sum-stats) @@ -1492,10 +1519,28 @@ _time:5m | stats max(duration) max_duration See also: - [`min`](#min-stats) +- [`quantile`](#quantile-stats) - [`avg`](#avg-stats) - [`sum`](#sum-stats) - [`count`](#count-stats) +### median stats + +`median(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the [median](https://en.wikipedia.org/wiki/Median) value across +the give numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + +For example, the following query return median for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +over logs for the last 5 minutes: + +```logsql +_time:5m | stats median(duration) median_duration +``` + +See also: + +- [`quantile`](#quantile-stats) +- [`avg`](#avg-stats) + ### min stats `min(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the minimum value across @@ -1512,10 +1557,34 @@ _time:5m | stats min(duration) min_duration See also: - [`max`](#max-stats) +- [`quantile`](#quantile-stats) - [`avg`](#avg-stats) - [`sum`](#sum-stats) - [`count`](#count-stats) +### quantile stats + +`quantile(phi, field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates `phi` [percentile](https://en.wikipedia.org/wiki/Percentile) over numeric values +for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). The `phi` must be in the range `0 ... 1`, where `0` means `0th` percentile, +while `1` means `100th` percentile. + +For example, the following query calculates `50th`, `90th` and `99th` percentiles for the `request_duration_seconds` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +over logs for the last 5 minutes: + +```logsql +_time:5m | stats + quantile(0.5, request_duration_seconds) p50, + quantile(0.9, request_duration_seconds) p90, + quantile(0.99, request_duration_seconds) p99 +``` + +See also: + +- [`min`](#min-stats) +- [`max`](#max-stats) +- [`median`](#median-stats) +- [`avg`](#avg-stats) + ### sum stats `sum(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the sum of numeric values across @@ -1535,6 +1604,22 @@ See also: - [`max`](#max-stats) - [`min`](#min-stats) +### sum_len stats + +`sum_len(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the sum of lengths of all the values +for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + +For example, the following query returns the sum of lengths of [`_msg` fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +across all the logs for the last 5 minutes: + +```logsql +_time:5m | stats sum_len(_msg) messages_len +``` + +See also: + +- [`count`](#count-stats) + ### uniq_values stats `uniq_values(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the unique non-empty values across @@ -1631,8 +1716,6 @@ Stats over the selected logs can be calculated via [`stats` pipe](#stats-pipe). LogsQL will support calculating the following additional stats based on the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) and fields created by [transformations](#transformations): -- The median and [percentile](https://en.wikipedia.org/wiki/Percentile) for the given field. - It will be possible specifying an optional condition [filter](#post-filters) when calculating the stats. For example, `sum(response_size) if (is_admin:true)` calculates the total response size for admins only. diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 2755dac0a1..bfad23e75e 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -50,6 +50,13 @@ By default the `/select/logsql/query` returns all the log entries matching the g ```sh curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10' ``` +- By adding [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) to the query. For example: + ```sh + curl http://localhost:9428/select/logsql/query -d 'query=error | limit 10' + ``` +- By adding [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter). The time range for the query can be specified via optional + `start` and `end` query ars formatted according to [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#timestamp-formats). +- By adding other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) to the query. The `/select/logsql/query` endpoint returns [a stream of JSON lines](https://jsonlines.org/), where each line contains JSON-encoded log entry in the form `{field1="value1",...,fieldN="valueN"}`. diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index a873a1c0ff..d257775f0f 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1738,6 +1738,55 @@ func (c *blockResultColumn) getMinValue() float64 { } } +func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 { + if c.isConst { + v := c.encodedValues[0] + return uint64(len(v)) * uint64(len(br.timestamps)) + } + if c.isTime { + return uint64(len(time.RFC3339Nano)) * uint64(len(br.timestamps)) + } + + switch c.valueType { + case valueTypeString: + return c.sumLenStringValues(br) + case valueTypeDict: + n := uint64(0) + dictValues := c.dictValues + for _, v := range c.encodedValues { + idx := v[0] + v := dictValues[idx] + n += uint64(len(v)) + } + return n + case valueTypeUint8: + return c.sumLenStringValues(br) + case valueTypeUint16: + return c.sumLenStringValues(br) + case valueTypeUint32: + return c.sumLenStringValues(br) + case valueTypeUint64: + return c.sumLenStringValues(br) + case valueTypeFloat64: + return c.sumLenStringValues(br) + case valueTypeIPv4: + return c.sumLenStringValues(br) + case valueTypeTimestampISO8601: + return uint64(len(iso8601Timestamp)) * uint64(len(br.timestamps)) + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 + } +} + +func (c *blockResultColumn) sumLenStringValues(br *blockResult) uint64 { + n := uint64(0) + for _, v := range c.getValues(br) { + n += uint64(len(v)) + } + return n +} + func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { if c.isConst { v := c.encodedValues[0] diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index b93a9eaeda..a1f1422a85 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -206,6 +206,29 @@ func (q *Query) String() string { return s } +// AddTimeFilter adds global filter _time:[start ... end] to q. +func (q *Query) AddTimeFilter(start, end int64) { + startStr := marshalTimestampRFC3339Nano(nil, start) + endStr := marshalTimestampRFC3339Nano(nil, end) + ft := &filterTime{ + minTimestamp: start, + maxTimestamp: end, + stringRepr: fmt.Sprintf("[%s, %s]", startStr, endStr), + } + + fa, ok := q.f.(*filterAnd) + if ok { + filters := make([]filter, len(fa.filters)+1) + filters[0] = ft + copy(filters[1:], fa.filters) + fa.filters = filters + } else { + q.f = &filterAnd{ + filters: []filter{ft, q.f}, + } + } +} + // AddPipeLimit adds `| limit n` pipe to q. // // See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe @@ -215,6 +238,56 @@ func (q *Query) AddPipeLimit(n uint64) { }) } +// Optimize tries optimizing the query. +func (q *Query) Optimize() { + q.pipes = optimizeSortOffsetPipes(q.pipes) + q.pipes = optimizeSortLimitPipes(q.pipes) +} + +func optimizeSortOffsetPipes(pipes []pipe) []pipe { + // Merge 'sort ... | offset ...' into 'sort ... offset ...' + i := 1 + for i < len(pipes) { + po, ok := pipes[i].(*pipeOffset) + if !ok { + i++ + continue + } + ps, ok := pipes[i-1].(*pipeSort) + if !ok { + i++ + continue + } + if ps.offset == 0 && ps.limit == 0 { + ps.offset = po.n + } + pipes = append(pipes[:i], pipes[i+1:]...) + } + return pipes +} + +func optimizeSortLimitPipes(pipes []pipe) []pipe { + // Merge 'sort ... | limit ...' into 'sort ... limit ...' + i := 1 + for i < len(pipes) { + pl, ok := pipes[i].(*pipeLimit) + if !ok { + i++ + continue + } + ps, ok := pipes[i-1].(*pipeSort) + if !ok { + i++ + continue + } + if ps.limit == 0 || pl.n < ps.limit { + ps.limit = pl.n + } + pipes = append(pipes[:i], pipes[i+1:]...) + } + return pipes +} + func (q *Query) getNeededColumns() ([]string, []string) { neededFields := newFieldsSet() neededFields.add("*") diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index ee9498a0b3..1f0289e584 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -926,6 +926,27 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats by(x) values() limit 1_000 AS y`, `* | stats by (x) values(*) limit 1000 as y`) f(`* | stats by(x) values(a,*,b) y`, `* | stats by (x) values(*) as y`) + // stats pipe sum_len + f(`* | stats Sum_len(foo) bar`, `* | stats sum_len(foo) as bar`) + f(`* | stats BY(x, y, ) SUM_Len(foo,bar,) bar`, `* | stats by (x, y) sum_len(foo, bar) as bar`) + f(`* | stats sum_len() x`, `* | stats sum_len(*) as x`) + f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`) + f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`) + + // stats pipe quantile + f(`* | stats quantile(0, foo) bar`, `* | stats quantile(0, foo) as bar`) + f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`) + f(`* | stats quantile(0.5, a, b, c) bar`, `* | stats quantile(0.5, a, b, c) as bar`) + f(`* | stats quantile(0.99, *) bar`, `* | stats quantile(0.99, *) as bar`) + f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99, *) as bar`) + + // stats pipe median + f(`* | stats Median(foo) bar`, `* | stats median(foo) as bar`) + f(`* | stats BY(x, y, ) MEDIAN(foo,bar,) bar`, `* | stats by (x, y) median(foo, bar) as bar`) + f(`* | stats median() x`, `* | stats median(*) as x`) + f(`* | stats median(*) x`, `* | stats median(*) as x`) + f(`* | stats median(foo,*,bar) x`, `* | stats median(*) as x`) + // stats pipe multiple funcs f(`* | stats count() "foo.bar:baz", count_uniq(a) bar`, `* | stats count(*) as "foo.bar:baz", count_uniq(a) as bar`) f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`) @@ -953,6 +974,15 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | sort bY (foo)`, `* | sort by (foo)`) f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`) f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`) + f(`* | sort limit 10`, `* | sort limit 10`) + f(`* | sort offset 20 limit 10`, `* | sort offset 20 limit 10`) + f(`* | sort desc limit 10`, `* | sort desc limit 10`) + f(`* | sort desc offset 20 limit 10`, `* | sort desc offset 20 limit 10`) + f(`* | sort by (foo desc, bar) limit 10`, `* | sort by (foo desc, bar) limit 10`) + f(`* | sort by (foo desc, bar) oFFset 20 limit 10`, `* | sort by (foo desc, bar) offset 20 limit 10`) + f(`* | sort by (foo desc, bar) desc limit 10`, `* | sort by (foo desc, bar) desc limit 10`) + f(`* | sort by (foo desc, bar) desc OFFSET 30 limit 10`, `* | sort by (foo desc, bar) desc offset 30 limit 10`) + f(`* | sort by (foo desc, bar) desc limit 10 OFFSET 30`, `* | sort by (foo desc, bar) desc offset 30 limit 10`) // uniq pipe f(`* | uniq`, `* | uniq`) @@ -1275,6 +1305,18 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats values(a) limit 0.5`) f(`foo | stats values(a) limit -1`) + // invalid stats sum_len + f(`foo | stats sum_len`) + f(`foo | stats sum_len()`) + + // invalid stats quantile + f(`foo | stats quantile`) + f(`foo | stats quantile() foo`) + f(`foo | stats quantile(bar, baz) foo`) + f(`foo | stats quantile(0.5) foo`) + f(`foo | stats quantile(-1, x) foo`) + f(`foo | stats quantile(10, x) foo`) + // invalid stats grouping fields f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:/bar) count() baz`) @@ -1297,6 +1339,16 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | sort by(baz`) f(`foo | sort by(baz,`) f(`foo | sort by(bar) foo`) + f(`foo | sort by(bar) limit`) + f(`foo | sort by(bar) limit foo`) + f(`foo | sort by(bar) limit -1234`) + f(`foo | sort by(bar) limit 12.34`) + f(`foo | sort by(bar) limit 10 limit 20`) + f(`foo | sort by(bar) offset`) + f(`foo | sort by(bar) offset limit`) + f(`foo | sort by(bar) offset -1234`) + f(`foo | sort by(bar) offset 12.34`) + f(`foo | sort by(bar) offset 10 offset 20`) // invalid uniq pipe f(`foo | uniq bar`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 55c2bab727..7623b6075b 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -65,7 +65,7 @@ func parsePipes(lex *lexer) ([]pipe, error) { var pipes []pipe for !lex.isKeyword(")", "") { if !lex.isKeyword("|") { - return nil, fmt.Errorf("expecting '|'") + return nil, fmt.Errorf("expecting '|'; got %q", lex.token) } if !lex.mustNextToken() { return nil, fmt.Errorf("missing token after '|'") diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index aee2028cfb..4787b56bb5 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -25,6 +25,14 @@ type pipeSort struct { // whether to apply descending order isDesc bool + + // how many results to skip + offset uint64 + + // how many results to return + // + // if zero, then all the results are returned + limit uint64 } func (ps *pipeSort) String() string { @@ -39,6 +47,12 @@ func (ps *pipeSort) String() string { if ps.isDesc { s += " desc" } + if ps.offset > 0 { + s += fmt.Sprintf(" offset %d", ps.offset) + } + if ps.limit > 0 { + s += fmt.Sprintf(" limit %d", ps.limit) + } return s } @@ -55,6 +69,13 @@ func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) { } func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + if ps.limit > 0 { + return newPipeTopkProcessor(ps, workersCount, stopCh, cancel, ppBase) + } + return newPipeSortProcessor(ps, workersCount, stopCh, cancel, ppBase) +} + +func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { maxStateSize := int64(float64(memory.Allowed()) * 0.2) shards := make([]pipeSortProcessorShard, workersCount) @@ -117,6 +138,9 @@ type pipeSortProcessorShardNopad struct { // stateSizeBudget is the remaining budget for the whole state size for the shard. // The per-shard budget is provided in chunks from the parent pipeSortProcessor. stateSizeBudget int + + // columnValues is used as temporary buffer at pipeSortProcessorShard.writeBlock + columnValues [][]string } // sortBlock represents a block of logs for sorting. @@ -176,16 +200,23 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { if len(byFields) == 0 { // Sort by all the columns + columnValues := shard.columnValues[:0] + for _, c := range cs { + columnValues = append(columnValues, c.getValues(br)) + } + shard.columnValues = columnValues + // Generate byColumns var rc resultColumn + bb := bbPool.Get() - for i := range br.timestamps { - // JSON-encode all the columns per each row into a single string + for rowIdx := range br.timestamps { + // Marshal all the columns per each row into a single string // and sort rows by the resulting string. bb.B = bb.B[:0] - for _, c := range cs { - v := c.getValueAtRow(br, i) - bb.B = marshalJSONKeyValue(bb.B, c.name, v) + for i, values := range columnValues { + v := values[rowIdx] + bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v) bb.B = append(bb.B, ',') } rc.addValue(bytesutil.ToUnsafeString(bb.B)) @@ -358,10 +389,8 @@ func (psp *pipeSortProcessor) flush() error { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) } - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } // Sort every shard in parallel @@ -377,17 +406,15 @@ func (psp *pipeSortProcessor) flush() error { } wg.Wait() - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } // Merge sorted results across shards sh := pipeSortProcessorShardsHeap(make([]*pipeSortProcessorShard, 0, len(shards))) for i := range shards { shard := &shards[i] - if shard.Len() > 0 { + if len(shard.rowRefs) > 0 { sh = append(sh, shard) } } @@ -400,49 +427,43 @@ func (psp *pipeSortProcessor) flush() error { wctx := &pipeSortWriteContext{ psp: psp, } - var shardNext *pipeSortProcessorShard + shardNextIdx := 0 for len(sh) > 1 { shard := sh[0] - wctx.writeRow(shard, shard.rowRefNext) - shard.rowRefNext++ + wctx.writeNextRow(shard) if shard.rowRefNext >= len(shard.rowRefs) { _ = heap.Pop(&sh) - shardNext = nil + shardNextIdx = 0 - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } continue } - if shardNext == nil { - shardNext = sh[1] - if len(sh) > 2 && sortBlockLess(sh[2], sh[2].rowRefNext, shardNext, shardNext.rowRefNext) { - shardNext = sh[2] + if shardNextIdx == 0 { + shardNextIdx = 1 + if len(sh) > 2 && sh.Less(2, 1) { + shardNextIdx = 2 } } - if sortBlockLess(shardNext, shardNext.rowRefNext, shard, shard.rowRefNext) { + if sh.Less(shardNextIdx, 0) { heap.Fix(&sh, 0) - shardNext = nil + shardNextIdx = 0 - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } } } if len(sh) == 1 { shard := sh[0] for shard.rowRefNext < len(shard.rowRefs) { - wctx.writeRow(shard, shard.rowRefNext) - shard.rowRefNext++ + wctx.writeNextRow(shard) } } wctx.flush() @@ -455,14 +476,25 @@ type pipeSortWriteContext struct { rcs []resultColumn br blockResult - valuesLen int + rowsWritten uint64 + valuesLen int } -func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx int) { +func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { + ps := shard.ps + + rowIdx := shard.rowRefNext + shard.rowRefNext++ + + wctx.rowsWritten++ + if wctx.rowsWritten <= ps.offset { + return + } + rr := shard.rowRefs[rowIdx] b := &shard.blocks[rr.blockIdx] - byFields := shard.ps.byFields + byFields := ps.byFields rcs := wctx.rcs areEqualColumns := len(rcs) == len(byFields)+len(b.otherColumns) @@ -671,7 +703,36 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) { ps.isDesc = true } - return &ps, nil + for { + switch { + case lex.isKeyword("offset"): + lex.nextToken() + s := lex.token + n, ok := tryParseUint64(s) + lex.nextToken() + if !ok { + return nil, fmt.Errorf("cannot parse 'offset %s'", s) + } + if ps.offset > 0 { + return nil, fmt.Errorf("duplicate 'offset'; the previous one is %d; the new one is %s", ps.offset, s) + } + ps.offset = n + case lex.isKeyword("limit"): + lex.nextToken() + s := lex.token + n, ok := tryParseUint64(s) + lex.nextToken() + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s'", s) + } + if ps.limit > 0 { + return nil, fmt.Errorf("duplicate 'limit'; the previous one is %d; the new one is %s", ps.limit, s) + } + ps.limit = n + default: + return &ps, nil + } + } } // bySortField represents 'by (...)' part of the pipeSort. @@ -725,13 +786,6 @@ func parseBySortFields(lex *lexer) ([]*bySortField, error) { } } -func marshalJSONKeyValue(dst []byte, k, v string) []byte { - dst = strconv.AppendQuote(dst, k) - dst = append(dst, ':') - dst = strconv.AppendQuote(dst, v) - return dst -} - func tryParseInt64(s string) (int64, bool) { if len(s) == 0 { return 0, false @@ -756,3 +810,10 @@ func tryParseInt64(s string) (int64, bool) { } return -int64(u64), true } + +func marshalJSONKeyValue(dst []byte, k, v string) []byte { + dst = strconv.AppendQuote(dst, k) + dst = append(dst, ':') + dst = strconv.AppendQuote(dst, v) + return dst +} diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 3a9d8980bc..56e0864601 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -345,10 +345,8 @@ func (psp *pipeStatsProcessor) flush() error { for key, psg := range shard.m { // shard.m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } spgBase := m[key] @@ -388,10 +386,8 @@ func (psp *pipeStatsProcessor) flush() error { for key, psg := range m { // m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. - select { - case <-psp.stopCh: + if needStop(psp.stopCh) { return nil - default: } // Unmarshal values for byFields from key. @@ -534,6 +530,24 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { return nil, "", fmt.Errorf("cannot parse 'values' func: %w", err) } sf = svs + case lex.isKeyword("sum_len"): + sss, err := parseStatsSumLen(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'sum_len' func: %w", err) + } + sf = sss + case lex.isKeyword("quantile"): + sqs, err := parseStatsQuantile(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'quantile' func: %w", err) + } + sf = sqs + case lex.isKeyword("median"): + sms, err := parseStatsMedian(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'median' func: %w", err) + } + sf = sms default: return nil, "", fmt.Errorf("unknown stats func %q", lex.token) } diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go new file mode 100644 index 0000000000..42b18df6a4 --- /dev/null +++ b/lib/logstorage/pipe_topk.go @@ -0,0 +1,553 @@ +package logstorage + +import ( + "container/heap" + "fmt" + "strings" + "sync" + "sync/atomic" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" +) + +func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + maxStateSize := int64(float64(memory.Allowed()) * 0.2) + + shards := make([]pipeTopkProcessorShard, workersCount) + for i := range shards { + shard := &shards[i] + shard.ps = ps + shard.stateSizeBudget = stateSizeBudgetChunk + maxStateSize -= stateSizeBudgetChunk + } + + ptp := &pipeTopkProcessor{ + ps: ps, + stopCh: stopCh, + cancel: cancel, + ppBase: ppBase, + + shards: shards, + + maxStateSize: maxStateSize, + } + ptp.stateSizeBudget.Store(maxStateSize) + + return ptp +} + +type pipeTopkProcessor struct { + ps *pipeSort + stopCh <-chan struct{} + cancel func() + ppBase pipeProcessor + + shards []pipeTopkProcessorShard + + maxStateSize int64 + stateSizeBudget atomic.Int64 +} + +type pipeTopkProcessorShard struct { + pipeTopkProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeTopkProcessorShardNopad{})%128]byte +} + +type pipeTopkProcessorShardNopad struct { + // ps points to the parent pipeSort. + ps *pipeSort + + // rows contains rows tracked by the given shard. + rows []*pipeTopkRow + + // rowNext points to the next index at rows during merge shards phase + rowNext int + + // tmpRow is used as a temporary row when determining whether the next ingested row must be stored in the shard. + tmpRow pipeTopkRow + + // these are aux fields for determining whether the next row must be stored in rows. + byColumnValues [][]string + otherColumnValues []pipeTopkOtherColumn + byColumns []string + otherColumns []Field + + // stateSizeBudget is the remaining budget for the whole state size for the shard. + // The per-shard budget is provided in chunks from the parent pipeTopkProcessor. + stateSizeBudget int +} + +type pipeTopkRow struct { + byColumns []string + otherColumns []Field +} + +type pipeTopkOtherColumn struct { + name string + values []string +} + +func (r *pipeTopkRow) clone() *pipeTopkRow { + byColumnsCopy := make([]string, len(r.byColumns)) + for i := range byColumnsCopy { + byColumnsCopy[i] = strings.Clone(r.byColumns[i]) + } + + otherColumnsCopy := make([]Field, len(r.otherColumns)) + for i := range otherColumnsCopy { + src := &r.otherColumns[i] + dst := &otherColumnsCopy[i] + dst.Name = strings.Clone(src.Name) + dst.Value = strings.Clone(src.Value) + } + + return &pipeTopkRow{ + byColumns: byColumnsCopy, + otherColumns: otherColumnsCopy, + } +} + +func (r *pipeTopkRow) sizeBytes() int { + n := int(unsafe.Sizeof(*r)) + + for _, v := range r.byColumns { + n += len(v) + } + n += len(r.byColumns) * int(unsafe.Sizeof(r.byColumns[0])) + + for _, f := range r.otherColumns { + n += len(f.Name) + len(f.Value) + } + n += len(r.otherColumns) * int(unsafe.Sizeof(r.otherColumns[0])) + + return n +} + +func (shard *pipeTopkProcessorShard) Len() int { + return len(shard.rows) +} + +func (shard *pipeTopkProcessorShard) Swap(i, j int) { + rows := shard.rows + rows[i], rows[j] = rows[j], rows[i] +} + +func (shard *pipeTopkProcessorShard) Less(i, j int) bool { + rows := shard.rows + + // This is max heap + return topkLess(shard.ps, rows[j], rows[i]) +} + +func (shard *pipeTopkProcessorShard) Push(x any) { + r := x.(*pipeTopkRow) + shard.rows = append(shard.rows, r) +} + +func (shard *pipeTopkProcessorShard) Pop() any { + rows := shard.rows + x := rows[len(rows)-1] + rows[len(rows)-1] = nil + shard.rows = rows[:len(rows)-1] + return x +} + +// writeBlock writes br to shard. +func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { + cs := br.getColumns() + + byFields := shard.ps.byFields + if len(byFields) == 0 { + // Sort by all the fields + + byColumnValues := shard.byColumnValues[:0] + for _, c := range cs { + byColumnValues = append(byColumnValues, c.getValues(br)) + } + shard.byColumnValues = byColumnValues + + byColumns := shard.byColumns[:0] + otherColumns := shard.otherColumns[:0] + bb := bbPool.Get() + for rowIdx := range br.timestamps { + byColumns = byColumns[:0] + bb.B = bb.B[:0] + for i, values := range byColumnValues { + v := values[rowIdx] + bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v) + bb.B = append(bb.B, ',') + } + byColumns = append(byColumns, bytesutil.ToUnsafeString(bb.B)) + + otherColumns = otherColumns[:0] + for i, values := range byColumnValues { + otherColumns = append(otherColumns, Field{ + Name: cs[i].name, + Value: values[rowIdx], + }) + } + + shard.addRow(byColumns, otherColumns) + } + bbPool.Put(bb) + shard.byColumns = byColumns + shard.otherColumns = otherColumns + } else { + // Sort by byFields + + byColumnValues := shard.byColumnValues[:0] + for _, bf := range byFields { + c := br.getColumnByName(bf.name) + byColumnValues = append(byColumnValues, c.getValues(br)) + } + shard.byColumnValues = byColumnValues + + otherColumnValues := shard.otherColumnValues[:0] + for _, c := range cs { + isByField := false + for _, bf := range byFields { + if bf.name == c.name { + isByField = true + break + } + } + if !isByField { + otherColumnValues = append(otherColumnValues, pipeTopkOtherColumn{ + name: c.name, + values: c.getValues(br), + }) + } + } + shard.otherColumnValues = otherColumnValues + + // add rows to shard + byColumns := shard.byColumns[:0] + otherColumns := shard.otherColumns[:0] + for rowIdx := range br.timestamps { + byColumns = byColumns[:0] + for _, values := range byColumnValues { + byColumns = append(byColumns, values[rowIdx]) + } + + otherColumns = otherColumns[:0] + for _, ocv := range otherColumnValues { + otherColumns = append(otherColumns, Field{ + Name: ocv.name, + Value: ocv.values[rowIdx], + }) + } + + shard.addRow(byColumns, otherColumns) + } + shard.byColumns = byColumns + shard.otherColumns = otherColumns + } +} + +func (shard *pipeTopkProcessorShard) addRow(byColumns []string, otherColumns []Field) { + r := &shard.tmpRow + r.byColumns = byColumns + r.otherColumns = otherColumns + + rows := shard.rows + if len(rows) > 0 && !topkLess(shard.ps, r, rows[0]) { + // Fast path - nothing to add. + return + } + + // Slow path - add r to shard.rows. + r = r.clone() + shard.stateSizeBudget -= r.sizeBytes() + if uint64(len(rows)) < shard.ps.limit { + heap.Push(shard, r) + shard.stateSizeBudget -= int(unsafe.Sizeof(r)) + } else { + shard.stateSizeBudget += rows[0].sizeBytes() + rows[0] = r + heap.Fix(shard, 0) + } +} + +func (shard *pipeTopkProcessorShard) sortRows(stopCh <-chan struct{}) { + rows := shard.rows + for i := len(rows) - 1; i > 0; i-- { + x := heap.Pop(shard) + rows[i] = x.(*pipeTopkRow) + + if needStop(stopCh) { + return + } + } + shard.rows = rows +} + +func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &ptp.shards[workerID] + + for shard.stateSizeBudget < 0 { + // steal some budget for the state size from the global budget. + remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk) + if remaining < 0 { + // The state size is too big. Stop processing data in order to avoid OOM crash. + if remaining+stateSizeBudgetChunk >= 0 { + // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. + ptp.cancel() + } + return + } + shard.stateSizeBudget += stateSizeBudgetChunk + } + + shard.writeBlock(br) +} + +func (ptp *pipeTopkProcessor) flush() error { + if n := ptp.stateSizeBudget.Load(); n <= 0 { + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.ps.String(), ptp.maxStateSize/(1<<20)) + } + + if needStop(ptp.stopCh) { + return nil + } + + // Sort every shard in parallel + var wg sync.WaitGroup + shards := ptp.shards + for i := range shards { + wg.Add(1) + go func(shard *pipeTopkProcessorShard) { + shard.sortRows(ptp.stopCh) + wg.Done() + }(&shards[i]) + } + wg.Wait() + + if needStop(ptp.stopCh) { + return nil + } + + // Merge sorted results across shards + sh := pipeTopkProcessorShardsHeap(make([]*pipeTopkProcessorShard, 0, len(shards))) + for i := range shards { + shard := &shards[i] + if len(shard.rows) > 0 { + sh = append(sh, shard) + } + } + if len(sh) == 0 { + return nil + } + + heap.Init(&sh) + + wctx := &pipeTopkWriteContext{ + ptp: ptp, + } + shardNextIdx := 0 + + for len(sh) > 1 { + shard := sh[0] + if !wctx.writeNextRow(shard) { + break + } + + if shard.rowNext >= len(shard.rows) { + _ = heap.Pop(&sh) + shardNextIdx = 0 + + if needStop(ptp.stopCh) { + return nil + } + + continue + } + + if shardNextIdx == 0 { + shardNextIdx = 1 + if len(sh) > 2 && sh.Less(2, 1) { + shardNextIdx = 2 + } + } + + if sh.Less(shardNextIdx, 0) { + heap.Fix(&sh, 0) + shardNextIdx = 0 + + if needStop(ptp.stopCh) { + return nil + } + } + } + if len(sh) == 1 { + shard := sh[0] + for shard.rowNext < len(shard.rows) { + if !wctx.writeNextRow(shard) { + break + } + } + } + wctx.flush() + + return nil +} + +type pipeTopkWriteContext struct { + ptp *pipeTopkProcessor + rcs []resultColumn + br blockResult + + rowsWritten uint64 + valuesLen int +} + +func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool { + ps := shard.ps + + rowIdx := shard.rowNext + shard.rowNext++ + + wctx.rowsWritten++ + if wctx.rowsWritten <= ps.offset { + return true + } + if wctx.rowsWritten > ps.offset+ps.limit { + return false + } + + r := shard.rows[rowIdx] + + byFields := ps.byFields + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(byFields)+len(r.otherColumns) + if areEqualColumns { + for i, c := range r.otherColumns { + if rcs[len(byFields)+i].name != c.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to bbBase and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, bf := range byFields { + rcs = append(rcs, resultColumn{ + name: bf.name, + }) + } + for _, c := range r.otherColumns { + rcs = append(rcs, resultColumn{ + name: c.Name, + }) + } + wctx.rcs = rcs + } + + byColumns := r.byColumns + for i := range byFields { + v := byColumns[i] + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + + for i, c := range r.otherColumns { + v := c.Value + rcs[len(byFields)+i].addValue(v) + wctx.valuesLen += len(v) + } + + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } + + return true +} + +func (wctx *pipeTopkWriteContext) flush() { + rcs := wctx.rcs + br := &wctx.br + + wctx.valuesLen = 0 + + if len(rcs) == 0 { + return + } + + // Flush rcs to ppBase + br.setResultColumns(rcs) + wctx.ptp.ppBase.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].resetKeepName() + } +} + +type pipeTopkProcessorShardsHeap []*pipeTopkProcessorShard + +func (sh *pipeTopkProcessorShardsHeap) Len() int { + return len(*sh) +} + +func (sh *pipeTopkProcessorShardsHeap) Swap(i, j int) { + a := *sh + a[i], a[j] = a[j], a[i] +} + +func (sh *pipeTopkProcessorShardsHeap) Less(i, j int) bool { + a := *sh + shardA := a[i] + shardB := a[j] + return topkLess(shardA.ps, shardA.rows[shardA.rowNext], shardB.rows[shardB.rowNext]) +} + +func (sh *pipeTopkProcessorShardsHeap) Push(x any) { + shard := x.(*pipeTopkProcessorShard) + *sh = append(*sh, shard) +} + +func (sh *pipeTopkProcessorShardsHeap) Pop() any { + a := *sh + x := a[len(a)-1] + a[len(a)-1] = nil + *sh = a[:len(a)-1] + return x +} + +func topkLess(ps *pipeSort, a, b *pipeTopkRow) bool { + byFields := ps.byFields + + csA := a.byColumns + csB := b.byColumns + + for k := range csA { + isDesc := ps.isDesc + if len(byFields) > 0 && byFields[k].isDesc { + isDesc = !isDesc + } + + vA := csA[k] + vB := csB[k] + + if vA == vB { + continue + } + + if isDesc { + return stringsutil.LessNatural(vB, vA) + } + return stringsutil.LessNatural(vA, vB) + } + return false +} diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 6cdd46fc7d..3b72aaac2c 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -209,10 +209,8 @@ func (pup *pipeUniqProcessor) flush() error { m := shards[0].m shards = shards[1:] for i := range shards { - select { - case <-pup.stopCh: + if needStop(pup.stopCh) { return nil - default: } for k := range shards[i].m { @@ -229,10 +227,8 @@ func (pup *pipeUniqProcessor) flush() error { if len(byFields) == 0 { for k := range m { - select { - case <-pup.stopCh: + if needStop(pup.stopCh) { return nil - default: } rowFields = rowFields[:0] @@ -259,10 +255,8 @@ func (pup *pipeUniqProcessor) flush() error { } } else { for k := range m { - select { - case <-pup.stopCh: + if needStop(pup.stopCh) { return nil - default: } rowFields = rowFields[:0] diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index 62e215d5f4..1c1f7dbf51 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -86,7 +86,7 @@ func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) { } func (smp *statsMaxProcessor) finalizeStats() string { - return strconv.FormatFloat(smp.max, 'g', -1, 64) + return strconv.FormatFloat(smp.max, 'f', -1, 64) } func parseStatsMax(lex *lexer) (*statsMax, error) { diff --git a/lib/logstorage/stats_median.go b/lib/logstorage/stats_median.go new file mode 100644 index 0000000000..30cc3f1b1e --- /dev/null +++ b/lib/logstorage/stats_median.go @@ -0,0 +1,65 @@ +package logstorage + +import ( + "slices" + "unsafe" +) + +type statsMedian struct { + fields []string + containsStar bool +} + +func (sm *statsMedian) String() string { + return "median(" + fieldNamesString(sm.fields) + ")" +} + +func (sm *statsMedian) neededFields() []string { + return sm.fields +} + +func (sm *statsMedian) newStatsProcessor() (statsProcessor, int) { + smp := &statsMedianProcessor{ + sqp: &statsQuantileProcessor{ + sq: &statsQuantile{ + fields: sm.fields, + containsStar: sm.containsStar, + phi: 0.5, + }, + }, + } + return smp, int(unsafe.Sizeof(*smp)) + int(unsafe.Sizeof(*smp.sqp)) + int(unsafe.Sizeof(*smp.sqp.sq)) +} + +type statsMedianProcessor struct { + sqp *statsQuantileProcessor +} + +func (smp *statsMedianProcessor) updateStatsForAllRows(br *blockResult) int { + return smp.sqp.updateStatsForAllRows(br) +} + +func (smp *statsMedianProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + return smp.sqp.updateStatsForRow(br, rowIdx) +} + +func (smp *statsMedianProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsMedianProcessor) + smp.sqp.mergeState(src.sqp) +} + +func (smp *statsMedianProcessor) finalizeStats() string { + return smp.sqp.finalizeStats() +} + +func parseStatsMedian(lex *lexer) (*statsMedian, error) { + fields, err := parseFieldNamesForStatsFunc(lex, "median") + if err != nil { + return nil, err + } + sm := &statsMedian{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + } + return sm, nil +} diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index bf157cb203..5dcf1dd970 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -86,7 +86,7 @@ func (smp *statsMinProcessor) mergeState(sfp statsProcessor) { } func (smp *statsMinProcessor) finalizeStats() string { - return strconv.FormatFloat(smp.min, 'g', -1, 64) + return strconv.FormatFloat(smp.min, 'f', -1, 64) } func parseStatsMin(lex *lexer) (*statsMin, error) { diff --git a/lib/logstorage/stats_quantile.go b/lib/logstorage/stats_quantile.go new file mode 100644 index 0000000000..cbdcfc07c9 --- /dev/null +++ b/lib/logstorage/stats_quantile.go @@ -0,0 +1,215 @@ +package logstorage + +import ( + "fmt" + "math" + "slices" + "strconv" + "unsafe" + + "github.com/valyala/fastrand" +) + +type statsQuantile struct { + fields []string + containsStar bool + + phi float64 +} + +func (sq *statsQuantile) String() string { + return fmt.Sprintf("quantile(%g, %s)", sq.phi, fieldNamesString(sq.fields)) +} + +func (sq *statsQuantile) neededFields() []string { + return sq.fields +} + +func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) { + sqp := &statsQuantileProcessor{ + sq: sq, + } + return sqp, int(unsafe.Sizeof(*sqp)) +} + +type statsQuantileProcessor struct { + sq *statsQuantile + + h histogram +} + +func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int { + h := &sqp.h + stateSizeIncrease := 0 + + if sqp.sq.containsStar { + for _, c := range br.getColumns() { + for _, v := range c.getValues(br) { + f, ok := tryParseFloat64(v) + if ok { + stateSizeIncrease += h.update(f) + } + } + } + } else { + for _, field := range sqp.sq.fields { + c := br.getColumnByName(field) + for _, v := range c.getValues(br) { + f, ok := tryParseFloat64(v) + if ok { + stateSizeIncrease += h.update(f) + } + } + } + } + + return stateSizeIncrease +} + +func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + h := &sqp.h + stateSizeIncrease := 0 + + if sqp.sq.containsStar { + for _, c := range br.getColumns() { + f := c.getFloatValueAtRow(rowIdx) + if !math.IsNaN(f) { + stateSizeIncrease += h.update(f) + } + } + } else { + for _, field := range sqp.sq.fields { + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if !math.IsNaN(f) { + stateSizeIncrease += h.update(f) + } + } + } + + return stateSizeIncrease +} + +func (sqp *statsQuantileProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsQuantileProcessor) + sqp.h.mergeState(&src.h) +} + +func (sqp *statsQuantileProcessor) finalizeStats() string { + q := sqp.h.quantile(sqp.sq.phi) + return strconv.FormatFloat(q, 'f', -1, 64) +} + +func parseStatsQuantile(lex *lexer) (*statsQuantile, error) { + if !lex.isKeyword("quantile") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "quantile") + } + lex.nextToken() + + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'quantile' args: %w", err) + } + if len(fields) < 2 { + return nil, fmt.Errorf("'quantile' must have at least two args: phi and field name") + } + + // Parse phi + phi, ok := tryParseFloat64(fields[0]) + if !ok { + return nil, fmt.Errorf("phi arg in 'quantile' must be floating point number; got %q", fields[0]) + } + if phi < 0 || phi > 1 { + return nil, fmt.Errorf("phi arg in 'quantile' must be in the range [0..1]; got %q", fields[0]) + } + + // Parse fields + fields = fields[1:] + if slices.Contains(fields, "*") { + fields = []string{"*"} + } + + sq := &statsQuantile{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + + phi: phi, + } + return sq, nil +} + +type histogram struct { + a []float64 + min float64 + max float64 + count uint64 + + rng fastrand.RNG +} + +func (h *histogram) update(f float64) int { + if h.count == 0 || f < h.min { + h.min = f + } + if h.count == 0 || f > h.max { + h.max = f + } + + h.count++ + if len(h.a) < maxHistogramSamples { + h.a = append(h.a, f) + return int(unsafe.Sizeof(f)) + } + + if n := h.rng.Uint32n(uint32(h.count)); n < uint32(len(h.a)) { + h.a[n] = f + } + return 0 +} + +const maxHistogramSamples = 100_000 + +func (h *histogram) mergeState(src *histogram) { + if src.count == 0 { + // Nothing to merge + return + } + if h.count == 0 { + h.a = append(h.a, src.a...) + h.min = src.min + h.max = src.max + h.count = src.count + return + } + + h.a = append(h.a, src.a...) + if src.min < h.min { + h.min = src.min + } + if src.max > h.max { + h.max = src.max + } + h.count += src.count +} + +func (h *histogram) quantile(phi float64) float64 { + if len(h.a) == 0 { + return nan + } + if len(h.a) == 1 { + return h.a[0] + } + if phi <= 0 { + return h.min + } + if phi >= 1 { + return h.max + } + + slices.Sort(h.a) + idx := int(phi * float64(len(h.a))) + if idx == len(h.a) { + return h.max + } + return h.a[idx] +} diff --git a/lib/logstorage/stats_quantile_test.go b/lib/logstorage/stats_quantile_test.go new file mode 100644 index 0000000000..f497258ade --- /dev/null +++ b/lib/logstorage/stats_quantile_test.go @@ -0,0 +1,55 @@ +package logstorage + +import ( + "math" + "testing" +) + +func TestHistogramQuantile(t *testing.T) { + f := func(a []float64, phi, qExpected float64) { + t.Helper() + + var h histogram + for _, f := range a { + h.update(f) + } + q := h.quantile(phi) + + if math.IsNaN(qExpected) { + if !math.IsNaN(q) { + t.Fatalf("unexpected result for q=%v, phi=%v; got %v; want %v", a, phi, q, qExpected) + } + } else if q != qExpected { + t.Fatalf("unexpected result for q=%v, phi=%v; got %v; want %v", a, phi, q, qExpected) + } + } + + f(nil, -1, nan) + f(nil, 0, nan) + f(nil, 0.5, nan) + f(nil, 1, nan) + f(nil, 10, nan) + + f([]float64{123}, -1, 123) + f([]float64{123}, 0, 123) + f([]float64{123}, 0.5, 123) + f([]float64{123}, 1, 123) + f([]float64{123}, 10, 123) + + f([]float64{5, 1}, -1, 1) + f([]float64{5, 1}, 0, 1) + f([]float64{5, 1}, 0.5-1e-5, 1) + f([]float64{5, 1}, 0.5, 5) + f([]float64{5, 1}, 1, 5) + f([]float64{5, 1}, 10, 5) + + f([]float64{5, 1, 3}, -1, 1) + f([]float64{5, 1, 3}, 0, 1) + f([]float64{5, 1, 3}, 1.0/3-1e-5, 1) + f([]float64{5, 1, 3}, 1.0/3, 3) + f([]float64{5, 1, 3}, 2.0/3-1e-5, 3) + f([]float64{5, 1, 3}, 2.0/3, 5) + f([]float64{5, 1, 3}, 1-1e-5, 5) + f([]float64{5, 1, 3}, 1, 5) + f([]float64{5, 1, 3}, 10, 5) +} diff --git a/lib/logstorage/stats_sum_len.go b/lib/logstorage/stats_sum_len.go new file mode 100644 index 0000000000..f09e48418b --- /dev/null +++ b/lib/logstorage/stats_sum_len.go @@ -0,0 +1,89 @@ +package logstorage + +import ( + "slices" + "strconv" + "unsafe" +) + +type statsSumLen struct { + fields []string + containsStar bool +} + +func (ss *statsSumLen) String() string { + return "sum_len(" + fieldNamesString(ss.fields) + ")" +} + +func (ss *statsSumLen) neededFields() []string { + return ss.fields +} + +func (ss *statsSumLen) newStatsProcessor() (statsProcessor, int) { + ssp := &statsSumLenProcessor{ + ss: ss, + sumLen: 0, + } + return ssp, int(unsafe.Sizeof(*ssp)) +} + +type statsSumLenProcessor struct { + ss *statsSumLen + + sumLen uint64 +} + +func (ssp *statsSumLenProcessor) updateStatsForAllRows(br *blockResult) int { + if ssp.ss.containsStar { + // Sum all the columns + for _, c := range br.getColumns() { + ssp.sumLen += c.sumLenValues(br) + } + } else { + // Sum the requested columns + for _, field := range ssp.ss.fields { + c := br.getColumnByName(field) + ssp.sumLen += c.sumLenValues(br) + } + } + return 0 +} + +func (ssp *statsSumLenProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if ssp.ss.containsStar { + // Sum all the fields for the given row + for _, c := range br.getColumns() { + v := c.getValueAtRow(br, rowIdx) + ssp.sumLen += uint64(len(v)) + } + } else { + // Sum only the given fields for the given row + for _, field := range ssp.ss.fields { + c := br.getColumnByName(field) + v := c.getValueAtRow(br, rowIdx) + ssp.sumLen += uint64(len(v)) + } + } + return 0 +} + +func (ssp *statsSumLenProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsSumLenProcessor) + ssp.sumLen += src.sumLen +} + +func (ssp *statsSumLenProcessor) finalizeStats() string { + return strconv.FormatUint(ssp.sumLen, 10) +} + +func parseStatsSumLen(lex *lexer) (*statsSumLen, error) { + fields, err := parseFieldNamesForStatsFunc(lex, "sum_len") + if err != nil { + return nil, err + } + ss := &statsSumLen{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + } + return ss, nil +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 2b1ea3ba41..5ed9b80d06 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -182,12 +182,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch bsws := bswb.bsws for i := range bsws { bsw := &bsws[i] - select { - case <-stopCh: + if needStop(stopCh) { // The search has been canceled. Just skip all the scheduled work in order to save CPU time. bsw.reset() continue - default: } bs.search(bsw) @@ -266,11 +264,9 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs type partitionSearchFinalizer func() func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { - select { - case <-stopCh: + if needStop(stopCh) { // Do not spend CPU time on search, since it is already stopped. return func() {} - default: } tenantIDs := so.tenantIDs @@ -436,10 +432,8 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c // it is assumed that ibhs are sorted ibhs := p.indexBlockHeaders for len(ibhs) > 0 && len(tenantIDs) > 0 { - select { - case <-stopCh: + if needStop(stopCh) { return - default: } // locate tenantID equal or bigger than the tenantID in ibhs[0] @@ -541,10 +535,8 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c ibhs := p.indexBlockHeaders for len(ibhs) > 0 && len(streamIDs) > 0 { - select { - case <-stopCh: + if needStop(stopCh) { return - default: } // locate streamID equal or bigger than the streamID in ibhs[0]