package logstorage

import (
	"context"
	"fmt"
	"math"
	"reflect"
	"sort"
	"strings"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)

func TestStorageRunQuery(t *testing.T) {
	t.Parallel()

	path := t.Name()

	const tenantsCount = 11
	const streamsPerTenant = 3
	const blocksPerStream = 5
	const rowsPerBlock = 7

	sc := &StorageConfig{
		Retention: 24 * time.Hour,
	}
	s := MustOpenStorage(path, sc)

	// fill the storage with data
	var allTenantIDs []TenantID
	baseTimestamp := time.Now().UnixNano() - 3600*1e9
	var fields []Field
	streamTags := []string{
		"job",
		"instance",
	}
	for i := 0; i < tenantsCount; i++ {
		tenantID := TenantID{
			AccountID: uint32(i),
			ProjectID: uint32(10*i + 1),
		}
		allTenantIDs = append(allTenantIDs, tenantID)
		for j := 0; j < streamsPerTenant; j++ {
			streamIDValue := fmt.Sprintf("stream_id=%d", j)
			for k := 0; k < blocksPerStream; k++ {
				lr := GetLogRows(streamTags, nil)
				for m := 0; m < rowsPerBlock; m++ {
					timestamp := baseTimestamp + int64(m)*1e9 + int64(k)
					// Append stream fields
					fields = append(fields[:0], Field{
						Name:  "job",
						Value: "foobar",
					}, Field{
						Name:  "instance",
						Value: fmt.Sprintf("host-%d:234", j),
					})
					// append the remaining fields
					fields = append(fields, Field{
						Name:  "_msg",
						Value: fmt.Sprintf("log message %d at block %d", m, k),
					})
					fields = append(fields, Field{
						Name:  "source-file",
						Value: "/foo/bar/baz",
					})
					fields = append(fields, Field{
						Name:  "tenant.id",
						Value: tenantID.String(),
					})
					fields = append(fields, Field{
						Name:  "stream-id",
						Value: streamIDValue,
					})
					lr.MustAdd(tenantID, timestamp, fields)
				}
				s.MustAddRows(lr)
				PutLogRows(lr)
			}
		}
	}
	s.debugFlush()

	mustRunQuery := func(t *testing.T, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) {
		t.Helper()
		err := s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
		if err != nil {
			t.Fatalf("unexpected error returned from the query [%s]: %s", q, err)
		}
	}

	// run tests on the storage data
	t.Run("missing-tenant", func(t *testing.T) {
		q := mustParseQuery(`"log message"`)
		tenantID := TenantID{
			AccountID: 0,
			ProjectID: 0,
		}
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
		}
		tenantIDs := []TenantID{tenantID}
		mustRunQuery(t, tenantIDs, q, writeBlock)
	})
	t.Run("missing-message-text", func(t *testing.T) {
		q := mustParseQuery(`foobar`)
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
		}
		tenantIDs := []TenantID{tenantID}
		mustRunQuery(t, tenantIDs, q, writeBlock)
	})
	t.Run("matching-tenant-id", func(t *testing.T) {
		q := mustParseQuery(`tenant.id:*`)
		for i := 0; i < tenantsCount; i++ {
			tenantID := TenantID{
				AccountID: uint32(i),
				ProjectID: uint32(10*i + 1),
			}
			expectedTenantID := tenantID.String()
			var rowsCountTotal atomic.Uint32
			writeBlock := func(_ uint, timestamps []int64, columns []BlockColumn) {
				hasTenantIDColumn := false
				var columnNames []string
				for _, c := range columns {
					if c.Name == "tenant.id" {
						hasTenantIDColumn = true
						if len(c.Values) != len(timestamps) {
							panic(fmt.Errorf("unexpected number of rows in column %q; got %d; want %d", c.Name, len(c.Values), len(timestamps)))
						}
						for _, v := range c.Values {
							if v != expectedTenantID {
								panic(fmt.Errorf("unexpected tenant.id; got %s; want %s", v, expectedTenantID))
							}
						}
					}
					columnNames = append(columnNames, c.Name)
				}
				if !hasTenantIDColumn {
					panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames))
				}
				rowsCountTotal.Add(uint32(len(timestamps)))
			}
			tenantIDs := []TenantID{tenantID}
			mustRunQuery(t, tenantIDs, q, writeBlock)

			expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
			if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
				t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
			}
		}
	})
	t.Run("matching-multiple-tenant-ids", func(t *testing.T) {
		q := mustParseQuery(`"log message"`)
		var rowsCountTotal atomic.Uint32
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			rowsCountTotal.Add(uint32(len(timestamps)))
		}
		mustRunQuery(t, allTenantIDs, q, writeBlock)

		expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
		if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
			t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
		}
	})
	t.Run("matching-in-filter", func(t *testing.T) {
		q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`)
		var rowsCountTotal atomic.Uint32
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			rowsCountTotal.Add(uint32(len(timestamps)))
		}
		mustRunQuery(t, allTenantIDs, q, writeBlock)

		expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
		if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
			t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
		}
	})
	t.Run("stream-filter-mismatch", func(t *testing.T) {
		q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`)
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
		}
		mustRunQuery(t, allTenantIDs, q, writeBlock)
	})
	t.Run("matching-stream-id", func(t *testing.T) {
		for i := 0; i < streamsPerTenant; i++ {
			q := mustParseQuery(fmt.Sprintf(`log _stream:{job="foobar",instance="host-%d:234"} AND stream-id:*`, i))
			tenantID := TenantID{
				AccountID: 1,
				ProjectID: 11,
			}
			expectedStreamID := fmt.Sprintf("stream_id=%d", i)
			var rowsCountTotal atomic.Uint32
			writeBlock := func(_ uint, timestamps []int64, columns []BlockColumn) {
				hasStreamIDColumn := false
				var columnNames []string
				for _, c := range columns {
					if c.Name == "stream-id" {
						hasStreamIDColumn = true
						if len(c.Values) != len(timestamps) {
							panic(fmt.Errorf("unexpected number of rows for column %q; got %d; want %d", c.Name, len(c.Values), len(timestamps)))
						}
						for _, v := range c.Values {
							if v != expectedStreamID {
								panic(fmt.Errorf("unexpected stream-id; got %s; want %s", v, expectedStreamID))
							}
						}
					}
					columnNames = append(columnNames, c.Name)
				}
				if !hasStreamIDColumn {
					panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames))
				}
				rowsCountTotal.Add(uint32(len(timestamps)))
			}
			tenantIDs := []TenantID{tenantID}
			mustRunQuery(t, tenantIDs, q, writeBlock)

			expectedRowsCount := blocksPerStream * rowsPerBlock
			if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
				t.Fatalf("unexpected number of rows for stream %d; got %d; want %d", i, n, expectedRowsCount)
			}
		}
	})
	t.Run("matching-multiple-stream-ids-with-re-filter", func(t *testing.T) {
		q := mustParseQuery(`_msg:log _stream:{job="foobar",instance=~"host-[^:]+:234"} and re("message [02] at")`)
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		var rowsCountTotal atomic.Uint32
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			rowsCountTotal.Add(uint32(len(timestamps)))
		}
		tenantIDs := []TenantID{tenantID}
		mustRunQuery(t, tenantIDs, q, writeBlock)

		expectedRowsCount := streamsPerTenant * blocksPerStream * 2
		if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
		}
	})
	t.Run("matching-time-range", func(t *testing.T) {
		minTimestamp := baseTimestamp + (rowsPerBlock-2)*1e9
		maxTimestamp := baseTimestamp + (rowsPerBlock-1)*1e9 - 1
		q := mustParseQuery(fmt.Sprintf(`_time:[%f,%f]`, float64(minTimestamp)/1e9, float64(maxTimestamp)/1e9))
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		var rowsCountTotal atomic.Uint32
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			rowsCountTotal.Add(uint32(len(timestamps)))
		}
		tenantIDs := []TenantID{tenantID}
		mustRunQuery(t, tenantIDs, q, writeBlock)

		expectedRowsCount := streamsPerTenant * blocksPerStream
		if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
		}
	})
	t.Run("matching-stream-id-with-time-range", func(t *testing.T) {
		minTimestamp := baseTimestamp + (rowsPerBlock-2)*1e9
		maxTimestamp := baseTimestamp + (rowsPerBlock-1)*1e9 - 1
		q := mustParseQuery(fmt.Sprintf(`_time:[%f,%f] _stream:{job="foobar",instance="host-1:234"}`, float64(minTimestamp)/1e9, float64(maxTimestamp)/1e9))
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		var rowsCountTotal atomic.Uint32
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			rowsCountTotal.Add(uint32(len(timestamps)))
		}
		tenantIDs := []TenantID{tenantID}
		mustRunQuery(t, tenantIDs, q, writeBlock)

		expectedRowsCount := blocksPerStream
		if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
		}
	})
	t.Run("matching-stream-id-missing-time-range", func(t *testing.T) {
		minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
		maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
		q := mustParseQuery(fmt.Sprintf(`_stream:{job="foobar",instance="host-1:234"} _time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9))
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
		}
		tenantIDs := []TenantID{tenantID}
		mustRunQuery(t, tenantIDs, q, writeBlock)
	})
	t.Run("missing-time-range", func(t *testing.T) {
		minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
		maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
		q := mustParseQuery(fmt.Sprintf(`_time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9))
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
			panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
		}
		tenantIDs := []TenantID{tenantID}
		mustRunQuery(t, tenantIDs, q, writeBlock)
	})
	t.Run("field_names-all", func(t *testing.T) {
		q := mustParseQuery("*")
		results, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		resultsExpected := []ValueWithHits{
			{"_msg", 1155},
			{"_stream", 1155},
			{"_stream_id", 1155},
			{"_time", 1155},
			{"instance", 1155},
			{"job", 1155},
			{"source-file", 1155},
			{"stream-id", 1155},
			{"tenant.id", 1155},
		}
		if !reflect.DeepEqual(results, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
		}
	})
	t.Run("field_names-some", func(t *testing.T) {
		q := mustParseQuery(`_stream:{instance=~"host-1:.+"}`)
		results, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		resultsExpected := []ValueWithHits{
			{"_msg", 385},
			{"_stream", 385},
			{"_stream_id", 385},
			{"_time", 385},
			{"instance", 385},
			{"job", 385},
			{"source-file", 385},
			{"stream-id", 385},
			{"tenant.id", 385},
		}
		if !reflect.DeepEqual(results, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
		}
	})
	t.Run("field_values-nolimit", func(t *testing.T) {
		q := mustParseQuery("*")
		results, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 0)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		resultsExpected := []ValueWithHits{
			{`{instance="host-0:234",job="foobar"}`, 385},
			{`{instance="host-1:234",job="foobar"}`, 385},
			{`{instance="host-2:234",job="foobar"}`, 385},
		}
		if !reflect.DeepEqual(results, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
		}
	})
	t.Run("field_values-limit", func(t *testing.T) {
		q := mustParseQuery("*")
		results, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 3)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		resultsExpected := []ValueWithHits{
			{`{instance="host-0:234",job="foobar"}`, 0},
			{`{instance="host-1:234",job="foobar"}`, 0},
			{`{instance="host-2:234",job="foobar"}`, 0},
		}
		if !reflect.DeepEqual(results, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
		}
	})
	t.Run("field_values-limit", func(t *testing.T) {
		q := mustParseQuery("instance:='host-1:234'")
		results, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 4)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		resultsExpected := []ValueWithHits{
			{`{instance="host-1:234",job="foobar"}`, 385},
		}
		if !reflect.DeepEqual(results, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
		}
	})
	t.Run("stream_field_names", func(t *testing.T) {
		q := mustParseQuery("*")
		results, err := s.GetStreamFieldNames(context.Background(), allTenantIDs, q)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		resultsExpected := []ValueWithHits{
			{"instance", 1155},
			{"job", 1155},
		}
		if !reflect.DeepEqual(results, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
		}
	})
	t.Run("stream_field_values-nolimit", func(t *testing.T) {
		q := mustParseQuery("*")
		results, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 0)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		resultsExpected := []ValueWithHits{
			{`host-0:234`, 385},
			{`host-1:234`, 385},
			{`host-2:234`, 385},
		}
		if !reflect.DeepEqual(results, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
		}
	})
	t.Run("stream_field_values-limit", func(t *testing.T) {
		q := mustParseQuery("*")
		values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 3)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		resultsExpected := []ValueWithHits{
			{`host-0:234`, 385},
			{`host-1:234`, 385},
			{`host-2:234`, 385},
		}
		if !reflect.DeepEqual(values, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultsExpected)
		}
	})
	t.Run("streams", func(t *testing.T) {
		q := mustParseQuery("*")
		results, err := s.GetStreams(context.Background(), allTenantIDs, q, 0)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		resultsExpected := []ValueWithHits{
			{`{instance="host-0:234",job="foobar"}`, 385},
			{`{instance="host-1:234",job="foobar"}`, 385},
			{`{instance="host-2:234",job="foobar"}`, 385},
		}
		if !reflect.DeepEqual(results, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
		}
	})
	t.Run("stream_ids", func(t *testing.T) {
		q := mustParseQuery("*")
		results, err := s.GetStreamIDs(context.Background(), allTenantIDs, q, 0)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}

		// Verify the first 5 results with the smallest _stream_id value.
		sort.Slice(results, func(i, j int) bool {
			return results[i].Value < results[j].Value
		})
		results = results[:5]

		resultsExpected := []ValueWithHits{
			{"000000000000000140c1914be0226f8185f5b00551fb3b2d", 35},
			{"000000000000000177edafcd46385c778b57476eb5b92233", 35},
			{"0000000000000001f5b4cae620b5e85d6ef5f2107fe00274", 35},
			{"000000010000000b40c1914be0226f8185f5b00551fb3b2d", 35},
			{"000000010000000b77edafcd46385c778b57476eb5b92233", 35},
		}
		if !reflect.DeepEqual(results, resultsExpected) {
			t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
		}
	})

	// Run more complex tests
	f := func(t *testing.T, query string, rowsExpected [][]Field) {
		t.Helper()

		q := mustParseQuery(query)
		var resultRowsLock sync.Mutex
		var resultRows [][]Field
		writeBlock := func(_ uint, _ []int64, bcs []BlockColumn) {
			if len(bcs) == 0 {
				return
			}

			for i := 0; i < len(bcs[0].Values); i++ {
				row := make([]Field, len(bcs))
				for j, bc := range bcs {
					row[j] = Field{
						Name:  strings.Clone(bc.Name),
						Value: strings.Clone(bc.Values[i]),
					}
				}
				resultRowsLock.Lock()
				resultRows = append(resultRows, row)
				resultRowsLock.Unlock()
			}
		}
		mustRunQuery(t, allTenantIDs, q, writeBlock)

		assertRowsEqual(t, resultRows, rowsExpected)
	}

	t.Run("stats-count-total", func(t *testing.T) {
		f(t, `* | stats count() rows`, [][]Field{
			{
				{"rows", "1155"},
			},
		})
	})
	t.Run("_stream_id-filter", func(t *testing.T) {
		f(t, `_stream_id:in(tenant.id:2 | fields _stream_id) | stats count() rows`, [][]Field{
			{
				{"rows", "105"},
			},
		})
	})
	t.Run("in-filter-with-subquery-match", func(t *testing.T) {
		f(t, `tenant.id:in(tenant.id:2 | fields tenant.id) | stats count() rows`, [][]Field{
			{
				{"rows", "105"},
			},
		})
	})
	t.Run("in-filter-with-subquery-mismatch", func(t *testing.T) {
		f(t, `tenant.id:in(tenant.id:23243 | fields tenant.id) | stats count() rows`, [][]Field{
			{
				{"rows", "0"},
			},
		})
	})
	t.Run("conditional-stats", func(t *testing.T) {
		f(t, `* | stats
			count() rows_total,
			count() if (stream-id:0) stream_0_rows,
			count() if (stream-id:1123) stream_x_rows
		`, [][]Field{
			{
				{"rows_total", "1155"},
				{"stream_0_rows", "385"},
				{"stream_x_rows", "0"},
			},
		})
	})
	t.Run("in-filter-with-subquery-in-conditional-stats-mismatch", func(t *testing.T) {
		f(t, `* | stats
			count() rows_total,
			count() if (tenant.id:in(tenant.id:3 | fields tenant.id)) rows_nonzero,
			count() if (tenant.id:in(tenant.id:23243 | fields tenant.id)) rows_zero
		`, [][]Field{
			{
				{"rows_total", "1155"},
				{"rows_nonzero", "105"},
				{"rows_zero", "0"},
			},
		})
	})
	t.Run("pipe-extract", func(t *testing.T) {
		f(t, `* | extract "host-<host>:" from instance | uniq (host) with hits | sort by (host)`, [][]Field{
			{
				{"host", "0"},
				{"hits", "385"},
			},
			{
				{"host", "1"},
				{"hits", "385"},
			},
			{
				{"host", "2"},
				{"hits", "385"},
			},
		})
	})
	t.Run("pipe-extract-if-filter-with-subquery", func(t *testing.T) {
		f(t, `* | extract
				if (tenant.id:in(tenant.id:(3 or 4) | fields tenant.id))
				"host-<host>:" from instance
			| filter host:~"1|2"
			| uniq (tenant.id, host) with hits
			| sort by (tenant.id, host)`, [][]Field{
			{
				{"tenant.id", "{accountID=3,projectID=31}"},
				{"host", "1"},
				{"hits", "35"},
			},
			{
				{"tenant.id", "{accountID=3,projectID=31}"},
				{"host", "2"},
				{"hits", "35"},
			},
			{
				{"tenant.id", "{accountID=4,projectID=41}"},
				{"host", "1"},
				{"hits", "35"},
			},
			{
				{"tenant.id", "{accountID=4,projectID=41}"},
				{"host", "2"},
				{"hits", "35"},
			},
		})
	})
	t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(t *testing.T) {
		f(t, `* | extract
				if (tenant.id:in(tenant.id:3 | fields tenant.id))
				"host-<host>:" from instance
			| filter host:*
			| uniq (host) with hits
			| sort by (host)`, [][]Field{
			{
				{"host", "0"},
				{"hits", "35"},
			},
			{
				{"host", "1"},
				{"hits", "35"},
			},
			{
				{"host", "2"},
				{"hits", "35"},
			},
		})
	})
	t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(t *testing.T) {
		f(t, `* | extract
				if (tenant.id:in(tenant.id:3 | fields tenant.id))
				"host-<host>:" from instance
			| filter host:""
			| uniq (host) with hits
			| sort by (host)`, [][]Field{
			{
				{"host", ""},
				{"hits", "1050"},
			},
		})
	})
	t.Run("stream_context-noop-1", func(t *testing.T) {
		f(t, `"message 3 at block 1"
			| stream_context before 0
			| stats count() rows`, [][]Field{
			{
				{"rows", "33"},
			},
		})
	})
	t.Run("stream_context-noop-2", func(t *testing.T) {
		f(t, `"message 3 at block 1"
			| stream_context before 0 after 0
			| stats count() rows`, [][]Field{
			{
				{"rows", "33"},
			},
		})
	})
	t.Run("stream_context-before-1", func(t *testing.T) {
		f(t, `"message 3 at block 1"
			| stream_context before 1
			| stats count() rows`, [][]Field{
			{
				{"rows", "66"},
			},
		})
	})
	t.Run("stream_context-after-1", func(t *testing.T) {
		f(t, `"message 3 at block 1"
			| stream_context after 1
			| stats count() rows`, [][]Field{
			{
				{"rows", "66"},
			},
		})
	})
	t.Run("stream_context-before-after-1", func(t *testing.T) {
		f(t, `"message 3 at block 1"
			| stream_context before 1 after 1
			| stats count() rows`, [][]Field{
			{
				{"rows", "99"},
			},
		})
	})
	t.Run("stream_context-before-1000", func(t *testing.T) {
		f(t, `"message 4"
			| stream_context before 1000
			| stats count() rows`, [][]Field{
			{
				{"rows", "825"},
			},
		})
	})
	t.Run("stream_context-after-1000", func(t *testing.T) {
		f(t, `"message 4"
			| stream_context after 1000
			| stats count() rows`, [][]Field{
			{
				{"rows", "495"},
			},
		})
	})
	t.Run("stream_context-before-after-1000", func(t *testing.T) {
		f(t, `"message 4"
			| stream_context before 1000 after 1000
			| stats count() rows`, [][]Field{
			{
				{"rows", "1155"},
			},
		})
	})

	// Close the storage and delete its data
	s.MustClose()
	fs.MustRemoveAll(path)
}

