From 2d674f98d4a736ec78b3bae989c881cb595d46c7 Mon Sep 17 00:00:00 2001 From: Dmytro Kozlov Date: Sun, 18 Feb 2024 21:58:47 +0100 Subject: [PATCH] Enable the `limit` query param for the `/select/logsql/query` (#5778) * app/vlselect: add limit for logs query * app/vlselect: CHANGELOG.md * app/vlselect: stop search process if limit is reached, update logic, remove default limit * app/vlselect: fix tests * app/vlselect: fix filter tests * app/vlselect: fix tests --- app/vlselect/logsql/logsql.go | 23 +++++++++-- app/vlstorage/main.go | 2 +- docs/CHANGELOG.md | 1 + lib/logstorage/filters_test.go | 3 +- lib/logstorage/storage_search.go | 40 +++++++++++++----- lib/logstorage/storage_search_test.go | 59 ++++++++++++++++----------- 6 files changed, 88 insertions(+), 40 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index f525335bea..343593f44c 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -2,11 +2,13 @@ package logsql import ( "net/http" + "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) @@ -31,23 +33,36 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err) return } + limit, err := httputils.GetInt(r, "limit") + if err != nil { + httpserver.Errorf(w, r, "cannot parse limit from the request: %s", err) + return + } w.Header().Set("Content-Type", "application/stream+json; charset=utf-8") - sw := getSortWriter() sw.Init(w, maxSortBufferSize.IntN()) tenantIDs := []logstorage.TenantID{tenantID} - vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) { + + var mx sync.Mutex + vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) bool { if len(columns) == 0 { - return + return true } rowsCount := len(columns[0].Values) - + mx.Lock() + if rowsCount > limit { + rowsCount = limit + } + limit = limit - rowsCount + mx.Unlock() bb := blockResultPool.Get() for rowIdx := 0; rowIdx < rowsCount; rowIdx++ { WriteJSONRow(bb, columns, rowIdx) } sw.MustWrite(bb.B) blockResultPool.Put(bb) + + return limit == 0 }) sw.FinalFlush() putSortWriter(sw) diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index b1b55675c9..ed5eba88d9 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -100,7 +100,7 @@ func MustAddRows(lr *logstorage.LogRows) { } // RunQuery runs the given q and calls processBlock for the returned data blocks -func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn)) { +func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn) bool) { strg.RunQuery(tenantIDs, q, stopCh, processBlock) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 6a4bc7ec2a..d8e747b4ea 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -63,6 +63,7 @@ Released at 2024-02-14 * FEATURE: [dashboards/all](https://grafana.com/orgs/victoriametrics): add new panel `CPU spent on GC`. It should help identifying cases when too much CPU is spent on garbage collection, and advice users on how this can be addressed. * FEATURE: [vmalert](https://docs.victoriametrics.com/#vmalert): support [filtering](https://prometheus.io/docs/prometheus/2.49/querying/api/#rules) for `/api/v1/rules` API. See [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5749) by @victoramsantos. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): support client-side TLS configuration for creating and deleting snapshots via `-snapshot.tls*` cmd-line flags. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5724). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5738). +* FEATURE: enable `limit` query param for `/select/logsql/query` VictoriaLogs API endpoint. By default this limit sets to 1000 lines, but it can be changed via query param. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce CPU usage when `-promscrape.dropOriginalLabels` command-line flag is set. This issue has been introduced in [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0) when addressing [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5389). * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): properly release memory during config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4690). diff --git a/lib/logstorage/filters_test.go b/lib/logstorage/filters_test.go index cf7d6e7827..a5d017ee47 100644 --- a/lib/logstorage/filters_test.go +++ b/lib/logstorage/filters_test.go @@ -9226,7 +9226,7 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi resultColumnNames: []string{resultColumnName}, } workersCount := 3 - s.search(workersCount, so, nil, func(workerID uint, br *blockResult) { + s.search(workersCount, so, nil, func(workerID uint, br *blockResult) bool { // Verify tenantID if !br.streamID.tenantID.equal(&tenantID) { t.Fatalf("unexpected tenantID in blockResult; got %s; want %s", &br.streamID.tenantID, &tenantID) @@ -9248,6 +9248,7 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi if !reflect.DeepEqual(br.timestamps, expectedTimestamps) { t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", br.timestamps, expectedTimestamps) } + return false }) } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index c728d9998a..136e2b8212 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -1,6 +1,7 @@ package logstorage import ( + "context" "math" "sort" "sync" @@ -43,7 +44,7 @@ type searchOptions struct { } // RunQuery runs the given q and calls processBlock for results -func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(columns []BlockColumn)) { +func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(columns []BlockColumn) bool) { resultColumnNames := q.getResultColumnNames() so := &genericSearchOptions{ tenantIDs: tenantIDs, @@ -51,7 +52,7 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{ resultColumnNames: resultColumnNames, } workersCount := cgroup.AvailableCPUs() - s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) { + s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) bool { brs := getBlockRows() cs := brs.cs @@ -61,10 +62,11 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{ Values: br.getColumnValues(i), }) } - processBlock(cs) + limitReached := processBlock(cs) brs.cs = cs putBlockRows(brs) + return limitReached }) } @@ -118,7 +120,7 @@ const blockSearchWorksPerBatch = 64 // searchResultFunc must process sr. // // The callback is called at the worker with the given workerID. -type searchResultFunc func(workerID uint, br *blockResult) +type searchResultFunc func(workerID uint, br *blockResult) bool // search searches for the matching rows according to so. // @@ -128,19 +130,34 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch var wg sync.WaitGroup workCh := make(chan []*blockSearchWork, workersCount) wg.Add(workersCount) + + ctx, cancelFn := context.WithCancel(context.Background()) + for i := 0; i < workersCount; i++ { go func(workerID uint) { + defer wg.Done() bs := getBlockSearch() - for bsws := range workCh { - for _, bsw := range bsws { - bs.search(bsw) - if bs.br.RowsCount() > 0 { - processBlockResult(workerID, &bs.br) + defer putBlockSearch(bs) + for { + select { + case bsws, ok := <-workCh: + if !ok { + return } + for _, bsw := range bsws { + bs.search(bsw) + if bs.br.RowsCount() > 0 { + limitReached := processBlockResult(workerID, &bs.br) + if limitReached { + cancelFn() + return + } + } + } + case <-ctx.Done(): + return } } - putBlockSearch(bs) - wg.Done() }(uint(i)) } @@ -178,6 +195,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch // Wait until workers finish their work close(workCh) wg.Wait() + cancelFn() // Decrement references to parts for _, pw := range pws { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 98b8f902a1..9a0440d82d 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -84,7 +84,7 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 0, ProjectID: 0, } - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { panic(fmt.Errorf("unexpected match")) } tenantIDs := []TenantID{tenantID} @@ -96,7 +96,7 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { panic(fmt.Errorf("unexpected match")) } tenantIDs := []TenantID{tenantID} @@ -111,7 +111,7 @@ func TestStorageRunQuery(t *testing.T) { } expectedTenantID := tenantID.String() rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { hasTenantIDColumn := false var columnNames []string for _, c := range columns { @@ -132,6 +132,7 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames)) } atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -145,8 +146,9 @@ func TestStorageRunQuery(t *testing.T) { t.Run("matching-multiple-tenant-ids", func(t *testing.T) { q := mustParseQuery(`"log message"`) rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + return false } s.RunQuery(allTenantIDs, q, nil, processBlock) @@ -158,8 +160,9 @@ func TestStorageRunQuery(t *testing.T) { t.Run("matching-in-filter", func(t *testing.T) { q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`) rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + return false } s.RunQuery(allTenantIDs, q, nil, processBlock) @@ -170,7 +173,7 @@ func TestStorageRunQuery(t *testing.T) { }) t.Run("stream-filter-mismatch", func(t *testing.T) { q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`) - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { panic(fmt.Errorf("unexpected match")) } s.RunQuery(allTenantIDs, q, nil, processBlock) @@ -184,7 +187,7 @@ func TestStorageRunQuery(t *testing.T) { } expectedStreamID := fmt.Sprintf("stream_id=%d", i) rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { hasStreamIDColumn := false var columnNames []string for _, c := range columns { @@ -205,6 +208,7 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames)) } atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -222,8 +226,9 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -242,8 +247,9 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -262,8 +268,9 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -281,7 +288,7 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { panic(fmt.Errorf("unexpected match")) } tenantIDs := []TenantID{tenantID} @@ -295,7 +302,7 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(columns []BlockColumn) { + processBlock := func(columns []BlockColumn) bool { panic(fmt.Errorf("unexpected match")) } tenantIDs := []TenantID{tenantID} @@ -405,7 +412,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock) @@ -423,7 +430,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock) @@ -441,7 +448,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock) @@ -461,11 +468,12 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + return false } s.search(workersCount, so, nil, processBlock) @@ -485,8 +493,9 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + return false } s.search(workersCount, so, nil, processBlock) @@ -505,7 +514,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock) @@ -526,11 +535,12 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + return false } s.search(workersCount, so, nil, processBlock) @@ -555,11 +565,12 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + return false } s.search(workersCount, so, nil, processBlock) @@ -592,11 +603,12 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + return false } s.search(workersCount, so, nil, processBlock) @@ -620,8 +632,9 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + return false } s.search(workersCount, so, nil, processBlock) @@ -644,7 +657,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) { + processBlock := func(workerID uint, br *blockResult) bool { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock)