app/vmauth: do not use net/http/httputil.ReverseProxy

This allows better controlling requests to backends and providing better error logging.
For example, if the backend was unavailable, then the ReverseProxy was logging the error
message without client ip and the initial request uri. This could harden debugging.

This is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3486
This commit is contained in:
Aliaksandr Valialkin 2023-01-27 13:38:13 -08:00
parent 1b81d8f542
commit 372b1688d7
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 119 additions and 47 deletions

View File

@ -1,22 +1,24 @@
package main
import (
"context"
"flag"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"net/textproto"
"os"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/metrics"
@ -27,6 +29,7 @@ var (
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
maxIdleConnsPerBackend = flag.Int("maxIdleConnsPerBackend", 100, "The maximum number of idle connections vmauth can open per each backend host")
responseTimeout = flag.Duration("responseTimeout", 5*time.Minute, "The timeout for receiving a response from backend")
reloadAuthKey = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+
`Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`)
@ -86,11 +89,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
ui := ac[authToken]
if ui == nil {
invalidAuthTokenRequests.Inc()
err := fmt.Errorf("cannot find the provided auth token %q in config", authToken)
if *logInvalidAuthTokens {
httpserver.Errorf(w, r, "cannot find the provided auth token %q in config", authToken)
err = &httpserver.ErrorWithStatusCode{
Err: err,
StatusCode: http.StatusUnauthorized,
}
httpserver.Errorf(w, r, "%s", err)
} else {
errStr := fmt.Sprintf("cannot find the provided auth token %q in config", authToken)
http.Error(w, errStr, http.StatusBadRequest)
http.Error(w, err.Error(), http.StatusUnauthorized)
}
return true
}
@ -100,28 +107,103 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
httpserver.Errorf(w, r, "cannot determine targetURL: %s", err)
return true
}
// This code has been copied from net/http/httputil/reverseproxy.go
req := sanitizeRequestHeaders(r)
req.URL = targetURL
for _, h := range headers {
r.Header.Set(h.Name, h.Value)
req.Header.Set(h.Name, h.Value)
}
ctx := context.WithValue(r.Context(), targetURLKey, targetURL)
r = r.WithContext(ctx)
proxyRequest(w, r)
transportOnce.Do(transportInit)
res, err := transport.RoundTrip(req)
if err != nil {
err = &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("error when proxying the request to %q: %s", targetURL, err),
StatusCode: http.StatusBadGateway,
}
httpserver.Errorf(w, r, "%s", err)
return true
}
removeHopHeaders(res.Header)
copyHeader(w.Header(), res.Header)
w.WriteHeader(res.StatusCode)
copyBuf := copyBufPool.Get()
copyBuf.B = bytesutil.ResizeNoCopyNoOverallocate(copyBuf.B, 16*1024)
_, err = io.CopyBuffer(w, res.Body, copyBuf.B)
copyBufPool.Put(copyBuf)
_ = res.Body.Close()
if err != nil && !netutil.IsTrivialNetworkError(err) {
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying response body from %s: %s", remoteAddr, requestURI, targetURL, err)
return true
}
return true
}
func proxyRequest(w http.ResponseWriter, r *http.Request) {
reverseProxyOnce.Do(initReverseProxy)
defer func() {
err := recover()
if err == nil || err == http.ErrAbortHandler {
// Suppress http.ErrAbortHandler panic.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
return
var copyBufPool bytesutil.ByteBufferPool
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
// Forward other panics to the caller.
panic(err)
}()
reverseProxy.ServeHTTP(w, r)
}
}
func sanitizeRequestHeaders(r *http.Request) *http.Request {
// This code has been copied from net/http/httputil/reverseproxy.go
req := r.Clone(r.Context())
removeHopHeaders(req.Header)
if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
// If we aren't the first proxy retain prior
// X-Forwarded-For information as a comma+space
// separated list and fold multiple headers into one.
prior := req.Header["X-Forwarded-For"]
if len(prior) > 0 {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
req.Header.Set("X-Forwarded-For", clientIP)
}
return req
}
func removeHopHeaders(h http.Header) {
// remove hop-by-hop headers listed in the "Connection" header of h.
// See RFC 7230, section 6.1
for _, f := range h["Connection"] {
for _, sf := range strings.Split(f, ",") {
if sf = textproto.TrimString(sf); sf != "" {
h.Del(sf)
}
}
}
// Remove hop-by-hop headers to the backend. Especially
// important is "Connection" because we want a persistent
// connection, regardless of what the client sent to us.
for _, key := range hopHeaders {
h.Del(key)
}
}
// Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the
// obsoleted RFC 2616 (section 13.5.1) and are used for backward
// compatibility.
var hopHeaders = []string{
"Connection",
"Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"Te", // canonicalized version of "TE"
"Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
"Transfer-Encoding",
"Upgrade",
}
var (
@ -131,36 +213,24 @@ var (
)
var (
reverseProxy *httputil.ReverseProxy
reverseProxyOnce sync.Once
transport *http.Transport
transportOnce sync.Once
)
// initReverseProxy must be called after flag.Parse(), since it uses command-line flags.
func initReverseProxy() {
reverseProxy = &httputil.ReverseProxy{
Director: func(r *http.Request) {
targetURL := r.Context().Value(targetURLKey).(*url.URL)
r.URL = targetURL
},
Transport: func() *http.Transport {
tr := http.DefaultTransport.(*http.Transport).Clone()
// Automatic compression must be disabled in order to fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/535
tr.DisableCompression = true
// Disable HTTP/2.0, since VictoriaMetrics components don't support HTTP/2.0 (because there is no sense in this).
tr.ForceAttemptHTTP2 = false
tr.MaxIdleConnsPerHost = *maxIdleConnsPerBackend
if tr.MaxIdleConns != 0 && tr.MaxIdleConns < tr.MaxIdleConnsPerHost {
tr.MaxIdleConns = tr.MaxIdleConnsPerHost
}
return tr
}(),
FlushInterval: time.Second,
ErrorLog: logger.StdErrorLogger(),
func transportInit() {
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.ResponseHeaderTimeout = *responseTimeout
// Automatic compression must be disabled in order to fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/535
tr.DisableCompression = true
// Disable HTTP/2.0, since VictoriaMetrics components don't support HTTP/2.0 (because there is no sense in this).
tr.ForceAttemptHTTP2 = false
tr.MaxIdleConnsPerHost = *maxIdleConnsPerBackend
if tr.MaxIdleConns != 0 && tr.MaxIdleConns < tr.MaxIdleConnsPerHost {
tr.MaxIdleConns = tr.MaxIdleConnsPerHost
}
transport = tr
}
var targetURLKey interface{} = "vm-target-url"
func usage() {
const s = `
vmauth authenticates and authorizes incoming requests and proxies them to VictoriaMetrics.

View File

@ -27,6 +27,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): propagate all the timeout-related errors from `vmstorage` to `vmselect` when `vmstorage`. Previously some timeout errors weren't returned from `vmselect` to `vmstorage`. Instead, `vmstorage` could log the error and close the connection to `vmselect`, so `vmselect` was logging cryptic errors such as `cannot execute funcName="..." on vmstorage "...": EOF`.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add support for time zone selection for older versions of browsers. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3680).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): update API version for [ec2_sd_configs](https://docs.victoriametrics.com/sd_configs.html#ec2_sd_configs) to fix [the issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3700) with missing `__meta_ec2_availability_zone_id` attribute.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): allow re-entering authorization info in the web browser if the entered info was incorrect. Previously it was non-trivial to do via the web browser, since `vmauth` was returning `400 Bad Request` instead of `401 Unauthorized` http response code.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): always log the client address and the requested URL on proxying errors. Previously some errors could miss this information.
## [v1.86.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.86.2)