From e6deb390642adbac11124e15a8a68601e28f44e3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 25 Dec 2020 16:44:26 +0200 Subject: [PATCH] app/vmselect: refactor `/api/v1/stats/top_queries` Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 --- README.md | 2 + app/vmselect/main.go | 31 ++- app/vmselect/prometheus/prometheus.go | 22 +- app/vmselect/promql/exec.go | 9 +- app/vmselect/promql/query_stats.go | 254 ---------------------- app/vmselect/promql/query_stats_test.go | 144 ------------- app/vmselect/querystats/querystats.go | 271 ++++++++++++++++++++++++ docs/CHANGELOG.md | 1 + docs/Single-server-VictoriaMetrics.md | 12 +- 9 files changed, 310 insertions(+), 436 deletions(-) delete mode 100644 app/vmselect/promql/query_stats.go delete mode 100644 app/vmselect/promql/query_stats_test.go create mode 100644 app/vmselect/querystats/querystats.go diff --git a/README.md b/README.md index 6d798c3175..7281b22c14 100644 --- a/README.md +++ b/README.md @@ -214,6 +214,8 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr - `tags/autoComplete/values` - returns tag values matching the given `valuePrefix` and/or `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#auto-complete-support). - `tags/delSeries` - deletes series matching the given `path`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#removing-series-from-the-tagdb). +* URL for query stats across all tenants: `http://:8481/api/v1/status/top_queries`. It returns query lists with the most frequently executed queries and queries taking the most duration. + * URL for time series deletion: `http://:8481/delete//prometheus/api/v1/admin/tsdb/delete_series?match[]=`. Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't be used on a regular basis, since it carries non-zero overhead. diff --git a/app/vmselect/main.go b/app/vmselect/main.go index e5c0d5ee8f..af8e78b85a 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -171,6 +171,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { promql.ResetRollupResultCache() return true } + if path == "/api/v1/status/top_queries" { + topQueriesRequests.Inc() + if err := prometheus.QueryStatsHandler(startTime, w, r); err != nil { + topQueriesErrors.Inc() + sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) + return true + } + return true + } p, err := httpserver.ParsePath(path) if err != nil { @@ -274,25 +283,6 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, return true } return true - case "prometheus/api/v1/status/queries/avg_duration": - if err := prometheus.QueryStatsHandler(startTime, w, r, "avg_duration"); err != nil { - sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) - return true - } - return true - case "prometheus/api/v1/status/queries/duration": - if err := prometheus.QueryStatsHandler(startTime, w, r, "duration"); err != nil { - sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) - return true - } - return true - case "prometheus/api/v1/status/queries/frequency": - if err := prometheus.QueryStatsHandler(startTime, w, r, "frequency"); err != nil { - sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) - return true - } - return true - case "prometheus/api/v1/status/tsdb": statusTSDBRequests.Inc() if err := prometheus.TSDBStatusHandler(startTime, at, w, r); err != nil { @@ -511,6 +501,9 @@ var ( statusTSDBRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/status/tsdb"}`) statusTSDBErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/api/v1/status/tsdb"}`) + topQueriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/top_queries"}`) + topQueriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/top_queries"}`) + statusActiveQueriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}prometheus/api/v1/status/active_queries"}`) deleteRequests = metrics.NewCounter(`vm_http_requests_total{path="/delete/{}/prometheus/api/v1/admin/tsdb/delete_series"}`) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index d30c3864bc..879ac2e69c 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -1327,31 +1328,28 @@ func getLatencyOffsetMilliseconds() int64 { return d } -// QueryStatsHandler - returns statistics for queries executions with given aggregate func name. -// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 -func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, aggregateBy string) error { +// QueryStatsHandler returns query stats at `/api/v1/status/top_queries` +func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } - topN := 10 + topN := 20 topNStr := r.FormValue("topN") if len(topNStr) > 0 { n, err := strconv.Atoi(topNStr) if err != nil { return fmt.Errorf("cannot parse `topN` arg %q: %w", topNStr, err) } - if n <= 0 { - n = 1 - } - if n > 1000 { - n = 1000 - } topN = n } + maxLifetimeMsecs, err := searchutils.GetDuration(r, "maxLifetime", 10*60*1000) + if err != nil { + return fmt.Errorf("cannot parse `maxLifetime` arg: %w", err) + } w.Header().Set("Content-Type", "application/json; charset=utf-8") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - promql.WriteQueryStatsResponse(bw, topN, aggregateBy) + querystats.WriteJSONQueryStats(bw, topN, time.Duration(maxLifetimeMsecs)*time.Millisecond) if err := bw.Flush(); err != nil { return err } @@ -1359,4 +1357,4 @@ func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque return nil } -var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/queries"}`) +var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/top_queries"}`) diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 091630a9c9..437fc003f4 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -11,6 +11,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" @@ -39,11 +40,11 @@ func Exec(ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result, } }() } - if *maxQueryStatsTrackerItemsCount > 0 { - start := time.Now() + if querystats.Enabled() { + startTime := time.Now() + ac := ec.AuthToken defer func() { - tr := ec.End - ec.Start - InsertQueryStat(q, tr, start, time.Since(start)) + querystats.RegisterQuery(ac.AccountID, ac.ProjectID, q, ec.End-ec.Start, startTime) }() } diff --git a/app/vmselect/promql/query_stats.go b/app/vmselect/promql/query_stats.go deleted file mode 100644 index f7496ea798..0000000000 --- a/app/vmselect/promql/query_stats.go +++ /dev/null @@ -1,254 +0,0 @@ -package promql - -import ( - "flag" - "fmt" - "io" - "sort" - "strings" - "sync" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/metrics" -) - -var ( - maxQueryStatsRecordLifeTime = flag.Duration("search.MaxQueryStatsRecordLifeTime", 10*time.Minute, "Limits maximum lifetime for query stats record. With minimum 10 seconds") - maxQueryStatsTrackerItemsCount = flag.Int("search.MaxQueryStatsItems", 1000, "Limits count for distinct query stat records, keyed by query name and query time range. "+ - "With Maximum 5000 records. Zero value disables query stats recording") -) - -var ( - shrinkQueryStatsCalls = metrics.NewCounter(`vm_query_stats_shrink_calls_total`) - globalQueryStatsTracker *queryStatsTracker - gQSTOnce sync.Once -) - -// InsertQueryStat - inserts query stats record to global query stats tracker -// with given query name, query time-range, execution time and its duration. -func InsertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) { - gQSTOnce.Do(func() { - initQueryStatsTracker() - }) - globalQueryStatsTracker.insertQueryStat(query, tr, execTime, duration) -} - -// WriteQueryStatsResponse - writes query stats to given writer in json format with given aggregate key. -func WriteQueryStatsResponse(w io.Writer, topN int, aggregateBy string) { - gQSTOnce.Do(func() { - initQueryStatsTracker() - }) - writeJSONQueryStats(w, globalQueryStatsTracker, topN, aggregateBy) -} - -// queryStatsTracker - hold statistics for all queries, -// query name and query range is a group key. -type queryStatsTracker struct { - maxQueryLogRecordTime time.Duration - limit int - queryStatsLocker sync.Mutex - qs []queryStats -} - -// queryStats - represent single query statistic. -type queryStats struct { - query string - queryRange int64 - queryLastSeen int64 - queryStatRecords []queryStatRecord -} - -// queryStatRecord - one record of query stat. -type queryStatRecord struct { - // end-start - duration time.Duration - // in seconds as unix_ts. - execTime int64 -} - -func initQueryStatsTracker() { - limit := *maxQueryStatsTrackerItemsCount - if limit > 5000 { - limit = 5000 - } - qlt := *maxQueryStatsRecordLifeTime - if qlt == 0 { - qlt = time.Second * 10 - } - logger.Infof("enabled query stats tracking, max records count: %d, max query record lifetime: %s", limit, qlt) - qst := queryStatsTracker{ - limit: limit, - maxQueryLogRecordTime: qlt, - } - go func() { - for { - time.Sleep(time.Second * 10) - qst.dropOutdatedRecords() - } - }() - globalQueryStatsTracker = &qst -} - -func formatJSONQueryStats(queries []queryStats) string { - var s strings.Builder - for i, q := range queries { - fmt.Fprintf(&s, `{"query": %q,`, q.query) - fmt.Fprintf(&s, `"query_time_range": %q,`, time.Duration(q.queryRange*1e6)) - fmt.Fprintf(&s, `"cumalative_duration": %q,`, q.Duration()) - if len(q.queryStatRecords) > 0 { - fmt.Fprintf(&s, `"avg_duration": %q,`, q.Duration()/time.Duration(len(q.queryStatRecords))) - } - fmt.Fprintf(&s, `"requests_count": "%d"`, len(q.queryStatRecords)) - s.WriteString(`}`) - if i != len(queries)-1 { - s.WriteString(`,`) - } - - } - return s.String() -} - -func writeJSONQueryStats(w io.Writer, ql *queryStatsTracker, topN int, aggregateBy string) { - fmt.Fprintf(w, `{"top_n": "%d",`, topN) - fmt.Fprintf(w, `"stats_max_duration": %q,`, maxQueryStatsRecordLifeTime.String()) - fmt.Fprint(w, `"top": [`) - switch aggregateBy { - case "frequency": - fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByRecordCount(ql, topN))) - case "duration": - fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByDuration(ql, topN))) - case "avg_duration": - fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByAvgDuration(ql, topN))) - default: - logger.Errorf("invalid aggregation key=%q, report bug", aggregateBy) - fmt.Fprintf(w, `{"error": "invalid aggregateBy value=%s"}`, aggregateBy) - } - fmt.Fprint(w, `]`) - fmt.Fprint(w, `}`) -} - -// drops query stats records less then given time in seconds. -// no need to sort -// its added in chronological order. -// must be called with mutex. -func (qs *queryStats) dropOutDatedRecords(t int64) { - // fast path - // compare time with last elem. - if len(qs.queryStatRecords) > 0 && qs.queryStatRecords[len(qs.queryStatRecords)-1].execTime < t { - qs.queryStatRecords = qs.queryStatRecords[:0] - return - } - // remove all elements by default. - shrinkIndex := len(qs.queryStatRecords) - for i, v := range qs.queryStatRecords { - if t < v.execTime { - shrinkIndex = i - break - } - } - if shrinkIndex > 0 { - qs.queryStatRecords = qs.queryStatRecords[shrinkIndex:] - } -} - -// calculates cumulative duration for query. -func (qs *queryStats) Duration() time.Duration { - var cnt time.Duration - for _, v := range qs.queryStatRecords { - cnt += v.duration - } - return cnt -} - -// must be called with mutex, -// shrinks slice by the last added query with given shrinkSize. -func (qst *queryStatsTracker) shrink(shrinkSize int) { - if len(qst.qs) < shrinkSize { - return - } - sort.Slice(qst.qs, func(i, j int) bool { - return qst.qs[i].queryLastSeen < qst.qs[j].queryLastSeen - }) - qst.qs = qst.qs[shrinkSize:] -} - -// drop outdated keys. -func (qst *queryStatsTracker) dropOutdatedRecords() { - qst.queryStatsLocker.Lock() - defer qst.queryStatsLocker.Unlock() - t := time.Now().Add(-qst.maxQueryLogRecordTime).Unix() - var i int - for _, v := range qst.qs { - v.dropOutDatedRecords(t) - if len(v.queryStatRecords) > 0 { - qst.qs[i] = v - i++ - } - } - if i == len(qst.qs) { - return - } - qst.qs = qst.qs[:i] -} - -func (qst *queryStatsTracker) insertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) { - qst.queryStatsLocker.Lock() - defer qst.queryStatsLocker.Unlock() - // shrink old queries. - if len(qst.qs) > qst.limit { - shrinkQueryStatsCalls.Inc() - qst.shrink(1) - } - // add record to exist stats, keyed by query string and time-range. - for i, v := range qst.qs { - if v.query == query && v.queryRange == tr { - v.queryLastSeen = execTime.Unix() - v.queryStatRecords = append(v.queryStatRecords, queryStatRecord{execTime: execTime.Unix(), duration: duration}) - qst.qs[i] = v - return - } - } - qst.qs = append(qst.qs, queryStats{ - queryStatRecords: []queryStatRecord{{execTime: execTime.Unix(), duration: duration}}, - queryLastSeen: execTime.Unix(), - query: query, - queryRange: tr, - }) - -} - -func getTopNQueriesByAvgDuration(qst *queryStatsTracker, top int) []queryStats { - return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { - lenI := len(qst.qs[i].queryStatRecords) - lenJ := len(qst.qs[j].queryStatRecords) - if lenI == 0 || lenJ == 0 { - return false - } - return qst.qs[i].Duration()/time.Duration(lenI) > qst.qs[j].Duration()/time.Duration(lenJ) - }) -} - -func getTopNQueriesByRecordCount(qst *queryStatsTracker, top int) []queryStats { - return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { - return len(qst.qs[i].queryStatRecords) > len(qst.qs[j].queryStatRecords) - }) -} - -func getTopNQueriesByDuration(qst *queryStatsTracker, top int) []queryStats { - return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { - return qst.qs[i].Duration() > qst.qs[j].Duration() - }) -} - -func getTopNQueryStatsItemsWithFilter(qst *queryStatsTracker, top int, filterFunc func(i, j int) bool) []queryStats { - qst.queryStatsLocker.Lock() - defer qst.queryStatsLocker.Unlock() - if top > len(qst.qs) { - top = len(qst.qs) - } - sort.Slice(qst.qs, filterFunc) - result := make([]queryStats, 0, top) - result = append(result, qst.qs[:top]...) - return result -} diff --git a/app/vmselect/promql/query_stats_test.go b/app/vmselect/promql/query_stats_test.go deleted file mode 100644 index eec36c0faa..0000000000 --- a/app/vmselect/promql/query_stats_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package promql - -import ( - "fmt" - "reflect" - "strings" - "testing" - "time" -) - -func TestQueryLoggerShrink(t *testing.T) { - f := func(addItemCount, limit, expectedLen int) { - t.Helper() - qst := &queryStatsTracker{ - limit: limit, - maxQueryLogRecordTime: time.Second * 5, - } - for i := 0; i < addItemCount; i++ { - qst.insertQueryStat(fmt.Sprintf("random-n-%d", i), int64(i), time.Now().Add(-time.Second), 500+time.Duration(i)) - } - if len(qst.qs) != expectedLen { - t.Fatalf("unxpected len got=%d, for queryStats slice, want=%d", len(qst.qs), expectedLen) - } - } - f(10, 5, 6) - f(30, 10, 11) - f(15, 15, 15) -} - -func TestGetTopNQueriesByDuration(t *testing.T) { - f := func(topN int, expectedQueryStats []queryStats) { - t.Helper() - ql := &queryStatsTracker{ - limit: 25, - maxQueryLogRecordTime: time.Second * 5, - } - queriesDurations := []int{16, 4, 5, 10} - for i, v := range queriesDurations { - ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) - } - got := getTopNQueriesByAvgDuration(ql, topN) - - if len(got) != len(expectedQueryStats) { - t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) - } - for i, gotR := range got { - if gotR.query != expectedQueryStats[i].query { - t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) - } - } - } - f(1, []queryStats{{query: "query-n-0"}}) - f(2, []queryStats{{query: "query-n-0"}, {query: "query-n-3"}}) -} - -func TestGetTopNQueriesByCount(t *testing.T) { - f := func(topN int, expectedQueryStats []queryStats) { - t.Helper() - ql := &queryStatsTracker{ - limit: 25, - maxQueryLogRecordTime: time.Second * 5, - } - queriesCounts := []int{1, 4, 5, 11} - for i, v := range queriesCounts { - for ic := 0; ic < v; ic++ { - ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) - } - } - - got := getTopNQueriesByRecordCount(ql, topN) - - if len(got) != len(expectedQueryStats) { - t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) - } - for i, gotR := range got { - if gotR.query != expectedQueryStats[i].query { - t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) - } - } - } - f(1, []queryStats{{query: "query-n-3"}}) - f(2, []queryStats{{query: "query-n-3"}, {query: "query-n-2"}}) -} - -func TestGetTopNQueriesByAverageDuration(t *testing.T) { - f := func(topN int, expectedQueryStats []queryStats) { - t.Helper() - ql := &queryStatsTracker{ - limit: 25, - maxQueryLogRecordTime: time.Second * 5, - } - queriesQurations := []int{4, 25, 14, 10} - for i, v := range queriesQurations { - ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) - } - - got := getTopNQueriesByAvgDuration(ql, topN) - - if len(got) != len(expectedQueryStats) { - t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) - } - for i, gotR := range got { - if gotR.query != expectedQueryStats[i].query { - t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) - } - } - } - f(1, []queryStats{{query: "query-n-1"}}) - f(2, []queryStats{{query: "query-n-1"}, {query: "query-n-2"}}) -} - -func TestWriteJSONQueryStats(t *testing.T) { - qst := queryStatsTracker{ - limit: 100, - maxQueryLogRecordTime: time.Minute * 5, - } - t1 := time.Now() - qst.insertQueryStat("sum(rate(rps_total)[1m]) by(service)", 360, t1, time.Microsecond*100) - qst.insertQueryStat("up", 360, t1, time.Microsecond) - qst.insertQueryStat("up", 360, t1, time.Microsecond) - qst.insertQueryStat("up", 360, t1, time.Microsecond) - - f := func(t *testing.T, wantResp, aggregateBy string) { - var got strings.Builder - writeJSONQueryStats(&got, &qst, 5, aggregateBy) - if !reflect.DeepEqual(got.String(), wantResp) { - t.Fatalf("unexpected response, \ngot: %s,\nwant: %s", got.String(), wantResp) - } - } - - t.Run("aggregateByDuration", func(t *testing.T) { - f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`, - "duration") - }) - t.Run("aggregateByfrequency", func(t *testing.T) { - f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"},{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"}]}`, - "frequency") - }) - t.Run("aggregateByDuration", func(t *testing.T) { - f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`, - "avg_duration") - }) - -} diff --git a/app/vmselect/querystats/querystats.go b/app/vmselect/querystats/querystats.go new file mode 100644 index 0000000000..6a34684da8 --- /dev/null +++ b/app/vmselect/querystats/querystats.go @@ -0,0 +1,271 @@ +package querystats + +import ( + "flag" + "fmt" + "io" + "sort" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +var ( + lastQueriesCount = flag.Int("search.queryStats.lastQueriesCount", 100000, "Query stats for `/api/v1/status/top_queries` is tracked on this number of last queries. "+ + "Zero value disables query stats tracking") + minQueryDuration = flag.Duration("search.queryStats.minQueryDuration", 0, "The minimum duration for queries to track in query stats at `/api/v1/status/top_queries`. "+ + "Queries with lower duration are ignored in query stats") +) + +var ( + qsTracker *queryStatsTracker + initOnce sync.Once +) + +// Enabled returns true of query stats tracking is enabled. +func Enabled() bool { + return *lastQueriesCount > 0 +} + +// RegisterQuery registers the query on the given timeRangeMsecs, which has been started at startTime. +// +// RegisterQuery must be called when the query is finished. +func RegisterQuery(accountID, projectID uint32, query string, timeRangeMsecs int64, startTime time.Time) { + initOnce.Do(initQueryStats) + qsTracker.registerQuery(accountID, projectID, query, timeRangeMsecs, startTime) +} + +// WriteJSONQueryStats writes query stats to given writer in json format. +func WriteJSONQueryStats(w io.Writer, topN int, maxLifetime time.Duration) { + initOnce.Do(initQueryStats) + qsTracker.writeJSONQueryStats(w, topN, maxLifetime) +} + +// queryStatsTracker holds statistics for queries +type queryStatsTracker struct { + mu sync.Mutex + a []queryStatRecord + nextIdx uint +} + +type queryStatRecord struct { + accountID uint32 + projectID uint32 + query string + timeRangeSecs int64 + registerTime time.Time + duration time.Duration +} + +type queryStatKey struct { + accountID uint32 + projectID uint32 + query string + timeRangeSecs int64 +} + +func initQueryStats() { + recordsCount := *lastQueriesCount + if recordsCount <= 0 { + recordsCount = 1 + } else { + logger.Infof("enabled query stats tracking at `/api/v1/status/top_queries` with -search.queryStats.lastQueriesCount=%d, -search.queryStats.minQueryDuration=%s", + *lastQueriesCount, *minQueryDuration) + } + qsTracker = &queryStatsTracker{ + a: make([]queryStatRecord, recordsCount), + } +} + +func (qst *queryStatsTracker) writeJSONQueryStats(w io.Writer, topN int, maxLifetime time.Duration) { + fmt.Fprintf(w, `{"topN":"%d","maxLifetime":%q,`, topN, maxLifetime) + fmt.Fprintf(w, `"search.queryStats.lastQueriesCount":%d,`, *lastQueriesCount) + fmt.Fprintf(w, `"search.queryStats.minQueryDuration":%q,`, *minQueryDuration) + fmt.Fprintf(w, `"topByCount":[`) + topByCount := qst.getTopByCount(topN, maxLifetime) + for i, r := range topByCount { + fmt.Fprintf(w, `{"accountID":%d,"projectID":%d,"query":%q,"timeRangeSeconds":%d,"count":%d}`, r.accountID, r.projectID, r.query, r.timeRangeSecs, r.count) + if i+1 < len(topByCount) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `],"topByAvgDuration":[`) + topByAvgDuration := qst.getTopByAvgDuration(topN, maxLifetime) + for i, r := range topByAvgDuration { + fmt.Fprintf(w, `{"accountID":%d,"projectID":%d,"query":%q,"timeRangeSeconds":%d,"avgDurationSeconds":%.3f}`, + r.accountID, r.projectID, r.query, r.timeRangeSecs, r.duration.Seconds()) + if i+1 < len(topByAvgDuration) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `],"topBySumDuration":[`) + topBySumDuration := qst.getTopBySumDuration(topN, maxLifetime) + for i, r := range topBySumDuration { + fmt.Fprintf(w, `{"accountID":%d,"projectID":%d,"query":%q,"timeRangeSeconds":%d,"sumDurationSeconds":%.3f}`, + r.accountID, r.projectID, r.query, r.timeRangeSecs, r.duration.Seconds()) + if i+1 < len(topBySumDuration) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `]}`) +} + +func (qst *queryStatsTracker) registerQuery(accountID, projectID uint32, query string, timeRangeMsecs int64, startTime time.Time) { + registerTime := time.Now() + duration := registerTime.Sub(startTime) + if duration < *minQueryDuration { + return + } + + qst.mu.Lock() + defer qst.mu.Unlock() + + a := qst.a + idx := qst.nextIdx + if idx >= uint(len(a)) { + idx = 0 + } + qst.nextIdx = idx + 1 + r := &a[idx] + r.accountID = accountID + r.projectID = projectID + r.query = query + r.timeRangeSecs = timeRangeMsecs / 1000 + r.registerTime = registerTime + r.duration = duration +} + +func (qst *queryStatsTracker) getTopByCount(topN int, maxLifetime time.Duration) []queryStatByCount { + currentTime := time.Now() + qst.mu.Lock() + m := make(map[queryStatKey]int) + for _, r := range qst.a { + if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { + continue + } + k := queryStatKey{ + accountID: r.accountID, + projectID: r.projectID, + query: r.query, + timeRangeSecs: r.timeRangeSecs, + } + m[k] = m[k] + 1 + } + qst.mu.Unlock() + + var a []queryStatByCount + for k, count := range m { + a = append(a, queryStatByCount{ + accountID: k.accountID, + projectID: k.projectID, + query: k.query, + timeRangeSecs: k.timeRangeSecs, + count: count, + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].count > a[j].count + }) + if len(a) > topN { + a = a[:topN] + } + return a +} + +type queryStatByCount struct { + accountID uint32 + projectID uint32 + query string + timeRangeSecs int64 + count int +} + +func (qst *queryStatsTracker) getTopByAvgDuration(topN int, maxLifetime time.Duration) []queryStatByDuration { + currentTime := time.Now() + qst.mu.Lock() + type countSum struct { + count int + sum time.Duration + } + m := make(map[queryStatKey]countSum) + for _, r := range qst.a { + if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { + continue + } + k := queryStatKey{ + accountID: r.accountID, + projectID: r.projectID, + query: r.query, + timeRangeSecs: r.timeRangeSecs, + } + ks := m[k] + ks.count++ + ks.sum += r.duration + m[k] = ks + } + qst.mu.Unlock() + + var a []queryStatByDuration + for k, ks := range m { + a = append(a, queryStatByDuration{ + accountID: k.accountID, + projectID: k.projectID, + query: k.query, + timeRangeSecs: k.timeRangeSecs, + duration: ks.sum / time.Duration(ks.count), + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].duration > a[j].duration + }) + if len(a) > topN { + a = a[:topN] + } + return a +} + +type queryStatByDuration struct { + accountID uint32 + projectID uint32 + query string + timeRangeSecs int64 + duration time.Duration +} + +func (qst *queryStatsTracker) getTopBySumDuration(topN int, maxLifetime time.Duration) []queryStatByDuration { + currentTime := time.Now() + qst.mu.Lock() + m := make(map[queryStatKey]time.Duration) + for _, r := range qst.a { + if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { + continue + } + k := queryStatKey{ + accountID: r.accountID, + projectID: r.projectID, + query: r.query, + timeRangeSecs: r.timeRangeSecs, + } + m[k] = m[k] + r.duration + } + qst.mu.Unlock() + + var a []queryStatByDuration + for k, d := range m { + a = append(a, queryStatByDuration{ + accountID: k.accountID, + projectID: k.projectID, + query: k.query, + timeRangeSecs: k.timeRangeSecs, + duration: d, + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].duration > a[j].duration + }) + if len(a) > topN { + a = a[:topN] + } + return a +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a03f6fe085..df7d27cc90 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,7 @@ # tip +* FEATURE: add `/api/v1/status/top_queries` handler, which returns the most frequently executed queries and queries that took the most time for execution. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 * FEATURE: vmagent: add support for `proxy_url` config option in Prometheus scrape configs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503 * FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. * FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index a8544b656d..d57e825713 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -535,11 +535,17 @@ See [this feature request](https://github.com/prometheus/prometheus/issues/6178) Additionally VictoriaMetrics provides the following handlers: -* `/api/v1/series/count` - it returns the total number of time series in the database. Some notes: +* `/api/v1/series/count` - returns the total number of time series in the database. Some notes: * the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series; * the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions; -* `/api/v1/labels/count` - it returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. -* `/api/v1/status/active_queries` - it returns a list of currently running queries. +* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. +* `/api/v1/status/active_queries` - returns a list of currently running queries. +* `/api/v1/status/top_queries` - returns the following query lists: + * the most frequently executed queries - `topByCount` + * queries with the biggest average execution duration - `topByAvgDuration` + * queries that took the most time for execution - `topBySumDuration` + The number of returned queries can be limited via `topN` query arg. Old queries can be filtered out with `maxLifetime` query arg. + For example, request to `/api/v1/status/top_queries?topN=5&maxLifetime=30s` would return up to 5 queries per list, which were executed during the last 30 seconds. ## Graphite API usage