app/vmselect: make sure the request doesnt wait in pending queue more than the configured timeout

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
This commit is contained in:
Aliaksandr Valialkin 2020-09-22 01:21:20 +03:00
parent 31e341371b
commit 69eb9783e6
2 changed files with 27 additions and 4 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@ -33,7 +34,8 @@ var (
cacheDataPath = flag.String("cacheDataPath", "", "Path to directory for cache files. Cache isn't saved if empty")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests limit is reached")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
"limit is reached; see also -search.maxQueryDuration")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+
"This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+
"Deduplication is disabled if the -dedup.minScrapeInterval is 0")
@ -133,7 +135,11 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
default:
// Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc()
t := timerpool.Get(*maxQueueDuration)
d := searchutils.GetMaxQueryDuration(r)
if d > *maxQueueDuration {
d = *maxQueueDuration
}
t := timerpool.Get(d)
select {
case concurrencyCh <- struct{}{}:
timerpool.Put(t)
@ -143,8 +149,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot handle more than %d concurrent search requests during %s; possible solutions: "+
"increase `-search.maxQueueDuration`, increase `-search.maxConcurrentRequests`, increase server capacity",
*maxConcurrentRequests, *maxQueueDuration),
"increase `-search.maxQueueDuration`; increase `-search.maxQueryDuration`; increase `-search.maxConcurrentRequests`; "+
"increase server capacity",
*maxConcurrentRequests, d),
StatusCode: http.StatusServiceUnavailable,
}
httpserver.Errorf(w, r, "%s", err)

View File

@ -112,6 +112,22 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
// GetMaxQueryDuration returns the maximum duration for query from r.
func GetMaxQueryDuration(r *http.Request) time.Duration {
dms, err := GetDuration(r, "timeout", 0)
if err != nil {
dms = 0
}
d := time.Duration(dms) * time.Millisecond
if d <= 0 || d > *maxQueryDuration {
d = *maxQueryDuration
}
if *StorageTimeout > 0 && d > *StorageTimeout {
d = *StorageTimeout
}
return d
}
// GetDeadlineForQuery returns deadline for the given query r.
func GetDeadlineForQuery(r *http.Request, startTime time.Time) Deadline {
dMax := maxQueryDuration.Milliseconds()