app/vmauth: limit the number of concurrent requests served by vmauth with the -maxConcurrentRequests command-line flag

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3346

This commit is based on the https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3486
This commit is contained in:
Aliaksandr Valialkin 2023-01-27 14:06:42 -08:00
parent 372b1688d7
commit ff39a91147
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 49 additions and 8 deletions

View File

@ -7,6 +7,7 @@ import (
"net" "net"
"net/http" "net/http"
"net/textproto" "net/textproto"
"net/url"
"os" "os"
"strings" "strings"
"sync" "sync"
@ -28,8 +29,11 @@ var (
httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol") httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+ useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
maxIdleConnsPerBackend = flag.Int("maxIdleConnsPerBackend", 100, "The maximum number of idle connections vmauth can open per each backend host") maxIdleConnsPerBackend = flag.Int("maxIdleConnsPerBackend", 100, "The maximum number of idle connections vmauth can open per each backend host. "+
"See also -maxConcurrentRequests")
responseTimeout = flag.Duration("responseTimeout", 5*time.Minute, "The timeout for receiving a response from backend") responseTimeout = flag.Duration("responseTimeout", 5*time.Minute, "The timeout for receiving a response from backend")
maxConcurrentRequests = flag.Int("maxConcurrentRequests", 1000, "The maximum number of concurrent requests vmauth can process. Other requests are rejected with "+
"'429 Too Many Requests' http status code. See also -maxIdleConnsPerBackend")
reloadAuthKey = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...") reloadAuthKey = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+ logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+
`Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`) `Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`)
@ -85,6 +89,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
// See https://docs.influxdata.com/influxdb/v2.0/api/ // See https://docs.influxdata.com/influxdb/v2.0/api/
authToken = strings.Replace(authToken, "Token", "Bearer", 1) authToken = strings.Replace(authToken, "Token", "Bearer", 1)
} }
ac := authConfig.Load().(map[string]*UserInfo) ac := authConfig.Load().(map[string]*UserInfo)
ui := ac[authToken] ui := ac[authToken]
if ui == nil { if ui == nil {
@ -108,6 +113,26 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true return true
} }
// Limit the concurrency of requests to backends
concurrencyLimitOnce.Do(concurrencyLimitInit)
select {
case concurrencyLimitCh <- struct{}{}:
default:
concurrentRequestsLimitReachedTotal.Inc()
w.Header().Add("Retry-After", "10")
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot serve more than -maxConcurrentRequests=%d concurrent requests", cap(concurrencyLimitCh)),
StatusCode: http.StatusTooManyRequests,
}
httpserver.Errorf(w, r, "%s", err)
return true
}
processRequest(w, r, targetURL, headers)
<-concurrencyLimitCh
return true
}
func processRequest(w http.ResponseWriter, r *http.Request, targetURL *url.URL, headers []Header) {
// This code has been copied from net/http/httputil/reverseproxy.go // This code has been copied from net/http/httputil/reverseproxy.go
req := sanitizeRequestHeaders(r) req := sanitizeRequestHeaders(r)
req.URL = targetURL req.URL = targetURL
@ -122,7 +147,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
StatusCode: http.StatusBadGateway, StatusCode: http.StatusBadGateway,
} }
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return
} }
removeHopHeaders(res.Header) removeHopHeaders(res.Header)
copyHeader(w.Header(), res.Header) copyHeader(w.Header(), res.Header)
@ -137,10 +162,8 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
remoteAddr := httpserver.GetQuotedRemoteAddr(r) remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r) requestURI := httpserver.GetRequestURI(r)
logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying response body from %s: %s", remoteAddr, requestURI, targetURL, err) logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying response body from %s: %s", remoteAddr, requestURI, targetURL, err)
return true return
} }
return true
} }
var copyBufPool bytesutil.ByteBufferPool var copyBufPool bytesutil.ByteBufferPool
@ -231,6 +254,23 @@ func transportInit() {
transport = tr transport = tr
} }
var (
concurrencyLimitCh chan struct{}
concurrencyLimitOnce sync.Once
)
func concurrencyLimitInit() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
_ = metrics.NewGauge("vmauth_concurrent_requests_capacity", func() float64 {
return float64(*maxConcurrentRequests)
})
_ = metrics.NewGauge("vmauth_concurrent_requests_current", func() float64 {
return float64(len(concurrencyLimitCh))
})
}
var concurrentRequestsLimitReachedTotal = metrics.NewCounter("vmauth_concurrent_requests_limit_reached_total")
func usage() { func usage() {
const s = ` const s = `
vmauth authenticates and authorizes incoming requests and proxies them to VictoriaMetrics. vmauth authenticates and authorizes incoming requests and proxies them to VictoriaMetrics.

View File

@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when sending stale markers for targets, which expose big number of metrics. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675) issues. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when sending stale markers for targets, which expose big number of metrics. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675) issues.
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): allow limiting the number of concurrent requests sent to `vmauth` via `-maxConcurrentRequests` command-line flag. This allows controlling memory usage of `vmauth` and the resource usage of backends behind `vmauth`. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3346). Thanks to @dmitryk-dk for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3486).
* FEATURE: allow using VictoriaMetrics components behind proxies, which communicate with the backend via [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335). For example, [vmauth](https://docs.victoriametrics.com/vmauth.html) accepts proxy protocol connections when it starts with `-httpListenAddr.useProxyProtocol` command-line flag. * FEATURE: allow using VictoriaMetrics components behind proxies, which communicate with the backend via [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335). For example, [vmauth](https://docs.victoriametrics.com/vmauth.html) accepts proxy protocol connections when it starts with `-httpListenAddr.useProxyProtocol` command-line flag.
* FEATURE: add `-internStringMaxLen` command-line flag, which can be used for fine-tuning RAM vs CPU usage in certain workloads. For example, if the stored time series contain long labels, then it may be useful reducing the `-internStringMaxLen` in order to reduce memory usage at the cost of increased CPU usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692). * FEATURE: add `-internStringMaxLen` command-line flag, which can be used for fine-tuning RAM vs CPU usage in certain workloads. For example, if the stored time series contain long labels, then it may be useful reducing the `-internStringMaxLen` in order to reduce memory usage at the cost of increased CPU usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692).