diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 49b92b7bd5..5354962aad 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -7,12 +7,11 @@ import ( "io" "net/http" "net/url" + "strconv" "strings" "sync" "time" - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -23,6 +22,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" + "github.com/VictoriaMetrics/metrics" ) var ( @@ -463,10 +463,10 @@ again: // Unexpected status code returned retriesCount++ - retryDuration *= 2 - if retryDuration > maxRetryDuration { - retryDuration = maxRetryDuration - } + retryAfterHeader := parseRetryAfterHeader(resp.Header.Get("Retry-After")) + retryDuration = getRetryDuration(retryAfterHeader, retryDuration, maxRetryDuration) + + // Handle response body, err := io.ReadAll(resp.Body) _ = resp.Body.Close() if err != nil { @@ -488,3 +488,49 @@ again: } var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second) + +// getRetryDuration returns retry duration. +// retryAfterDuration has the highest priority. +// If retryAfterDuration is not specified, retryDuration gets doubled. +// retryDuration can't exceed maxRetryDuration. +// +// Also see: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6097 +func getRetryDuration(retryAfterDuration, retryDuration, maxRetryDuration time.Duration) time.Duration { + // retryAfterDuration has the highest priority duration + if retryAfterDuration > 0 { + return timeutil.AddJitterToDuration(retryAfterDuration) + } + + // default backoff retry policy + retryDuration *= 2 + if retryDuration > maxRetryDuration { + retryDuration = maxRetryDuration + } + + return retryDuration +} + +// parseRetryAfterHeader parses `Retry-After` value retrieved from HTTP response header. +// retryAfterString should be in either HTTP-date or a number of seconds. +// It will return time.Duration(0) if `retryAfterString` does not follow RFC 7231. +func parseRetryAfterHeader(retryAfterString string) (retryAfterDuration time.Duration) { + if retryAfterString == "" { + return retryAfterDuration + } + + defer func() { + v := retryAfterDuration.Seconds() + logger.Infof("'Retry-After: %s' parsed into %.2f second(s)", retryAfterString, v) + }() + + // Retry-After could be in "Mon, 02 Jan 2006 15:04:05 GMT" format. + if parsedTime, err := time.Parse(http.TimeFormat, retryAfterString); err == nil { + return time.Duration(time.Until(parsedTime).Seconds()) * time.Second + } + // Retry-After could be in seconds. + if seconds, err := strconv.Atoi(retryAfterString); err == nil { + return time.Duration(seconds) * time.Second + } + + return 0 +} diff --git a/app/vmagent/remotewrite/client_test.go b/app/vmagent/remotewrite/client_test.go new file mode 100644 index 0000000000..fda56e2f85 --- /dev/null +++ b/app/vmagent/remotewrite/client_test.go @@ -0,0 +1,99 @@ +package remotewrite + +import ( + "math" + "net/http" + "testing" + "time" +) + +func TestCalculateRetryDuration(t *testing.T) { + // `testFunc` call `calculateRetryDuration` for `n` times + // and evaluate if the result of `calculateRetryDuration` is + // 1. >= expectMinDuration + // 2. <= expectMinDuration + 10% (see timeutil.AddJitterToDuration) + f := func(retryAfterDuration, retryDuration time.Duration, n int, expectMinDuration time.Duration) { + t.Helper() + + for i := 0; i < n; i++ { + retryDuration = getRetryDuration(retryAfterDuration, retryDuration, time.Minute) + } + + expectMaxDuration := helper(expectMinDuration) + expectMinDuration = expectMinDuration - (1000 * time.Millisecond) // Avoid edge case when calculating time.Until(now) + + if !(retryDuration >= expectMinDuration && retryDuration <= expectMaxDuration) { + t.Fatalf( + "incorrect retry duration, want (ms): [%d, %d], got (ms): %d", + expectMinDuration.Milliseconds(), expectMaxDuration.Milliseconds(), + retryDuration.Milliseconds(), + ) + } + } + + // Call calculateRetryDuration for 1 time. + { + // default backoff policy + f(0, time.Second, 1, 2*time.Second) + // default backoff policy exceed max limit" + f(0, 10*time.Minute, 1, time.Minute) + + // retry after > default backoff policy + f(10*time.Second, 1*time.Second, 1, 10*time.Second) + // retry after < default backoff policy + f(1*time.Second, 10*time.Second, 1, 1*time.Second) + // retry after invalid and < default backoff policy + f(0, time.Second, 1, 2*time.Second) + + } + + // Call calculateRetryDuration for multiple times. + { + // default backoff policy 2 times + f(0, time.Second, 2, 4*time.Second) + // default backoff policy 3 times + f(0, time.Second, 3, 8*time.Second) + // default backoff policy N times exceed max limit + f(0, time.Second, 10, time.Minute) + + // retry after 120s 1 times + f(120*time.Second, time.Second, 1, 120*time.Second) + // retry after 120s 2 times + f(120*time.Second, time.Second, 2, 120*time.Second) + } +} + +func TestParseRetryAfterHeader(t *testing.T) { + f := func(retryAfterString string, expectResult time.Duration) { + t.Helper() + + result := parseRetryAfterHeader(retryAfterString) + // expect `expectResult == result` when retryAfterString is in seconds or invalid + // expect the difference between result and expectResult to be lower than 10% + if !(expectResult == result || math.Abs(float64(expectResult-result))/float64(expectResult) < 0.10) { + t.Fatalf( + "incorrect retry after duration, want (ms): %d, got (ms): %d", + expectResult.Milliseconds(), result.Milliseconds(), + ) + } + } + + // retry after header in seconds + f("10", 10*time.Second) + // retry after header in date time + f(time.Now().Add(30*time.Second).UTC().Format(http.TimeFormat), 30*time.Second) + // retry after header invalid + f("invalid-retry-after", 0) + // retry after header not in GMT + f(time.Now().Add(10*time.Second).Format("Mon, 02 Jan 2006 15:04:05 FAKETZ"), 0) +} + +// helper calculate the max possible time duration calculated by timeutil.AddJitterToDuration. +func helper(d time.Duration) time.Duration { + dv := d / 10 + if dv > 10*time.Second { + dv = 10 * time.Second + } + + return d + dv +} diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 4440233946..0eefe18dcb 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -59,6 +59,7 @@ Released at 2024-08-28 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): reduce memory usage when scraping targets with big response body. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6759). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): stop adding default port 80/443 for scrape URLs without a port. The value in `instance` label will still carry port for backward compatibility. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6792). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add flags `-remoteWrite.retryMinInterval` and `-remoteWrite.retryMaxTime` for adjusting remote-write requests retry policy. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5486). Thanks to @yorik for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6289). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): respect `Retry-After` header when making retries for pushing the data to remote destination via remote write protocol. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6097). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): add command-line flag `-notifier.headers` to allow configuring additional headers for all requests sent to the corresponding `-notifier.url`. * FEATURE: [vmalert-tool](https://docs.victoriametrics.com/vmalert-tool/): add `-external.label` and `-external.url` command-line flags, in the same way as these flags are supported by vmalert. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6735). * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmrestore](https://docs.victoriametrics.com/vmrestore/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): use exponential backoff for retries when uploading or downloading data from S3. This should reduce the number of failed uploads and downloads when S3 is temporarily unavailable. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6732).