From 52929c060a2bd5045d345780a44d1d6372d18cc3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 8 Nov 2024 13:33:40 +0100 Subject: [PATCH] app/vlselect/logsql: call Query.Optimize() inside parseCommonArgs(), which is called et every /select/logsql/* endpoint. This reduces the probability of forgotten call to Query.Optimize(). (cherry picked from commit 055009380216eddba2c2e14b24e22684c1ceff8b) --- app/vlselect/logsql/logsql.go | 28 +++------------------------- lib/logstorage/parser.go | 12 +++++++++++- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 6113918bd6..a6a1cfc891 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -73,7 +73,6 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ } // Prepare the query for hits count. - q.Optimize() q.DropAllPipes() q.AddCountByTimePipe(int64(step), int64(offset), fields) @@ -204,7 +203,6 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt } // Obtain field names for the given query - q.Optimize() fieldNames, err := vlstorage.GetFieldNames(ctx, tenantIDs, q) if err != nil { httpserver.Errorf(w, r, "cannot obtain field names: %s", err) @@ -244,7 +242,6 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht } // Obtain unique values for the given field - q.Optimize() values, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit)) if err != nil { httpserver.Errorf(w, r, "cannot obtain values for field %q: %s", fieldName, err) @@ -267,7 +264,6 @@ func ProcessStreamFieldNamesRequest(ctx context.Context, w http.ResponseWriter, } // Obtain stream field names for the given query - q.Optimize() names, err := vlstorage.GetStreamFieldNames(ctx, tenantIDs, q) if err != nil { httpserver.Errorf(w, r, "cannot obtain stream field names: %s", err) @@ -306,7 +302,6 @@ func ProcessStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter, } // Obtain stream field values for the given query and the given fieldName - q.Optimize() values, err := vlstorage.GetStreamFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit)) if err != nil { httpserver.Errorf(w, r, "cannot obtain stream field values: %s", err) @@ -338,7 +333,6 @@ func ProcessStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http } // Obtain streamIDs for the given query - q.Optimize() streamIDs, err := vlstorage.GetStreamIDs(ctx, tenantIDs, q, uint64(limit)) if err != nil { httpserver.Errorf(w, r, "cannot obtain stream_ids: %s", err) @@ -370,7 +364,6 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R } // Obtain streams for the given query - q.Optimize() streams, err := vlstorage.GetStreams(ctx, tenantIDs, q, uint64(limit)) if err != nil { httpserver.Errorf(w, r, "cannot obtain streams: %s", err) @@ -398,7 +391,6 @@ func ProcessLiveTailRequest(ctx context.Context, w http.ResponseWriter, r *http. "see https://docs.victoriametrics.com/victorialogs/querying/#live-tailing for details", q) return } - q.Optimize() refreshIntervalMsecs, err := httputils.GetDuration(r, "refresh_interval", 1000) if err != nil { @@ -429,12 +421,7 @@ func ProcessLiveTailRequest(ctx context.Context, w http.ResponseWriter, r *http. } qOrig := q for { - q = qOrig.Clone(end) - q.AddTimeFilter(start, end) - // q.Optimize() call is needed for converting '*' into filterNoop. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785#issuecomment-2358547733 - q.Optimize() - + q = qOrig.CloneWithTimeFilter(end, start, end) if err := vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, tp.writeBlock); err != nil { httpserver.Errorf(w, r, "cannot execute tail query [%s]: %s", q, err) return @@ -613,8 +600,6 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r return } - q.Optimize() - m := make(map[string]*statsSeries) var mLock sync.Mutex @@ -725,8 +710,6 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt return } - q.Optimize() - var rows []statsRow var rowsLock sync.Mutex @@ -826,7 +809,6 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req q.AddPipeLimit(uint64(limit)) } - q.Optimize() writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { if len(columns) == 0 || len(columns[0].Values) == 0 { @@ -857,7 +839,6 @@ type row struct { func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) { limitUpper := 2 * limit q.AddPipeLimit(uint64(limitUpper)) - q.Optimize() rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper) if err != nil { @@ -877,11 +858,7 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, qOrig := q for { timestamp := qOrig.GetTimestamp() - q = qOrig.Clone(timestamp) - q.AddTimeFilter(start, end) - // q.Optimize() call is needed for converting '*' into filterNoop. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785#issuecomment-2358547733 - q.Optimize() + q = qOrig.CloneWithTimeFilter(timestamp, start, end) rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper) if err != nil { return nil, err @@ -1005,6 +982,7 @@ func parseCommonArgs(r *http.Request) (*logstorage.Query, []logstorage.TenantID, if err != nil { return nil, nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err) } + q.Optimize() // Parse optional start and end args start, okStart, err := getTimeNsec(r, "start") diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index b183bdd462..6282334cda 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -335,7 +335,7 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) { } } -// Clone returns a copy of q. +// Clone returns a copy of q at the given timestamp. func (q *Query) Clone(timestamp int64) *Query { qStr := q.String() qCopy, err := ParseQueryAtTimestamp(qStr, timestamp) @@ -345,6 +345,16 @@ func (q *Query) Clone(timestamp int64) *Query { return qCopy } +// CloneWithTimeFilter clones q at the given timestamp and adds _time:[start, end] filter to the cloned q. +func (q *Query) CloneWithTimeFilter(timestamp, start, end int64) *Query { + q = q.Clone(timestamp) + q.AddTimeFilter(start, end) + // q.Optimize() call is needed for converting '*' into filterNoop. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785#issuecomment-2358547733 + q.Optimize() + return q +} + // CanReturnLastNResults returns true if time range filter at q can be adjusted for returning the last N results. func (q *Query) CanReturnLastNResults() bool { for _, p := range q.pipes {