app/vmselect: make sure the request doesnt wait in pending queue more than the configured timeout

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
This commit is contained in:
Aliaksandr Valialkin 2020-09-22 01:21:20 +03:00
parent a69234ed18
commit 3b1e3a03e0
2 changed files with 24 additions and 4 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -23,7 +24,8 @@ var (
deleteAuthKey = flag.String("deleteAuthKey", "", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests limit is reached")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
"limit is reached; see also -search.maxQueryDuration")
resetCacheAuthKey = flag.String("search.resetCacheAuthKey", "", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call")
)
@ -77,7 +79,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
default:
// Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc()
t := timerpool.Get(*maxQueueDuration)
d := searchutils.GetMaxQueryDuration(r)
if d > *maxQueueDuration {
d = *maxQueueDuration
}
t := timerpool.Get(d)
select {
case concurrencyCh <- struct{}{}:
timerpool.Put(t)
@ -87,8 +93,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot handle more than %d concurrent search requests during %s; possible solutions: "+
"increase `-search.maxQueueDuration`, increase `-search.maxConcurrentRequests`, increase server capacity",
*maxConcurrentRequests, *maxQueueDuration),
"increase `-search.maxQueueDuration`; increase `-search.maxQueryDuration`; increase `-search.maxConcurrentRequests`; "+
"increase server capacity",
*maxConcurrentRequests, d),
StatusCode: http.StatusServiceUnavailable,
}
httpserver.Errorf(w, r, "%s", err)

View File

@ -104,6 +104,19 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
// GetMaxQueryDuration returns the maximum duration for query from r.
func GetMaxQueryDuration(r *http.Request) time.Duration {
dms, err := GetDuration(r, "timeout", 0)
if err != nil {
dms = 0
}
d := time.Duration(dms) * time.Millisecond
if d <= 0 || d > *maxQueryDuration {
d = *maxQueryDuration
}
return d
}
// GetDeadlineForQuery returns deadline for the given query r.
func GetDeadlineForQuery(r *http.Request, startTime time.Time) Deadline {
dMax := maxQueryDuration.Milliseconds()