diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index b5f693c8bd..14298aca3d 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "net/http" "net/url" + "strconv" "strings" "sync" "time" @@ -20,6 +21,9 @@ import ( ) var ( + rateLimit = flagutil.NewArray("remoteWrite.rateLimit", "Optional rate limit in bytes per second for data sent to -remoteWrite.url. "+ + "By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+ + "is sent after temporary unavailability of the remote storage") sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", "Timeout for sending a single block of data to -remoteWrite.url") proxyURL = flagutil.NewArray("remoteWrite.proxyURL", "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. "+ "Example: -remoteWrite.proxyURL=socks5://proxy:1234") @@ -49,6 +53,8 @@ type client struct { fq *persistentqueue.FastQueue hc *http.Client + rl rateLimiter + bytesSent *metrics.Counter blocksSent *metrics.Counter requestDuration *metrics.Histogram @@ -113,6 +119,18 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu }, stopCh: make(chan struct{}), } + if bytesPerSec := rateLimit.GetOptionalArg(argIdx); bytesPerSec != "" { + limit, err := strconv.ParseInt(bytesPerSec, 10, 64) + if err != nil { + logger.Fatalf("cannot parse -remoteWrite.rateLimit=%q for -remoteWrite.url=%q: %s", bytesPerSec, sanitizedURL, err) + } + if limit > 0 { + logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", limit, sanitizedURL) + c.rl.perSecondLimit = limit + } + } + c.rl.limitReached = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remote_write_rate_limit_reached_total{url=%q}`, c.sanitizedURL)) + c.bytesSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_bytes_sent_total{url=%q}`, c.sanitizedURL)) c.blocksSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_blocks_sent_total{url=%q}`, c.sanitizedURL)) c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.sanitizedURL)) @@ -189,6 +207,7 @@ func (c *client) runWorker() { } func (c *client) sendBlock(block []byte) { + c.rl.register(len(block), c.stopCh) retryDuration := time.Second retriesCount := 0 c.bytesSent.Add(len(block)) @@ -271,3 +290,38 @@ again: c.retriesCount.Inc() goto again } + +type rateLimiter struct { + perSecondLimit int64 + + // The current budget. It is increased by perSecondLimit every second. + budget int64 + + // The next deadline for increasing the budget by perSecondLimit + deadline time.Time + + limitReached *metrics.Counter +} + +func (rl *rateLimiter) register(dataLen int, stopCh <-chan struct{}) { + limit := rl.perSecondLimit + if limit <= 0 { + return + } + for rl.budget <= 0 { + now := time.Now() + if d := rl.deadline.Sub(now); d > 0 { + rl.limitReached.Inc() + t := time.NewTimer(retryDuration) + select { + case <-stopCh: + t.Stop() + return + case <-t.C: + } + } + rl.budget += limit + rl.deadline = now.Add(time.Second) + } + rl.budget -= int64(dataLen) +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index fae2dff955..c4b45ba4ec 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -5,6 +5,7 @@ * FEATURE: added `-loggerTimezone` command-line flag for adjusting time zone for timestamps in log messages. By default UTC is used. * FEATURE: added `-search.maxStepForPointsAdjustment` command-line flag, which can be used for disabling adjustment for points returned `/api/v1/query_range` handler if such points have timestamps closer than `-search.latencyOffset` to the current time. Such points may contain incomplete data, so they are substituted by the previous values for `step` query args smaller than one minute by default. * FEATURE: vmalert: added `-datasource.queryStep` command-line flag for passing `step` query arg to `/api/v1/query` endpoint. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1025 +* FEATURE: vmagent: added `-remoteWrite.rateLimit` command-line flag for limiting data transfer rate to `-remoteWrite.url`. This may be useful when big amounts of buffered data is sent after temporarily unavailability of the remote storage. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1035 * BUGFIX: vmagent: reduce the HTTP reconnection rate to scrape targets. Previously vmagent could errorneusly close HTTP keep-alive connections more often than needed. * BUGFIX: vmagent: retry scrape and service discovery requests when the remote server closes HTTP keep-alive connection. Previously `disable_keepalive: true` option could be used under `scrape_configs` section when working with such servers.