lib/netutil: init implimentation of proxy protocol (#3687)

* lib/netutil: init implimentation of proxy protocol
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2023-01-27 08:08:35 +01:00 committed by Aliaksandr Valialkin
parent e8ee219fba
commit ebebaecd94
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
20 changed files with 336 additions and 50 deletions

View File

@ -44,16 +44,29 @@ import (
var (
httpListenAddr = flag.String("httpListenAddr", ":8429", "TCP address to listen for http connections. "+
"Set this flag to empty value in order to disable listening on any port. This mode may be useful for running multiple vmagent instances on the same server. "+
"Note that /targets and /metrics pages aren't available if -httpListenAddr=''")
"Note that /targets and /metrics pages aren't available if -httpListenAddr=''. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<vmagent>:8429/write")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty")
"This flag isn't needed when ingesting data over HTTP - just send it to http://<vmagent>:8429/write . "+
"See also -influxListenAddr.useProxyProtocol")
influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+
"See also -graphiteListenAddr.useProxyProtocol")
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
"Usually :4242 must be set. Doesn't work if empty. See also -opentsdbListenAddr.useProxyProtocol")
opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")
)
@ -104,26 +117,26 @@ func main() {
remotewrite.Init()
common.StartUnmarshalWorkers()
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error {
return influx.InsertHandlerForReader(r, false)
})
}
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler)
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
}
if len(*opentsdbListenAddr) > 0 {
httpInsertHandler := getOpenTSDBHTTPInsertHandler()
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, httpInsertHandler)
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, httpInsertHandler)
}
if len(*opentsdbHTTPListenAddr) > 0 {
httpInsertHandler := getOpenTSDBHTTPInsertHandler()
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, httpInsertHandler)
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, httpInsertHandler)
}
promscrape.Init(remotewrite.Push)
if len(*httpListenAddr) > 0 {
go httpserver.Serve(*httpListenAddr, requestHandler)
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
}
logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds())

View File

