diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 71e1baef62..e5c0d5ee8f 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -274,6 +274,25 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, return true } return true + case "prometheus/api/v1/status/queries/avg_duration": + if err := prometheus.QueryStatsHandler(startTime, w, r, "avg_duration"); err != nil { + sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) + return true + } + return true + case "prometheus/api/v1/status/queries/duration": + if err := prometheus.QueryStatsHandler(startTime, w, r, "duration"); err != nil { + sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) + return true + } + return true + case "prometheus/api/v1/status/queries/frequency": + if err := prometheus.QueryStatsHandler(startTime, w, r, "frequency"); err != nil { + sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) + return true + } + return true + case "prometheus/api/v1/status/tsdb": statusTSDBRequests.Inc() if err := prometheus.TSDBStatusHandler(startTime, at, w, r); err != nil { diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index c7c6f6aaa6..d30c3864bc 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -1326,3 +1326,37 @@ func getLatencyOffsetMilliseconds() int64 { } return d } + +// QueryStatsHandler - returns statistics for queries executions with given aggregate func name. +// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 +func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, aggregateBy string) error { + if err := r.ParseForm(); err != nil { + return fmt.Errorf("cannot parse form values: %w", err) + } + topN := 10 + topNStr := r.FormValue("topN") + if len(topNStr) > 0 { + n, err := strconv.Atoi(topNStr) + if err != nil { + return fmt.Errorf("cannot parse `topN` arg %q: %w", topNStr, err) + } + if n <= 0 { + n = 1 + } + if n > 1000 { + n = 1000 + } + topN = n + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + promql.WriteQueryStatsResponse(bw, topN, aggregateBy) + if err := bw.Flush(); err != nil { + return err + } + queryStatsDuration.UpdateDuration(startTime) + return nil +} + +var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/queries"}`) diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 8acb8a77e6..091630a9c9 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -39,6 +39,13 @@ func Exec(ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result, } }() } + if *maxQueryStatsTrackerItemsCount > 0 { + start := time.Now() + defer func() { + tr := ec.End - ec.Start + InsertQueryStat(q, tr, start, time.Since(start)) + }() + } ec.validate() diff --git a/app/vmselect/promql/query_stats.go b/app/vmselect/promql/query_stats.go new file mode 100644 index 0000000000..f7496ea798 --- /dev/null +++ b/app/vmselect/promql/query_stats.go @@ -0,0 +1,254 @@ +package promql + +import ( + "flag" + "fmt" + "io" + "sort" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" +) + +var ( + maxQueryStatsRecordLifeTime = flag.Duration("search.MaxQueryStatsRecordLifeTime", 10*time.Minute, "Limits maximum lifetime for query stats record. With minimum 10 seconds") + maxQueryStatsTrackerItemsCount = flag.Int("search.MaxQueryStatsItems", 1000, "Limits count for distinct query stat records, keyed by query name and query time range. "+ + "With Maximum 5000 records. Zero value disables query stats recording") +) + +var ( + shrinkQueryStatsCalls = metrics.NewCounter(`vm_query_stats_shrink_calls_total`) + globalQueryStatsTracker *queryStatsTracker + gQSTOnce sync.Once +) + +// InsertQueryStat - inserts query stats record to global query stats tracker +// with given query name, query time-range, execution time and its duration. +func InsertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) { + gQSTOnce.Do(func() { + initQueryStatsTracker() + }) + globalQueryStatsTracker.insertQueryStat(query, tr, execTime, duration) +} + +// WriteQueryStatsResponse - writes query stats to given writer in json format with given aggregate key. +func WriteQueryStatsResponse(w io.Writer, topN int, aggregateBy string) { + gQSTOnce.Do(func() { + initQueryStatsTracker() + }) + writeJSONQueryStats(w, globalQueryStatsTracker, topN, aggregateBy) +} + +// queryStatsTracker - hold statistics for all queries, +// query name and query range is a group key. +type queryStatsTracker struct { + maxQueryLogRecordTime time.Duration + limit int + queryStatsLocker sync.Mutex + qs []queryStats +} + +// queryStats - represent single query statistic. +type queryStats struct { + query string + queryRange int64 + queryLastSeen int64 + queryStatRecords []queryStatRecord +} + +// queryStatRecord - one record of query stat. +type queryStatRecord struct { + // end-start + duration time.Duration + // in seconds as unix_ts. + execTime int64 +} + +func initQueryStatsTracker() { + limit := *maxQueryStatsTrackerItemsCount + if limit > 5000 { + limit = 5000 + } + qlt := *maxQueryStatsRecordLifeTime + if qlt == 0 { + qlt = time.Second * 10 + } + logger.Infof("enabled query stats tracking, max records count: %d, max query record lifetime: %s", limit, qlt) + qst := queryStatsTracker{ + limit: limit, + maxQueryLogRecordTime: qlt, + } + go func() { + for { + time.Sleep(time.Second * 10) + qst.dropOutdatedRecords() + } + }() + globalQueryStatsTracker = &qst +} + +func formatJSONQueryStats(queries []queryStats) string { + var s strings.Builder + for i, q := range queries { + fmt.Fprintf(&s, `{"query": %q,`, q.query) + fmt.Fprintf(&s, `"query_time_range": %q,`, time.Duration(q.queryRange*1e6)) + fmt.Fprintf(&s, `"cumalative_duration": %q,`, q.Duration()) + if len(q.queryStatRecords) > 0 { + fmt.Fprintf(&s, `"avg_duration": %q,`, q.Duration()/time.Duration(len(q.queryStatRecords))) + } + fmt.Fprintf(&s, `"requests_count": "%d"`, len(q.queryStatRecords)) + s.WriteString(`}`) + if i != len(queries)-1 { + s.WriteString(`,`) + } + + } + return s.String() +} + +func writeJSONQueryStats(w io.Writer, ql *queryStatsTracker, topN int, aggregateBy string) { + fmt.Fprintf(w, `{"top_n": "%d",`, topN) + fmt.Fprintf(w, `"stats_max_duration": %q,`, maxQueryStatsRecordLifeTime.String()) + fmt.Fprint(w, `"top": [`) + switch aggregateBy { + case "frequency": + fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByRecordCount(ql, topN))) + case "duration": + fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByDuration(ql, topN))) + case "avg_duration": + fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByAvgDuration(ql, topN))) + default: + logger.Errorf("invalid aggregation key=%q, report bug", aggregateBy) + fmt.Fprintf(w, `{"error": "invalid aggregateBy value=%s"}`, aggregateBy) + } + fmt.Fprint(w, `]`) + fmt.Fprint(w, `}`) +} + +// drops query stats records less then given time in seconds. +// no need to sort +// its added in chronological order. +// must be called with mutex. +func (qs *queryStats) dropOutDatedRecords(t int64) { + // fast path + // compare time with last elem. + if len(qs.queryStatRecords) > 0 && qs.queryStatRecords[len(qs.queryStatRecords)-1].execTime < t { + qs.queryStatRecords = qs.queryStatRecords[:0] + return + } + // remove all elements by default. + shrinkIndex := len(qs.queryStatRecords) + for i, v := range qs.queryStatRecords { + if t < v.execTime { + shrinkIndex = i + break + } + } + if shrinkIndex > 0 { + qs.queryStatRecords = qs.queryStatRecords[shrinkIndex:] + } +} + +// calculates cumulative duration for query. +func (qs *queryStats) Duration() time.Duration { + var cnt time.Duration + for _, v := range qs.queryStatRecords { + cnt += v.duration + } + return cnt +} + +// must be called with mutex, +// shrinks slice by the last added query with given shrinkSize. +func (qst *queryStatsTracker) shrink(shrinkSize int) { + if len(qst.qs) < shrinkSize { + return + } + sort.Slice(qst.qs, func(i, j int) bool { + return qst.qs[i].queryLastSeen < qst.qs[j].queryLastSeen + }) + qst.qs = qst.qs[shrinkSize:] +} + +// drop outdated keys. +func (qst *queryStatsTracker) dropOutdatedRecords() { + qst.queryStatsLocker.Lock() + defer qst.queryStatsLocker.Unlock() + t := time.Now().Add(-qst.maxQueryLogRecordTime).Unix() + var i int + for _, v := range qst.qs { + v.dropOutDatedRecords(t) + if len(v.queryStatRecords) > 0 { + qst.qs[i] = v + i++ + } + } + if i == len(qst.qs) { + return + } + qst.qs = qst.qs[:i] +} + +func (qst *queryStatsTracker) insertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) { + qst.queryStatsLocker.Lock() + defer qst.queryStatsLocker.Unlock() + // shrink old queries. + if len(qst.qs) > qst.limit { + shrinkQueryStatsCalls.Inc() + qst.shrink(1) + } + // add record to exist stats, keyed by query string and time-range. + for i, v := range qst.qs { + if v.query == query && v.queryRange == tr { + v.queryLastSeen = execTime.Unix() + v.queryStatRecords = append(v.queryStatRecords, queryStatRecord{execTime: execTime.Unix(), duration: duration}) + qst.qs[i] = v + return + } + } + qst.qs = append(qst.qs, queryStats{ + queryStatRecords: []queryStatRecord{{execTime: execTime.Unix(), duration: duration}}, + queryLastSeen: execTime.Unix(), + query: query, + queryRange: tr, + }) + +} + +func getTopNQueriesByAvgDuration(qst *queryStatsTracker, top int) []queryStats { + return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { + lenI := len(qst.qs[i].queryStatRecords) + lenJ := len(qst.qs[j].queryStatRecords) + if lenI == 0 || lenJ == 0 { + return false + } + return qst.qs[i].Duration()/time.Duration(lenI) > qst.qs[j].Duration()/time.Duration(lenJ) + }) +} + +func getTopNQueriesByRecordCount(qst *queryStatsTracker, top int) []queryStats { + return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { + return len(qst.qs[i].queryStatRecords) > len(qst.qs[j].queryStatRecords) + }) +} + +func getTopNQueriesByDuration(qst *queryStatsTracker, top int) []queryStats { + return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { + return qst.qs[i].Duration() > qst.qs[j].Duration() + }) +} + +func getTopNQueryStatsItemsWithFilter(qst *queryStatsTracker, top int, filterFunc func(i, j int) bool) []queryStats { + qst.queryStatsLocker.Lock() + defer qst.queryStatsLocker.Unlock() + if top > len(qst.qs) { + top = len(qst.qs) + } + sort.Slice(qst.qs, filterFunc) + result := make([]queryStats, 0, top) + result = append(result, qst.qs[:top]...) + return result +} diff --git a/app/vmselect/promql/query_stats_test.go b/app/vmselect/promql/query_stats_test.go new file mode 100644 index 0000000000..eec36c0faa --- /dev/null +++ b/app/vmselect/promql/query_stats_test.go @@ -0,0 +1,144 @@ +package promql + +import ( + "fmt" + "reflect" + "strings" + "testing" + "time" +) + +func TestQueryLoggerShrink(t *testing.T) { + f := func(addItemCount, limit, expectedLen int) { + t.Helper() + qst := &queryStatsTracker{ + limit: limit, + maxQueryLogRecordTime: time.Second * 5, + } + for i := 0; i < addItemCount; i++ { + qst.insertQueryStat(fmt.Sprintf("random-n-%d", i), int64(i), time.Now().Add(-time.Second), 500+time.Duration(i)) + } + if len(qst.qs) != expectedLen { + t.Fatalf("unxpected len got=%d, for queryStats slice, want=%d", len(qst.qs), expectedLen) + } + } + f(10, 5, 6) + f(30, 10, 11) + f(15, 15, 15) +} + +func TestGetTopNQueriesByDuration(t *testing.T) { + f := func(topN int, expectedQueryStats []queryStats) { + t.Helper() + ql := &queryStatsTracker{ + limit: 25, + maxQueryLogRecordTime: time.Second * 5, + } + queriesDurations := []int{16, 4, 5, 10} + for i, v := range queriesDurations { + ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) + } + got := getTopNQueriesByAvgDuration(ql, topN) + + if len(got) != len(expectedQueryStats) { + t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) + } + for i, gotR := range got { + if gotR.query != expectedQueryStats[i].query { + t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) + } + } + } + f(1, []queryStats{{query: "query-n-0"}}) + f(2, []queryStats{{query: "query-n-0"}, {query: "query-n-3"}}) +} + +func TestGetTopNQueriesByCount(t *testing.T) { + f := func(topN int, expectedQueryStats []queryStats) { + t.Helper() + ql := &queryStatsTracker{ + limit: 25, + maxQueryLogRecordTime: time.Second * 5, + } + queriesCounts := []int{1, 4, 5, 11} + for i, v := range queriesCounts { + for ic := 0; ic < v; ic++ { + ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) + } + } + + got := getTopNQueriesByRecordCount(ql, topN) + + if len(got) != len(expectedQueryStats) { + t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) + } + for i, gotR := range got { + if gotR.query != expectedQueryStats[i].query { + t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) + } + } + } + f(1, []queryStats{{query: "query-n-3"}}) + f(2, []queryStats{{query: "query-n-3"}, {query: "query-n-2"}}) +} + +func TestGetTopNQueriesByAverageDuration(t *testing.T) { + f := func(topN int, expectedQueryStats []queryStats) { + t.Helper() + ql := &queryStatsTracker{ + limit: 25, + maxQueryLogRecordTime: time.Second * 5, + } + queriesQurations := []int{4, 25, 14, 10} + for i, v := range queriesQurations { + ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) + } + + got := getTopNQueriesByAvgDuration(ql, topN) + + if len(got) != len(expectedQueryStats) { + t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) + } + for i, gotR := range got { + if gotR.query != expectedQueryStats[i].query { + t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) + } + } + } + f(1, []queryStats{{query: "query-n-1"}}) + f(2, []queryStats{{query: "query-n-1"}, {query: "query-n-2"}}) +} + +func TestWriteJSONQueryStats(t *testing.T) { + qst := queryStatsTracker{ + limit: 100, + maxQueryLogRecordTime: time.Minute * 5, + } + t1 := time.Now() + qst.insertQueryStat("sum(rate(rps_total)[1m]) by(service)", 360, t1, time.Microsecond*100) + qst.insertQueryStat("up", 360, t1, time.Microsecond) + qst.insertQueryStat("up", 360, t1, time.Microsecond) + qst.insertQueryStat("up", 360, t1, time.Microsecond) + + f := func(t *testing.T, wantResp, aggregateBy string) { + var got strings.Builder + writeJSONQueryStats(&got, &qst, 5, aggregateBy) + if !reflect.DeepEqual(got.String(), wantResp) { + t.Fatalf("unexpected response, \ngot: %s,\nwant: %s", got.String(), wantResp) + } + } + + t.Run("aggregateByDuration", func(t *testing.T) { + f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`, + "duration") + }) + t.Run("aggregateByfrequency", func(t *testing.T) { + f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"},{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"}]}`, + "frequency") + }) + t.Run("aggregateByDuration", func(t *testing.T) { + f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`, + "avg_duration") + }) + +}