diff --git a/app/vmagent/README.md b/app/vmagent/README.md index c7a8e36be0..e2307df656 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -193,9 +193,11 @@ VictoriaMetrics remote write protocol provides the following benefits comparing In this case `vmagent` buffers the incoming data to disk using the VictoriaMetrics remote write format. This reduces disk read/write IO and disk space usage by 2x-5x comparing to Prometheus remote write format. -`vmagent` automatically uses VictoriaMetrics remote write protocol when it sends data to VictoriaMetrics components such as other `vmagent` instances, +`vmagent` automatically switches to VictoriaMetrics remote write protocol when it sends data to VictoriaMetrics components such as other `vmagent` instances, [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) or `vminsert` at [cluster version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html). +It is possible to force switch to VictoriaMetrics remote write protocol by specifying `-remoteWrite.forceVMProto` +command-line flag for the corresponding `-remoteWrite.url`. `vmagent` automatically switches to Prometheus remote write protocol when it sends data to old versions of VictoriaMetrics components or to other Prometheus-compatible remote storage systems. It is possible to force switch to Prometheus remote write protocol @@ -1451,6 +1453,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -remoteWrite.forcePromProto array Whether to force Prometheus remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol Supports array of values separated by comma or specified via multiple flags. + -remoteWrite.forceVMProto array + Whether to force VictoriaMetrics remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol + Supports array of values separated by comma or specified via multiple flags. -remoteWrite.headers array Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2' Supports an array of values separated by comma or specified via multiple flags. diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 6993fd27f0..0ebff6203d 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -15,11 +15,17 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" ) var ( + forcePromProto = flagutil.NewArrayBool("remoteWrite.forcePromProto", "Whether to force Prometheus remote write protocol for sending data "+ + "to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol") + forceVMProto = flagutil.NewArrayBool("remoteWrite.forceVMProto", "Whether to force VictoriaMetrics remote write protocol for sending data "+ + "to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol") + rateLimit = flagutil.NewArrayInt("remoteWrite.rateLimit", "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. "+ "By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+ "is sent after temporary unavailability of the remote storage") @@ -67,11 +73,14 @@ var ( ) type client struct { - sanitizedURL string - remoteWriteURL string - isVMRemoteWrite bool - fq *persistentqueue.FastQueue - hc *http.Client + sanitizedURL string + remoteWriteURL string + + // Whether to use VictoriaMetrics remote write protocol for sending the data to remoteWriteURL + useVMProto bool + + fq *persistentqueue.FastQueue + hc *http.Client sendBlock func(block []byte) bool authCfg *promauth.Config @@ -93,7 +102,7 @@ type client struct { stopCh chan struct{} } -func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int, isVMRemoteWrite bool) *client { +func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client { authCfg, err := getAuthConfig(argIdx) if err != nil { logger.Panicf("FATAL: cannot initialize auth config for remoteWrite.url=%q: %s", remoteWriteURL, err) @@ -128,20 +137,38 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste Timeout: sendTimeout.GetOptionalArgOrDefault(argIdx, time.Minute), } c := &client{ - sanitizedURL: sanitizedURL, - remoteWriteURL: remoteWriteURL, - isVMRemoteWrite: isVMRemoteWrite, - authCfg: authCfg, - awsCfg: awsCfg, - fq: fq, - hc: hc, - stopCh: make(chan struct{}), + sanitizedURL: sanitizedURL, + remoteWriteURL: remoteWriteURL, + authCfg: authCfg, + awsCfg: awsCfg, + fq: fq, + hc: hc, + stopCh: make(chan struct{}), } c.sendBlock = c.sendBlockHTTP + return c } func (c *client) init(argIdx, concurrency int, sanitizedURL string) { + useVMProto := forceVMProto.GetOptionalArg(argIdx) + usePromProto := forcePromProto.GetOptionalArg(argIdx) + if useVMProto && usePromProto { + logger.Fatalf("-remoteWrite.useVMProto and -remoteWrite.usePromProto cannot be set simultaneously for -remoteWrite.url=%s", sanitizedURL) + } + if !useVMProto && !usePromProto { + // Auto-detect whether the remote storage supports VictoriaMetrics remote write protocol. + doRequest := func(url string) (*http.Response, error) { + return c.doRequest(url, nil) + } + useVMProto = common.HandleVMProtoClientHandshake(c.remoteWriteURL, doRequest) + if !useVMProto { + logger.Infof("the remote storage at %q doesn't support VictoriaMetrics remote write protocol. Switching to Prometheus remote write protocol. "+ + "See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol", sanitizedURL) + } + } + c.useVMProto = useVMProto + if bytesPerSec := rateLimit.GetOptionalArgOrDefault(argIdx, 0); bytesPerSec > 0 { logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL) c.rl.perSecondLimit = int64(bytesPerSec) @@ -294,6 +321,33 @@ func (c *client) runWorker() { } } +func (c *client) doRequest(url string, body []byte) (*http.Response, error) { + reqBody := bytes.NewBuffer(body) + req, err := http.NewRequest(http.MethodPost, url, reqBody) + if err != nil { + logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", url, err) + } + c.authCfg.SetHeaders(req, true) + h := req.Header + h.Set("User-Agent", "vmagent") + h.Set("Content-Type", "application/x-protobuf") + if c.useVMProto { + h.Set("Content-Encoding", "zstd") + h.Set("X-VictoriaMetrics-Remote-Write-Version", "1") + } else { + h.Set("Content-Encoding", "snappy") + h.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + } + if c.awsCfg != nil { + sigv4Hash := awsapi.HashHex(body) + if err := c.awsCfg.SignRequest(req, sigv4Hash); err != nil { + // there is no need in retry, request will be rejected by client.Do and retried by code below + logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err) + } + } + return c.hc.Do(req) +} + // sendBlockHTTP sends the given block to c.remoteWriteURL. // // The function returns false only if c.stopCh is closed. @@ -302,37 +356,10 @@ func (c *client) sendBlockHTTP(block []byte) bool { c.rl.register(len(block), c.stopCh) retryDuration := time.Second retriesCount := 0 - c.bytesSent.Add(len(block)) - c.blocksSent.Inc() - sigv4Hash := "" - if c.awsCfg != nil { - sigv4Hash = awsapi.HashHex(block) - } again: - req, err := http.NewRequest(http.MethodPost, c.remoteWriteURL, bytes.NewBuffer(block)) - if err != nil { - logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", c.sanitizedURL, err) - } - c.authCfg.SetHeaders(req, true) - h := req.Header - h.Set("User-Agent", "vmagent") - h.Set("Content-Type", "application/x-protobuf") - if c.isVMRemoteWrite { - h.Set("Content-Encoding", "zstd") - h.Set("X-VictoriaMetrics-Remote-Write-Version", "1") - } else { - h.Set("Content-Encoding", "snappy") - h.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - } - if c.awsCfg != nil { - if err := c.awsCfg.SignRequest(req, sigv4Hash); err != nil { - // there is no need in retry, request will be rejected by client.Do and retried by code below - logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err) - } - } startTime := time.Now() - resp, err := c.hc.Do(req) + resp, err := c.doRequest(c.remoteWriteURL, block) c.requestDuration.UpdateDuration(startTime) if err != nil { c.errorsCount.Inc() @@ -357,6 +384,8 @@ again: if statusCode/100 == 2 { _ = resp.Body.Close() c.requestsOKCount.Inc() + c.bytesSent.Add(len(block)) + c.blocksSent.Inc() return true } metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc() diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 9a9e4dfbc2..b27141769f 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -21,7 +21,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -35,8 +34,6 @@ var ( remoteWriteMultitenantURLs = flagutil.NewArrayString("remoteWrite.multitenantURL", "Base path for multitenant remote storage URL to write data to. "+ "See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://:8480 . "+ "Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url") - forcePromProto = flagutil.NewArrayBool("remoteWrite.forcePromProto", "Whether to force Prometheus remote write protocol for sending data "+ - "to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol") tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+ "See also -remoteWrite.maxDiskUsagePerURL") queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+ @@ -479,21 +476,10 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI return float64(fq.GetInmemoryQueueLen()) }) - // Auto-detect whether the remote storage supports VictoriaMetrics remote write protocol. - isVMRemoteWrite := false - usePromProto := forcePromProto.GetOptionalArg(argIdx) - if !usePromProto { - isVMRemoteWrite = common.HandleVMProtoClientHandshake(remoteWriteURL) - if !isVMRemoteWrite { - logger.Infof("the remote storage at %q doesn't support VictoriaMetrics remote write protocol. Switching to Prometheus remote write protocol. "+ - "See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol", sanitizedURL) - } - } - var c *client switch remoteWriteURL.Scheme { case "http", "https": - c = newHTTPClient(argIdx, remoteWriteURL.String(), sanitizedURL, fq, *queues, isVMRemoteWrite) + c = newHTTPClient(argIdx, remoteWriteURL.String(), sanitizedURL, fq, *queues) default: logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL) } @@ -510,7 +496,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI } pss := make([]*pendingSeries, pssLen) for i := range pss { - pss[i] = newPendingSeries(fq.MustWriteBlock, isVMRemoteWrite, sf, rd) + pss[i] = newPendingSeries(fq.MustWriteBlock, c.useVMProto, sf, rd) } rwctx := &remoteWriteCtx{ diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 8bd8be786d..feb35595a7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): use the provided `-remoteWrite.*` auth options when determining whether the remote storage supports [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). Previously the auth options were ignored. This was preventing from automatic switch to VictoriaMetrics remote write protocol. + ## [v1.88.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.88.0) Released at 2023-02-24 diff --git a/docs/vmagent.md b/docs/vmagent.md index d19aa0865c..8fb77eb11d 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -197,9 +197,11 @@ VictoriaMetrics remote write protocol provides the following benefits comparing In this case `vmagent` buffers the incoming data to disk using the VictoriaMetrics remote write format. This reduces disk read/write IO and disk space usage by 2x-5x comparing to Prometheus remote write format. -`vmagent` automatically uses VictoriaMetrics remote write protocol when it sends data to VictoriaMetrics components such as other `vmagent` instances, +`vmagent` automatically switches to VictoriaMetrics remote write protocol when it sends data to VictoriaMetrics components such as other `vmagent` instances, [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) or `vminsert` at [cluster version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html). +It is possible to force switch to VictoriaMetrics remote write protocol by specifying `-remoteWrite.forceVMProto` +command-line flag for the corresponding `-remoteWrite.url`. `vmagent` automatically switches to Prometheus remote write protocol when it sends data to old versions of VictoriaMetrics components or to other Prometheus-compatible remote storage systems. It is possible to force switch to Prometheus remote write protocol @@ -1455,6 +1457,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -remoteWrite.forcePromProto array Whether to force Prometheus remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol Supports array of values separated by comma or specified via multiple flags. + -remoteWrite.forceVMProto array + Whether to force VictoriaMetrics remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol + Supports array of values separated by comma or specified via multiple flags. -remoteWrite.headers array Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2' Supports an array of values separated by comma or specified via multiple flags. diff --git a/lib/protoparser/common/vmproto_handshake.go b/lib/protoparser/common/vmproto_handshake.go index 2b28872940..edc299dc1c 100644 --- a/lib/protoparser/common/vmproto_handshake.go +++ b/lib/protoparser/common/vmproto_handshake.go @@ -3,16 +3,20 @@ package common import ( "io" "net/http" - "net/url" "strconv" + "strings" ) -func HandleVMProtoClientHandshake(remoteWriteURL *url.URL) bool { - u := *remoteWriteURL - q := u.Query() - q.Set("get_vm_proto_version", "1") - u.RawQuery = q.Encode() - resp, err := http.Get(u.String()) +// HandleVMProtoClientHashake returns true if the server at remoteWriteURL supports VictoriaMetrics remote write protocol. +func HandleVMProtoClientHandshake(remoteWriteURL string, doRequest func(handshakeURL string) (*http.Response, error)) bool { + u := remoteWriteURL + if strings.Contains(u, "?") { + u += "&" + } else { + u += "?" + } + u += "get_vm_proto_version=1" + resp, err := doRequest(u) if err != nil { return false }