From ebebaecd94dc763a0c163c398a4b999624098d3d Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 27 Jan 2023 08:08:35 +0100 Subject: [PATCH] lib/netutil: init implimentation of proxy protocol (#3687) * lib/netutil: init implimentation of proxy protocol https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335 * wip Co-authored-by: Aliaksandr Valialkin --- app/vmagent/main.go | 37 +++++--- app/vmalert/main.go | 6 +- app/vmauth/main.go | 6 +- app/vmbackup/main.go | 2 +- app/vminsert/main.go | 36 ++++--- app/vmrestore/main.go | 2 +- app/vmselect/main.go | 6 +- app/vmstorage/main.go | 8 +- app/vmstorage/servers/vminsert.go | 2 +- docs/CHANGELOG.md | 1 + lib/httpserver/httpserver.go | 7 +- lib/ingestserver/clusternative/server.go | 2 +- lib/ingestserver/graphite/server.go | 7 +- lib/ingestserver/influx/server.go | 7 +- lib/ingestserver/opentsdb/server.go | 7 +- lib/ingestserver/opentsdbhttp/server.go | 7 +- lib/netutil/proxyprotocol.go | 116 +++++++++++++++++++++++ lib/netutil/proxyprotocol_test.go | 107 +++++++++++++++++++++ lib/netutil/tcplistener.go | 18 +++- lib/vmselectapi/server.go | 2 +- 20 files changed, 336 insertions(+), 50 deletions(-) create mode 100644 lib/netutil/proxyprotocol.go create mode 100644 lib/netutil/proxyprotocol_test.go diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 8593401782..4ffca3f7dd 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -44,16 +44,29 @@ import ( var ( httpListenAddr = flag.String("httpListenAddr", ":8429", "TCP address to listen for http connections. "+ "Set this flag to empty value in order to disable listening on any port. This mode may be useful for running multiple vmagent instances on the same server. "+ - "Note that /targets and /metrics pages aren't available if -httpListenAddr=''") + "Note that /targets and /metrics pages aren't available if -httpListenAddr=''. See also -httpListenAddr.useProxyProtocol") + useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+ - "This flag isn't needed when ingesting data over HTTP - just send it to http://:8429/write") - graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") + "This flag isn't needed when ingesting data over HTTP - just send it to http://:8429/write . "+ + "See also -influxListenAddr.useProxyProtocol") + influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") + graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+ + "See also -graphiteListenAddr.useProxyProtocol") + graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+ "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ - "Usually :4242 must be set. Doesn't work if empty") - opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty") - configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg") - dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+ + "Usually :4242 must be set. Doesn't work if empty. See also -opentsdbListenAddr.useProxyProtocol") + opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") + opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+ + "See also -opentsdbHTTPListenAddr.useProxyProtocol") + opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+ + "at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") + configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg") + dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+ "-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+ "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag") ) @@ -104,26 +117,26 @@ func main() { remotewrite.Init() common.StartUnmarshalWorkers() if len(*influxListenAddr) > 0 { - influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error { + influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error { return influx.InsertHandlerForReader(r, false) }) } if len(*graphiteListenAddr) > 0 { - graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) + graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler) } if len(*opentsdbListenAddr) > 0 { httpInsertHandler := getOpenTSDBHTTPInsertHandler() - opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, httpInsertHandler) + opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, httpInsertHandler) } if len(*opentsdbHTTPListenAddr) > 0 { httpInsertHandler := getOpenTSDBHTTPInsertHandler() - opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, httpInsertHandler) + opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, httpInsertHandler) } promscrape.Init(remotewrite.Push) if len(*httpListenAddr) > 0 { - go httpserver.Serve(*httpListenAddr, requestHandler) + go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) } logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds()) diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 451948eec3..da9a62f5ee 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -49,7 +49,9 @@ absolute path to all .tpl files in root.`) configCheckInterval = flag.Duration("configCheckInterval", 0, "Interval for checking for changes in '-rule' or '-notifier.config' files. "+ "By default the checking is disabled. Send SIGHUP signal in order to force config check for changes.") - httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections") + httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol") + useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules") validateTemplates = flag.Bool("rule.validateTemplates", true, "Whether to validate annotation and label templates") @@ -170,7 +172,7 @@ func main() { go configReload(ctx, manager, groupsCfg, sighupCh) rh := &requestHandler{m: manager} - go httpserver.Serve(*httpListenAddr, rh.handler) + go httpserver.Serve(*httpListenAddr, *useProxyProtocol, rh.handler) sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) diff --git a/app/vmauth/main.go b/app/vmauth/main.go index c79173d3e0..b60823c643 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -22,7 +22,9 @@ import ( ) var ( - httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections") + httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol") + useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") maxIdleConnsPerBackend = flag.Int("maxIdleConnsPerBackend", 100, "The maximum number of idle connections vmauth can open per each backend host") reloadAuthKey = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...") logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+ @@ -41,7 +43,7 @@ func main() { logger.Infof("starting vmauth at %q...", *httpListenAddr) startTime := time.Now() initAuthConfig() - go httpserver.Serve(*httpListenAddr, requestHandler) + go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds()) sig := procutil.WaitForSigterm() diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index e7daa43b68..8479ed37f3 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -93,7 +93,7 @@ func main() { logger.Fatalf("invalid -snapshotName=%q: %s", *snapshotName, err) } - go httpserver.Serve(*httpListenAddr, nil) + go httpserver.Serve(*httpListenAddr, false, nil) srcFS, err := newSrcFS() if err != nil { diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 28bdbbc169..bc376f1b55 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -47,14 +47,28 @@ import ( var ( clusternativeListenAddr = flag.String("clusternativeListenAddr", "", "TCP address to listen for data from other vminsert nodes in multi-level cluster setup. "+ "See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup . Usually :8400 should be set to match default vmstorage port for vminsert. Disabled work if empty") - graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") - influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+ - "This flag isn't needed when ingesting data over HTTP - just send it to http://:8428/write") + graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+ + "See also -graphiteListenAddr.useProxyProtocol") + graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") + influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+ + "This flag isn't needed when ingesting data over HTTP - just send it to http://:8428/write . "+ + "See also -influxListenAddr.useProxyProtocol") + influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+ "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ - "Usually :4242 must be set. Doesn't work if empty") - opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty") - httpListenAddr = flag.String("httpListenAddr", ":8480", "Address to listen for http connections") + "Usually :4242 must be set. Doesn't work if empty. "+ + "See also -opentsdbListenAddr.useProxyProtocol") + opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") + opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+ + "See also -opentsdbHTTPListenAddr.useProxyProtocol") + opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+ + "at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") + httpListenAddr = flag.String("httpListenAddr", ":8480", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol") + useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superfluous labels are dropped. In this case the vm_metrics_with_dropped_labels_total metric at /metrics page is incremented") maxLabelValueLen = flag.Int("maxLabelValueLen", 16*1024, "The maximum length of label values in the accepted time series. Longer label values are truncated. In this case the vm_too_long_label_values_total metric at /metrics page is incremented") storageNodes = flagutil.NewArrayString("storageNode", "Comma-separated addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1,...,vmstorage-hostN . "+ @@ -110,26 +124,26 @@ func main() { }) } if len(*graphiteListenAddr) > 0 { - graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, func(r io.Reader) error { + graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, func(r io.Reader) error { return graphite.InsertHandler(nil, r) }) } if len(*influxListenAddr) > 0 { - influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error { + influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error { return influx.InsertHandlerForReader(nil, r) }) } if len(*opentsdbListenAddr) > 0 { - opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error { + opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, func(r io.Reader) error { return opentsdb.InsertHandler(nil, r) }, opentsdbhttp.InsertHandler) } if len(*opentsdbHTTPListenAddr) > 0 { - opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler) + opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler) } go func() { - httpserver.Serve(*httpListenAddr, requestHandler) + httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) }() sig := procutil.WaitForSigterm() diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go index b7510c762e..cb2efcd79d 100644 --- a/app/vmrestore/main.go +++ b/app/vmrestore/main.go @@ -38,7 +38,7 @@ func main() { logger.Init() pushmetrics.Init() - go httpserver.Serve(*httpListenAddr, nil) + go httpserver.Serve(*httpListenAddr, false, nil) srcFS, err := newSrcFS() if err != nil { diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 0984f6ec1b..145e5cb954 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -38,7 +38,9 @@ import ( ) var ( - httpListenAddr = flag.String("httpListenAddr", ":8481", "Address to listen for http connections") + httpListenAddr = flag.String("httpListenAddr", ":8481", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol") + useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") cacheDataPath = flag.String("cacheDataPath", "", "Path to directory for cache files. Cache isn't saved if empty") maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+ "It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+ @@ -127,7 +129,7 @@ func main() { } go func() { - httpserver.Serve(*httpListenAddr, requestHandler) + httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) }() sig := procutil.WaitForSigterm() diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 84a99fd9ac..4ff97744f9 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -25,8 +25,10 @@ import ( ) var ( - retentionPeriod = flagutil.NewDuration("retentionPeriod", "1", "Data with timestamps outside the retentionPeriod is automatically deleted. See also -retentionFilter") - httpListenAddr = flag.String("httpListenAddr", ":8482", "Address to listen for http connections") + retentionPeriod = flagutil.NewDuration("retentionPeriod", "1", "Data with timestamps outside the retentionPeriod is automatically deleted. See also -retentionFilter") + httpListenAddr = flag.String("httpListenAddr", ":8482", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol") + useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") storageDataPath = flag.String("storageDataPath", "vmstorage-data", "Path to storage data") vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services") vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") @@ -123,7 +125,7 @@ func main() { requestHandler := newRequestHandler(strg) go func() { - httpserver.Serve(*httpListenAddr, requestHandler) + httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) }() sig := procutil.WaitForSigterm() diff --git a/app/vmstorage/servers/vminsert.go b/app/vmstorage/servers/vminsert.go index 017b34d977..335b6da1f1 100644 --- a/app/vmstorage/servers/vminsert.go +++ b/app/vmstorage/servers/vminsert.go @@ -40,7 +40,7 @@ type VMInsertServer struct { // NewVMInsertServer starts VMInsertServer at the given addr serving the given storage. func NewVMInsertServer(addr string, storage *storage.Storage) (*VMInsertServer, error) { - ln, err := netutil.NewTCPListener("vminsert", addr, nil) + ln, err := netutil.NewTCPListener("vminsert", addr, false, nil) if err != nil { return nil, fmt.Errorf("unable to listen vminsertAddr %s: %w", addr, err) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b30cfc2034..363b6aa9ec 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when sending stale markers for targets, which expose big number of metrics. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675) issues. +* FEATURE: allow using VictoriaMetrics components behind proxies, which communicate with the backend via [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335). For example, [vmauth](https://docs.victoriametrics.com/vmauth.html) accepts proxy protocol connections when it starts with `-httpListenAddr.useProxyProtocol` command-line flag. * FEATURE: add `-internStringMaxLen` command-line flag, which can be used for fine-tuning RAM vs CPU usage in certain workloads. For example, if the stored time series contain long labels, then it may be useful reducing the `-internStringMaxLen` in order to reduce memory usage at the cost of increased CPU usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692). * BUGFIX: fix a bug, which could prevent background merges for the previous partitions until restart if the storage didn't have enough disk space for final deduplication and down-sampling. diff --git a/lib/httpserver/httpserver.go b/lib/httpserver/httpserver.go index 9e154276c9..1668489efa 100644 --- a/lib/httpserver/httpserver.go +++ b/lib/httpserver/httpserver.go @@ -79,7 +79,10 @@ type RequestHandler func(w http.ResponseWriter, r *http.Request) bool // by calling DisableResponseCompression before writing the first byte to w. // // The compression is also disabled if -http.disableResponseCompression flag is set. -func Serve(addr string, rh RequestHandler) { +// +// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol. +// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt +func Serve(addr string, useProxyProtocol bool, rh RequestHandler) { if rh == nil { rh = func(w http.ResponseWriter, r *http.Request) bool { return false @@ -103,7 +106,7 @@ func Serve(addr string, rh RequestHandler) { } tlsConfig = tc } - ln, err := netutil.NewTCPListener(scheme, addr, tlsConfig) + ln, err := netutil.NewTCPListener(scheme, addr, useProxyProtocol, tlsConfig) if err != nil { logger.Fatalf("cannot start http server at %s: %s", addr, err) } diff --git a/lib/ingestserver/clusternative/server.go b/lib/ingestserver/clusternative/server.go index 8827e7ee58..273f11452f 100644 --- a/lib/ingestserver/clusternative/server.go +++ b/lib/ingestserver/clusternative/server.go @@ -33,7 +33,7 @@ type Server struct { // MustStop must be called on the returned server when it is no longer needed. func MustStart(addr string, insertHandler func(c net.Conn) error) *Server { logger.Infof("starting TCP clusternative server at %q", addr) - lnTCP, err := netutil.NewTCPListener("clusternative", addr, nil) + lnTCP, err := netutil.NewTCPListener("clusternative", addr, false, nil) if err != nil { logger.Fatalf("cannot start TCP clusternative server at %q: %s", addr, err) } diff --git a/lib/ingestserver/graphite/server.go b/lib/ingestserver/graphite/server.go index 4608ba635a..2042d7a236 100644 --- a/lib/ingestserver/graphite/server.go +++ b/lib/ingestserver/graphite/server.go @@ -37,10 +37,13 @@ type Server struct { // // The incoming connections are processed with insertHandler. // +// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol. +// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt +// // MustStop must be called on the returned server when it is no longer needed. -func MustStart(addr string, insertHandler func(r io.Reader) error) *Server { +func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server { logger.Infof("starting TCP Graphite server at %q", addr) - lnTCP, err := netutil.NewTCPListener("graphite", addr, nil) + lnTCP, err := netutil.NewTCPListener("graphite", addr, useProxyProtocol, nil) if err != nil { logger.Fatalf("cannot start TCP Graphite server at %q: %s", addr, err) } diff --git a/lib/ingestserver/influx/server.go b/lib/ingestserver/influx/server.go index bb172c68e0..53cfe65925 100644 --- a/lib/ingestserver/influx/server.go +++ b/lib/ingestserver/influx/server.go @@ -37,10 +37,13 @@ type Server struct { // // The incoming connections are processed with insertHandler. // +// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol. +// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt +// // MustStop must be called on the returned server when it is no longer needed. -func MustStart(addr string, insertHandler func(r io.Reader) error) *Server { +func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server { logger.Infof("starting TCP InfluxDB server at %q", addr) - lnTCP, err := netutil.NewTCPListener("influx", addr, nil) + lnTCP, err := netutil.NewTCPListener("influx", addr, useProxyProtocol, nil) if err != nil { logger.Fatalf("cannot start TCP InfluxDB server at %q: %s", addr, err) } diff --git a/lib/ingestserver/opentsdb/server.go b/lib/ingestserver/opentsdb/server.go index 46518e3f99..f950763585 100644 --- a/lib/ingestserver/opentsdb/server.go +++ b/lib/ingestserver/opentsdb/server.go @@ -40,10 +40,13 @@ type Server struct { // MustStart starts OpenTSDB collector on the given addr. // +// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol. +// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt +// // MustStop must be called on the returned server when it is no longer needed. -func MustStart(addr string, telnetInsertHandler func(r io.Reader) error, httpInsertHandler func(req *http.Request) error) *Server { +func MustStart(addr string, useProxyProtocol bool, telnetInsertHandler func(r io.Reader) error, httpInsertHandler func(req *http.Request) error) *Server { logger.Infof("starting TCP OpenTSDB collector at %q", addr) - lnTCP, err := netutil.NewTCPListener("opentsdb", addr, nil) + lnTCP, err := netutil.NewTCPListener("opentsdb", addr, useProxyProtocol, nil) if err != nil { logger.Fatalf("cannot start TCP OpenTSDB collector at %q: %s", addr, err) } diff --git a/lib/ingestserver/opentsdbhttp/server.go b/lib/ingestserver/opentsdbhttp/server.go index 34d71ff63c..3e55ef4cfd 100644 --- a/lib/ingestserver/opentsdbhttp/server.go +++ b/lib/ingestserver/opentsdbhttp/server.go @@ -27,10 +27,13 @@ type Server struct { // MustStart starts HTTP OpenTSDB server on the given addr. // +// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol. +// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt +// // MustStop must be called on the returned server when it is no longer needed. -func MustStart(addr string, insertHandler func(r *http.Request) error) *Server { +func MustStart(addr string, useProxyProtocol bool, insertHandler func(r *http.Request) error) *Server { logger.Infof("starting HTTP OpenTSDB server at %q", addr) - lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr, nil) + lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr, useProxyProtocol, nil) if err != nil { logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err) } diff --git a/lib/netutil/proxyprotocol.go b/lib/netutil/proxyprotocol.go new file mode 100644 index 0000000000..d6ffc21797 --- /dev/null +++ b/lib/netutil/proxyprotocol.go @@ -0,0 +1,116 @@ +package netutil + +import ( + "encoding/binary" + "fmt" + "io" + "net" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +type proxyProtocolConn struct { + net.Conn + remoteAddr net.Addr +} + +func newProxyProtocolConn(c net.Conn) (net.Conn, error) { + remoteAddr, err := readProxyProto(c) + if err != nil { + return nil, fmt.Errorf("proxy protocol error: %w", err) + } + if remoteAddr == nil { + remoteAddr = c.RemoteAddr() + } + return &proxyProtocolConn{ + Conn: c, + remoteAddr: remoteAddr, + }, nil +} + +func (ppc *proxyProtocolConn) RemoteAddr() net.Addr { + return ppc.remoteAddr +} + +func readProxyProto(r io.Reader) (net.Addr, error) { + bb := bbPool.Get() + defer bbPool.Put(bb) + + // Read the first 16 bytes of proxy protocol header: + // - bytes 0-11: v2Identifier + // - byte 12: version and command + // - byte 13: family and protocol + // - bytes 14-15: payload length + // + // See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt + bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, 16) + if _, err := io.ReadFull(r, bb.B); err != nil { + return nil, fmt.Errorf("cannot read proxy protocol header: %w", err) + } + ident := bb.B[:12] + if string(ident) != v2Identifier { + return nil, fmt.Errorf("unexpected proxy protocol header: %q; want %q", ident, v2Identifier) + } + version := bb.B[12] >> 4 + command := bb.B[12] & 0x0f + family := bb.B[13] >> 4 + proto := bb.B[13] & 0x0f + if version != 2 { + return nil, fmt.Errorf("unsupported proxy protocol version, only v2 protocol version is supported, got: %d", version) + } + if proto != 1 { + // Only TCP is supported (aka STREAM). + return nil, fmt.Errorf("the proxy protocol implementation doesn't support proto %d; expecting 1", proto) + } + // The length of the remainder of the header including any TLVs in network byte order + // 0, 1, 2 + blockLen := int(binary.BigEndian.Uint16(bb.B[14:16])) + // in general RFC doesn't limit block length, but for sanity check lets limit it to 2kb + // in theory TLVs may occupy some space + if blockLen > 2048 { + return nil, fmt.Errorf("too big proxy protocol block length: %d; it mustn't exceed 2048 bytes", blockLen) + } + + // Read the protocol block itself + bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, blockLen) + if _, err := io.ReadFull(r, bb.B); err != nil { + return nil, fmt.Errorf("cannot read proxy protocol block with the lehgth %d bytes: %w", blockLen, err) + } + switch command { + case 0: + // Proxy LOCAL command. Ignore the protocol block. The real sender address should be used. + return nil, nil + case 1: + // Parse the protocol block according to the family. + switch family { + case 1: + // ipv4 (aka AF_INET) + if len(bb.B) < 12 { + return nil, fmt.Errorf("cannot ipv4 address from proxy protocol block with the length %d bytes; expected at least 12 bytes", len(bb.B)) + } + remoteAddr := &net.TCPAddr{ + IP: net.IPv4(bb.B[0], bb.B[1], bb.B[2], bb.B[3]), + Port: int(binary.BigEndian.Uint16(bb.B[8:10])), + } + return remoteAddr, nil + case 2: + // ipv6 (aka AF_INET6) + if len(bb.B) < 36 { + return nil, fmt.Errorf("cannot read ipv6 address from proxy protocol block with the length %d bytes; expected at least 36 bytes", len(bb.B)) + } + remoteAddr := &net.TCPAddr{ + IP: bb.B[0:16], + Port: int(binary.BigEndian.Uint16(bb.B[32:34])), + } + return remoteAddr, nil + default: + return nil, fmt.Errorf("the proxy protocol implementation doesn't support protocol family %d; supported values: 1, 2", family) + } + default: + return nil, fmt.Errorf("the proxy protocol implementation doesn't support command %d; suppoted values: 0, 1", command) + } +} + +const v2Identifier = "\r\n\r\n\x00\r\nQUIT\n" + +var bbPool bytesutil.ByteBufferPool diff --git a/lib/netutil/proxyprotocol_test.go b/lib/netutil/proxyprotocol_test.go new file mode 100644 index 0000000000..0e86b21cd9 --- /dev/null +++ b/lib/netutil/proxyprotocol_test.go @@ -0,0 +1,107 @@ +package netutil + +import ( + "bytes" + "io" + "net" + "reflect" + "testing" +) + +func TestParseProxyProtocolSuccess(t *testing.T) { + f := func(body, wantTail []byte, wantAddr net.Addr) { + t.Helper() + r := bytes.NewBuffer(body) + gotAddr, err := readProxyProto(r) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !reflect.DeepEqual(gotAddr, wantAddr) { + t.Fatalf("ip not match, got: %v, want: %v", gotAddr, wantAddr) + } + gotTail, err := io.ReadAll(r) + if err != nil { + t.Fatalf("cannot read tail: %s", err) + } + if !bytes.Equal(gotTail, wantTail) { + t.Fatalf("unexpected tail after parsing proxy protocol\ngot:\n%q\nwant:\n%q", gotTail, wantTail) + } + } + // LOCAL addr + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x20, 0x11, 0x00, 0x0C, + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}, nil, + nil) + // ipv4 + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C, + // ip data srcid,dstip,srcport,dstport + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}, nil, + &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 80}) + // ipv4 with payload + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C, + // ip data + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0, + // some payload + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0, + }, []byte{0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}, + &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 80}) + // ipv6 + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x24, + // src and dst ipv6 + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + // ports + 0, 80, 0, 0}, nil, + &net.TCPAddr{IP: net.ParseIP("::1"), Port: 80}) +} + +func TestParseProxyProtocolFail(t *testing.T) { + f := func(body []byte) { + t.Helper() + r := bytes.NewBuffer(body) + gotAddr, err := readProxyProto(r) + if err == nil { + t.Fatalf("expected error at input %v", body) + } + if gotAddr != nil { + t.Fatalf("expected ip to be nil, got: %v", gotAddr) + } + } + // too short protocol prefix + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A}) + // broken protocol prefix + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21}) + // invalid header + f([]byte{0x0D, 0x1A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C}) + // invalid version + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x31, 0x11, 0x00, 0x0C}) + // too long block + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0xff, 0x0C}) + // missing bytes in address + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C, + // ip data srcid,dstip,srcport + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80}) + // too short address length + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x08, + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0}) + // unsupported family + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x31, 0x00, 0x0C, + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}) + // unsupported command + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x22, 0x11, 0x00, 0x0C, + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}) + // mimatch ipv6 and ipv4 + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x0C, + // ip data srcid,dstip,srcport + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}) + // ipv4 udp isn't supported + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x12, 0x00, 0x0C, + // ip data srcid,dstip,srcport,dstport + 0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}) + // ipv6 udp isn't supported + f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x22, 0x00, 0x24, + // src and dst ipv6 + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + // ports + 0, 80, 0, 0}) +} diff --git a/lib/netutil/tcplistener.go b/lib/netutil/tcplistener.go index f179eac18c..0a0fb40fdc 100644 --- a/lib/netutil/tcplistener.go +++ b/lib/netutil/tcplistener.go @@ -16,8 +16,11 @@ var enableTCP6 = flag.Bool("enableTCP6", false, "Whether to enable IPv6 for list // NewTCPListener returns new TCP listener for the given addr and optional tlsConfig. // -// name is used for metrics registered in ms. Each listener in the program must have distinct name. -func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, error) { +// name is used for metrics. Each listener in the program must have a distinct name. +// +// If useProxyProtocol is set to true, then the returned listener accepts TCP connections via proxy protocol. +// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt +func NewTCPListener(name, addr string, useProxyProtocol bool, tlsConfig *tls.Config) (*TCPListener, error) { network := GetTCPNetwork() ln, err := net.Listen(network, addr) if err != nil { @@ -28,7 +31,8 @@ func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, err } ms := metrics.GetDefaultSet() tln := &TCPListener{ - Listener: ln, + Listener: ln, + useProxyProtocol: useProxyProtocol, accepts: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_accepts_total{name=%q, addr=%q}`, name, addr)), acceptErrors: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_errors_total{name=%q, addr=%q, type="accept"}`, name, addr)), @@ -69,6 +73,8 @@ type TCPListener struct { accepts *metrics.Counter acceptErrors *metrics.Counter + useProxyProtocol bool + connMetrics } @@ -87,6 +93,12 @@ func (ln *TCPListener) Accept() (net.Conn, error) { ln.acceptErrors.Inc() return nil, err } + if ln.useProxyProtocol { + conn, err = newProxyProtocolConn(conn) + if err != nil { + return nil, err + } + } ln.conns.Inc() sc := &statConn{ Conn: conn, diff --git a/lib/vmselectapi/server.go b/lib/vmselectapi/server.go index b12620be5a..20b30462b6 100644 --- a/lib/vmselectapi/server.go +++ b/lib/vmselectapi/server.go @@ -102,7 +102,7 @@ type Limits struct { // // If disableResponseCompression is set to true, then the returned server doesn't compress responses. func NewServer(addr string, api API, limits Limits, disableResponseCompression bool) (*Server, error) { - ln, err := netutil.NewTCPListener("vmselect", addr, nil) + ln, err := netutil.NewTCPListener("vmselect", addr, false, nil) if err != nil { return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", addr, err) }