app: add vm_concurrent_ metrics for visibility in concurrency limiters for vminsert and vmselect

This commit is contained in:
Aliaksandr Valialkin 2019-08-05 18:27:50 +03:00
parent 05713469c3
commit 5c83f8e203
2 changed files with 52 additions and 11 deletions

View File

@ -32,6 +32,17 @@ func Init() {
func Do(f func() error) error {
// Limit the number of conurrent f calls in order to prevent from excess
// memory usage and CPU trashing.
select {
case ch <- struct{}{}:
err := f()
<-ch
return err
default:
}
// All the workers are busy.
// Sleep for up to waitDuration.
concurrencyLimitReached.Inc()
t := timerpool.Get(waitDuration)
select {
case ch <- struct{}{}:
@ -41,9 +52,19 @@ func Do(f func() error) error {
return err
case <-t.C:
timerpool.Put(t)
concurrencyLimitErrors.Inc()
concurrencyLimitTimeout.Inc()
return fmt.Errorf("the server is overloaded with %d concurrent inserts; either increase -maxConcurrentInserts or reduce the load", cap(ch))
}
}
var concurrencyLimitErrors = metrics.NewCounter(`vm_concurrency_limit_errors_total`)
var (
concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_insert_limit_reached_total`)
concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_insert_limit_timeout_total`)
_ = metrics.NewGauge(`vm_concurrent_insert_capacity`, func() float64 {
return float64(cap(ch))
})
_ = metrics.NewGauge(`vm_concurrent_insert_current`, func() float64 {
return float64(len(ch))
})
)

View File

@ -30,20 +30,38 @@ func Init() {
fs.RemoveDirContents(tmpDirPath)
netstorage.InitTmpBlocksDir(tmpDirPath)
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
concurrencyCh = make(chan struct{}, *maxConcurrentRequests)
}
var concurrencyCh chan struct{}
// Stop stops vmselect
func Stop() {
promql.StopRollupResultCache()
}
var concurrencyCh chan struct{}
var (
concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_select_limit_reached_total`)
concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_select_limit_timeout_total`)
_ = metrics.NewGauge(`vm_concurrent_select_capacity`, func() float64 {
return float64(cap(concurrencyCh))
})
_ = metrics.NewGauge(`vm_concurrent_select_current`, func() float64 {
return float64(len(concurrencyCh))
})
)
// RequestHandler handles remote read API requests for Prometheus
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
// Limit the number of concurrent queries.
select {
case concurrencyCh <- struct{}{}:
defer func() { <-concurrencyCh }()
default:
// Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc()
t := timerpool.Get(*maxQueueDuration)
select {
case concurrencyCh <- struct{}{}:
@ -51,9 +69,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
defer func() { <-concurrencyCh }()
case <-t.C:
timerpool.Put(t)
concurrencyLimitTimeout.Inc()
httpserver.Errorf(w, "cannot handle more than %d concurrent requests", cap(concurrencyCh))
return true
}
}
path := strings.Replace(r.URL.Path, "//", "/", -1)
if strings.HasPrefix(path, "/api/v1/label/") {