2020-02-23 12:35:47 +01:00
package remotewrite
import (
2020-07-20 18:27:25 +02:00
"bytes"
2023-08-24 00:08:04 +02:00
"errors"
2020-02-23 12:35:47 +01:00
"fmt"
2022-08-21 23:13:44 +02:00
"io"
2020-07-20 18:27:25 +02:00
"net/http"
"net/url"
2020-02-23 12:35:47 +01:00
"strings"
"sync"
"time"
2022-05-04 19:24:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
2020-05-06 15:51:32 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
2020-05-12 16:20:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2023-02-26 21:07:30 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2021-01-26 23:23:10 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/metrics"
)
var (
2023-02-26 21:07:30 +01:00
forcePromProto = flagutil . NewArrayBool ( "remoteWrite.forcePromProto" , "Whether to force Prometheus remote write protocol for sending data " +
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol" )
forceVMProto = flagutil . NewArrayBool ( "remoteWrite.forceVMProto" , "Whether to force VictoriaMetrics remote write protocol for sending data " +
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol" )
2023-08-12 13:17:55 +02:00
rateLimit = flagutil . NewArrayInt ( "remoteWrite.rateLimit" , 0 , "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. " +
2023-05-10 09:50:41 +02:00
"By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data " +
2021-01-26 23:19:35 +01:00
"is sent after temporary unavailability of the remote storage" )
2023-08-12 13:17:55 +02:00
sendTimeout = flagutil . NewArrayDuration ( "remoteWrite.sendTimeout" , time . Minute , "Timeout for sending a single block of data to the corresponding -remoteWrite.url" )
proxyURL = flagutil . NewArrayString ( "remoteWrite.proxyURL" , "Optional proxy URL for writing data to the corresponding -remoteWrite.url. " +
2022-06-30 19:15:56 +02:00
"Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234" )
2020-02-23 12:35:47 +01:00
2022-06-30 19:15:56 +02:00
tlsInsecureSkipVerify = flagutil . NewArrayBool ( "remoteWrite.tlsInsecureSkipVerify" , "Whether to skip tls verification when connecting to the corresponding -remoteWrite.url" )
2022-10-01 17:26:05 +02:00
tlsCertFile = flagutil . NewArrayString ( "remoteWrite.tlsCertFile" , "Optional path to client-side TLS certificate file to use when connecting " +
2022-06-30 19:15:56 +02:00
"to the corresponding -remoteWrite.url" )
2022-10-01 17:26:05 +02:00
tlsKeyFile = flagutil . NewArrayString ( "remoteWrite.tlsKeyFile" , "Optional path to client-side TLS certificate key to use when connecting to the corresponding -remoteWrite.url" )
tlsCAFile = flagutil . NewArrayString ( "remoteWrite.tlsCAFile" , "Optional path to TLS CA file to use for verifying connections to the corresponding -remoteWrite.url. " +
2023-05-10 09:50:41 +02:00
"By default, system CA is used" )
2022-10-01 17:26:05 +02:00
tlsServerName = flagutil . NewArrayString ( "remoteWrite.tlsServerName" , "Optional TLS server name to use for connections to the corresponding -remoteWrite.url. " +
2023-05-10 09:50:41 +02:00
"By default, the server name from -remoteWrite.url is used" )
2020-02-23 12:35:47 +01:00
2022-10-01 17:26:05 +02:00
headers = flagutil . NewArrayString ( "remoteWrite.headers" , "Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. " +
2022-06-30 19:15:56 +02:00
"For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. " +
2022-06-30 19:00:03 +02:00
"Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'" )
2022-10-01 17:26:05 +02:00
basicAuthUsername = flagutil . NewArrayString ( "remoteWrite.basicAuth.username" , "Optional basic auth username to use for the corresponding -remoteWrite.url" )
basicAuthPassword = flagutil . NewArrayString ( "remoteWrite.basicAuth.password" , "Optional basic auth password to use for the corresponding -remoteWrite.url" )
basicAuthPasswordFile = flagutil . NewArrayString ( "remoteWrite.basicAuth.passwordFile" , "Optional path to basic auth password to use for the corresponding -remoteWrite.url. " +
2022-06-30 19:15:56 +02:00
"The file is re-read every second" )
2022-10-01 17:26:05 +02:00
bearerToken = flagutil . NewArrayString ( "remoteWrite.bearerToken" , "Optional bearer auth token to use for the corresponding -remoteWrite.url" )
bearerTokenFile = flagutil . NewArrayString ( "remoteWrite.bearerTokenFile" , "Optional path to bearer token file to use for the corresponding -remoteWrite.url. " +
2022-06-30 19:15:56 +02:00
"The token is re-read from the file every second" )
2021-05-22 15:20:18 +02:00
2022-10-01 17:26:05 +02:00
oauth2ClientID = flagutil . NewArrayString ( "remoteWrite.oauth2.clientID" , "Optional OAuth2 clientID to use for the corresponding -remoteWrite.url" )
oauth2ClientSecret = flagutil . NewArrayString ( "remoteWrite.oauth2.clientSecret" , "Optional OAuth2 clientSecret to use for the corresponding -remoteWrite.url" )
oauth2ClientSecretFile = flagutil . NewArrayString ( "remoteWrite.oauth2.clientSecretFile" , "Optional OAuth2 clientSecretFile to use for the corresponding -remoteWrite.url" )
2023-12-20 20:35:16 +01:00
oauth2EndpointParams = flagutil . NewArrayString ( "remoteWrite.oauth2.endpointParams" , "Optional OAuth2 endpoint parameters to use for the corresponding -remoteWrite.url . " +
` The endpoint parameters must be set in JSON format: { "param1":"value1",...,"paramN":"valueN"} ` )
oauth2TokenURL = flagutil . NewArrayString ( "remoteWrite.oauth2.tokenUrl" , "Optional OAuth2 tokenURL to use for the corresponding -remoteWrite.url" )
oauth2Scopes = flagutil . NewArrayString ( "remoteWrite.oauth2.scopes" , "Optional OAuth2 scopes to use for the corresponding -remoteWrite.url. Scopes must be delimited by ';'" )
2022-05-04 19:24:19 +02:00
2022-06-30 19:15:56 +02:00
awsUseSigv4 = flagutil . NewArrayBool ( "remoteWrite.aws.useSigv4" , "Enables SigV4 request signing for the corresponding -remoteWrite.url. " +
"It is expected that other -remoteWrite.aws.* command-line flags are set if sigv4 request signing is enabled" )
2022-10-01 17:26:05 +02:00
awsEC2Endpoint = flagutil . NewArrayString ( "remoteWrite.aws.ec2Endpoint" , "Optional AWS EC2 API endpoint to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsSTSEndpoint = flagutil . NewArrayString ( "remoteWrite.aws.stsEndpoint" , "Optional AWS STS API endpoint to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsRegion = flagutil . NewArrayString ( "remoteWrite.aws.region" , "Optional AWS region to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsRoleARN = flagutil . NewArrayString ( "remoteWrite.aws.roleARN" , "Optional AWS roleARN to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsAccessKey = flagutil . NewArrayString ( "remoteWrite.aws.accessKey" , "Optional AWS AccessKey to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsService = flagutil . NewArrayString ( "remoteWrite.aws.service" , "Optional AWS Service to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set. " +
2022-06-30 19:15:56 +02:00
"Defaults to \"aps\"" )
2022-10-01 17:26:05 +02:00
awsSecretKey = flagutil . NewArrayString ( "remoteWrite.aws.secretKey" , "Optional AWS SecretKey to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
2020-02-23 12:35:47 +01:00
)
type client struct {
2023-02-26 21:07:30 +01:00
sanitizedURL string
remoteWriteURL string
// Whether to use VictoriaMetrics remote write protocol for sending the data to remoteWriteURL
useVMProto bool
fq * persistentqueue . FastQueue
hc * http . Client
2020-02-23 12:35:47 +01:00
2021-09-28 23:52:07 +02:00
sendBlock func ( block [ ] byte ) bool
authCfg * promauth . Config
2022-05-04 19:24:19 +02:00
awsCfg * awsapi . Config
2021-05-22 15:20:18 +02:00
2021-01-26 23:19:35 +01:00
rl rateLimiter
2020-12-15 19:39:12 +01:00
bytesSent * metrics . Counter
blocksSent * metrics . Counter
2020-02-23 12:35:47 +01:00
requestDuration * metrics . Histogram
requestsOKCount * metrics . Counter
errorsCount * metrics . Counter
2020-11-01 23:43:51 +01:00
packetsDropped * metrics . Counter
2022-05-02 21:20:05 +02:00
rateLimit * metrics . Gauge
2020-02-23 12:35:47 +01:00
retriesCount * metrics . Counter
2021-08-15 12:32:40 +02:00
sendDuration * metrics . FloatCounter
2020-02-23 12:35:47 +01:00
wg sync . WaitGroup
stopCh chan struct { }
}
2023-02-26 21:07:30 +01:00
func newHTTPClient ( argIdx int , remoteWriteURL , sanitizedURL string , fq * persistentqueue . FastQueue , concurrency int ) * client {
2021-05-22 16:59:23 +02:00
authCfg , err := getAuthConfig ( argIdx )
2020-07-20 18:27:25 +02:00
if err != nil {
2023-10-25 23:19:33 +02:00
logger . Fatalf ( "cannot initialize auth config for -remoteWrite.url=%q: %s" , remoteWriteURL , err )
}
tlsCfg , err := authCfg . NewTLSConfig ( )
if err != nil {
logger . Fatalf ( "cannot initialize tls config for -remoteWrite.url=%q: %s" , remoteWriteURL , err )
2020-07-20 18:27:25 +02:00
}
2022-05-04 19:24:19 +02:00
awsCfg , err := getAWSAPIConfig ( argIdx )
if err != nil {
2023-10-25 23:19:33 +02:00
logger . Fatalf ( "cannot initialize AWS Config for -remoteWrite.url=%q: %s" , remoteWriteURL , err )
2022-05-04 19:24:19 +02:00
}
2020-07-20 18:27:25 +02:00
tr := & http . Transport {
2022-04-06 11:28:54 +02:00
DialContext : statDial ,
2020-07-20 18:27:25 +02:00
TLSClientConfig : tlsCfg ,
2022-04-06 11:28:54 +02:00
TLSHandshakeTimeout : 10 * time . Second ,
2020-07-20 18:27:25 +02:00
MaxConnsPerHost : 2 * concurrency ,
2020-08-04 19:59:55 +02:00
MaxIdleConnsPerHost : 2 * concurrency ,
IdleConnTimeout : time . Minute ,
WriteBufferSize : 64 * 1024 ,
2020-07-20 18:27:25 +02:00
}
pURL := proxyURL . GetOptionalArg ( argIdx )
if len ( pURL ) > 0 {
if ! strings . Contains ( pURL , "://" ) {
logger . Fatalf ( "cannot parse -remoteWrite.proxyURL=%q: it must start with `http://`, `https://` or `socks5://`" , pURL )
}
2021-10-26 20:21:08 +02:00
pu , err := url . Parse ( pURL )
2020-07-20 18:27:25 +02:00
if err != nil {
logger . Fatalf ( "cannot parse -remoteWrite.proxyURL=%q: %s" , pURL , err )
}
2021-10-26 20:21:08 +02:00
tr . Proxy = http . ProxyURL ( pu )
2020-07-20 18:27:25 +02:00
}
2023-02-24 02:36:52 +01:00
hc := & http . Client {
Transport : tr ,
2023-08-12 13:17:55 +02:00
Timeout : sendTimeout . GetOptionalArg ( argIdx ) ,
2023-02-24 02:36:52 +01:00
}
2020-02-23 12:35:47 +01:00
c := & client {
2023-02-26 21:07:30 +01:00
sanitizedURL : sanitizedURL ,
remoteWriteURL : remoteWriteURL ,
authCfg : authCfg ,
awsCfg : awsCfg ,
fq : fq ,
hc : hc ,
stopCh : make ( chan struct { } ) ,
2020-02-23 12:35:47 +01:00
}
2021-09-28 23:52:07 +02:00
c . sendBlock = c . sendBlockHTTP
2023-02-26 21:07:30 +01:00
useVMProto := forceVMProto . GetOptionalArg ( argIdx )
usePromProto := forcePromProto . GetOptionalArg ( argIdx )
if useVMProto && usePromProto {
logger . Fatalf ( "-remoteWrite.useVMProto and -remoteWrite.usePromProto cannot be set simultaneously for -remoteWrite.url=%s" , sanitizedURL )
}
app/vmagent/remotewrite: follow-up for e3a756d82869f8c357b072f6e635ebfc7d65dd2c
- Document the fix
- Move the detection of VictoriaMetrics remoteWrite protocol from client.init() to newHTTPClient()
This simplifies the fix to the following diff:
diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go
index 099899c19..70b904af4 100644
--- a/app/vmagent/remotewrite/client.go
+++ b/app/vmagent/remotewrite/client.go
@@ -151,10 +151,6 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
c.sendBlock = c.sendBlockHTTP
- return c
-}
-
-func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
useVMProto := forceVMProto.GetOptionalArg(argIdx)
usePromProto := forcePromProto.GetOptionalArg(argIdx)
if useVMProto && usePromProto {
@@ -173,6 +169,10 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
}
c.useVMProto = useVMProto
+ return c
+}
+
+func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
2023-03-08 08:50:06 +01:00
if ! useVMProto && ! usePromProto {
2023-02-26 21:07:30 +01:00
// Auto-detect whether the remote storage supports VictoriaMetrics remote write protocol.
doRequest := func ( url string ) ( * http . Response , error ) {
return c . doRequest ( url , nil )
}
useVMProto = common . HandleVMProtoClientHandshake ( c . remoteWriteURL , doRequest )
if ! useVMProto {
logger . Infof ( "the remote storage at %q doesn't support VictoriaMetrics remote write protocol. Switching to Prometheus remote write protocol. " +
"See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol" , sanitizedURL )
}
}
app/vmagent/remotewrite: follow-up for e3a756d82869f8c357b072f6e635ebfc7d65dd2c
- Document the fix
- Move the detection of VictoriaMetrics remoteWrite protocol from client.init() to newHTTPClient()
This simplifies the fix to the following diff:
diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go
index 099899c19..70b904af4 100644
--- a/app/vmagent/remotewrite/client.go
+++ b/app/vmagent/remotewrite/client.go
@@ -151,10 +151,6 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
c.sendBlock = c.sendBlockHTTP
- return c
-}
-
-func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
useVMProto := forceVMProto.GetOptionalArg(argIdx)
usePromProto := forcePromProto.GetOptionalArg(argIdx)
if useVMProto && usePromProto {
@@ -173,6 +169,10 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
}
c.useVMProto = useVMProto
+ return c
+}
+
+func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
2023-03-08 08:50:06 +01:00
c . useVMProto = useVMProto
2023-02-26 21:07:30 +01:00
app/vmagent/remotewrite: follow-up for e3a756d82869f8c357b072f6e635ebfc7d65dd2c
- Document the fix
- Move the detection of VictoriaMetrics remoteWrite protocol from client.init() to newHTTPClient()
This simplifies the fix to the following diff:
diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go
index 099899c19..70b904af4 100644
--- a/app/vmagent/remotewrite/client.go
+++ b/app/vmagent/remotewrite/client.go
@@ -151,10 +151,6 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
c.sendBlock = c.sendBlockHTTP
- return c
-}
-
-func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
useVMProto := forceVMProto.GetOptionalArg(argIdx)
usePromProto := forcePromProto.GetOptionalArg(argIdx)
if useVMProto && usePromProto {
@@ -173,6 +169,10 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
}
c.useVMProto = useVMProto
+ return c
+}
+
+func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
2023-03-08 08:50:06 +01:00
return c
}
func ( c * client ) init ( argIdx , concurrency int , sanitizedURL string ) {
2023-08-12 13:17:55 +02:00
if bytesPerSec := rateLimit . GetOptionalArg ( argIdx ) ; bytesPerSec > 0 {
2021-02-01 13:27:05 +01:00
logger . Infof ( "applying %d bytes per second rate limit for -remoteWrite.url=%q" , bytesPerSec , sanitizedURL )
c . rl . perSecondLimit = int64 ( bytesPerSec )
2021-01-26 23:19:35 +01:00
}
2022-05-06 14:01:52 +02:00
c . rl . limitReached = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_rate_limit_reached_total { url=%q} ` , c . sanitizedURL ) )
2021-01-26 23:19:35 +01:00
2020-12-15 19:39:12 +01:00
c . bytesSent = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_bytes_sent_total { url=%q} ` , c . sanitizedURL ) )
c . blocksSent = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_blocks_sent_total { url=%q} ` , c . sanitizedURL ) )
2022-05-02 21:20:05 +02:00
c . rateLimit = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_rate_limit { url=%q} ` , c . sanitizedURL ) , func ( ) float64 {
2023-08-12 13:17:55 +02:00
return float64 ( rateLimit . GetOptionalArg ( argIdx ) )
2022-05-02 21:20:05 +02:00
} )
2020-09-16 21:34:01 +02:00
c . requestDuration = metrics . GetOrCreateHistogram ( fmt . Sprintf ( ` vmagent_remotewrite_duration_seconds { url=%q} ` , c . sanitizedURL ) )
c . requestsOKCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_requests_total { url=%q, status_code="2XX"} ` , c . sanitizedURL ) )
c . errorsCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_errors_total { url=%q} ` , c . sanitizedURL ) )
2020-11-01 23:43:51 +01:00
c . packetsDropped = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_packets_dropped_total { url=%q} ` , c . sanitizedURL ) )
2020-09-16 21:34:01 +02:00
c . retriesCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_retries_count_total { url=%q} ` , c . sanitizedURL ) )
2021-08-15 12:32:40 +02:00
c . sendDuration = metrics . GetOrCreateFloatCounter ( fmt . Sprintf ( ` vmagent_remotewrite_send_duration_seconds_total { url=%q} ` , c . sanitizedURL ) )
2022-07-18 13:31:35 +02:00
metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_queues { url=%q} ` , c . sanitizedURL ) , func ( ) float64 {
return float64 ( * queues )
} )
2020-03-03 12:08:17 +01:00
for i := 0 ; i < concurrency ; i ++ {
2020-02-23 12:35:47 +01:00
c . wg . Add ( 1 )
go func ( ) {
defer c . wg . Done ( )
c . runWorker ( )
} ( )
}
2020-09-16 21:34:01 +02:00
logger . Infof ( "initialized client for -remoteWrite.url=%q" , c . sanitizedURL )
2020-02-23 12:35:47 +01:00
}
func ( c * client ) MustStop ( ) {
close ( c . stopCh )
c . wg . Wait ( )
2020-09-16 21:34:01 +02:00
logger . Infof ( "stopped client for -remoteWrite.url=%q" , c . sanitizedURL )
2020-02-23 12:35:47 +01:00
}
2021-05-22 16:59:23 +02:00
func getAuthConfig ( argIdx int ) ( * promauth . Config , error ) {
2022-06-30 19:00:03 +02:00
headersValue := headers . GetOptionalArg ( argIdx )
2022-06-30 19:17:30 +02:00
var hdrs [ ] string
2022-06-30 19:00:03 +02:00
if headersValue != "" {
2022-06-30 19:17:30 +02:00
hdrs = strings . Split ( headersValue , "^^" )
2022-06-30 19:00:03 +02:00
}
2021-05-22 16:59:23 +02:00
username := basicAuthUsername . GetOptionalArg ( argIdx )
password := basicAuthPassword . GetOptionalArg ( argIdx )
passwordFile := basicAuthPasswordFile . GetOptionalArg ( argIdx )
var basicAuthCfg * promauth . BasicAuthConfig
if username != "" || password != "" || passwordFile != "" {
basicAuthCfg = & promauth . BasicAuthConfig {
Username : username ,
2021-11-05 13:41:14 +01:00
Password : promauth . NewSecret ( password ) ,
2021-05-22 16:59:23 +02:00
PasswordFile : passwordFile ,
}
}
token := bearerToken . GetOptionalArg ( argIdx )
tokenFile := bearerTokenFile . GetOptionalArg ( argIdx )
var oauth2Cfg * promauth . OAuth2Config
clientSecret := oauth2ClientSecret . GetOptionalArg ( argIdx )
clientSecretFile := oauth2ClientSecretFile . GetOptionalArg ( argIdx )
if clientSecretFile != "" || clientSecret != "" {
2023-12-20 20:35:16 +01:00
endpointParamsJSON := oauth2EndpointParams . GetOptionalArg ( argIdx )
endpointParams , err := flagutil . ParseJSONMap ( endpointParamsJSON )
if err != nil {
return nil , fmt . Errorf ( "cannot parse JSON for -remoteWrite.oauth2.endpointParams=%s: %w" , endpointParamsJSON , err )
}
2021-05-22 16:59:23 +02:00
oauth2Cfg = & promauth . OAuth2Config {
ClientID : oauth2ClientID . GetOptionalArg ( argIdx ) ,
2021-11-05 13:41:14 +01:00
ClientSecret : promauth . NewSecret ( clientSecret ) ,
2021-05-22 16:59:23 +02:00
ClientSecretFile : clientSecretFile ,
2023-12-20 20:35:16 +01:00
EndpointParams : endpointParams ,
2021-05-22 16:59:23 +02:00
TokenURL : oauth2TokenURL . GetOptionalArg ( argIdx ) ,
Scopes : strings . Split ( oauth2Scopes . GetOptionalArg ( argIdx ) , ";" ) ,
}
}
tlsCfg := & promauth . TLSConfig {
2020-05-12 16:20:55 +02:00
CAFile : tlsCAFile . GetOptionalArg ( argIdx ) ,
CertFile : tlsCertFile . GetOptionalArg ( argIdx ) ,
KeyFile : tlsKeyFile . GetOptionalArg ( argIdx ) ,
ServerName : tlsServerName . GetOptionalArg ( argIdx ) ,
2020-12-15 11:51:12 +01:00
InsecureSkipVerify : tlsInsecureSkipVerify . GetOptionalArg ( argIdx ) ,
2020-02-23 12:35:47 +01:00
}
2021-05-22 15:20:18 +02:00
2022-07-04 13:27:48 +02:00
opts := & promauth . Options {
BasicAuth : basicAuthCfg ,
BearerToken : token ,
BearerTokenFile : tokenFile ,
OAuth2 : oauth2Cfg ,
TLSConfig : tlsCfg ,
Headers : hdrs ,
}
authCfg , err := opts . NewConfig ( )
2021-05-22 15:20:18 +02:00
if err != nil {
2023-07-03 13:12:40 +02:00
return nil , fmt . Errorf ( "cannot populate auth config for remoteWrite idx: %d, err: %w" , argIdx , err )
2021-05-22 15:20:18 +02:00
}
return authCfg , nil
}
2022-05-04 19:24:19 +02:00
func getAWSAPIConfig ( argIdx int ) ( * awsapi . Config , error ) {
2022-05-04 19:39:38 +02:00
if ! awsUseSigv4 . GetOptionalArg ( argIdx ) {
2022-05-04 19:24:19 +02:00
return nil , nil
}
2022-08-05 17:50:00 +02:00
ec2Endpoint := awsEC2Endpoint . GetOptionalArg ( argIdx )
stsEndpoint := awsSTSEndpoint . GetOptionalArg ( argIdx )
2022-05-04 19:24:19 +02:00
region := awsRegion . GetOptionalArg ( argIdx )
roleARN := awsRoleARN . GetOptionalArg ( argIdx )
accessKey := awsAccessKey . GetOptionalArg ( argIdx )
secretKey := awsSecretKey . GetOptionalArg ( argIdx )
2022-05-18 14:58:31 +02:00
service := awsService . GetOptionalArg ( argIdx )
2022-08-05 17:50:00 +02:00
cfg , err := awsapi . NewConfig ( ec2Endpoint , stsEndpoint , region , roleARN , accessKey , secretKey , service )
2022-05-04 19:24:19 +02:00
if err != nil {
return nil , err
}
return cfg , nil
}
2020-02-23 12:35:47 +01:00
func ( c * client ) runWorker ( ) {
var ok bool
var block [ ] byte
2021-02-17 20:42:45 +01:00
ch := make ( chan bool , 1 )
2020-02-23 12:35:47 +01:00
for {
block , ok = c . fq . MustReadBlock ( block [ : 0 ] )
if ! ok {
return
}
go func ( ) {
2021-08-15 12:32:40 +02:00
startTime := time . Now ( )
2021-02-17 20:42:45 +01:00
ch <- c . sendBlock ( block )
2021-08-15 12:32:40 +02:00
c . sendDuration . Add ( time . Since ( startTime ) . Seconds ( ) )
2020-02-23 12:35:47 +01:00
} ( )
select {
2021-02-17 20:42:45 +01:00
case ok := <- ch :
if ok {
// The block has been sent successfully
continue
}
// Return unsent block to the queue.
2023-11-24 13:42:11 +01:00
c . fq . MustWriteBlockIgnoreDisabledPQ ( block )
2021-02-17 20:42:45 +01:00
return
2020-02-23 12:35:47 +01:00
case <- c . stopCh :
// c must be stopped. Wait for a while in the hope the block will be sent.
graceDuration := 5 * time . Second
select {
2021-02-17 20:42:45 +01:00
case ok := <- ch :
if ! ok {
// Return unsent block to the queue.
2023-11-24 13:42:11 +01:00
c . fq . MustWriteBlockIgnoreDisabledPQ ( block )
2021-02-17 20:42:45 +01:00
}
2020-02-23 12:35:47 +01:00
case <- time . After ( graceDuration ) :
2021-02-17 20:42:45 +01:00
// Return unsent block to the queue.
2023-11-24 13:42:11 +01:00
c . fq . MustWriteBlockIgnoreDisabledPQ ( block )
2020-02-23 12:35:47 +01:00
}
return
}
}
}
2023-02-26 21:07:30 +01:00
func ( c * client ) doRequest ( url string , body [ ] byte ) ( * http . Response , error ) {
2023-10-17 11:58:19 +02:00
req , err := c . newRequest ( url , body )
if err != nil {
return nil , err
}
2023-08-24 00:08:04 +02:00
resp , err := c . hc . Do ( req )
2023-10-25 23:19:33 +02:00
if err == nil {
return resp , nil
}
if ! errors . Is ( err , io . EOF ) && ! errors . Is ( err , io . ErrUnexpectedEOF ) {
return nil , err
}
// It is likely connection became stale or timed out during the first request.
// Make another attempt in hope request will succeed.
// If not, the error should be handled by the caller as usual.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139
req , err = c . newRequest ( url , body )
if err != nil {
return nil , fmt . Errorf ( "second attempt: %w" , err )
}
resp , err = c . hc . Do ( req )
if err != nil {
return nil , fmt . Errorf ( "second attempt: %w" , err )
2023-08-24 00:08:04 +02:00
}
2023-10-25 23:19:33 +02:00
return resp , nil
2023-08-24 00:08:04 +02:00
}
2023-10-17 11:58:19 +02:00
func ( c * client ) newRequest ( url string , body [ ] byte ) ( * http . Request , error ) {
2023-02-26 21:07:30 +01:00
reqBody := bytes . NewBuffer ( body )
req , err := http . NewRequest ( http . MethodPost , url , reqBody )
2020-07-20 18:27:25 +02:00
if err != nil {
2023-02-26 21:07:30 +01:00
logger . Panicf ( "BUG: unexpected error from http.NewRequest(%q): %s" , url , err )
2020-07-20 18:27:25 +02:00
}
2023-10-17 11:58:19 +02:00
err = c . authCfg . SetHeaders ( req , true )
if err != nil {
return nil , err
}
2020-07-20 18:27:25 +02:00
h := req . Header
h . Set ( "User-Agent" , "vmagent" )
h . Set ( "Content-Type" , "application/x-protobuf" )
2023-02-26 21:07:30 +01:00
if c . useVMProto {
2023-02-21 03:38:49 +01:00
h . Set ( "Content-Encoding" , "zstd" )
h . Set ( "X-VictoriaMetrics-Remote-Write-Version" , "1" )
} else {
h . Set ( "Content-Encoding" , "snappy" )
h . Set ( "X-Prometheus-Remote-Write-Version" , "0.1.0" )
}
2022-05-04 19:24:19 +02:00
if c . awsCfg != nil {
2023-02-26 21:07:30 +01:00
sigv4Hash := awsapi . HashHex ( body )
2022-05-18 14:58:31 +02:00
if err := c . awsCfg . SignRequest ( req , sigv4Hash ) ; err != nil {
2023-10-25 23:19:33 +02:00
return nil , fmt . Errorf ( "cannot sign remoteWrite request with AWS sigv4: %w" , err )
2022-05-04 19:24:19 +02:00
}
}
2023-10-17 11:58:19 +02:00
return req , nil
2023-02-26 21:07:30 +01:00
}
// sendBlockHTTP sends the given block to c.remoteWriteURL.
//
// The function returns false only if c.stopCh is closed.
// Otherwise it tries sending the block to remote storage indefinitely.
func ( c * client ) sendBlockHTTP ( block [ ] byte ) bool {
c . rl . register ( len ( block ) , c . stopCh )
retryDuration := time . Second
retriesCount := 0
again :
2020-02-23 12:35:47 +01:00
startTime := time . Now ( )
2023-02-26 21:07:30 +01:00
resp , err := c . doRequest ( c . remoteWriteURL , block )
2020-02-23 12:35:47 +01:00
c . requestDuration . UpdateDuration ( startTime )
if err != nil {
c . errorsCount . Inc ( )
retryDuration *= 2
if retryDuration > time . Minute {
retryDuration = time . Minute
}
2021-05-24 14:42:43 +02:00
logger . Warnf ( "couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds" ,
2020-09-16 21:34:01 +02:00
len ( block ) , c . sanitizedURL , err , retryDuration . Seconds ( ) )
2021-01-26 23:23:10 +01:00
t := timerpool . Get ( retryDuration )
2020-07-20 18:27:25 +02:00
select {
case <- c . stopCh :
2021-01-26 23:23:10 +01:00
timerpool . Put ( t )
2021-02-17 20:23:38 +01:00
return false
2020-07-20 18:27:25 +02:00
case <- t . C :
2021-01-26 23:23:10 +01:00
timerpool . Put ( t )
2020-07-20 18:27:25 +02:00
}
2020-02-23 12:35:47 +01:00
c . retriesCount . Inc ( )
goto again
}
2020-07-20 18:27:25 +02:00
statusCode := resp . StatusCode
2020-07-28 19:52:00 +02:00
if statusCode / 100 == 2 {
_ = resp . Body . Close ( )
c . requestsOKCount . Inc ( )
2023-02-26 21:07:30 +01:00
c . bytesSent . Add ( len ( block ) )
c . blocksSent . Inc ( )
2021-02-17 20:23:38 +01:00
return true
2020-07-28 19:52:00 +02:00
}
2020-11-01 23:43:51 +01:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_requests_total { url=%q, status_code="%d"} ` , c . sanitizedURL , statusCode ) ) . Inc ( )
2021-03-26 12:17:59 +01:00
if statusCode == 409 || statusCode == 400 {
2022-08-21 23:13:44 +02:00
body , err := io . ReadAll ( resp . Body )
2021-12-21 15:36:09 +01:00
_ = resp . Body . Close ( )
if err != nil {
2022-06-27 11:31:16 +02:00
remoteWriteRejectedLogger . Errorf ( "sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; " +
2021-12-21 15:36:09 +01:00
"failed to read response body: %s" ,
len ( block ) , c . sanitizedURL , statusCode , err )
} else {
2022-06-27 11:31:16 +02:00
remoteWriteRejectedLogger . Errorf ( "sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s" ,
2021-12-21 15:36:09 +01:00
len ( block ) , c . sanitizedURL , statusCode , string ( body ) )
}
2021-05-13 15:16:16 +02:00
// Just drop block on 409 and 400 status codes like Prometheus does.
2020-11-01 23:43:51 +01:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873
2021-05-13 15:16:16 +02:00
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149
2020-11-01 23:43:51 +01:00
_ = resp . Body . Close ( )
c . packetsDropped . Inc ( )
2021-02-17 20:23:38 +01:00
return true
2020-11-01 23:43:51 +01:00
}
2020-07-28 19:52:00 +02:00
// Unexpected status code returned
2020-08-30 20:39:45 +02:00
retriesCount ++
2020-07-28 19:52:00 +02:00
retryDuration *= 2
if retryDuration > time . Minute {
retryDuration = time . Minute
}
2022-08-21 23:13:44 +02:00
body , err := io . ReadAll ( resp . Body )
2020-07-28 19:52:00 +02:00
_ = resp . Body . Close ( )
if err != nil {
2020-09-16 21:34:01 +02:00
logger . Errorf ( "cannot read response body from %q during retry #%d: %s" , c . sanitizedURL , retriesCount , err )
2020-07-28 19:52:00 +02:00
} else {
2020-08-30 20:39:45 +02:00
logger . Errorf ( "unexpected status code received after sending a block with size %d bytes to %q during retry #%d: %d; response body=%q; " +
2020-09-16 21:34:01 +02:00
"re-sending the block in %.3f seconds" , len ( block ) , c . sanitizedURL , retriesCount , statusCode , body , retryDuration . Seconds ( ) )
2020-07-28 19:52:00 +02:00
}
2021-01-26 23:23:10 +01:00
t := timerpool . Get ( retryDuration )
2020-07-28 19:52:00 +02:00
select {
case <- c . stopCh :
2021-01-26 23:23:10 +01:00
timerpool . Put ( t )
2021-02-17 20:23:38 +01:00
return false
2020-07-28 19:52:00 +02:00
case <- t . C :
2021-01-26 23:23:10 +01:00
timerpool . Put ( t )
2020-02-23 12:35:47 +01:00
}
2020-07-28 19:52:00 +02:00
c . retriesCount . Inc ( )
goto again
2020-04-17 14:51:29 +02:00
}
2021-01-26 23:19:35 +01:00
2022-06-27 11:31:16 +02:00
var remoteWriteRejectedLogger = logger . WithThrottler ( "remoteWriteRejected" , 5 * time . Second )
2021-01-26 23:19:35 +01:00
type rateLimiter struct {
perSecondLimit int64
2021-02-28 23:28:20 +01:00
// mu protects budget and deadline from concurrent access.
mu sync . Mutex
2021-01-26 23:19:35 +01:00
// The current budget. It is increased by perSecondLimit every second.
budget int64
// The next deadline for increasing the budget by perSecondLimit
deadline time . Time
limitReached * metrics . Counter
}
func ( rl * rateLimiter ) register ( dataLen int , stopCh <- chan struct { } ) {
limit := rl . perSecondLimit
if limit <= 0 {
return
}
2021-02-28 23:28:20 +01:00
rl . mu . Lock ( )
defer rl . mu . Unlock ( )
2021-01-26 23:19:35 +01:00
for rl . budget <= 0 {
2021-02-28 23:58:31 +01:00
if d := time . Until ( rl . deadline ) ; d > 0 {
2021-01-26 23:19:35 +01:00
rl . limitReached . Inc ( )
2021-01-26 23:23:10 +01:00
t := timerpool . Get ( d )
2021-01-26 23:19:35 +01:00
select {
case <- stopCh :
2021-01-26 23:23:10 +01:00
timerpool . Put ( t )
2021-01-26 23:19:35 +01:00
return
case <- t . C :
2021-01-26 23:23:10 +01:00
timerpool . Put ( t )
2021-01-26 23:19:35 +01:00
}
}
rl . budget += limit
2021-02-28 23:58:31 +01:00
rl . deadline = time . Now ( ) . Add ( time . Second )
2021-01-26 23:19:35 +01:00
}
rl . budget -= int64 ( dataLen )
}