app/vmselect: take into account the time the requests wait in the queue if -search.maxConcurrentRequests is exceeded

This will prevent from excess CPU usage for timed out queries.
This commit is contained in:
Aliaksandr Valialkin 2020-02-04 16:13:59 +02:00
parent e6bf88a4d4
commit ccd3aa4f15
2 changed files with 25 additions and 34 deletions

View File

@ -111,6 +111,7 @@ var (
) )
func requestHandler(w http.ResponseWriter, r *http.Request) bool { func requestHandler(w http.ResponseWriter, r *http.Request) bool {
startTime := time.Now()
// Limit the number of concurrent queries. // Limit the number of concurrent queries.
select { select {
case concurrencyCh <- struct{}{}: case concurrencyCh <- struct{}{}:
@ -155,23 +156,23 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
} }
switch p.Prefix { switch p.Prefix {
case "select": case "select":
return selectHandler(w, r, p, at) return selectHandler(startTime, w, r, p, at)
case "delete": case "delete":
return deleteHandler(w, r, p, at) return deleteHandler(startTime, w, r, p, at)
default: default:
// This is not our link // This is not our link
return false return false
} }
} }
func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool { func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool {
if strings.HasPrefix(p.Suffix, "prometheus/api/v1/label/") { if strings.HasPrefix(p.Suffix, "prometheus/api/v1/label/") {
s := p.Suffix[len("prometheus/api/v1/label/"):] s := p.Suffix[len("prometheus/api/v1/label/"):]
if strings.HasSuffix(s, "/values") { if strings.HasSuffix(s, "/values") {
labelValuesRequests.Inc() labelValuesRequests.Inc()
labelName := s[:len(s)-len("/values")] labelName := s[:len(s)-len("/values")]
httpserver.EnableCORS(w, r) httpserver.EnableCORS(w, r)
if err := prometheus.LabelValuesHandler(at, labelName, w, r); err != nil { if err := prometheus.LabelValuesHandler(startTime, at, labelName, w, r); err != nil {
labelValuesErrors.Inc() labelValuesErrors.Inc()
sendPrometheusError(w, r, err) sendPrometheusError(w, r, err)
return true return true
@ -184,7 +185,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a
case "prometheus/api/v1/query": case "prometheus/api/v1/query":
queryRequests.Inc() queryRequests.Inc()
httpserver.EnableCORS(w, r) httpserver.EnableCORS(w, r)
if err := prometheus.QueryHandler(at, w, r); err != nil { if err := prometheus.QueryHandler(startTime, at, w, r); err != nil {
queryErrors.Inc() queryErrors.Inc()
sendPrometheusError(w, r, err) sendPrometheusError(w, r, err)
return true return true
@ -193,7 +194,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a
case "prometheus/api/v1/query_range": case "prometheus/api/v1/query_range":
queryRangeRequests.Inc() queryRangeRequests.Inc()
httpserver.EnableCORS(w, r) httpserver.EnableCORS(w, r)
if err := prometheus.QueryRangeHandler(at, w, r); err != nil { if err := prometheus.QueryRangeHandler(startTime, at, w, r); err != nil {
queryRangeErrors.Inc() queryRangeErrors.Inc()
sendPrometheusError(w, r, err) sendPrometheusError(w, r, err)
return true return true
@ -202,7 +203,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a
case "prometheus/api/v1/series": case "prometheus/api/v1/series":
seriesRequests.Inc() seriesRequests.Inc()
httpserver.EnableCORS(w, r) httpserver.EnableCORS(w, r)
if err := prometheus.SeriesHandler(at, w, r); err != nil { if err := prometheus.SeriesHandler(startTime, at, w, r); err != nil {
seriesErrors.Inc() seriesErrors.Inc()
sendPrometheusError(w, r, err) sendPrometheusError(w, r, err)
return true return true
@ -211,7 +212,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a
case "prometheus/api/v1/series/count": case "prometheus/api/v1/series/count":
seriesCountRequests.Inc() seriesCountRequests.Inc()
httpserver.EnableCORS(w, r) httpserver.EnableCORS(w, r)
if err := prometheus.SeriesCountHandler(at, w, r); err != nil { if err := prometheus.SeriesCountHandler(startTime, at, w, r); err != nil {
seriesCountErrors.Inc() seriesCountErrors.Inc()
sendPrometheusError(w, r, err) sendPrometheusError(w, r, err)
return true return true
@ -220,7 +221,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a
case "prometheus/api/v1/labels": case "prometheus/api/v1/labels":
labelsRequests.Inc() labelsRequests.Inc()
httpserver.EnableCORS(w, r) httpserver.EnableCORS(w, r)
if err := prometheus.LabelsHandler(at, w, r); err != nil { if err := prometheus.LabelsHandler(startTime, at, w, r); err != nil {
labelsErrors.Inc() labelsErrors.Inc()
sendPrometheusError(w, r, err) sendPrometheusError(w, r, err)
return true return true
@ -229,7 +230,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a
case "prometheus/api/v1/labels/count": case "prometheus/api/v1/labels/count":
labelsCountRequests.Inc() labelsCountRequests.Inc()
httpserver.EnableCORS(w, r) httpserver.EnableCORS(w, r)
if err := prometheus.LabelsCountHandler(at, w, r); err != nil { if err := prometheus.LabelsCountHandler(startTime, at, w, r); err != nil {
labelsCountErrors.Inc() labelsCountErrors.Inc()
sendPrometheusError(w, r, err) sendPrometheusError(w, r, err)
return true return true
@ -237,7 +238,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a
return true return true
case "prometheus/api/v1/export": case "prometheus/api/v1/export":
exportRequests.Inc() exportRequests.Inc()
if err := prometheus.ExportHandler(at, w, r); err != nil { if err := prometheus.ExportHandler(startTime, at, w, r); err != nil {
exportErrors.Inc() exportErrors.Inc()
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
return true return true
@ -245,7 +246,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a
return true return true
case "prometheus/federate": case "prometheus/federate":
federateRequests.Inc() federateRequests.Inc()
if err := prometheus.FederateHandler(at, w, r); err != nil { if err := prometheus.FederateHandler(startTime, at, w, r); err != nil {
federateErrors.Inc() federateErrors.Inc()
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
return true return true
@ -274,11 +275,11 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a
} }
} }
func deleteHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool { func deleteHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool {
switch p.Suffix { switch p.Suffix {
case "prometheus/api/v1/admin/tsdb/delete_series": case "prometheus/api/v1/admin/tsdb/delete_series":
deleteRequests.Inc() deleteRequests.Inc()
if err := prometheus.DeleteHandler(at, r); err != nil { if err := prometheus.DeleteHandler(startTime, at, r); err != nil {
deleteErrors.Inc() deleteErrors.Inc()
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
return true return true

View File

@ -39,8 +39,7 @@ var (
const defaultStep = 5 * 60 * 1000 const defaultStep = 5 * 60 * 1000
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime() ct := currentTime()
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %s", err) return fmt.Errorf("cannot parse request form values: %s", err)
@ -116,8 +115,7 @@ func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) err
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`) var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
// ExportHandler exports data in raw format from /api/v1/export. // ExportHandler exports data in raw format from /api/v1/export.
func ExportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime() ct := currentTime()
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %s", err) return fmt.Errorf("cannot parse request form values: %s", err)
@ -215,8 +213,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, star
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request. // DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
// //
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series // See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
func DeleteHandler(at *auth.Token, r *http.Request) error { func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error {
startTime := time.Now()
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %s", err) return fmt.Errorf("cannot parse request form values: %s", err)
} }
@ -288,8 +285,7 @@ var httpClient = &http.Client{
// LabelValuesHandler processes /api/v1/label/<labelName>/values request. // LabelValuesHandler processes /api/v1/label/<labelName>/values request.
// //
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values // See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
func LabelValuesHandler(at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error { func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
deadline := getDeadlineForQuery(r) deadline := getDeadlineForQuery(r)
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
@ -392,8 +388,7 @@ func labelValuesWithMatches(at *auth.Token, labelName string, matches []string,
var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/label/{}/values"}`) var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/label/{}/values"}`)
// LabelsCountHandler processes /api/v1/labels/count request. // LabelsCountHandler processes /api/v1/labels/count request.
func LabelsCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { func LabelsCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
deadline := getDeadlineForQuery(r) deadline := getDeadlineForQuery(r)
labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline) labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline)
if err != nil { if err != nil {
@ -414,8 +409,7 @@ var labelsCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="
// LabelsHandler processes /api/v1/labels request. // LabelsHandler processes /api/v1/labels request.
// //
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names // See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
func LabelsHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
deadline := getDeadlineForQuery(r) deadline := getDeadlineForQuery(r)
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
@ -510,8 +504,7 @@ func labelsWithMatches(at *auth.Token, matches []string, start, end int64, deadl
var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels"}`) var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels"}`)
// SeriesCountHandler processes /api/v1/series/count request. // SeriesCountHandler processes /api/v1/series/count request.
func SeriesCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { func SeriesCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
deadline := getDeadlineForQuery(r) deadline := getDeadlineForQuery(r)
n, isPartial, err := netstorage.GetSeriesCount(at, deadline) n, isPartial, err := netstorage.GetSeriesCount(at, deadline)
if err != nil { if err != nil {
@ -532,8 +525,7 @@ var seriesCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="
// SeriesHandler processes /api/v1/series request. // SeriesHandler processes /api/v1/series request.
// //
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers // See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
func SeriesHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime() ct := currentTime()
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
@ -613,8 +605,7 @@ var seriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/
// QueryHandler processes /api/v1/query request. // QueryHandler processes /api/v1/query request.
// //
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries // See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime() ct := currentTime()
query := r.FormValue("query") query := r.FormValue("query")
@ -728,8 +719,7 @@ func parsePositiveDuration(s string, step int64) (int64, error) {
// QueryRangeHandler processes /api/v1/query_range request. // QueryRangeHandler processes /api/v1/query_range request.
// //
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries // See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime() ct := currentTime()
query := r.FormValue("query") query := r.FormValue("query")