@ -49,7 +49,9 @@ absolute path to all .tpl files in root.`)
configCheckInterval = flag.Duration("configCheckInterval", 0, "Interval for checking for changes in '-rule' or '-notifier.config' files. "+
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes.")
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections")
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules")
validateTemplates = flag.Bool("rule.validateTemplates", true, "Whether to validate annotation and label templates")
@ -170,7 +172,7 @@ func main() {
go configReload(ctx, manager, groupsCfg, sighupCh)
rh := &requestHandler{m: manager}
go httpserver.Serve(*httpListenAddr, rh.handler)
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, rh.handler)
sig := procutil.WaitForSigterm()
logger.Infof("service received signal %s", sig)

View File

@ -22,7 +22,9 @@ import (
)
var (
httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections")
httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
maxIdleConnsPerBackend = flag.Int("maxIdleConnsPerBackend", 100, "The maximum number of idle connections vmauth can open per each backend host")
reloadAuthKey = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+
@ -41,7 +43,7 @@ func main() {
logger.Infof("starting vmauth at %q...", *httpListenAddr)
startTime := time.Now()
initAuthConfig()
go httpserver.Serve(*httpListenAddr, requestHandler)
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds())
sig := procutil.WaitForSigterm()

View File

@ -93,7 +93,7 @@ func main() {
logger.Fatalf("invalid -snapshotName=%q: %s", *snapshotName, err)
}
go httpserver.Serve(*httpListenAddr, nil)
go httpserver.Serve(*httpListenAddr, false, nil)
srcFS, err := newSrcFS()
if err != nil {

View File

@ -47,14 +47,28 @@ import (
var (
clusternativeListenAddr = flag.String("clusternativeListenAddr", "", "TCP address to listen for data from other vminsert nodes in multi-level cluster setup. "+
"See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup . Usually :8400 should be set to match default vmstorage port for vminsert. Disabled work if empty")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+
"See also -graphiteListenAddr.useProxyProtocol")
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write . "+
"See also -influxListenAddr.useProxyProtocol")
influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
httpListenAddr = flag.String("httpListenAddr", ":8480", "Address to listen for http connections")
"Usually :4242 must be set. Doesn't work if empty. "+
"See also -opentsdbListenAddr.useProxyProtocol")
opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
httpListenAddr = flag.String("httpListenAddr", ":8480", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superfluous labels are dropped. In this case the vm_metrics_with_dropped_labels_total metric at /metrics page is incremented")
maxLabelValueLen = flag.Int("maxLabelValueLen", 16*1024, "The maximum length of label values in the accepted time series. Longer label values are truncated. In this case the vm_too_long_label_values_total metric at /metrics page is incremented")
storageNodes = flagutil.NewArrayString("storageNode", "Comma-separated addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1,...,vmstorage-hostN . "+
@ -110,26 +124,26 @@ func main() {
})
}
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, func(r io.Reader) error {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, func(r io.Reader) error {
return graphite.InsertHandler(nil, r)
})
}
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error {
return influx.InsertHandlerForReader(nil, r)
})
}
if len(*opentsdbListenAddr) > 0 {
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error {
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, func(r io.Reader) error {
return opentsdb.InsertHandler(nil, r)
}, opentsdbhttp.InsertHandler)
}
if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler)
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler)
}
go func() {
httpserver.Serve(*httpListenAddr, requestHandler)
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
}()
sig := procutil.WaitForSigterm()

View File

@ -38,7 +38,7 @@ func main() {
logger.Init()
pushmetrics.Init()
go httpserver.Serve(*httpListenAddr, nil)
go httpserver.Serve(*httpListenAddr, false, nil)
srcFS, err := newSrcFS()
if err != nil {

View File

@ -38,7 +38,9 @@ import (
)
var (
httpListenAddr = flag.String("httpListenAddr", ":8481", "Address to listen for http connections")
httpListenAddr = flag.String("httpListenAddr", ":8481", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
cacheDataPath = flag.String("cacheDataPath", "", "Path to directory for cache files. Cache isn't saved if empty")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
@ -127,7 +129,7 @@ func main() {
}
go func() {
httpserver.Serve(*httpListenAddr, requestHandler)
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
}()
sig := procutil.WaitForSigterm()

View File

@ -25,8 +25,10 @@ import (
)
var (
retentionPeriod = flagutil.NewDuration("retentionPeriod", "1", "Data with timestamps outside the retentionPeriod is automatically deleted. See also -retentionFilter")
httpListenAddr = flag.String("httpListenAddr", ":8482", "Address to listen for http connections")
retentionPeriod = flagutil.NewDuration("retentionPeriod", "1", "Data with timestamps outside the retentionPeriod is automatically deleted. See also -retentionFilter")
httpListenAddr = flag.String("httpListenAddr", ":8482", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
storageDataPath = flag.String("storageDataPath", "vmstorage-data", "Path to storage data")
vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services")
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
@ -123,7 +125,7 @@ func main() {
requestHandler := newRequestHandler(strg)
go func() {
httpserver.Serve(*httpListenAddr, requestHandler)
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
}()
sig := procutil.WaitForSigterm()

View File

@ -40,7 +40,7 @@ type VMInsertServer struct {
// NewVMInsertServer starts VMInsertServer at the given addr serving the given storage.
func NewVMInsertServer(addr string, storage *storage.Storage) (*VMInsertServer, error) {
ln, err := netutil.NewTCPListener("vminsert", addr, nil)
ln, err := netutil.NewTCPListener("vminsert", addr, false, nil)
if err != nil {
return nil, fmt.Errorf("unable to listen vminsertAddr %s: %w", addr, err)
}

View File

@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when sending stale markers for targets, which expose big number of metrics. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675) issues.
* FEATURE: allow using VictoriaMetrics components behind proxies, which communicate with the backend via [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335). For example, [vmauth](https://docs.victoriametrics.com/vmauth.html) accepts proxy protocol connections when it starts with `-httpListenAddr.useProxyProtocol` command-line flag.
* FEATURE: add `-internStringMaxLen` command-line flag, which can be used for fine-tuning RAM vs CPU usage in certain workloads. For example, if the stored time series contain long labels, then it may be useful reducing the `-internStringMaxLen` in order to reduce memory usage at the cost of increased CPU usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692).
* BUGFIX: fix a bug, which could prevent background merges for the previous partitions until restart if the storage didn't have enough disk space for final deduplication and down-sampling.

View File

@ -79,7 +79,10 @@ type RequestHandler func(w http.ResponseWriter, r *http.Request) bool
// by calling DisableResponseCompression before writing the first byte to w.
//
// The compression is also disabled if -http.disableResponseCompression flag is set.
func Serve(addr string, rh RequestHandler) {
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
func Serve(addr string, useProxyProtocol bool, rh RequestHandler) {
if rh == nil {
rh = func(w http.ResponseWriter, r *http.Request) bool {
return false
@ -103,7 +106,7 @@ func Serve(addr string, rh RequestHandler) {
}
tlsConfig = tc
}
ln, err := netutil.NewTCPListener(scheme, addr, tlsConfig)
ln, err := netutil.NewTCPListener(scheme, addr, useProxyProtocol, tlsConfig)
if err != nil {
logger.Fatalf("cannot start http server at %s: %s", addr, err)
}

View File

@ -33,7 +33,7 @@ type Server struct {
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(c net.Conn) error) *Server {
logger.Infof("starting TCP clusternative server at %q", addr)
lnTCP, err := netutil.NewTCPListener("clusternative", addr, nil)
lnTCP, err := netutil.NewTCPListener("clusternative", addr, false, nil)
if err != nil {
logger.Fatalf("cannot start TCP clusternative server at %q: %s", addr, err)
}

View File

@ -37,10 +37,13 @@ type Server struct {
//
// The incoming connections are processed with insertHandler.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r io.Reader) error) *Server {
func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server {
logger.Infof("starting TCP Graphite server at %q", addr)
lnTCP, err := netutil.NewTCPListener("graphite", addr, nil)
lnTCP, err := netutil.NewTCPListener("graphite", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start TCP Graphite server at %q: %s", addr, err)
}

View File

@ -37,10 +37,13 @@ type Server struct {
//
// The incoming connections are processed with insertHandler.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r io.Reader) error) *Server {
func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server {
logger.Infof("starting TCP InfluxDB server at %q", addr)
lnTCP, err := netutil.NewTCPListener("influx", addr, nil)
lnTCP, err := netutil.NewTCPListener("influx", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start TCP InfluxDB server at %q: %s", addr, err)
}

View File

@ -40,10 +40,13 @@ type Server struct {
// MustStart starts OpenTSDB collector on the given addr.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, telnetInsertHandler func(r io.Reader) error, httpInsertHandler func(req *http.Request) error) *Server {
func MustStart(addr string, useProxyProtocol bool, telnetInsertHandler func(r io.Reader) error, httpInsertHandler func(req *http.Request) error) *Server {
logger.Infof("starting TCP OpenTSDB collector at %q", addr)
lnTCP, err := netutil.NewTCPListener("opentsdb", addr, nil)
lnTCP, err := netutil.NewTCPListener("opentsdb", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start TCP OpenTSDB collector at %q: %s", addr, err)
}

View File

@ -27,10 +27,13 @@ type Server struct {
// MustStart starts HTTP OpenTSDB server on the given addr.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r *http.Request) error) *Server {
func MustStart(addr string, useProxyProtocol bool, insertHandler func(r *http.Request) error) *Server {
logger.Infof("starting HTTP OpenTSDB server at %q", addr)
lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr, nil)
lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err)
}

View File

@ -0,0 +1,116 @@
package netutil
import (
"encoding/binary"
"fmt"
"io"
"net"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
type proxyProtocolConn struct {
net.Conn
remoteAddr net.Addr
}
func newProxyProtocolConn(c net.Conn) (net.Conn, error) {
remoteAddr, err := readProxyProto(c)
if err != nil {
return nil, fmt.Errorf("proxy protocol error: %w", err)
}
if remoteAddr == nil {
remoteAddr = c.RemoteAddr()
}
return &proxyProtocolConn{
Conn: c,
remoteAddr: remoteAddr,
}, nil
}
func (ppc *proxyProtocolConn) RemoteAddr() net.Addr {
return ppc.remoteAddr
}
func readProxyProto(r io.Reader) (net.Addr, error) {
bb := bbPool.Get()
defer bbPool.Put(bb)
// Read the first 16 bytes of proxy protocol header:
// - bytes 0-11: v2Identifier
// - byte 12: version and command
// - byte 13: family and protocol
// - bytes 14-15: payload length
//
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, 16)
if _, err := io.ReadFull(r, bb.B); err != nil {
return nil, fmt.Errorf("cannot read proxy protocol header: %w", err)
}
ident := bb.B[:12]
if string(ident) != v2Identifier {
return nil, fmt.Errorf("unexpected proxy protocol header: %q; want %q", ident, v2Identifier)
}
version := bb.B[12] >> 4
command := bb.B[12] & 0x0f
family := bb.B[13] >> 4
proto := bb.B[13] & 0x0f
if version != 2 {
return nil, fmt.Errorf("unsupported proxy protocol version, only v2 protocol version is supported, got: %d", version)
}
if proto != 1 {
// Only TCP is supported (aka STREAM).
return nil, fmt.Errorf("the proxy protocol implementation doesn't support proto %d; expecting 1", proto)
}
// The length of the remainder of the header including any TLVs in network byte order
// 0, 1, 2
blockLen := int(binary.BigEndian.Uint16(bb.B[14:16]))
// in general RFC doesn't limit block length, but for sanity check lets limit it to 2kb
// in theory TLVs may occupy some space
if blockLen > 2048 {
return nil, fmt.Errorf("too big proxy protocol block length: %d; it mustn't exceed 2048 bytes", blockLen)
}
// Read the protocol block itself
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, blockLen)
if _, err := io.ReadFull(r, bb.B); err != nil {
return nil, fmt.Errorf("cannot read proxy protocol block with the lehgth %d bytes: %w", blockLen, err)
}
switch command {
case 0:
// Proxy LOCAL command. Ignore the protocol block. The real sender address should be used.
return nil, nil
case 1:
// Parse the protocol block according to the family.
switch family {
case 1:
// ipv4 (aka AF_INET)
if len(bb.B) < 12 {
return nil, fmt.Errorf("cannot ipv4 address from proxy protocol block with the length %d bytes; expected at least 12 bytes", len(bb.B))
}
remoteAddr := &net.TCPAddr{
IP: net.IPv4(bb.B[0], bb.B[1], bb.B[2], bb.B[3]),
Port: int(binary.BigEndian.Uint16(bb.B[8:10])),
}
return remoteAddr, nil
case 2:
// ipv6 (aka AF_INET6)
if len(bb.B) < 36 {
return nil, fmt.Errorf("cannot read ipv6 address from proxy protocol block with the length %d bytes; expected at least 36 bytes", len(bb.B))
}
remoteAddr := &net.TCPAddr{
IP: bb.B[0:16],
Port: int(binary.BigEndian.Uint16(bb.B[32:34])),
}
return remoteAddr, nil
default:
return nil, fmt.Errorf("the proxy protocol implementation doesn't support protocol family %d; supported values: 1, 2", family)
}
default:
return nil, fmt.Errorf("the proxy protocol implementation doesn't support command %d; suppoted values: 0, 1", command)
}
}
const v2Identifier = "\r\n\r\n\x00\r\nQUIT\n"
var bbPool bytesutil.ByteBufferPool

View File

@ -0,0 +1,107 @@
package netutil
import (
"bytes"
"io"
"net"
"reflect"
"testing"
)
func TestParseProxyProtocolSuccess(t *testing.T) {
f := func(body, wantTail []byte, wantAddr net.Addr) {
t.Helper()
r := bytes.NewBuffer(body)
gotAddr, err := readProxyProto(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(gotAddr, wantAddr) {
t.Fatalf("ip not match, got: %v, want: %v", gotAddr, wantAddr)
}
gotTail, err := io.ReadAll(r)
if err != nil {
t.Fatalf("cannot read tail: %s", err)
}
if !bytes.Equal(gotTail, wantTail) {
t.Fatalf("unexpected tail after parsing proxy protocol\ngot:\n%q\nwant:\n%q", gotTail, wantTail)
}
}
// LOCAL addr
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x20, 0x11, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}, nil,
nil)
// ipv4
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C,
// ip data srcid,dstip,srcport,dstport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}, nil,
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 80})
// ipv4 with payload
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C,
// ip data
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0,
// some payload
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0,
}, []byte{0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0},
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 80})
// ipv6
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x24,
// src and dst ipv6
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
// ports
0, 80, 0, 0}, nil,
&net.TCPAddr{IP: net.ParseIP("::1"), Port: 80})
}
func TestParseProxyProtocolFail(t *testing.T) {
f := func(body []byte) {
t.Helper()
r := bytes.NewBuffer(body)
gotAddr, err := readProxyProto(r)
if err == nil {
t.Fatalf("expected error at input %v", body)
}
if gotAddr != nil {
t.Fatalf("expected ip to be nil, got: %v", gotAddr)
}
}
// too short protocol prefix
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A})
// broken protocol prefix
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21})
// invalid header
f([]byte{0x0D, 0x1A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C})
// invalid version
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x31, 0x11, 0x00, 0x0C})
// too long block
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0xff, 0x0C})
// missing bytes in address
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C,
// ip data srcid,dstip,srcport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80})
// too short address length
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x08,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0})
// unsupported family
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x31, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// unsupported command
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x22, 0x11, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// mimatch ipv6 and ipv4
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x0C,
// ip data srcid,dstip,srcport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// ipv4 udp isn't supported
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x12, 0x00, 0x0C,
// ip data srcid,dstip,srcport,dstport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// ipv6 udp isn't supported
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x22, 0x00, 0x24,
// src and dst ipv6
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
// ports
0, 80, 0, 0})
}

View File

@ -16,8 +16,11 @@ var enableTCP6 = flag.Bool("enableTCP6", false, "Whether to enable IPv6 for list
// NewTCPListener returns new TCP listener for the given addr and optional tlsConfig.
//
// name is used for metrics registered in ms. Each listener in the program must have distinct name.
func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, error) {
// name is used for metrics. Each listener in the program must have a distinct name.
//
// If useProxyProtocol is set to true, then the returned listener accepts TCP connections via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
func NewTCPListener(name, addr string, useProxyProtocol bool, tlsConfig *tls.Config) (*TCPListener, error) {
network := GetTCPNetwork()
ln, err := net.Listen(network, addr)
if err != nil {
@ -28,7 +31,8 @@ func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, err
}
ms := metrics.GetDefaultSet()
tln := &TCPListener{
Listener: ln,
Listener: ln,
useProxyProtocol: useProxyProtocol,
accepts: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_accepts_total{name=%q, addr=%q}`, name, addr)),
acceptErrors: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_errors_total{name=%q, addr=%q, type="accept"}`, name, addr)),
@ -69,6 +73,8 @@ type TCPListener struct {
accepts *metrics.Counter
acceptErrors *metrics.Counter
useProxyProtocol bool
connMetrics
}
@ -87,6 +93,12 @@ func (ln *TCPListener) Accept() (net.Conn, error) {
ln.acceptErrors.Inc()
return nil, err
}
if ln.useProxyProtocol {
conn, err = newProxyProtocolConn(conn)
if err != nil {
return nil, err
}
}
ln.conns.Inc()
sc := &statConn{
Conn: conn,

View File

@ -102,7 +102,7 @@ type Limits struct {
//
// If disableResponseCompression is set to true, then the returned server doesn't compress responses.
func NewServer(addr string, api API, limits Limits, disableResponseCompression bool) (*Server, error) {
ln, err := netutil.NewTCPListener("vmselect", addr, nil)
ln, err := netutil.NewTCPListener("vmselect", addr, false, nil)
if err != nil {
return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", addr, err)
}