mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-24 22:07:23 +01:00
02bccd1eb9
* [vmagent] added ingestion rate limiting with new flag `-maxIngestionRate`. This flag can be used to limit the number of samples ingested by vmagent per second. If the limit is exceeded, the ingestion rate will be throttled. * fix changelog * fix review comment
538 lines
22 KiB
Go
538 lines
22 KiB
Go
package remotewrite
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
forcePromProto = flagutil.NewArrayBool("remoteWrite.forcePromProto", "Whether to force Prometheus remote write protocol for sending data "+
|
|
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol")
|
|
forceVMProto = flagutil.NewArrayBool("remoteWrite.forceVMProto", "Whether to force VictoriaMetrics remote write protocol for sending data "+
|
|
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol")
|
|
|
|
rateLimit = flagutil.NewArrayInt("remoteWrite.rateLimit", 0, "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. "+
|
|
"By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+
|
|
"is sent after temporary unavailability of the remote storage")
|
|
sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to the corresponding -remoteWrite.url")
|
|
proxyURL = flagutil.NewArrayString("remoteWrite.proxyURL", "Optional proxy URL for writing data to the corresponding -remoteWrite.url. "+
|
|
"Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234")
|
|
|
|
tlsHandshakeTimeout = flagutil.NewArrayDuration("remoteWrite.tlsHandshakeTimeout", 20*time.Second, "The timeout for estabilishing tls connections to the corresponding -remoteWrite.url")
|
|
tlsInsecureSkipVerify = flagutil.NewArrayBool("remoteWrite.tlsInsecureSkipVerify", "Whether to skip tls verification when connecting to the corresponding -remoteWrite.url")
|
|
tlsCertFile = flagutil.NewArrayString("remoteWrite.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting "+
|
|
"to the corresponding -remoteWrite.url")
|
|
tlsKeyFile = flagutil.NewArrayString("remoteWrite.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to the corresponding -remoteWrite.url")
|
|
tlsCAFile = flagutil.NewArrayString("remoteWrite.tlsCAFile", "Optional path to TLS CA file to use for verifying connections to the corresponding -remoteWrite.url. "+
|
|
"By default, system CA is used")
|
|
tlsServerName = flagutil.NewArrayString("remoteWrite.tlsServerName", "Optional TLS server name to use for connections to the corresponding -remoteWrite.url. "+
|
|
"By default, the server name from -remoteWrite.url is used")
|
|
|
|
headers = flagutil.NewArrayString("remoteWrite.headers", "Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. "+
|
|
"For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. "+
|
|
"Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'")
|
|
|
|
basicAuthUsername = flagutil.NewArrayString("remoteWrite.basicAuth.username", "Optional basic auth username to use for the corresponding -remoteWrite.url")
|
|
basicAuthPassword = flagutil.NewArrayString("remoteWrite.basicAuth.password", "Optional basic auth password to use for the corresponding -remoteWrite.url")
|
|
basicAuthPasswordFile = flagutil.NewArrayString("remoteWrite.basicAuth.passwordFile", "Optional path to basic auth password to use for the corresponding -remoteWrite.url. "+
|
|
"The file is re-read every second")
|
|
bearerToken = flagutil.NewArrayString("remoteWrite.bearerToken", "Optional bearer auth token to use for the corresponding -remoteWrite.url")
|
|
bearerTokenFile = flagutil.NewArrayString("remoteWrite.bearerTokenFile", "Optional path to bearer token file to use for the corresponding -remoteWrite.url. "+
|
|
"The token is re-read from the file every second")
|
|
|
|
oauth2ClientID = flagutil.NewArrayString("remoteWrite.oauth2.clientID", "Optional OAuth2 clientID to use for the corresponding -remoteWrite.url")
|
|
oauth2ClientSecret = flagutil.NewArrayString("remoteWrite.oauth2.clientSecret", "Optional OAuth2 clientSecret to use for the corresponding -remoteWrite.url")
|
|
oauth2ClientSecretFile = flagutil.NewArrayString("remoteWrite.oauth2.clientSecretFile", "Optional OAuth2 clientSecretFile to use for the corresponding -remoteWrite.url")
|
|
oauth2EndpointParams = flagutil.NewArrayString("remoteWrite.oauth2.endpointParams", "Optional OAuth2 endpoint parameters to use for the corresponding -remoteWrite.url . "+
|
|
`The endpoint parameters must be set in JSON format: {"param1":"value1",...,"paramN":"valueN"}`)
|
|
oauth2TokenURL = flagutil.NewArrayString("remoteWrite.oauth2.tokenUrl", "Optional OAuth2 tokenURL to use for the corresponding -remoteWrite.url")
|
|
oauth2Scopes = flagutil.NewArrayString("remoteWrite.oauth2.scopes", "Optional OAuth2 scopes to use for the corresponding -remoteWrite.url. Scopes must be delimited by ';'")
|
|
|
|
awsUseSigv4 = flagutil.NewArrayBool("remoteWrite.aws.useSigv4", "Enables SigV4 request signing for the corresponding -remoteWrite.url. "+
|
|
"It is expected that other -remoteWrite.aws.* command-line flags are set if sigv4 request signing is enabled")
|
|
awsEC2Endpoint = flagutil.NewArrayString("remoteWrite.aws.ec2Endpoint", "Optional AWS EC2 API endpoint to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
|
|
awsSTSEndpoint = flagutil.NewArrayString("remoteWrite.aws.stsEndpoint", "Optional AWS STS API endpoint to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
|
|
awsRegion = flagutil.NewArrayString("remoteWrite.aws.region", "Optional AWS region to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
|
|
awsRoleARN = flagutil.NewArrayString("remoteWrite.aws.roleARN", "Optional AWS roleARN to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
|
|
awsAccessKey = flagutil.NewArrayString("remoteWrite.aws.accessKey", "Optional AWS AccessKey to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
|
|
awsService = flagutil.NewArrayString("remoteWrite.aws.service", "Optional AWS Service to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set. "+
|
|
"Defaults to \"aps\"")
|
|
awsSecretKey = flagutil.NewArrayString("remoteWrite.aws.secretKey", "Optional AWS SecretKey to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
|
|
)
|
|
|
|
type client struct {
|
|
sanitizedURL string
|
|
remoteWriteURL string
|
|
|
|
// Whether to use VictoriaMetrics remote write protocol for sending the data to remoteWriteURL
|
|
useVMProto bool
|
|
|
|
fq *persistentqueue.FastQueue
|
|
hc *http.Client
|
|
|
|
sendBlock func(block []byte) bool
|
|
authCfg *promauth.Config
|
|
awsCfg *awsapi.Config
|
|
|
|
rl *rateLimiter
|
|
|
|
bytesSent *metrics.Counter
|
|
blocksSent *metrics.Counter
|
|
requestDuration *metrics.Histogram
|
|
requestsOKCount *metrics.Counter
|
|
errorsCount *metrics.Counter
|
|
packetsDropped *metrics.Counter
|
|
rateLimit *metrics.Gauge
|
|
retriesCount *metrics.Counter
|
|
sendDuration *metrics.FloatCounter
|
|
|
|
wg sync.WaitGroup
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client {
|
|
authCfg, err := getAuthConfig(argIdx)
|
|
if err != nil {
|
|
logger.Fatalf("cannot initialize auth config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
|
}
|
|
tlsCfg, err := authCfg.NewTLSConfig()
|
|
if err != nil {
|
|
logger.Fatalf("cannot initialize tls config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
|
}
|
|
awsCfg, err := getAWSAPIConfig(argIdx)
|
|
if err != nil {
|
|
logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
|
}
|
|
tr := &http.Transport{
|
|
DialContext: statDial,
|
|
TLSClientConfig: tlsCfg,
|
|
TLSHandshakeTimeout: tlsHandshakeTimeout.GetOptionalArg(argIdx),
|
|
MaxConnsPerHost: 2 * concurrency,
|
|
MaxIdleConnsPerHost: 2 * concurrency,
|
|
IdleConnTimeout: time.Minute,
|
|
WriteBufferSize: 64 * 1024,
|
|
}
|
|
pURL := proxyURL.GetOptionalArg(argIdx)
|
|
if len(pURL) > 0 {
|
|
if !strings.Contains(pURL, "://") {
|
|
logger.Fatalf("cannot parse -remoteWrite.proxyURL=%q: it must start with `http://`, `https://` or `socks5://`", pURL)
|
|
}
|
|
pu, err := url.Parse(pURL)
|
|
if err != nil {
|
|
logger.Fatalf("cannot parse -remoteWrite.proxyURL=%q: %s", pURL, err)
|
|
}
|
|
tr.Proxy = http.ProxyURL(pu)
|
|
}
|
|
hc := &http.Client{
|
|
Transport: tr,
|
|
Timeout: sendTimeout.GetOptionalArg(argIdx),
|
|
}
|
|
c := &client{
|
|
sanitizedURL: sanitizedURL,
|
|
remoteWriteURL: remoteWriteURL,
|
|
authCfg: authCfg,
|
|
awsCfg: awsCfg,
|
|
fq: fq,
|
|
hc: hc,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
c.sendBlock = c.sendBlockHTTP
|
|
|
|
useVMProto := forceVMProto.GetOptionalArg(argIdx)
|
|
usePromProto := forcePromProto.GetOptionalArg(argIdx)
|
|
if useVMProto && usePromProto {
|
|
logger.Fatalf("-remoteWrite.useVMProto and -remoteWrite.usePromProto cannot be set simultaneously for -remoteWrite.url=%s", sanitizedURL)
|
|
}
|
|
if !useVMProto && !usePromProto {
|
|
// Auto-detect whether the remote storage supports VictoriaMetrics remote write protocol.
|
|
doRequest := func(url string) (*http.Response, error) {
|
|
return c.doRequest(url, nil)
|
|
}
|
|
useVMProto = common.HandleVMProtoClientHandshake(c.remoteWriteURL, doRequest)
|
|
if !useVMProto {
|
|
logger.Infof("the remote storage at %q doesn't support VictoriaMetrics remote write protocol. Switching to Prometheus remote write protocol. "+
|
|
"See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol", sanitizedURL)
|
|
}
|
|
}
|
|
c.useVMProto = useVMProto
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
|
|
limitReached := metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rate_limit_reached_total{url=%q}`, c.sanitizedURL))
|
|
if bytesPerSec := rateLimit.GetOptionalArg(argIdx); bytesPerSec > 0 {
|
|
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL)
|
|
c.rl = newRateLimiter(time.Second, int64(bytesPerSec), limitReached, c.stopCh)
|
|
}
|
|
c.bytesSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_bytes_sent_total{url=%q}`, c.sanitizedURL))
|
|
c.blocksSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_blocks_sent_total{url=%q}`, c.sanitizedURL))
|
|
c.rateLimit = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_rate_limit{url=%q}`, c.sanitizedURL), func() float64 {
|
|
return float64(rateLimit.GetOptionalArg(argIdx))
|
|
})
|
|
c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.sanitizedURL))
|
|
c.requestsOKCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.sanitizedURL))
|
|
c.errorsCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_errors_total{url=%q}`, c.sanitizedURL))
|
|
c.packetsDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_packets_dropped_total{url=%q}`, c.sanitizedURL))
|
|
c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.sanitizedURL))
|
|
c.sendDuration = metrics.GetOrCreateFloatCounter(fmt.Sprintf(`vmagent_remotewrite_send_duration_seconds_total{url=%q}`, c.sanitizedURL))
|
|
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
|
|
return float64(*queues)
|
|
})
|
|
for i := 0; i < concurrency; i++ {
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.runWorker()
|
|
}()
|
|
}
|
|
logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL)
|
|
}
|
|
|
|
func (c *client) MustStop() {
|
|
close(c.stopCh)
|
|
c.wg.Wait()
|
|
logger.Infof("stopped client for -remoteWrite.url=%q", c.sanitizedURL)
|
|
}
|
|
|
|
func getAuthConfig(argIdx int) (*promauth.Config, error) {
|
|
headersValue := headers.GetOptionalArg(argIdx)
|
|
var hdrs []string
|
|
if headersValue != "" {
|
|
hdrs = strings.Split(headersValue, "^^")
|
|
}
|
|
username := basicAuthUsername.GetOptionalArg(argIdx)
|
|
password := basicAuthPassword.GetOptionalArg(argIdx)
|
|
passwordFile := basicAuthPasswordFile.GetOptionalArg(argIdx)
|
|
var basicAuthCfg *promauth.BasicAuthConfig
|
|
if username != "" || password != "" || passwordFile != "" {
|
|
basicAuthCfg = &promauth.BasicAuthConfig{
|
|
Username: username,
|
|
Password: promauth.NewSecret(password),
|
|
PasswordFile: passwordFile,
|
|
}
|
|
}
|
|
|
|
token := bearerToken.GetOptionalArg(argIdx)
|
|
tokenFile := bearerTokenFile.GetOptionalArg(argIdx)
|
|
|
|
var oauth2Cfg *promauth.OAuth2Config
|
|
clientSecret := oauth2ClientSecret.GetOptionalArg(argIdx)
|
|
clientSecretFile := oauth2ClientSecretFile.GetOptionalArg(argIdx)
|
|
if clientSecretFile != "" || clientSecret != "" {
|
|
endpointParamsJSON := oauth2EndpointParams.GetOptionalArg(argIdx)
|
|
endpointParams, err := flagutil.ParseJSONMap(endpointParamsJSON)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse JSON for -remoteWrite.oauth2.endpointParams=%s: %w", endpointParamsJSON, err)
|
|
}
|
|
oauth2Cfg = &promauth.OAuth2Config{
|
|
ClientID: oauth2ClientID.GetOptionalArg(argIdx),
|
|
ClientSecret: promauth.NewSecret(clientSecret),
|
|
ClientSecretFile: clientSecretFile,
|
|
EndpointParams: endpointParams,
|
|
TokenURL: oauth2TokenURL.GetOptionalArg(argIdx),
|
|
Scopes: strings.Split(oauth2Scopes.GetOptionalArg(argIdx), ";"),
|
|
}
|
|
}
|
|
|
|
tlsCfg := &promauth.TLSConfig{
|
|
CAFile: tlsCAFile.GetOptionalArg(argIdx),
|
|
CertFile: tlsCertFile.GetOptionalArg(argIdx),
|
|
KeyFile: tlsKeyFile.GetOptionalArg(argIdx),
|
|
ServerName: tlsServerName.GetOptionalArg(argIdx),
|
|
InsecureSkipVerify: tlsInsecureSkipVerify.GetOptionalArg(argIdx),
|
|
}
|
|
|
|
opts := &promauth.Options{
|
|
BasicAuth: basicAuthCfg,
|
|
BearerToken: token,
|
|
BearerTokenFile: tokenFile,
|
|
OAuth2: oauth2Cfg,
|
|
TLSConfig: tlsCfg,
|
|
Headers: hdrs,
|
|
}
|
|
authCfg, err := opts.NewConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot populate auth config for remoteWrite idx: %d, err: %w", argIdx, err)
|
|
}
|
|
return authCfg, nil
|
|
}
|
|
|
|
func getAWSAPIConfig(argIdx int) (*awsapi.Config, error) {
|
|
if !awsUseSigv4.GetOptionalArg(argIdx) {
|
|
return nil, nil
|
|
}
|
|
ec2Endpoint := awsEC2Endpoint.GetOptionalArg(argIdx)
|
|
stsEndpoint := awsSTSEndpoint.GetOptionalArg(argIdx)
|
|
region := awsRegion.GetOptionalArg(argIdx)
|
|
roleARN := awsRoleARN.GetOptionalArg(argIdx)
|
|
accessKey := awsAccessKey.GetOptionalArg(argIdx)
|
|
secretKey := awsSecretKey.GetOptionalArg(argIdx)
|
|
service := awsService.GetOptionalArg(argIdx)
|
|
cfg, err := awsapi.NewConfig(ec2Endpoint, stsEndpoint, region, roleARN, accessKey, secretKey, service)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
func (c *client) runWorker() {
|
|
var ok bool
|
|
var block []byte
|
|
ch := make(chan bool, 1)
|
|
for {
|
|
block, ok = c.fq.MustReadBlock(block[:0])
|
|
if !ok {
|
|
return
|
|
}
|
|
go func() {
|
|
startTime := time.Now()
|
|
ch <- c.sendBlock(block)
|
|
c.sendDuration.Add(time.Since(startTime).Seconds())
|
|
}()
|
|
select {
|
|
case ok := <-ch:
|
|
if ok {
|
|
// The block has been sent successfully
|
|
continue
|
|
}
|
|
// Return unsent block to the queue.
|
|
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
|
return
|
|
case <-c.stopCh:
|
|
// c must be stopped. Wait for a while in the hope the block will be sent.
|
|
graceDuration := 5 * time.Second
|
|
select {
|
|
case ok := <-ch:
|
|
if !ok {
|
|
// Return unsent block to the queue.
|
|
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
|
}
|
|
case <-time.After(graceDuration):
|
|
// Return unsent block to the queue.
|
|
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *client) doRequest(url string, body []byte) (*http.Response, error) {
|
|
req, err := c.newRequest(url, body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resp, err := c.hc.Do(req)
|
|
if err == nil {
|
|
return resp, nil
|
|
}
|
|
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
|
return nil, err
|
|
}
|
|
// It is likely connection became stale or timed out during the first request.
|
|
// Make another attempt in hope request will succeed.
|
|
// If not, the error should be handled by the caller as usual.
|
|
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139
|
|
req, err = c.newRequest(url, body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("second attempt: %w", err)
|
|
}
|
|
resp, err = c.hc.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("second attempt: %w", err)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
|
|
reqBody := bytes.NewBuffer(body)
|
|
req, err := http.NewRequest(http.MethodPost, url, reqBody)
|
|
if err != nil {
|
|
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", url, err)
|
|
}
|
|
err = c.authCfg.SetHeaders(req, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
h := req.Header
|
|
h.Set("User-Agent", "vmagent")
|
|
h.Set("Content-Type", "application/x-protobuf")
|
|
if c.useVMProto {
|
|
h.Set("Content-Encoding", "zstd")
|
|
h.Set("X-VictoriaMetrics-Remote-Write-Version", "1")
|
|
} else {
|
|
h.Set("Content-Encoding", "snappy")
|
|
h.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
|
}
|
|
if c.awsCfg != nil {
|
|
sigv4Hash := awsapi.HashHex(body)
|
|
if err := c.awsCfg.SignRequest(req, sigv4Hash); err != nil {
|
|
return nil, fmt.Errorf("cannot sign remoteWrite request with AWS sigv4: %w", err)
|
|
}
|
|
}
|
|
return req, nil
|
|
}
|
|
|
|
// sendBlockHTTP sends the given block to c.remoteWriteURL.
|
|
//
|
|
// The function returns false only if c.stopCh is closed.
|
|
// Otherwise it tries sending the block to remote storage indefinitely.
|
|
func (c *client) sendBlockHTTP(block []byte) bool {
|
|
c.rl.register(len(block))
|
|
maxRetryDuration := timeutil.AddJitterToDuration(time.Minute)
|
|
retryDuration := timeutil.AddJitterToDuration(time.Second)
|
|
retriesCount := 0
|
|
|
|
again:
|
|
startTime := time.Now()
|
|
resp, err := c.doRequest(c.remoteWriteURL, block)
|
|
c.requestDuration.UpdateDuration(startTime)
|
|
if err != nil {
|
|
c.errorsCount.Inc()
|
|
retryDuration *= 2
|
|
if retryDuration > maxRetryDuration {
|
|
retryDuration = maxRetryDuration
|
|
}
|
|
logger.Warnf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds",
|
|
len(block), c.sanitizedURL, err, retryDuration.Seconds())
|
|
t := timerpool.Get(retryDuration)
|
|
select {
|
|
case <-c.stopCh:
|
|
timerpool.Put(t)
|
|
return false
|
|
case <-t.C:
|
|
timerpool.Put(t)
|
|
}
|
|
c.retriesCount.Inc()
|
|
goto again
|
|
}
|
|
statusCode := resp.StatusCode
|
|
if statusCode/100 == 2 {
|
|
_ = resp.Body.Close()
|
|
c.requestsOKCount.Inc()
|
|
c.bytesSent.Add(len(block))
|
|
c.blocksSent.Inc()
|
|
return true
|
|
}
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
|
|
if statusCode == 409 || statusCode == 400 {
|
|
body, err := io.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
if err != nil {
|
|
remoteWriteRejectedLogger.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+
|
|
"failed to read response body: %s",
|
|
len(block), c.sanitizedURL, statusCode, err)
|
|
} else {
|
|
remoteWriteRejectedLogger.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s",
|
|
len(block), c.sanitizedURL, statusCode, string(body))
|
|
}
|
|
// Just drop block on 409 and 400 status codes like Prometheus does.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873
|
|
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149
|
|
_ = resp.Body.Close()
|
|
c.packetsDropped.Inc()
|
|
return true
|
|
}
|
|
|
|
// Unexpected status code returned
|
|
retriesCount++
|
|
retryDuration *= 2
|
|
if retryDuration > maxRetryDuration {
|
|
retryDuration = maxRetryDuration
|
|
}
|
|
body, err := io.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
if err != nil {
|
|
logger.Errorf("cannot read response body from %q during retry #%d: %s", c.sanitizedURL, retriesCount, err)
|
|
} else {
|
|
logger.Errorf("unexpected status code received after sending a block with size %d bytes to %q during retry #%d: %d; response body=%q; "+
|
|
"re-sending the block in %.3f seconds", len(block), c.sanitizedURL, retriesCount, statusCode, body, retryDuration.Seconds())
|
|
}
|
|
t := timerpool.Get(retryDuration)
|
|
select {
|
|
case <-c.stopCh:
|
|
timerpool.Put(t)
|
|
return false
|
|
case <-t.C:
|
|
timerpool.Put(t)
|
|
}
|
|
c.retriesCount.Inc()
|
|
goto again
|
|
}
|
|
|
|
var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second)
|
|
|
|
type rateLimiter struct {
|
|
interval time.Duration
|
|
perIntervalLimit int64
|
|
stopCh <-chan struct{}
|
|
|
|
// mu protects budget and deadline from concurrent access.
|
|
mu sync.Mutex
|
|
|
|
// The current budget. It is increased by perIntervalLimit every interval.
|
|
budget int64
|
|
|
|
// The next deadline for increasing the budget by perIntervalLimit
|
|
deadline time.Time
|
|
|
|
limitReached *metrics.Counter
|
|
}
|
|
|
|
func newRateLimiter(interval time.Duration, perIntervalLimit int64, limitReached *metrics.Counter, stopCh <-chan struct{}) *rateLimiter {
|
|
return &rateLimiter{
|
|
interval: interval,
|
|
perIntervalLimit: perIntervalLimit,
|
|
stopCh: stopCh,
|
|
limitReached: limitReached,
|
|
}
|
|
}
|
|
|
|
func (rl *rateLimiter) register(count int) {
|
|
if rl == nil {
|
|
return
|
|
}
|
|
|
|
limit := rl.perIntervalLimit
|
|
if limit <= 0 {
|
|
return
|
|
}
|
|
|
|
rl.mu.Lock()
|
|
defer rl.mu.Unlock()
|
|
|
|
for rl.budget <= 0 {
|
|
if d := time.Until(rl.deadline); d > 0 {
|
|
rl.limitReached.Inc()
|
|
t := timerpool.Get(d)
|
|
select {
|
|
case <-rl.stopCh:
|
|
timerpool.Put(t)
|
|
return
|
|
case <-t.C:
|
|
timerpool.Put(t)
|
|
}
|
|
}
|
|
rl.budget += limit
|
|
rl.deadline = time.Now().Add(rl.interval)
|
|
}
|
|
rl.budget -= int64(count)
|
|
}
|