func mustParseQuery(query string) *Query {
	q, err := ParseQuery(query)
	if err != nil {
		panic(fmt.Errorf("BUG: cannot parse [%s]: %w", query, err))
	}
	return q
}

func TestStorageSearch(t *testing.T) {
	t.Parallel()

	path := t.Name()

	const tenantsCount = 11
	const streamsPerTenant = 3
	const blocksPerStream = 5
	const rowsPerBlock = 7

	sc := &StorageConfig{
		Retention: 24 * time.Hour,
	}
	s := MustOpenStorage(path, sc)

	// fill the storage with data.
	var allTenantIDs []TenantID
	baseTimestamp := time.Now().UnixNano() - 3600*1e9
	var fields []Field
	streamTags := []string{
		"job",
		"instance",
	}
	for i := 0; i < tenantsCount; i++ {
		tenantID := TenantID{
			AccountID: uint32(i),
			ProjectID: uint32(10*i + 1),
		}
		allTenantIDs = append(allTenantIDs, tenantID)
		for j := 0; j < streamsPerTenant; j++ {
			for k := 0; k < blocksPerStream; k++ {
				lr := GetLogRows(streamTags, nil)
				for m := 0; m < rowsPerBlock; m++ {
					timestamp := baseTimestamp + int64(m)*1e9 + int64(k)
					// Append stream fields
					fields = append(fields[:0], Field{
						Name:  "job",
						Value: "foobar",
					}, Field{
						Name:  "instance",
						Value: fmt.Sprintf("host-%d:234", j),
					})
					// append the remaining fields
					fields = append(fields, Field{
						Name:  "_msg",
						Value: fmt.Sprintf("log message %d at block %d", m, k),
					})
					fields = append(fields, Field{
						Name:  "source-file",
						Value: "/foo/bar/baz",
					})
					lr.MustAdd(tenantID, timestamp, fields)
				}
				s.MustAddRows(lr)
				PutLogRows(lr)
			}
		}
	}
	s.debugFlush()

	// run tests on the filled storage
	const workersCount = 3

	getBaseFilter := func(minTimestamp, maxTimestamp int64, sf *StreamFilter) filter {
		var filters []filter
		filters = append(filters, &filterTime{
			minTimestamp: minTimestamp,
			maxTimestamp: maxTimestamp,
		})
		if sf != nil {
			filters = append(filters, &filterStream{
				f: sf,
			})
		}
		return &filterAnd{
			filters: filters,
		}
	}

	t.Run("missing-tenant-smaller-than-existing", func(_ *testing.T) {
		tenantID := TenantID{
			AccountID: 0,
			ProjectID: 0,
		}
		minTimestamp := baseTimestamp
		maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
		f := getBaseFilter(minTimestamp, maxTimestamp, nil)
		so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
		processBlock := func(_ uint, _ *blockResult) {
			panic(fmt.Errorf("unexpected match"))
		}
		s.search(workersCount, so, nil, processBlock)
	})
	t.Run("missing-tenant-bigger-than-existing", func(_ *testing.T) {
		tenantID := TenantID{
			AccountID: tenantsCount + 1,
			ProjectID: 0,
		}
		minTimestamp := baseTimestamp
		maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
		f := getBaseFilter(minTimestamp, maxTimestamp, nil)
		so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
		processBlock := func(_ uint, _ *blockResult) {
			panic(fmt.Errorf("unexpected match"))
		}
		s.search(workersCount, so, nil, processBlock)
	})
	t.Run("missing-tenant-middle", func(_ *testing.T) {
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 0,
		}
		minTimestamp := baseTimestamp
		maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
		f := getBaseFilter(minTimestamp, maxTimestamp, nil)
		so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
		processBlock := func(_ uint, _ *blockResult) {
			panic(fmt.Errorf("unexpected match"))
		}
		s.search(workersCount, so, nil, processBlock)
	})
	t.Run("matching-tenant-id", func(t *testing.T) {
		for i := 0; i < tenantsCount; i++ {
			tenantID := TenantID{
				AccountID: uint32(i),
				ProjectID: uint32(10*i + 1),
			}
			minTimestamp := baseTimestamp
			maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
			f := getBaseFilter(minTimestamp, maxTimestamp, nil)
			so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
			var rowsCountTotal atomic.Uint32
			processBlock := func(_ uint, br *blockResult) {
				rowsCountTotal.Add(uint32(len(br.timestamps)))
			}
			s.search(workersCount, so, nil, processBlock)

			expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
			if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
				t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
			}
		}
	})
	t.Run("matching-multiple-tenant-ids", func(t *testing.T) {
		minTimestamp := baseTimestamp
		maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
		f := getBaseFilter(minTimestamp, maxTimestamp, nil)
		so := newTestGenericSearchOptions(allTenantIDs, f, []string{"_msg"})
		var rowsCountTotal atomic.Uint32
		processBlock := func(_ uint, br *blockResult) {
			rowsCountTotal.Add(uint32(len(br.timestamps)))
		}
		s.search(workersCount, so, nil, processBlock)

		expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
		if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
			t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
		}
	})
	t.Run("stream-filter-mismatch", func(_ *testing.T) {
		sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-.+:2345"}`)
		minTimestamp := baseTimestamp
		maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
		f := getBaseFilter(minTimestamp, maxTimestamp, sf)
		so := newTestGenericSearchOptions(allTenantIDs, f, []string{"_msg"})
		processBlock := func(_ uint, _ *blockResult) {
			panic(fmt.Errorf("unexpected match"))
		}
		s.search(workersCount, so, nil, processBlock)
	})
	t.Run("matching-stream-id", func(t *testing.T) {
		for i := 0; i < streamsPerTenant; i++ {
			sf := mustNewTestStreamFilter(fmt.Sprintf(`{job="foobar",instance="host-%d:234"}`, i))
			tenantID := TenantID{
				AccountID: 1,
				ProjectID: 11,
			}
			minTimestamp := baseTimestamp
			maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
			f := getBaseFilter(minTimestamp, maxTimestamp, sf)
			so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
			var rowsCountTotal atomic.Uint32
			processBlock := func(_ uint, br *blockResult) {
				rowsCountTotal.Add(uint32(len(br.timestamps)))
			}
			s.search(workersCount, so, nil, processBlock)

			expectedRowsCount := blocksPerStream * rowsPerBlock
			if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
				t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
			}
		}
	})
	t.Run("matching-multiple-stream-ids", func(t *testing.T) {
		sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-[^:]+:234"}`)
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		minTimestamp := baseTimestamp
		maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
		f := getBaseFilter(minTimestamp, maxTimestamp, sf)
		so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
		var rowsCountTotal atomic.Uint32
		processBlock := func(_ uint, br *blockResult) {
			rowsCountTotal.Add(uint32(len(br.timestamps)))
		}
		s.search(workersCount, so, nil, processBlock)

		expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
		if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
		}
	})
	t.Run("matching-multiple-stream-ids-with-re-filter", func(t *testing.T) {
		sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-[^:]+:234"}`)
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		minTimestamp := baseTimestamp
		maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
		f := getBaseFilter(minTimestamp, maxTimestamp, sf)
		f = &filterAnd{
			filters: []filter{
				f,
				&filterRegexp{
					fieldName: "_msg",
					re:        mustCompileRegex("message [02] at "),
				},
			},
		}
		so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
		var rowsCountTotal atomic.Uint32
		processBlock := func(_ uint, br *blockResult) {
			rowsCountTotal.Add(uint32(len(br.timestamps)))
		}
		s.search(workersCount, so, nil, processBlock)

		expectedRowsCount := streamsPerTenant * blocksPerStream * 2
		if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
		}
	})
	t.Run("matching-stream-id-smaller-time-range", func(t *testing.T) {
		sf := mustNewTestStreamFilter(`{job="foobar",instance="host-1:234"}`)
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		minTimestamp := baseTimestamp + (rowsPerBlock-2)*1e9
		maxTimestamp := baseTimestamp + (rowsPerBlock-1)*1e9 - 1
		f := getBaseFilter(minTimestamp, maxTimestamp, sf)
		so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
		var rowsCountTotal atomic.Uint32
		processBlock := func(_ uint, br *blockResult) {
			rowsCountTotal.Add(uint32(len(br.timestamps)))
		}
		s.search(workersCount, so, nil, processBlock)

		expectedRowsCount := blocksPerStream
		if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
		}
	})
	t.Run("matching-stream-id-missing-time-range", func(_ *testing.T) {
		sf := mustNewTestStreamFilter(`{job="foobar",instance="host-1:234"}`)
		tenantID := TenantID{
			AccountID: 1,
			ProjectID: 11,
		}
		minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
		maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
		f := getBaseFilter(minTimestamp, maxTimestamp, sf)
		so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"})
		processBlock := func(_ uint, _ *blockResult) {
			panic(fmt.Errorf("unexpected match"))
		}
		s.search(workersCount, so, nil, processBlock)
	})

	s.MustClose()
	fs.MustRemoveAll(path)
}

func TestParseStreamFieldsSuccess(t *testing.T) {
	t.Parallel()

	f := func(s, resultExpected string) {
		t.Helper()

		labels, err := parseStreamFields(nil, s)
		if err != nil {
			t.Fatalf("unexpected error: %s", err)
		}
		result := MarshalFieldsToJSON(nil, labels)
		if string(result) != resultExpected {
			t.Fatalf("unexpected result\ngot\n%s\nwant\n%s", result, resultExpected)
		}
	}

	f(`{}`, `{}`)
	f(`{foo="bar"}`, `{"foo":"bar"}`)
	f(`{a="b",c="d"}`, `{"a":"b","c":"d"}`)
	f(`{a="a=,b\"c}",b="d"}`, `{"a":"a=,b\"c}","b":"d"}`)
}

func newTestGenericSearchOptions(tenantIDs []TenantID, f filter, neededColumns []string) *genericSearchOptions {
	return &genericSearchOptions{
		tenantIDs:         tenantIDs,
		minTimestamp:      math.MinInt64,
		maxTimestamp:      math.MaxInt64,
		filter:            f,
		neededColumnNames: neededColumns,
	}
}