app/vmauth: automatically retry failing GET requests on the remaining backends

This commit is contained in:
Aliaksandr Valialkin 2023-02-09 21:05:13 -08:00
parent c80fc8c77f
commit 769d7da25a
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
4 changed files with 68 additions and 31 deletions

View File

@ -107,11 +107,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
ui.requests.Inc()
targetURL, headers, err := createTargetURL(ui, r.URL)
if err != nil {
httpserver.Errorf(w, r, "cannot determine targetURL: %s", err)
return true
}
// Limit the concurrency of requests to backends
concurrencyLimitOnce.Do(concurrencyLimitInit)
@ -128,13 +123,34 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
handleConcurrencyLimitError(w, r, err)
return true
}
processRequest(w, r, targetURL, headers)
processRequest(w, r, ui)
ui.endConcurrencyLimit()
<-concurrencyLimitCh
return true
}
func processRequest(w http.ResponseWriter, r *http.Request, targetURL *url.URL, headers []Header) {
func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) {
u := normalizeURL(r.URL)
up, headers, err := ui.getURLPrefix(u)
if err != nil {
httpserver.Errorf(w, r, "cannot determine targetURL: %s", err)
return
}
maxAttempts := up.getBackendsCount()
for i := 0; i < maxAttempts; i++ {
targetURL := up.mergeURLs(u)
if tryProcessingRequest(w, r, targetURL, headers) {
return
}
}
err = &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("all the backends for the user %q are unavailable", ui.name()),
StatusCode: http.StatusServiceUnavailable,
}
httpserver.Errorf(w, r, "%s", err)
}
func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url.URL, headers []Header) bool {
// This code has been copied from net/http/httputil/reverseproxy.go
req := sanitizeRequestHeaders(r)
req.URL = targetURL
@ -144,12 +160,20 @@ func processRequest(w http.ResponseWriter, r *http.Request, targetURL *url.URL,
transportOnce.Do(transportInit)
res, err := transport.RoundTrip(req)
if err != nil {
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
if r.Method == "POST" || r.Method == "PUT" {
// It is impossible to retry POST and PUT requests,
// since we already proxied the request body to the backend.
err = &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("error when proxying the request to %q: %s", targetURL, err),
StatusCode: http.StatusBadGateway,
Err: fmt.Errorf("cannot proxy the request to %q: %w", targetURL, err),
StatusCode: http.StatusServiceUnavailable,
}
httpserver.Errorf(w, r, "%s", err)
return
return true
}
logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying the request to %q: %s", remoteAddr, requestURI, targetURL, err)
return false
}
removeHopHeaders(res.Header)
copyHeader(w.Header(), res.Header)
@ -164,8 +188,9 @@ func processRequest(w http.ResponseWriter, r *http.Request, targetURL *url.URL,
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying response body from %s: %s", remoteAddr, requestURI, targetURL, err)
return
return true
}
return true
}
var copyBufPool bytesutil.ByteBufferPool

View File

@ -12,6 +12,10 @@ func (up *URLPrefix) mergeURLs(requestURI *url.URL) *url.URL {
return mergeURLs(pu, requestURI)
}
func (up *URLPrefix) getBackendsCount() int {
return len(up.urls)
}
func mergeURLs(uiURL, requestURI *url.URL) *url.URL {
targetURL := *uiURL
targetURL.Path += requestURI.Path
@ -35,7 +39,22 @@ func mergeURLs(uiURL, requestURI *url.URL) *url.URL {
return &targetURL
}
func createTargetURL(ui *UserInfo, uOrig *url.URL) (*url.URL, []Header, error) {
func (ui *UserInfo) getURLPrefix(u *url.URL) (*URLPrefix, []Header, error) {
for _, e := range ui.URLMaps {
for _, sp := range e.SrcPaths {
if sp.match(u.Path) {
return e.URLPrefix, e.Headers, nil
}
}
}
if ui.URLPrefix != nil {
return ui.URLPrefix, ui.Headers, nil
}
missingRouteRequests.Inc()
return nil, nil, fmt.Errorf("missing route for %q", u.String())
}
func normalizeURL(uOrig *url.URL) *url.URL {
u := *uOrig
// Prevent from attacks with using `..` in r.URL.Path
u.Path = path.Clean(u.Path)
@ -52,16 +71,5 @@ func createTargetURL(ui *UserInfo, uOrig *url.URL) (*url.URL, []Header, error) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1554
u.Path = ""
}
for _, e := range ui.URLMaps {
for _, sp := range e.SrcPaths {
if sp.match(u.Path) {
return e.URLPrefix.mergeURLs(&u), e.Headers, nil
}
}
}
if ui.URLPrefix != nil {
return ui.URLPrefix.mergeURLs(&u), ui.Headers, nil
}
missingRouteRequests.Inc()
return nil, nil, fmt.Errorf("missing route for %q", u.String())
return &u
}

View File

@ -13,10 +13,12 @@ func TestCreateTargetURLSuccess(t *testing.T) {
if err != nil {
t.Fatalf("cannot parse %q: %s", requestURI, err)
}
target, headers, err := createTargetURL(ui, u)
u = normalizeURL(u)
up, headers, err := ui.getURLPrefix(u)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
target := up.mergeURLs(u)
if target.String() != expectedTarget {
t.Fatalf("unexpected target; got %q; want %q", target, expectedTarget)
}
@ -119,15 +121,16 @@ func TestCreateTargetURLFailure(t *testing.T) {
if err != nil {
t.Fatalf("cannot parse %q: %s", requestURI, err)
}
target, headers, err := createTargetURL(ui, u)
u = normalizeURL(u)
up, headers, err := ui.getURLPrefix(u)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if target != nil {
t.Fatalf("unexpected target=%q; want empty string", target)
if up != nil {
t.Fatalf("unexpected non-empty up=%q", up)
}
if headers != nil {
t.Fatalf("unexpected headers=%q; want empty string", headers)
t.Fatalf("unexpected non-empty headers=%q", headers)
}
}
f(&UserInfo{}, "/foo/bar")

View File

@ -16,6 +16,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add the ability to limit the number of concurrent requests on a per-user basis via `max_concurrent_requests` option. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3346) and [these docs](https://docs.victoriametrics.com/vmauth.html#auth-config).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): automatically retry failing GET requests on all the configured backends.
* FEATURE: [vmalert enterprise](https://docs.victoriametrics.com/vmalert.html): add ability to read alerting and recording rules from S3, GCS or S3-compatible object storage. See [these docs](https://docs.victoriametrics.com/vmalert.html#reading-rules-from-object-storage).
## [v1.87.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.1)