VictoriaMetrics/lib/logstorage/storage_search_test.go
Aliaksandr Valialkin 246c339e3d
lib/logstorage: read timestamps column when it is really needed during query execution
Previously timestamps column was read unconditionally on every query.
This could significantly slow down queries, which do not need reading this column
like in https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070 .
2024-09-25 19:18:37 +02:00

1062 lines
32 KiB
Go

package logstorage
import (
"context"
"fmt"
"math"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestStorageRunQuery(t *testing.T) {
t.Parallel()
path := t.Name()
const tenantsCount = 11
const streamsPerTenant = 3
const blocksPerStream = 5
const rowsPerBlock = 7
sc := &StorageConfig{
Retention: 24 * time.Hour,
}
s := MustOpenStorage(path, sc)
// fill the storage with data
var allTenantIDs []TenantID
baseTimestamp := time.Now().UnixNano() - 3600*1e9
var fields []Field
streamTags := []string{
"job",
"instance",
}
for i := 0; i < tenantsCount; i++ {
tenantID := TenantID{
AccountID: uint32(i),
ProjectID: uint32(10*i + 1),
}
allTenantIDs = append(allTenantIDs, tenantID)
for j := 0; j < streamsPerTenant; j++ {
streamIDValue := fmt.Sprintf("stream_id=%d", j)
for k := 0; k < blocksPerStream; k++ {
lr := GetLogRows(streamTags, nil)
for m := 0; m < rowsPerBlock; m++ {
timestamp := baseTimestamp + int64(m)*1e9 + int64(k)
// Append stream fields
fields = append(fields[:0], Field{
Name: "job",
Value: "foobar",
}, Field{
Name: "instance",
Value: fmt.Sprintf("host-%d:234", j),
})
// append the remaining fields
fields = append(fields, Field{
Name: "_msg",
Value: fmt.Sprintf("log message %d at block %d", m, k),
})
fields = append(fields, Field{
Name: "source-file",
Value: "/foo/bar/baz",
})
fields = append(fields, Field{
Name: "tenant.id",
Value: tenantID.String(),
})
fields = append(fields, Field{
Name: "stream-id",
Value: streamIDValue,
})
lr.MustAdd(tenantID, timestamp, fields)
}
s.MustAddRows(lr)
PutLogRows(lr)
}
}
}
s.debugFlush()
mustRunQuery := func(t *testing.T, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) {
t.Helper()
err := s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
if err != nil {
t.Fatalf("unexpected error returned from the query [%s]: %s", q, err)
}
}
// run tests on the storage data
t.Run("missing-tenant", func(t *testing.T) {
q := mustParseQuery(`"log message"`)
tenantID := TenantID{
AccountID: 0,
ProjectID: 0,
}
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(t, tenantIDs, q, writeBlock)
})
t.Run("missing-message-text", func(t *testing.T) {
q := mustParseQuery(`foobar`)
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(t, tenantIDs, q, writeBlock)
})
t.Run("matching-tenant-id", func(t *testing.T) {
q := mustParseQuery(`tenant.id:*`)
for i := 0; i < tenantsCount; i++ {
tenantID := TenantID{
AccountID: uint32(i),
ProjectID: uint32(10*i + 1),
}
expectedTenantID := tenantID.String()
var rowsCountTotal atomic.Uint32
writeBlock := func(_ uint, timestamps []int64, columns []BlockColumn) {
hasTenantIDColumn := false
var columnNames []string
for _, c := range columns {
if c.Name == "tenant.id" {
hasTenantIDColumn = true
if len(c.Values) != len(timestamps) {
panic(fmt.Errorf("unexpected number of rows in column %q; got %d; want %d", c.Name, len(c.Values), len(timestamps)))
}
for _, v := range c.Values {
if v != expectedTenantID {
panic(fmt.Errorf("unexpected tenant.id; got %s; want %s", v, expectedTenantID))
}
}
}
columnNames = append(columnNames, c.Name)
}
if !hasTenantIDColumn {
panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames))
}
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
}
}
})
t.Run("matching-multiple-tenant-ids", func(t *testing.T) {
q := mustParseQuery(`"log message"`)
var rowsCountTotal atomic.Uint32
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
mustRunQuery(t, allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("matching-in-filter", func(t *testing.T) {
q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`)
var rowsCountTotal atomic.Uint32
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
mustRunQuery(t, allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("stream-filter-mismatch", func(t *testing.T) {
q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`)
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
mustRunQuery(t, allTenantIDs, q, writeBlock)
})
t.Run("matching-stream-id", func(t *testing.T) {
for i := 0; i < streamsPerTenant; i++ {
q := mustParseQuery(fmt.Sprintf(`log _stream:{job="foobar",instance="host-%d:234"} AND stream-id:*`, i))
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
expectedStreamID := fmt.Sprintf("stream_id=%d", i)
var rowsCountTotal atomic.Uint32
writeBlock := func(_ uint, timestamps []int64, columns []BlockColumn) {
hasStreamIDColumn := false
var columnNames []string
for _, c := range columns {
if c.Name == "stream-id" {
hasStreamIDColumn = true
if len(c.Values) != len(timestamps) {
panic(fmt.Errorf("unexpected number of rows for column %q; got %d; want %d", c.Name, len(c.Values), len(timestamps)))
}
for _, v := range c.Values {
if v != expectedStreamID {
panic(fmt.Errorf("unexpected stream-id; got %s; want %s", v, expectedStreamID))
}
}
}
columnNames = append(columnNames, c.Name)
}
if !hasStreamIDColumn {
panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames))
}
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows for stream %d; got %d; want %d", i, n, expectedRowsCount)
}
}
})
t.Run("matching-multiple-stream-ids-with-re-filter", func(t *testing.T) {
q := mustParseQuery(`_msg:log _stream:{job="foobar",instance=~"host-[^:]+:234"} and re("message [02] at")`)
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
var rowsCountTotal atomic.Uint32
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("matching-time-range", func(t *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock-2)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock-1)*1e9 - 1
q := mustParseQuery(fmt.Sprintf(`_time:[%f,%f]`, float64(minTimestamp)/1e9, float64(maxTimestamp)/1e9))
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
var rowsCountTotal atomic.Uint32
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("matching-stream-id-with-time-range", func(t *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock-2)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock-1)*1e9 - 1
q := mustParseQuery(fmt.Sprintf(`_time:[%f,%f] _stream:{job="foobar",instance="host-1:234"}`, float64(minTimestamp)/1e9, float64(maxTimestamp)/1e9))
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
var rowsCountTotal atomic.Uint32
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("matching-stream-id-missing-time-range", func(t *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
q := mustParseQuery(fmt.Sprintf(`_stream:{job="foobar",instance="host-1:234"} _time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9))
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(t, tenantIDs, q, writeBlock)
})
t.Run("missing-time-range", func(t *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
q := mustParseQuery(fmt.Sprintf(`_time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9))
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(t, tenantIDs, q, writeBlock)
})
t.Run("field_names-all", func(t *testing.T) {
q := mustParseQuery("*")
results, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultsExpected := []ValueWithHits{
{"_msg", 1155},
{"_stream", 1155},
{"_stream_id", 1155},
{"_time", 1155},
{"instance", 1155},
{"job", 1155},
{"source-file", 1155},
{"stream-id", 1155},
{"tenant.id", 1155},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("field_names-some", func(t *testing.T) {
q := mustParseQuery(`_stream:{instance=~"host-1:.+"}`)
results, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultsExpected := []ValueWithHits{
{"_msg", 385},
{"_stream", 385},
{"_stream_id", 385},
{"_time", 385},
{"instance", 385},
{"job", 385},
{"source-file", 385},
{"stream-id", 385},
{"tenant.id", 385},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("field_values-nolimit", func(t *testing.T) {
q := mustParseQuery("*")
results, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultsExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 385},
{`{instance="host-1:234",job="foobar"}`, 385},
{`{instance="host-2:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("field_values-limit", func(t *testing.T) {
q := mustParseQuery("*")
results, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultsExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 0},
{`{instance="host-1:234",job="foobar"}`, 0},
{`{instance="host-2:234",job="foobar"}`, 0},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("field_values-limit", func(t *testing.T) {
q := mustParseQuery("instance:='host-1:234'")
results, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 4)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultsExpected := []ValueWithHits{
{`{instance="host-1:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("stream_field_names", func(t *testing.T) {
q := mustParseQuery("*")
results, err := s.GetStreamFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultsExpected := []ValueWithHits{
{"instance", 1155},
{"job", 1155},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("stream_field_values-nolimit", func(t *testing.T) {
q := mustParseQuery("*")
results, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultsExpected := []ValueWithHits{
{`host-0:234`, 385},
{`host-1:234`, 385},
{`host-2:234`, 385},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("stream_field_values-limit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultsExpected := []ValueWithHits{
{`host-0:234`, 385},
{`host-1:234`, 385},
{`host-2:234`, 385},
}
if !reflect.DeepEqual(values, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultsExpected)
}
})
t.Run("streams", func(t *testing.T) {
q := mustParseQuery("*")
results, err := s.GetStreams(context.Background(), allTenantIDs, q, 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultsExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 385},
{`{instance="host-1:234",job="foobar"}`, 385},
{`{instance="host-2:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("stream_ids", func(t *testing.T) {
q := mustParseQuery("*")
results, err := s.GetStreamIDs(context.Background(), allTenantIDs, q, 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
// Verify the first 5 results with the smallest _stream_id value.
sort.Slice(results, func(i, j int) bool {
return results[i].Value < results[j].Value
})
results = results[:5]
resultsExpected := []ValueWithHits{
{"000000000000000140c1914be0226f8185f5b00551fb3b2d", 35},
{"000000000000000177edafcd46385c778b57476eb5b92233", 35},
{"0000000000000001f5b4cae620b5e85d6ef5f2107fe00274", 35},
{"000000010000000b40c1914be0226f8185f5b00551fb3b2d", 35},
{"000000010000000b77edafcd46385c778b57476eb5b92233", 35},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
// Run more complex tests
f := func(t *testing.T, query string, rowsExpected [][]Field) {
t.Helper()
q := mustParseQuery(query)
var resultRowsLock sync.Mutex
var resultRows [][]Field
writeBlock := func(_ uint, _ []int64, bcs []BlockColumn) {
if len(bcs) == 0 {
return
}
for i := 0; i < len(bcs[0].Values); i++ {
row := make([]Field, len(bcs))
for j, bc := range bcs {
row[j] = Field{
Name: strings.Clone(bc.Name),
Value: strings.Clone(bc.Values[i]),
}
}
resultRowsLock.Lock()
resultRows = append(resultRows, row)
resultRowsLock.Unlock()
}
}
mustRunQuery(t, allTenantIDs, q, writeBlock)
assertRowsEqual(t, resultRows, rowsExpected)
}
t.Run("stats-count-total", func(t *testing.T) {
f(t, `* | stats count() rows`, [][]Field{
{
{"rows", "1155"},
},
})
})
t.Run("_stream_id-filter", func(t *testing.T) {
f(t, `_stream_id:in(tenant.id:2 | fields _stream_id) | stats count() rows`, [][]Field{
{
{"rows", "105"},
},
})
})
t.Run("in-filter-with-subquery-match", func(t *testing.T) {
f(t, `tenant.id:in(tenant.id:2 | fields tenant.id) | stats count() rows`, [][]Field{
{
{"rows", "105"},
},
})
})
t.Run("in-filter-with-subquery-mismatch", func(t *testing.T) {
f(t, `tenant.id:in(tenant.id:23243 | fields tenant.id) | stats count() rows`, [][]Field{
{
{"rows", "0"},
},
})
})
t.Run("conditional-stats", func(t *testing.T) {
f(t, `* | stats
count() rows_total,
count() if (stream-id:0) stream_0_rows,
count() if (stream-id:1123) stream_x_rows
`, [][]Field{
{
{"rows_total", "1155"},
{"stream_0_rows", "385"},
{"stream_x_rows", "0"},
},
})
})
t.Run("in-filter-with-subquery-in-conditional-stats-mismatch", func(t *testing.T) {
f(t, `* | stats
count() rows_total,
count() if (tenant.id:in(tenant.id:3 | fields tenant.id)) rows_nonzero,
count() if (tenant.id:in(tenant.id:23243 | fields tenant.id)) rows_zero
`, [][]Field{
{
{"rows_total", "1155"},
{"rows_nonzero", "105"},
{"rows_zero", "0"},
},
})
})
t.Run("pipe-extract", func(t *testing.T) {
f(t, `* | extract "host-<host>:" from instance | uniq (host) with hits | sort by (host)`, [][]Field{
{
{"host", "0"},
{"hits", "385"},
},
{
{"host", "1"},
{"hits", "385"},
},
{
{"host", "2"},
{"hits", "385"},
},
})
})
t.Run("pipe-extract-if-filter-with-subquery", func(t *testing.T) {
f(t, `* | extract
if (tenant.id:in(tenant.id:(3 or 4) | fields tenant.id))
"host-<host>:" from instance
| filter host:~"1|2"
| uniq (tenant.id, host) with hits
| sort by (tenant.id, host)`, [][]Field{
{
{"tenant.id", "{accountID=3,projectID=31}"},
{"host", "1"},
{"hits", "35"},
},
{
{"tenant.id", "{accountID=3,projectID=31}"},
{"host", "2"},
{"hits", "35"},
},
{
{"tenant.id", "{accountID=4,projectID=41}"},
{"host", "1"},
{"hits", "35"},
},
{
{"tenant.id", "{accountID=4,projectID=41}"},
{"host", "2"},
{"hits", "35"},
},
})
})
t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(t *testing.T) {
f(t, `* | extract
if (tenant.id:in(tenant.id:3 | fields tenant.id))
"host-<host>:" from instance
| filter host:*
| uniq (host) with hits
| sort by (host)`, [][]Field{
{
{"host", "0"},
{"hits", "35"},
},
{
{"host", "1"},
{"hits", "35"},
},
{
{"host", "2"},
{"hits", "35"},
},
})
})
t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(t *testing.T) {
f(t, `* | extract
if (tenant.id:in(tenant.id:3 | fields tenant.id))
"host-<host>:" from instance
| filter host:""
| uniq (host) with hits
| sort by (host)`, [][]Field{
{
{"host", ""},
{"hits", "1050"},
},
})
})
t.Run("stream_context-noop-1", func(t *testing.T) {
f(t, `"message 3 at block 1"
| stream_context before 0
| stats count() rows`, [][]Field{
{
{"rows", "33"},
},
})
})
t.Run("stream_context-noop-2", func(t *testing.T) {
f(t, `"message 3 at block 1"
| stream_context before 0 after 0
| stats count() rows`, [][]Field{
{
{"rows", "33"},
},
})
})
t.Run("stream_context-before-1", func(t *testing.T) {
f(t, `"message 3 at block 1"
| stream_context before 1
| stats count() rows`, [][]Field{
{
{"rows", "66"},
},
})
})
t.Run("stream_context-after-1", func(t *testing.T) {
f(t, `"message 3 at block 1"
| stream_context after 1
| stats count() rows`, [][]Field{
{
{"rows", "66"},
},
})
})
t.Run("stream_context-before-after-1", func(t *testing.T) {
f(t, `"message 3 at block 1"
| stream_context before 1 after 1
| stats count() rows`, [][]Field{
{
{"rows", "99"},
},
})
})
t.Run("stream_context-before-1000", func(t *testing.T) {
f(t, `"message 4"
| stream_context before 1000
| stats count() rows`, [][]Field{
{
{"rows", "825"},
},
})
})
t.Run("stream_context-after-1000", func(t *testing.T) {
f(t, `"message 4"
| stream_context after 1000
| stats count() rows`, [][]Field{
{
{"rows", "495"},
},
})
})
t.Run("stream_context-before-after-1000", func(t *testing.T) {
f(t, `"message 4"
| stream_context before 1000 after 1000
| stats count() rows`, [][]Field{
{
{"rows", "1155"},
},
})
})
// Close the storage and delete its data
s.MustClose()
fs.MustRemoveAll(path)
}
func mustParseQuery(query string) *Query {
q, err := ParseQuery(query)
if err != nil {
panic(fmt.Errorf("BUG: cannot parse [%s]: %w", query, err))
}
return q
}
func TestStorageSearch(t *testing.T) {
t.Parallel()
path := t.Name()
const tenantsCount = 11
const streamsPerTenant = 3
const blocksPerStream = 5
const rowsPerBlock = 7
sc := &StorageConfig{
Retention: 24 * time.Hour,
}
s := MustOpenStorage(path, sc)
// fill the storage with data.
var allTenantIDs []TenantID
baseTimestamp := time.Now().UnixNano() - 3600*1e9
var fields []Field
streamTags := []string{
"job",
"instance",
}
for i := 0; i < tenantsCount; i++ {
tenantID := TenantID{
AccountID: uint32(i),
ProjectID: uint32(10*i + 1),
}
allTenantIDs = append(allTenantIDs, tenantID)
for j := 0; j < streamsPerTenant; j++ {
for k := 0; k < blocksPerStream; k++ {
lr := GetLogRows(streamTags, nil)
for m := 0; m < rowsPerBlock; m++ {
timestamp := baseTimestamp + int64(m)*1e9 + int64(k)
// Append stream fields
fields = append(fields[:0], Field{
Name: "job",
Value: "foobar",
}, Field{
Name: "instance",
Value: fmt.Sprintf("host-%d:234", j),
})
// append the remaining fields
fields = append(fields, Field{
Name: "_msg",
Value: fmt.Sprintf("log message %d at block %d", m, k),
})
fields = append(fields, Field{
Name: "source-file",
Value: "/foo/bar/baz",
})
lr.MustAdd(tenantID, timestamp, fields)
}
s.MustAddRows(lr)
PutLogRows(lr)
}
}
}
s.debugFlush()
// run tests on the filled storage
const workersCount = 3
getBaseFilter := func(minTimestamp, maxTimestamp int64, sf *StreamFilter) filter {
var filters []filter
filters = append(filters, &filterTime{
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
})
if sf != nil {
filters = append(filters, &filterStream{
f: sf,
})
}
return &filterAnd{
filters: filters,
}
}
t.Run("missing-tenant-smaller-than-existing", func(_ *testing.T) {
tenantID := TenantID{
AccountID: 0,
ProjectID: 0,
}
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
f := getBaseFilter(minTimestamp, maxTimestamp, nil)
so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
})
t.Run("missing-tenant-bigger-than-existing", func(_ *testing.T) {
tenantID := TenantID{
AccountID: tenantsCount + 1,
ProjectID: 0,
}
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
f := getBaseFilter(minTimestamp, maxTimestamp, nil)
so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
})
t.Run("missing-tenant-middle", func(_ *testing.T) {
tenantID := TenantID{
AccountID: 1,
ProjectID: 0,
}
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
f := getBaseFilter(minTimestamp, maxTimestamp, nil)
so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
})
t.Run("matching-tenant-id", func(t *testing.T) {
for i := 0; i < tenantsCount; i++ {
tenantID := TenantID{
AccountID: uint32(i),
ProjectID: uint32(10*i + 1),
}
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
f := getBaseFilter(minTimestamp, maxTimestamp, nil)
so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
}
}
})
t.Run("matching-multiple-tenant-ids", func(t *testing.T) {
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
f := getBaseFilter(minTimestamp, maxTimestamp, nil)
so := newTestGenericSearchOptions(allTenantIDs, f, []string{"_msg"})
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("stream-filter-mismatch", func(_ *testing.T) {
sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-.+:2345"}`)
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
f := getBaseFilter(minTimestamp, maxTimestamp, sf)
so := newTestGenericSearchOptions(allTenantIDs, f, []string{"_msg"})
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
})
t.Run("matching-stream-id", func(t *testing.T) {
for i := 0; i < streamsPerTenant; i++ {
sf := mustNewTestStreamFilter(fmt.Sprintf(`{job="foobar",instance="host-%d:234"}`, i))
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
f := getBaseFilter(minTimestamp, maxTimestamp, sf)
so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
expectedRowsCount := blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
}
}
})
t.Run("matching-multiple-stream-ids", func(t *testing.T) {
sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-[^:]+:234"}`)
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
f := getBaseFilter(minTimestamp, maxTimestamp, sf)
so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("matching-multiple-stream-ids-with-re-filter", func(t *testing.T) {
sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-[^:]+:234"}`)
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
f := getBaseFilter(minTimestamp, maxTimestamp, sf)
f = &filterAnd{
filters: []filter{
f,
&filterRegexp{
fieldName: "_msg",
re: mustCompileRegex("message [02] at "),
},
},
}
so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("matching-stream-id-smaller-time-range", func(t *testing.T) {
sf := mustNewTestStreamFilter(`{job="foobar",instance="host-1:234"}`)
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
minTimestamp := baseTimestamp + (rowsPerBlock-2)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock-1)*1e9 - 1
f := getBaseFilter(minTimestamp, maxTimestamp, sf)
so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
expectedRowsCount := blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("matching-stream-id-missing-time-range", func(_ *testing.T) {
sf := mustNewTestStreamFilter(`{job="foobar",instance="host-1:234"}`)
tenantID := TenantID{
AccountID: 1,
ProjectID: 11,
}
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
f := getBaseFilter(minTimestamp, maxTimestamp, sf)
so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
})
s.MustClose()
fs.MustRemoveAll(path)
}
func TestParseStreamFieldsSuccess(t *testing.T) {
t.Parallel()
f := func(s, resultExpected string) {
t.Helper()
labels, err := parseStreamFields(nil, s)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
result := MarshalFieldsToJSON(nil, labels)
if string(result) != resultExpected {
t.Fatalf("unexpected result\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
f(`{}`, `{}`)
f(`{foo="bar"}`, `{"foo":"bar"}`)
f(`{a="b",c="d"}`, `{"a":"b","c":"d"}`)
f(`{a="a=,b\"c}",b="d"}`, `{"a":"a=,b\"c}","b":"d"}`)
}
func newTestGenericSearchOptions(tenantIDs []TenantID, f filter, neededColumns []string) *genericSearchOptions {
return &genericSearchOptions{
tenantIDs: tenantIDs,
minTimestamp: math.MinInt64,
maxTimestamp: math.MaxInt64,
filter: f,
neededColumnNames: neededColumns,
}
}