mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-12 12:46:23 +01:00
lib/logstorage: work-in-progress
This commit is contained in:
parent
886f545f81
commit
d5224f3363
@ -1,13 +1,13 @@
|
||||
{
|
||||
"files": {
|
||||
"main.css": "./static/css/main.2fa7c03f.css",
|
||||
"main.js": "./static/js/main.68f1bd69.js",
|
||||
"main.css": "./static/css/main.1041c3d4.css",
|
||||
"main.js": "./static/js/main.e54f9531.js",
|
||||
"static/js/685.bebe1265.chunk.js": "./static/js/685.bebe1265.chunk.js",
|
||||
"static/media/MetricsQL.md": "./static/media/MetricsQL.cb83d071da309a358bc0.md",
|
||||
"index.html": "./index.html"
|
||||
},
|
||||
"entrypoints": [
|
||||
"static/css/main.2fa7c03f.css",
|
||||
"static/js/main.68f1bd69.js"
|
||||
"static/css/main.1041c3d4.css",
|
||||
"static/js/main.e54f9531.js"
|
||||
]
|
||||
}
|
@ -1 +1 @@
|
||||
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="./favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=5"/><meta name="theme-color" content="#000000"/><meta name="description" content="UI for VictoriaMetrics"/><link rel="apple-touch-icon" href="./apple-touch-icon.png"/><link rel="icon" type="image/png" sizes="32x32" href="./favicon-32x32.png"><link rel="manifest" href="./manifest.json"/><title>VM UI</title><script src="./dashboards/index.js" type="module"></script><meta name="twitter:card" content="summary_large_image"><meta name="twitter:image" content="./preview.jpg"><meta name="twitter:title" content="UI for VictoriaMetrics"><meta name="twitter:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta name="twitter:site" content="@VictoriaMetrics"><meta property="og:title" content="Metric explorer for VictoriaMetrics"><meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta property="og:image" content="./preview.jpg"><meta property="og:type" content="website"><script defer="defer" src="./static/js/main.68f1bd69.js"></script><link href="./static/css/main.2fa7c03f.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>
|
||||
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="./favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=5"/><meta name="theme-color" content="#000000"/><meta name="description" content="UI for VictoriaMetrics"/><link rel="apple-touch-icon" href="./apple-touch-icon.png"/><link rel="icon" type="image/png" sizes="32x32" href="./favicon-32x32.png"><link rel="manifest" href="./manifest.json"/><title>VM UI</title><script src="./dashboards/index.js" type="module"></script><meta name="twitter:card" content="summary_large_image"><meta name="twitter:image" content="./preview.jpg"><meta name="twitter:title" content="UI for VictoriaMetrics"><meta name="twitter:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta name="twitter:site" content="@VictoriaMetrics"><meta property="og:title" content="Metric explorer for VictoriaMetrics"><meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta property="og:image" content="./preview.jpg"><meta property="og:type" content="website"><script defer="defer" src="./static/js/main.e54f9531.js"></script><link href="./static/css/main.1041c3d4.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>
|
1
app/vlselect/vmui/static/css/main.1041c3d4.css
Normal file
1
app/vlselect/vmui/static/css/main.1041c3d4.css
Normal file
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
2
app/vlselect/vmui/static/js/main.e54f9531.js
Normal file
2
app/vlselect/vmui/static/js/main.e54f9531.js
Normal file
File diff suppressed because one or more lines are too long
@ -42,7 +42,7 @@ services:
|
||||
# storing logs and serving read queries.
|
||||
victorialogs:
|
||||
container_name: victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
|
||||
command:
|
||||
- "--storageDataPath=/vlogs"
|
||||
- "--httpListenAddr=:9428"
|
||||
|
@ -22,7 +22,7 @@ services:
|
||||
- -beat.uri=http://filebeat-victorialogs:5066
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-filebeat-docker-vl:/vlogs
|
||||
ports:
|
||||
|
@ -13,7 +13,7 @@ services:
|
||||
- "5140:5140"
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-filebeat-syslog-vl:/vlogs
|
||||
ports:
|
||||
|
@ -11,7 +11,7 @@ services:
|
||||
- "5140:5140"
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-fluentbit-vl:/vlogs
|
||||
ports:
|
||||
|
@ -14,7 +14,7 @@ services:
|
||||
- "5140:5140"
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-logstash-vl:/vlogs
|
||||
ports:
|
||||
|
@ -12,7 +12,7 @@ services:
|
||||
- "5140:5140"
|
||||
|
||||
vlogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-promtail-docker:/vlogs
|
||||
ports:
|
||||
|
@ -22,7 +22,7 @@ services:
|
||||
condition: service_healthy
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-vector-docker-vl:/vlogs
|
||||
ports:
|
||||
|
@ -3,7 +3,7 @@ version: '3'
|
||||
services:
|
||||
# Run `make package-victoria-logs` to build victoria-logs image
|
||||
vlogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
|
||||
volumes:
|
||||
- vlogs:/vlogs
|
||||
ports:
|
||||
|
@ -19,7 +19,12 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||
|
||||
## tip
|
||||
|
||||
## [v0.21.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.21.0-victorialogs)
|
||||
|
||||
Released at 2024-06-20
|
||||
|
||||
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add a bar chart displaying the number of log entries over a time range. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6404).
|
||||
* FEATURE: expose `_stream_id` field, which uniquely identifies [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). This field can be used for quick obtaining of all the logs belonging to a particular stream via [`_stream_id` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter).
|
||||
|
||||
## [v0.20.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.20.2-victorialogs)
|
||||
|
||||
|
@ -427,7 +427,7 @@ See also:
|
||||
|
||||
### Stream filter
|
||||
|
||||
VictoriaLogs provides an optimized way to select log entries, which belong to particular [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
|
||||
VictoriaLogs provides an optimized way to select logs, which belong to particular [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
|
||||
This can be done via `_stream:{...}` filter. The `{...}` may contain arbitrary
|
||||
[Prometheus-compatible label selector](https://docs.victoriametrics.com/keyconcepts/#filtering)
|
||||
over fields associated with [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
|
||||
@ -456,9 +456,34 @@ Performance tips:
|
||||
|
||||
See also:
|
||||
|
||||
- [`_stream_id` filter](#_stream_id-filter)
|
||||
- [Time filter](#time-filter)
|
||||
- [Exact filter](#exact-filter)
|
||||
|
||||
### _stream_id filter
|
||||
|
||||
Every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) in VictoriaMetrics is uniquely identified by `_stream_id` field.
|
||||
The `_stream_id:...` filter allows quickly selecting all the logs belonging to the particular stream.
|
||||
|
||||
For example, the following query selects all the logs, which belong to the [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
|
||||
with `_stream_id` equal to `0000007b000001c850d9950ea6196b1a4812081265faa1c7`:
|
||||
|
||||
```logsql
|
||||
_stream_id:0000007b000001c850d9950ea6196b1a4812081265faa1c7
|
||||
```
|
||||
|
||||
If the log stream contains too many logs, then it is good idea limiting the number of returned logs with [time filter](#time-filter). For example, the following
|
||||
query selects logs for the given stream for the last hour:
|
||||
|
||||
```logsql
|
||||
_time:1h _stream_id:0000007b000001c850d9950ea6196b1a4812081265faa1c7
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [stream filter](#stream-filter)
|
||||
|
||||
|
||||
### Word filter
|
||||
|
||||
The simplest LogsQL query consists of a single [word](#word) to search in log messages. For example, the following query matches
|
||||
|
@ -36,8 +36,8 @@ Just download archive for the needed Operating system and architecture, unpack i
|
||||
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
|
||||
|
||||
```sh
|
||||
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.20.2-victorialogs/victoria-logs-linux-amd64-v0.20.2-victorialogs.tar.gz
|
||||
tar xzf victoria-logs-linux-amd64-v0.20.2-victorialogs.tar.gz
|
||||
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.21.0-victorialogs/victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz
|
||||
tar xzf victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz
|
||||
./victoria-logs-prod
|
||||
```
|
||||
|
||||
@ -61,7 +61,7 @@ Here is the command to run VictoriaLogs in a Docker container:
|
||||
|
||||
```sh
|
||||
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
|
||||
docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
|
||||
docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
|
||||
```
|
||||
|
||||
See also:
|
||||
|
@ -161,7 +161,7 @@ the search to a particular time range.
|
||||
|
||||
### Stream fields
|
||||
|
||||
Some [structured logging](#data-model) fields may uniquely identify the application instance, which generates log entries.
|
||||
Some [structured logging](#data-model) fields may uniquely identify the application instance, which generates logs.
|
||||
This may be either a single field such as `instance="host123:456"` or a set of fields such as
|
||||
`{datacenter="...", env="...", job="...", instance="..."}` or
|
||||
`{kubernetes.namespace="...", kubernetes.node.name="...", kubernetes.pod.name="...", kubernetes.container.name="..."}`.
|
||||
@ -176,16 +176,18 @@ This provides the following benefits:
|
||||
- Increased query performance, since VictoriaLogs needs to scan lower amounts of data
|
||||
when [searching by stream fields](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter).
|
||||
|
||||
Every ingested log entry is associated with a log stream. The name of this stream is stored in `_stream` field.
|
||||
This field has the format similar to [labels in Prometheus metrics](https://docs.victoriametrics.com/keyconcepts/#labels):
|
||||
Every ingested log entry is associated with a log stream. Every log stream consists of two fields:
|
||||
|
||||
```
|
||||
{field1="value1", ..., fieldN="valueN"}
|
||||
```
|
||||
- `_stream_id` - this is an unique identifier for the log stream. All the logs for the particular stream can be selected
|
||||
via [`_stream_id:...` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter).
|
||||
|
||||
For example, if `host` and `app` fields are associated with the stream, then the `_stream` field will have `{host="host-123",app="my-app"}` value
|
||||
for the log entry with `host="host-123"` and `app="my-app"` fields. The `_stream` field can be searched
|
||||
with [stream filters](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter).
|
||||
- `_stream` - this field contains stream labels in the format similar to [labels in Prometheus metrics](https://docs.victoriametrics.com/keyconcepts/#labels):
|
||||
```
|
||||
{field1="value1", ..., fieldN="valueN"}
|
||||
```
|
||||
For example, if `host` and `app` fields are associated with the stream, then the `_stream` field will have `{host="host-123",app="my-app"}` value
|
||||
for the log entry with `host="host-123"` and `app="my-app"` fields. The `_stream` field can be searched
|
||||
with [stream filters](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter).
|
||||
|
||||
By default the value of `_stream` field is `{}`, since VictoriaLogs cannot determine automatically,
|
||||
which fields uniquely identify every log stream. This may lead to not-so-optimal resource usage and query performance.
|
||||
|
@ -263,10 +263,15 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) {
|
||||
br.addTimeColumn()
|
||||
}
|
||||
|
||||
if !slices.Contains(unneededColumnNames, "_stream_id") {
|
||||
// Add _stream_id column
|
||||
br.addStreamIDColumn(bs)
|
||||
}
|
||||
|
||||
if !slices.Contains(unneededColumnNames, "_stream") {
|
||||
// Add _stream column
|
||||
if !br.addStreamColumn(bs) {
|
||||
// Skip the current block, since the associated stream tags are missing.
|
||||
// Skip the current block, since the associated stream tags are missing
|
||||
br.reset()
|
||||
return
|
||||
}
|
||||
@ -315,6 +320,8 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) {
|
||||
func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) {
|
||||
for _, columnName := range bs.bsw.so.neededColumnNames {
|
||||
switch columnName {
|
||||
case "_stream_id":
|
||||
br.addStreamIDColumn(bs)
|
||||
case "_stream":
|
||||
if !br.addStreamColumn(bs) {
|
||||
// Skip the current block, since the associated stream tags are missing.
|
||||
@ -485,6 +492,13 @@ func (br *blockResult) addTimeColumn() {
|
||||
br.csInitialized = false
|
||||
}
|
||||
|
||||
func (br *blockResult) addStreamIDColumn(bs *blockSearch) {
|
||||
bb := bbPool.Get()
|
||||
bb.B = bs.bsw.bh.streamID.marshalString(bb.B)
|
||||
br.addConstColumn("_stream_id", bytesutil.ToUnsafeString(bb.B))
|
||||
bbPool.Put(bb)
|
||||
}
|
||||
|
||||
func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
|
||||
if !bs.prevStreamID.equal(&bs.bsw.bh.streamID) {
|
||||
return br.addStreamColumnSlow(bs)
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterAnd(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
|
@ -21,8 +21,14 @@ type filterAnyCasePhrase struct {
|
||||
phraseLowercaseOnce sync.Once
|
||||
phraseLowercase string
|
||||
|
||||
phraseUppercaseOnce sync.Once
|
||||
phraseUppercase string
|
||||
|
||||
tokensOnce sync.Once
|
||||
tokens []string
|
||||
|
||||
tokensUppercaseOnce sync.Once
|
||||
tokensUppercase []string
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePhrase) String() string {
|
||||
@ -42,6 +48,20 @@ func (fp *filterAnyCasePhrase) initTokens() {
|
||||
fp.tokens = tokenizeStrings(nil, []string{fp.phrase})
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePhrase) getTokensUppercase() []string {
|
||||
fp.tokensUppercaseOnce.Do(fp.initTokensUppercase)
|
||||
return fp.tokensUppercase
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePhrase) initTokensUppercase() {
|
||||
tokens := fp.getTokens()
|
||||
tokensUppercase := make([]string, len(tokens))
|
||||
for i, token := range tokens {
|
||||
tokensUppercase[i] = strings.ToUpper(token)
|
||||
}
|
||||
fp.tokensUppercase = tokensUppercase
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePhrase) getPhraseLowercase() string {
|
||||
fp.phraseLowercaseOnce.Do(fp.initPhraseLowercase)
|
||||
return fp.phraseLowercase
|
||||
@ -51,6 +71,15 @@ func (fp *filterAnyCasePhrase) initPhraseLowercase() {
|
||||
fp.phraseLowercase = strings.ToLower(fp.phrase)
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePhrase) getPhraseUppercase() string {
|
||||
fp.phraseUppercaseOnce.Do(fp.initPhraseUppercase)
|
||||
return fp.phraseUppercase
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePhrase) initPhraseUppercase() {
|
||||
fp.phraseUppercase = strings.ToUpper(fp.phrase)
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePhrase) applyToBlockResult(br *blockResult, bm *bitmap) {
|
||||
phraseLowercase := fp.getPhraseLowercase()
|
||||
applyToBlockResultGeneric(br, bm, fp.fieldName, phraseLowercase, matchAnyCasePhrase)
|
||||
@ -100,8 +129,9 @@ func (fp *filterAnyCasePhrase) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
|
||||
case valueTypeIPv4:
|
||||
matchIPv4ByPhrase(bs, ch, bm, phraseLowercase, tokens)
|
||||
case valueTypeTimestampISO8601:
|
||||
phraseUppercase := strings.ToUpper(fp.phrase)
|
||||
matchTimestampISO8601ByPhrase(bs, ch, bm, phraseUppercase, tokens)
|
||||
phraseUppercase := fp.getPhraseUppercase()
|
||||
tokensUppercase := fp.getTokensUppercase()
|
||||
matchTimestampISO8601ByPhrase(bs, ch, bm, phraseUppercase, tokensUppercase)
|
||||
default:
|
||||
logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
|
||||
}
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestMatchAnyCasePhrase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s, phraseLowercase string, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := matchAnyCasePhrase(s, phraseLowercase)
|
||||
@ -39,7 +41,11 @@ func TestMatchAnyCasePhrase(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("single-row", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -113,6 +119,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "other-column",
|
||||
@ -222,6 +230,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -277,6 +287,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -347,6 +359,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -412,6 +426,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -476,6 +492,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -540,6 +558,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -603,6 +623,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -708,6 +730,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -804,6 +828,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -22,8 +22,14 @@ type filterAnyCasePrefix struct {
|
||||
prefixLowercaseOnce sync.Once
|
||||
prefixLowercase string
|
||||
|
||||
prefixUppercaseOnce sync.Once
|
||||
prefixUppercase string
|
||||
|
||||
tokensOnce sync.Once
|
||||
tokens []string
|
||||
|
||||
tokensUppercaseOnce sync.Once
|
||||
tokensUppercase []string
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePrefix) String() string {
|
||||
@ -46,6 +52,20 @@ func (fp *filterAnyCasePrefix) initTokens() {
|
||||
fp.tokens = getTokensSkipLast(fp.prefix)
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePrefix) getTokensUppercase() []string {
|
||||
fp.tokensUppercaseOnce.Do(fp.initTokensUppercase)
|
||||
return fp.tokensUppercase
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePrefix) initTokensUppercase() {
|
||||
tokens := fp.getTokens()
|
||||
tokensUppercase := make([]string, len(tokens))
|
||||
for i, token := range tokens {
|
||||
tokensUppercase[i] = strings.ToUpper(token)
|
||||
}
|
||||
fp.tokensUppercase = tokensUppercase
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePrefix) getPrefixLowercase() string {
|
||||
fp.prefixLowercaseOnce.Do(fp.initPrefixLowercase)
|
||||
return fp.prefixLowercase
|
||||
@ -55,6 +75,15 @@ func (fp *filterAnyCasePrefix) initPrefixLowercase() {
|
||||
fp.prefixLowercase = strings.ToLower(fp.prefix)
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePrefix) getPrefixUppercase() string {
|
||||
fp.prefixUppercaseOnce.Do(fp.initPrefixUppercase)
|
||||
return fp.prefixUppercase
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePrefix) initPrefixUppercase() {
|
||||
fp.prefixUppercase = strings.ToUpper(fp.prefix)
|
||||
}
|
||||
|
||||
func (fp *filterAnyCasePrefix) applyToBlockResult(br *blockResult, bm *bitmap) {
|
||||
prefixLowercase := fp.getPrefixLowercase()
|
||||
applyToBlockResultGeneric(br, bm, fp.fieldName, prefixLowercase, matchAnyCasePrefix)
|
||||
@ -101,8 +130,9 @@ func (fp *filterAnyCasePrefix) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
|
||||
case valueTypeIPv4:
|
||||
matchIPv4ByPrefix(bs, ch, bm, prefixLowercase, tokens)
|
||||
case valueTypeTimestampISO8601:
|
||||
prefixUppercase := strings.ToUpper(fp.prefix)
|
||||
matchTimestampISO8601ByPrefix(bs, ch, bm, prefixUppercase, tokens)
|
||||
prefixUppercase := fp.getPrefixUppercase()
|
||||
tokensUppercase := fp.getTokensUppercase()
|
||||
matchTimestampISO8601ByPrefix(bs, ch, bm, prefixUppercase, tokensUppercase)
|
||||
default:
|
||||
logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
|
||||
}
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestMatchAnyCasePrefix(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s, prefixLowercase string, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := matchAnyCasePrefix(s, prefixLowercase)
|
||||
@ -39,7 +41,11 @@ func TestMatchAnyCasePrefix(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("single-row", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -131,6 +137,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "other-column",
|
||||
@ -246,6 +254,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -301,6 +311,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -383,6 +395,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -448,6 +462,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -512,6 +528,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -576,6 +594,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -639,6 +659,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -750,6 +772,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -846,6 +870,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -115,7 +115,7 @@ func (fr *filterDayRange) matchTimestampValue(timestamp int64) bool {
|
||||
}
|
||||
|
||||
func (fr *filterDayRange) dayRangeOffset(timestamp int64) int64 {
|
||||
timestamp += fr.offset
|
||||
timestamp -= fr.offset
|
||||
return timestamp % nsecsPerDay
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterDayRange(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
timestamps := []int64{
|
||||
1,
|
||||
9,
|
||||
@ -35,7 +37,7 @@ func TestFilterDayRange(t *testing.T) {
|
||||
ft = &filterDayRange{
|
||||
start: 1,
|
||||
end: 1,
|
||||
offset: 9,
|
||||
offset: 8,
|
||||
}
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
|
||||
|
||||
@ -44,7 +46,7 @@ func TestFilterDayRange(t *testing.T) {
|
||||
end: 10,
|
||||
offset: -9,
|
||||
}
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, []int{0})
|
||||
|
||||
ft = &filterDayRange{
|
||||
start: 2,
|
||||
|
@ -5,7 +5,11 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterExactPrefix(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("single-row", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -55,6 +59,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -106,6 +112,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -149,6 +157,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -201,6 +211,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -254,6 +266,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -307,6 +321,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -360,6 +376,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -384,7 +402,7 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
fieldName: "foo",
|
||||
prefix: "12",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fep, "foo", []int{0, 1, 5})
|
||||
testFilterMatchForColumns(t, columns, fep, "foo", []int{0, 1, 5, 9})
|
||||
|
||||
fep = &filterExactPrefix{
|
||||
fieldName: "foo",
|
||||
@ -413,6 +431,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -470,6 +490,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -495,7 +517,7 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
fieldName: "foo",
|
||||
prefix: "127.0.",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fep, "foo", []int{2, 4, 5, 7})
|
||||
testFilterMatchForColumns(t, columns, fep, "foo", []int{2, 4, 5, 6, 7})
|
||||
|
||||
fep = &filterExactPrefix{
|
||||
fieldName: "foo",
|
||||
@ -518,6 +540,8 @@ func TestFilterExactPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -5,7 +5,11 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterExact(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("single-row", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -43,6 +47,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -88,6 +94,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -131,6 +139,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -183,6 +193,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -236,6 +248,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -289,6 +303,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -342,6 +358,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -395,6 +413,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -470,6 +490,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -530,6 +552,8 @@ func TestFilterExact(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -7,7 +7,11 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterIn(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("single-row", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -75,6 +79,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -126,6 +132,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -181,6 +189,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -233,6 +243,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -298,6 +310,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -363,6 +377,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -428,6 +444,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -487,6 +505,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -568,6 +588,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -634,6 +656,8 @@ func TestFilterIn(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestMatchIPv4Range(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s string, minValue, maxValue uint32, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := matchIPv4Range(s, minValue, maxValue)
|
||||
@ -28,7 +30,11 @@ func TestMatchIPv4Range(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterIPv4Range(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -79,6 +85,8 @@ func TestFilterIPv4Range(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -134,6 +142,8 @@ func TestFilterIPv4Range(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -177,6 +187,8 @@ func TestFilterIPv4Range(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -206,6 +218,8 @@ func TestFilterIPv4Range(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -235,6 +249,8 @@ func TestFilterIPv4Range(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -264,6 +280,8 @@ func TestFilterIPv4Range(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -293,6 +311,8 @@ func TestFilterIPv4Range(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -322,6 +342,8 @@ func TestFilterIPv4Range(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -374,6 +396,8 @@ func TestFilterIPv4Range(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestMatchLenRange(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s string, minLen, maxLen uint64, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := matchLenRange(s, minLen, maxLen)
|
||||
@ -31,7 +33,11 @@ func TestMatchLenRange(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterLenRange(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -75,6 +81,8 @@ func TestFilterLenRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -115,6 +123,8 @@ func TestFilterLenRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -151,6 +161,8 @@ func TestFilterLenRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -176,7 +188,7 @@ func TestFilterLenRange(t *testing.T) {
|
||||
minLen: 2,
|
||||
maxLen: 2,
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{2, 3, 6})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 2, 5})
|
||||
|
||||
// mismatch
|
||||
fr = &filterLenRange{
|
||||
@ -195,6 +207,8 @@ func TestFilterLenRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -220,7 +234,7 @@ func TestFilterLenRange(t *testing.T) {
|
||||
minLen: 2,
|
||||
maxLen: 2,
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{2, 3, 6})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 2, 5})
|
||||
|
||||
// mismatch
|
||||
fr = &filterLenRange{
|
||||
@ -239,6 +253,8 @@ func TestFilterLenRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -264,7 +280,7 @@ func TestFilterLenRange(t *testing.T) {
|
||||
minLen: 2,
|
||||
maxLen: 2,
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{2, 3, 6})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 2, 5})
|
||||
|
||||
// mismatch
|
||||
fr = &filterLenRange{
|
||||
@ -283,6 +299,8 @@ func TestFilterLenRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -308,7 +326,7 @@ func TestFilterLenRange(t *testing.T) {
|
||||
minLen: 2,
|
||||
maxLen: 2,
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{2, 3, 6})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 2, 5})
|
||||
|
||||
// mismatch
|
||||
fr = &filterLenRange{
|
||||
@ -327,6 +345,8 @@ func TestFilterLenRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -364,6 +384,8 @@ func TestFilterLenRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -402,6 +424,8 @@ func TestFilterLenRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterNot(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterOr(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestMatchPhrase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s, phrase string, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := matchPhrase(s, phrase)
|
||||
@ -44,7 +46,11 @@ func TestMatchPhrase(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterPhrase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("single-row", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -118,6 +124,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "other-column",
|
||||
@ -227,6 +235,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -282,6 +292,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -352,6 +364,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -417,6 +431,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -481,6 +497,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -545,6 +563,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -608,6 +628,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -713,6 +735,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -809,6 +833,8 @@ func TestFilterPhrase(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestMatchPrefix(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s, prefix string, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := matchPrefix(s, prefix)
|
||||
@ -44,7 +46,11 @@ func TestMatchPrefix(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterPrefix(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("single-row", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -130,6 +136,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "other-column",
|
||||
@ -245,6 +253,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -300,6 +310,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -382,6 +394,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -447,6 +461,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -511,6 +527,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -575,6 +593,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -638,6 +658,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -749,6 +771,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -845,6 +869,8 @@ func TestFilterPrefix(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -5,7 +5,11 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterRange(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -77,6 +81,8 @@ func TestFilterRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -146,6 +152,8 @@ func TestFilterRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -210,6 +218,8 @@ func TestFilterRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -276,6 +286,8 @@ func TestFilterRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -341,6 +353,8 @@ func TestFilterRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -406,6 +420,8 @@ func TestFilterRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -478,6 +494,8 @@ func TestFilterRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -550,6 +568,8 @@ func TestFilterRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -570,16 +590,17 @@ func TestFilterRange(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
// range filter always mismatches ipv4
|
||||
fr := &filterRange{
|
||||
fieldName: "foo",
|
||||
minValue: -100,
|
||||
maxValue: 100,
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", nil)
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{1})
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -8,7 +8,11 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterRegexp(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -54,6 +58,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -92,6 +98,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -126,6 +134,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -161,6 +171,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -196,6 +208,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -231,6 +245,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -266,6 +282,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -301,6 +319,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -337,6 +357,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
@ -371,6 +393,8 @@ func TestFilterRegexp(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSkipFirstLastToken(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s, resultExpected string) {
|
||||
t.Helper()
|
||||
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestMatchSequence(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s string, phrases []string, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := matchSequence(s, phrases)
|
||||
@ -28,7 +30,11 @@ func TestMatchSequence(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterSequence(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("single-row", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -102,6 +108,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -153,6 +161,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -208,6 +218,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -278,6 +290,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -349,6 +363,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -420,6 +436,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -491,6 +509,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -562,6 +582,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -643,6 +665,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -739,6 +763,8 @@ func TestFilterSequence(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
84
lib/logstorage/filter_stream_id.go
Normal file
84
lib/logstorage/filter_stream_id.go
Normal file
@ -0,0 +1,84 @@
|
||||
package logstorage
|
||||
|
||||
import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
// filterStreamID is the filter for `_stream_id:id`
|
||||
type filterStreamID struct {
|
||||
streamIDStr string
|
||||
}
|
||||
|
||||
func (fs *filterStreamID) String() string {
|
||||
return "_stream_id:" + quoteTokenIfNeeded(fs.streamIDStr)
|
||||
}
|
||||
|
||||
func (fs *filterStreamID) updateNeededFields(neededFields fieldsSet) {
|
||||
neededFields.add("_stream_id")
|
||||
}
|
||||
|
||||
func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) {
|
||||
c := br.getColumnByName("_stream_id")
|
||||
if c.isConst {
|
||||
v := c.valuesEncoded[0]
|
||||
if fs.streamIDStr != v {
|
||||
bm.resetBits()
|
||||
}
|
||||
return
|
||||
}
|
||||
if c.isTime {
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
|
||||
switch c.valueType {
|
||||
case valueTypeString:
|
||||
values := c.getValues(br)
|
||||
bm.forEachSetBit(func(idx int) bool {
|
||||
v := values[idx]
|
||||
return fs.streamIDStr == v
|
||||
})
|
||||
case valueTypeDict:
|
||||
bb := bbPool.Get()
|
||||
for _, v := range c.dictValues {
|
||||
c := byte(0)
|
||||
if fs.streamIDStr == v {
|
||||
c = 1
|
||||
}
|
||||
bb.B = append(bb.B, c)
|
||||
}
|
||||
valuesEncoded := c.getValuesEncoded(br)
|
||||
bm.forEachSetBit(func(idx int) bool {
|
||||
n := valuesEncoded[idx][0]
|
||||
return bb.B[n] == 1
|
||||
})
|
||||
bbPool.Put(bb)
|
||||
case valueTypeUint8:
|
||||
bm.resetBits()
|
||||
case valueTypeUint16:
|
||||
bm.resetBits()
|
||||
case valueTypeUint32:
|
||||
bm.resetBits()
|
||||
case valueTypeUint64:
|
||||
bm.resetBits()
|
||||
case valueTypeFloat64:
|
||||
bm.resetBits()
|
||||
case valueTypeIPv4:
|
||||
bm.resetBits()
|
||||
case valueTypeTimestampISO8601:
|
||||
bm.resetBits()
|
||||
default:
|
||||
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *filterStreamID) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
|
||||
bb := bbPool.Get()
|
||||
bb.B = bs.bsw.bh.streamID.marshalString(bb.B)
|
||||
ok := fs.streamIDStr == string(bb.B)
|
||||
bbPool.Put(bb)
|
||||
if !ok {
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
}
|
87
lib/logstorage/filter_stream_id_test.go
Normal file
87
lib/logstorage/filter_stream_id_test.go
Normal file
@ -0,0 +1,87 @@
|
||||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
)
|
||||
|
||||
func TestFilterStreamID(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// match
|
||||
ft := &filterStreamID{
|
||||
streamIDStr: "0000007b000001c8302bc96e02e54e5524b3a68ec271e55e",
|
||||
}
|
||||
testFilterMatchForStreamID(t, ft, []int{0, 3, 6, 9})
|
||||
|
||||
ft = &filterStreamID{
|
||||
streamIDStr: "0000007b000001c850d9950ea6196b1a4812081265faa1c7",
|
||||
}
|
||||
testFilterMatchForStreamID(t, ft, []int{1, 4, 7})
|
||||
|
||||
// mismatch
|
||||
ft = &filterStreamID{
|
||||
streamIDStr: "abc",
|
||||
}
|
||||
testFilterMatchForStreamID(t, ft, nil)
|
||||
}
|
||||
|
||||
func testFilterMatchForStreamID(t *testing.T, f filter, expectedRowIdxs []int) {
|
||||
t.Helper()
|
||||
|
||||
storagePath := t.Name()
|
||||
|
||||
cfg := &StorageConfig{
|
||||
Retention: 100 * 365 * time.Duration(nsecsPerDay),
|
||||
}
|
||||
s := MustOpenStorage(storagePath, cfg)
|
||||
|
||||
tenantID := TenantID{
|
||||
AccountID: 123,
|
||||
ProjectID: 456,
|
||||
}
|
||||
|
||||
getMsgValue := func(i int) string {
|
||||
return fmt.Sprintf("some message value %d", i)
|
||||
}
|
||||
|
||||
generateTestLogStreams(s, tenantID, getMsgValue, 10, 3)
|
||||
|
||||
expectedResults := make([]string, len(expectedRowIdxs))
|
||||
expectedTimestamps := make([]int64, len(expectedRowIdxs))
|
||||
for i, idx := range expectedRowIdxs {
|
||||
expectedResults[i] = getMsgValue(idx)
|
||||
expectedTimestamps[i] = int64(idx * 100)
|
||||
}
|
||||
|
||||
testFilterMatchForStorage(t, s, tenantID, f, "_msg", expectedResults, expectedTimestamps)
|
||||
|
||||
// Close and delete the test storage
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(storagePath)
|
||||
}
|
||||
|
||||
func generateTestLogStreams(s *Storage, tenantID TenantID, getMsgValue func(int) string, rowsCount, streamsCount int) {
|
||||
streamFields := []string{"host", "app"}
|
||||
lr := GetLogRows(streamFields, nil)
|
||||
var fields []Field
|
||||
for i := range rowsCount {
|
||||
fields = append(fields[:0], Field{
|
||||
Name: "_msg",
|
||||
Value: getMsgValue(i),
|
||||
}, Field{
|
||||
Name: "host",
|
||||
Value: fmt.Sprintf("host-%d", i%streamsCount),
|
||||
}, Field{
|
||||
Name: "app",
|
||||
Value: "foobar",
|
||||
})
|
||||
timestamp := int64(i * 100)
|
||||
lr.MustAdd(tenantID, timestamp, fields)
|
||||
}
|
||||
s.MustAddRows(lr)
|
||||
PutLogRows(lr)
|
||||
}
|
@ -5,6 +5,8 @@ import (
|
||||
)
|
||||
|
||||
func TestMatchStringRange(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s, minValue, maxValue string, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := matchStringRange(s, minValue, maxValue)
|
||||
@ -22,7 +24,11 @@ func TestMatchStringRange(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterStringRange(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("const-column", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -45,11 +51,18 @@ func TestFilterStringRange(t *testing.T) {
|
||||
fr = &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "127.0.0.1",
|
||||
maxValue: "127.0.0.1",
|
||||
maxValue: "127.0.0.2",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{0, 1, 2})
|
||||
|
||||
// mismatch
|
||||
fr = &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "127.0.0.1",
|
||||
maxValue: "127.0.0.1",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", nil)
|
||||
|
||||
fr = &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "",
|
||||
@ -73,6 +86,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dict", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -100,9 +115,9 @@ func TestFilterStringRange(t *testing.T) {
|
||||
fr = &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "127",
|
||||
maxValue: "127.0.0.1",
|
||||
maxValue: "127.0.0.2",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 7})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 6, 7})
|
||||
|
||||
// mismatch
|
||||
fr = &filterStringRange{
|
||||
@ -128,6 +143,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("strings", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -135,8 +152,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
"A FOO",
|
||||
"a 10",
|
||||
"127.0.0.1",
|
||||
"20",
|
||||
"15.5",
|
||||
"200",
|
||||
"155.5",
|
||||
"-5",
|
||||
"a fooBaR",
|
||||
"a 127.0.0.1 dfff",
|
||||
@ -171,6 +188,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint8", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -194,9 +213,9 @@ func TestFilterStringRange(t *testing.T) {
|
||||
fr := &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "33",
|
||||
maxValue: "5",
|
||||
maxValue: "500",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
|
||||
|
||||
// mismatch
|
||||
fr = &filterStringRange{
|
||||
@ -222,6 +241,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint16", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -245,9 +266,9 @@ func TestFilterStringRange(t *testing.T) {
|
||||
fr := &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "33",
|
||||
maxValue: "5",
|
||||
maxValue: "555",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
|
||||
|
||||
// mismatch
|
||||
fr = &filterStringRange{
|
||||
@ -273,6 +294,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint32", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -296,9 +319,9 @@ func TestFilterStringRange(t *testing.T) {
|
||||
fr := &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "33",
|
||||
maxValue: "5",
|
||||
maxValue: "555",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
|
||||
|
||||
// mismatch
|
||||
fr = &filterStringRange{
|
||||
@ -324,6 +347,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("uint64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -347,9 +372,9 @@ func TestFilterStringRange(t *testing.T) {
|
||||
fr := &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "33",
|
||||
maxValue: "5",
|
||||
maxValue: "5555",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
|
||||
|
||||
// mismatch
|
||||
fr = &filterStringRange{
|
||||
@ -375,6 +400,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("float64", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -398,15 +425,9 @@ func TestFilterStringRange(t *testing.T) {
|
||||
fr := &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "33",
|
||||
maxValue: "5",
|
||||
maxValue: "555",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
|
||||
fr = &filterStringRange{
|
||||
fieldName: "foo",
|
||||
minValue: "-0",
|
||||
maxValue: "-1",
|
||||
}
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{6})
|
||||
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
|
||||
|
||||
// mismatch
|
||||
fr = &filterStringRange{
|
||||
@ -432,6 +453,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ipv4", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "foo",
|
||||
@ -491,6 +514,8 @@ func TestFilterStringRange(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("timestamp-iso8601", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
columns := []column{
|
||||
{
|
||||
name: "_msg",
|
||||
|
@ -2,7 +2,11 @@ package logstorage
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
)
|
||||
@ -155,8 +159,10 @@ func testFilterMatchForColumns(t *testing.T, columns []column, f filter, neededC
|
||||
t.Helper()
|
||||
|
||||
// Create the test storage
|
||||
const storagePath = "testFilterMatchForColumns"
|
||||
cfg := &StorageConfig{}
|
||||
storagePath := t.Name()
|
||||
cfg := &StorageConfig{
|
||||
Retention: time.Duration(100 * 365 * nsecsPerDay),
|
||||
}
|
||||
s := MustOpenStorage(storagePath, cfg)
|
||||
|
||||
// Generate rows
|
||||
@ -187,34 +193,58 @@ func testFilterMatchForColumns(t *testing.T, columns []column, f filter, neededC
|
||||
fs.MustRemoveAll(storagePath)
|
||||
}
|
||||
|
||||
func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f filter, neededColumnName string, expectedResults []string, expectedTimestamps []int64) {
|
||||
func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f filter, neededColumnName string, expectedValues []string, expectedTimestamps []int64) {
|
||||
t.Helper()
|
||||
|
||||
so := &genericSearchOptions{
|
||||
tenantIDs: []TenantID{tenantID},
|
||||
filter: f,
|
||||
neededColumnNames: []string{neededColumnName},
|
||||
neededColumnNames: []string{neededColumnName, "_time"},
|
||||
}
|
||||
workersCount := 3
|
||||
|
||||
type result struct {
|
||||
value string
|
||||
timestamp int64
|
||||
}
|
||||
var resultsMu sync.Mutex
|
||||
var results []result
|
||||
|
||||
const workersCount = 3
|
||||
s.search(workersCount, so, nil, func(_ uint, br *blockResult) {
|
||||
// Verify columns
|
||||
cs := br.getColumns()
|
||||
if len(cs) != 1 {
|
||||
t.Fatalf("unexpected number of columns in blockResult; got %d; want 1", len(cs))
|
||||
if len(cs) != 2 {
|
||||
t.Fatalf("unexpected number of columns in blockResult; got %d; want 2", len(cs))
|
||||
}
|
||||
results := cs[0].getValues(br)
|
||||
if !reflect.DeepEqual(results, expectedResults) {
|
||||
t.Fatalf("unexpected results matched;\ngot\n%q\nwant\n%q", results, expectedResults)
|
||||
}
|
||||
|
||||
// Verify timestamps
|
||||
if br.timestamps == nil {
|
||||
br.timestamps = []int64{}
|
||||
}
|
||||
if !reflect.DeepEqual(br.timestamps, expectedTimestamps) {
|
||||
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", br.timestamps, expectedTimestamps)
|
||||
values := cs[0].getValues(br)
|
||||
resultsMu.Lock()
|
||||
for i, v := range values {
|
||||
results = append(results, result{
|
||||
value: strings.Clone(v),
|
||||
timestamp: br.timestamps[i],
|
||||
})
|
||||
}
|
||||
resultsMu.Unlock()
|
||||
})
|
||||
|
||||
sort.Slice(results, func(i, j int) bool {
|
||||
return results[i].timestamp < results[j].timestamp
|
||||
})
|
||||
|
||||
timestamps := make([]int64, len(results))
|
||||
values := make([]string, len(results))
|
||||
|
||||
for i, r := range results {
|
||||
timestamps[i] = r.timestamp
|
||||
values[i] = r.value
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(timestamps, expectedTimestamps) {
|
||||
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, expectedTimestamps)
|
||||
}
|
||||
if !reflect.DeepEqual(values, expectedValues) {
|
||||
t.Fatalf("unexpected values;\ngot\n%q\nwant\n%q", values, expectedValues)
|
||||
}
|
||||
}
|
||||
|
||||
func generateRowsFromColumns(s *Storage, tenantID TenantID, columns []column) {
|
||||
|
@ -3,11 +3,14 @@ package logstorage
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
)
|
||||
|
||||
func TestFilterTime(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
timestamps := []int64{
|
||||
1,
|
||||
9,
|
||||
@ -89,8 +92,10 @@ func testFilterMatchForTimestamps(t *testing.T, timestamps []int64, f filter, ex
|
||||
t.Helper()
|
||||
|
||||
// Create the test storage
|
||||
const storagePath = "testFilterMatchForTimestamps"
|
||||
cfg := &StorageConfig{}
|
||||
storagePath := t.Name()
|
||||
cfg := &StorageConfig{
|
||||
Retention: 100 * 365 * time.Duration(nsecsPerDay),
|
||||
}
|
||||
s := MustOpenStorage(storagePath, cfg)
|
||||
|
||||
// Generate rows
|
||||
|
@ -117,12 +117,12 @@ func (fr *filterWeekRange) matchTimestampValue(timestamp int64) bool {
|
||||
}
|
||||
|
||||
func (fr *filterWeekRange) weekday(timestamp int64) time.Weekday {
|
||||
timestamp += fr.offset
|
||||
timestamp -= fr.offset
|
||||
return time.Unix(0, timestamp).UTC().Weekday()
|
||||
}
|
||||
|
||||
func (fr *filterWeekRange) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
|
||||
if fr.startDay > fr.endDay || fr.startDay > time.Saturday || fr.endDay < time.Monday {
|
||||
if fr.startDay > fr.endDay {
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
|
@ -6,12 +6,15 @@ import (
|
||||
)
|
||||
|
||||
func TestFilterWeekRange(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
sunday := time.Date(2024, 6, 9, 1, 0, 0, 0, time.UTC).UnixNano()
|
||||
timestamps := []int64{
|
||||
0,
|
||||
1 * nsecsPerDay,
|
||||
2 * nsecsPerDay,
|
||||
4 * nsecsPerDay,
|
||||
6 * nsecsPerDay,
|
||||
sunday,
|
||||
sunday + 1*nsecsPerDay,
|
||||
sunday + 2*nsecsPerDay,
|
||||
sunday + 4*nsecsPerDay,
|
||||
sunday + 6*nsecsPerDay,
|
||||
}
|
||||
|
||||
// match
|
||||
@ -36,16 +39,16 @@ func TestFilterWeekRange(t *testing.T) {
|
||||
ft = &filterWeekRange{
|
||||
startDay: time.Monday,
|
||||
endDay: time.Monday,
|
||||
offset: 2 * nsecsPerDay,
|
||||
offset: 3 * nsecsPerDay,
|
||||
}
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, []int{2})
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, []int{3})
|
||||
|
||||
ft = &filterWeekRange{
|
||||
startDay: time.Monday,
|
||||
endDay: time.Monday,
|
||||
offset: -2 * nsecsPerDay,
|
||||
}
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, []int{4})
|
||||
|
||||
ft = &filterWeekRange{
|
||||
startDay: time.Sunday,
|
||||
@ -68,9 +71,9 @@ func TestFilterWeekRange(t *testing.T) {
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, nil)
|
||||
|
||||
ft = &filterWeekRange{
|
||||
startDay: time.Saturday,
|
||||
endDay: time.Saturday,
|
||||
offset: -2 * nsecsPerHour,
|
||||
startDay: time.Friday,
|
||||
endDay: time.Friday,
|
||||
offset: -1 * nsecsPerHour,
|
||||
}
|
||||
testFilterMatchForTimestamps(t, timestamps, ft, nil)
|
||||
}
|
||||
|
@ -9,7 +9,9 @@ import (
|
||||
)
|
||||
|
||||
func TestStorageSearchStreamIDs(t *testing.T) {
|
||||
const path = "TestStorageSearchStreamIDs"
|
||||
t.Parallel()
|
||||
|
||||
path := t.Name()
|
||||
const partitionName = "foobar"
|
||||
s := newTestStorage()
|
||||
mustCreateIndexdb(path)
|
||||
|
@ -3,6 +3,7 @@ package logstorage
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -234,6 +235,51 @@ func (q *Query) String() string {
|
||||
return s
|
||||
}
|
||||
|
||||
func (q *Query) getSortedStreamIDs() []streamID {
|
||||
switch t := q.f.(type) {
|
||||
case *filterAnd:
|
||||
for _, f := range t.filters {
|
||||
streamIDs, ok := getSortedStreamIDsFromFilterOr(f)
|
||||
if ok {
|
||||
return streamIDs
|
||||
}
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
streamIDs, _ := getSortedStreamIDsFromFilterOr(q.f)
|
||||
return streamIDs
|
||||
}
|
||||
}
|
||||
|
||||
func getSortedStreamIDsFromFilterOr(f filter) ([]streamID, bool) {
|
||||
switch t := f.(type) {
|
||||
case *filterOr:
|
||||
var streamIDs []streamID
|
||||
for _, f := range t.filters {
|
||||
fs, ok := f.(*filterStreamID)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
var sid streamID
|
||||
if sid.tryUnmarshalFromString(fs.streamIDStr) {
|
||||
streamIDs = append(streamIDs, sid)
|
||||
}
|
||||
}
|
||||
sort.Slice(streamIDs, func(i, j int) bool {
|
||||
return streamIDs[i].less(&streamIDs[j])
|
||||
})
|
||||
return streamIDs, len(streamIDs) > 0
|
||||
case *filterStreamID:
|
||||
var sid streamID
|
||||
if !sid.tryUnmarshalFromString(t.streamIDStr) {
|
||||
return nil, true
|
||||
}
|
||||
return []streamID{sid}, true
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
// AddCountByTimePipe adds '| stats by (_time:step offset off, field1, ..., fieldN) count() hits' to the end of q.
|
||||
func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
|
||||
{
|
||||
@ -773,6 +819,8 @@ func parseFilterForPhrase(lex *lexer, phrase, fieldName string) (filter, error)
|
||||
switch fieldName {
|
||||
case "_time":
|
||||
return parseFilterTimeGeneric(lex)
|
||||
case "_stream_id":
|
||||
return parseFilterStreamID(lex)
|
||||
case "_stream":
|
||||
return parseFilterStream(lex)
|
||||
default:
|
||||
@ -1782,6 +1830,17 @@ func stripTimezoneSuffix(s string) string {
|
||||
return s[:len(s)-len(tz)]
|
||||
}
|
||||
|
||||
func parseFilterStreamID(lex *lexer) (*filterStreamID, error) {
|
||||
s, err := getCompoundToken(lex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fs := &filterStreamID{
|
||||
streamIDStr: s,
|
||||
}
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
func parseFilterStream(lex *lexer) (*filterStream, error) {
|
||||
sf, err := parseStreamFilter(lex)
|
||||
if err != nil {
|
||||
|
@ -696,6 +696,10 @@ func TestParseQuerySuccess(t *testing.T) {
|
||||
// empty filter
|
||||
f(`"" or foo:"" and not bar:""`, `"" or foo:"" !bar:""`)
|
||||
|
||||
// _stream_id filter
|
||||
f(`_stream_id:foo`, `_stream_id:foo`)
|
||||
f(`_stream_id:foo-bar/b:az`, `_stream_id:"foo-bar/b:az"`)
|
||||
|
||||
// _stream filters
|
||||
f(`_stream:{}`, `_stream:{}`)
|
||||
f(`_stream:{foo="bar", baz=~"x" OR or!="b", "x=},"="d}{"}`, `_stream:{foo="bar",baz=~"x" or "or"!="b","x=},"="d}{"}`)
|
||||
@ -1238,6 +1242,9 @@ func TestParseQueryFailure(t *testing.T) {
|
||||
f(`'foo`)
|
||||
f("`foo")
|
||||
|
||||
// invalid _stream_id filters
|
||||
f("_stream_id:(foo)")
|
||||
|
||||
// invalid _stream filters
|
||||
f("_stream:")
|
||||
f("_stream:{")
|
||||
|
@ -10,7 +10,9 @@ import (
|
||||
)
|
||||
|
||||
func TestPartitionLifecycle(t *testing.T) {
|
||||
const path = "TestPartitionLifecycle"
|
||||
t.Parallel()
|
||||
|
||||
path := t.Name()
|
||||
var ddbStats DatadbStats
|
||||
|
||||
s := newTestStorage()
|
||||
@ -50,7 +52,9 @@ func TestPartitionLifecycle(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPartitionMustAddRowsSerial(t *testing.T) {
|
||||
const path = "TestPartitionMustAddRowsSerial"
|
||||
t.Parallel()
|
||||
|
||||
path := t.Name()
|
||||
var ddbStats DatadbStats
|
||||
|
||||
s := newTestStorage()
|
||||
@ -132,7 +136,9 @@ func TestPartitionMustAddRowsSerial(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPartitionMustAddRowsConcurrent(t *testing.T) {
|
||||
const path = "TestPartitionMustAddRowsConcurrent"
|
||||
t.Parallel()
|
||||
|
||||
path := t.Name()
|
||||
s := newTestStorage()
|
||||
|
||||
mustCreatePartition(path)
|
||||
|
@ -18,6 +18,10 @@ type genericSearchOptions struct {
|
||||
// tenantIDs must contain the list of tenantIDs for the search.
|
||||
tenantIDs []TenantID
|
||||
|
||||
// streamIDs is an optional sorted list of streamIDs for the search.
|
||||
// If it is empty, then the search is performed by tenantIDs
|
||||
streamIDs []streamID
|
||||
|
||||
// filter is the filter to use for the search
|
||||
filter filter
|
||||
|
||||
@ -101,9 +105,11 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
|
||||
}
|
||||
|
||||
func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error {
|
||||
streamIDs := q.getSortedStreamIDs()
|
||||
neededColumnNames, unneededColumnNames := q.getNeededColumns()
|
||||
so := &genericSearchOptions{
|
||||
tenantIDs: tenantIDs,
|
||||
streamIDs: streamIDs,
|
||||
filter: q.f,
|
||||
neededColumnNames: neededColumnNames,
|
||||
unneededColumnNames: unneededColumnNames,
|
||||
@ -653,6 +659,12 @@ func (pt *partition) search(minTimestamp, maxTimestamp int64, sf *StreamFilter,
|
||||
var streamIDs []streamID
|
||||
if sf != nil {
|
||||
streamIDs = pt.idb.searchStreamIDs(tenantIDs, sf)
|
||||
if len(so.streamIDs) > 0 {
|
||||
streamIDs = intersectStreamIDs(streamIDs, so.streamIDs)
|
||||
}
|
||||
tenantIDs = nil
|
||||
} else if len(so.streamIDs) > 0 {
|
||||
streamIDs = getStreamIDsForTenantIDs(so.streamIDs, tenantIDs)
|
||||
tenantIDs = nil
|
||||
}
|
||||
if hasStreamFilters(f) {
|
||||
@ -671,6 +683,36 @@ func (pt *partition) search(minTimestamp, maxTimestamp int64, sf *StreamFilter,
|
||||
return pt.ddb.search(soInternal, workCh, stopCh)
|
||||
}
|
||||
|
||||
func intersectStreamIDs(a, b []streamID) []streamID {
|
||||
m := make(map[streamID]struct{}, len(b))
|
||||
for _, streamID := range b {
|
||||
m[streamID] = struct{}{}
|
||||
}
|
||||
|
||||
result := make([]streamID, 0, len(a))
|
||||
for _, streamID := range a {
|
||||
if _, ok := m[streamID]; ok {
|
||||
result = append(result, streamID)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func getStreamIDsForTenantIDs(streamIDs []streamID, tenantIDs []TenantID) []streamID {
|
||||
m := make(map[TenantID]struct{}, len(tenantIDs))
|
||||
for _, tenantID := range tenantIDs {
|
||||
m[tenantID] = struct{}{}
|
||||
}
|
||||
|
||||
result := make([]streamID, 0, len(streamIDs))
|
||||
for _, streamID := range streamIDs {
|
||||
if _, ok := m[streamID.tenantID]; ok {
|
||||
result = append(result, streamID)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func hasStreamFilters(f filter) bool {
|
||||
visitFunc := func(f filter) bool {
|
||||
_, ok := f.(*filterStream)
|
||||
|
@ -14,7 +14,9 @@ import (
|
||||
)
|
||||
|
||||
func TestStorageRunQuery(t *testing.T) {
|
||||
const path = "TestStorageRunQuery"
|
||||
t.Parallel()
|
||||
|
||||
path := t.Name()
|
||||
|
||||
const tenantsCount = 11
|
||||
const streamsPerTenant = 3
|
||||
@ -322,6 +324,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||
resultExpected := []ValueWithHits{
|
||||
{"_msg", 1155},
|
||||
{"_stream", 1155},
|
||||
{"_stream_id", 1155},
|
||||
{"_time", 1155},
|
||||
{"instance", 1155},
|
||||
{"job", 1155},
|
||||
@ -343,6 +346,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||
resultExpected := []ValueWithHits{
|
||||
{"_msg", 385},
|
||||
{"_stream", 385},
|
||||
{"_stream_id", 385},
|
||||
{"_time", 385},
|
||||
{"instance", 385},
|
||||
{"job", 385},
|
||||
@ -635,7 +639,9 @@ func mustParseQuery(query string) *Query {
|
||||
}
|
||||
|
||||
func TestStorageSearch(t *testing.T) {
|
||||
const path = "TestStorageSearch"
|
||||
t.Parallel()
|
||||
|
||||
path := t.Name()
|
||||
|
||||
const tenantsCount = 11
|
||||
const streamsPerTenant = 3
|
||||
@ -962,6 +968,8 @@ func TestStorageSearch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestParseStreamFieldsSuccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := func(s, resultExpected string) {
|
||||
t.Helper()
|
||||
|
||||
|
@ -7,8 +7,10 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
)
|
||||
|
||||
func TestStorageLifecycle(_ *testing.T) {
|
||||
const path = "TestStorageLifecycle"
|
||||
func TestStorageLifecycle(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
path := t.Name()
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
cfg := &StorageConfig{}
|
||||
@ -19,7 +21,9 @@ func TestStorageLifecycle(_ *testing.T) {
|
||||
}
|
||||
|
||||
func TestStorageMustAddRows(t *testing.T) {
|
||||
const path = "TestStorageMustAddRows"
|
||||
t.Parallel()
|
||||
|
||||
path := t.Name()
|
||||
|
||||
var sStats StorageStats
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package logstorage
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
@ -25,6 +26,27 @@ func (sid *streamID) reset() {
|
||||
*sid = streamID{}
|
||||
}
|
||||
|
||||
// marshalString returns _stream_id value for the given sid.
|
||||
func (sid *streamID) marshalString(dst []byte) []byte {
|
||||
bb := bbPool.Get()
|
||||
bb.B = sid.marshal(bb.B)
|
||||
dst = hex.AppendEncode(dst, bb.B)
|
||||
bbPool.Put(bb)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (sid *streamID) tryUnmarshalFromString(s string) bool {
|
||||
data, err := hex.DecodeString(s)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
tail, err := sid.unmarshal(data)
|
||||
if err != nil || len(tail) > 0 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// String returns human-readable representation for sid.
|
||||
func (sid *streamID) String() string {
|
||||
return fmt.Sprintf("(tenant_id=%s, id=%s)", &sid.tenantID, &sid.id)
|
||||
|
@ -5,6 +5,36 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStreamIDMarshalUnmarshalString(t *testing.T) {
|
||||
f := func(sid *streamID) {
|
||||
t.Helper()
|
||||
|
||||
s := string(sid.marshalString(nil))
|
||||
|
||||
var sid2 streamID
|
||||
if !sid2.tryUnmarshalFromString(s) {
|
||||
t.Fatalf("cannot unmarshal streamID from %q", s)
|
||||
}
|
||||
|
||||
s2 := string(sid2.marshalString(nil))
|
||||
if s != s2 {
|
||||
t.Fatalf("unexpected marshaled streamID; got %s; want %s", s2, s)
|
||||
}
|
||||
}
|
||||
|
||||
f(&streamID{})
|
||||
f(&streamID{
|
||||
tenantID: TenantID{
|
||||
AccountID: 123,
|
||||
ProjectID: 456,
|
||||
},
|
||||
id: u128{
|
||||
lo: 89,
|
||||
hi: 344334,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamIDMarshalUnmarshal(t *testing.T) {
|
||||
f := func(sid *streamID, marshaledLen int) {
|
||||
t.Helper()
|
||||
|
Loading…
Reference in New Issue
Block a user