From aa37e6b438b4588c128392951882e873c4dd4817 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 16 Aug 2022 14:52:38 +0300 Subject: [PATCH] lib/promscrape: retry http requests if the server returns 429 status code The 429 status code means that the server is overwhelmed with requests. The client can retry the request after some wait time. Implement this strategy for service discovery and scrape requests. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2940 --- docs/CHANGELOG.md | 1 + lib/promscrape/client.go | 27 ++++--------------------- lib/promscrape/discoveryutils/client.go | 23 +++++++++++++-------- 3 files changed, 20 insertions(+), 31 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index db853ed95f..a0a24a9601 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -25,6 +25,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for MX record types in [dns_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dns_sd_configs) in the same way as Prometheus 2.38 [does](https://github.com/prometheus/prometheus/pull/10099). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `__meta_kubernetes_service_port_number` meta-label for `role: service` in [kubernetes_sd_configs](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) in the same way as Prometheus 2.38 [does](https://github.com/prometheus/prometheus/pull/11002). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `__meta_kubernetes_pod_container_image` meta-label for `role: pod` in [kubernetes_sd_configs](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) in the same way as Prometheus 2.38 [does](https://github.com/prometheus/prometheus/pull/11034). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): retry HTTP requests after some wait time during service discovery and during target scrapes if the server returns 429 HTTP status code (aka `Too many requests`). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2940). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add a legend in the top right corner for shortcut keys. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2813). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `toTime()` template function in the same way as Prometheus 2.38 [does](https://github.com/prometheus/prometheus/pull/10993). See [these docs](https://prometheus.io/docs/prometheus/latest/configuration/template_reference/#numbers). diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index 2051f2216e..568dcea63d 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/metrics" @@ -189,6 +190,7 @@ func (c *client) GetStreamReader() (*streamReader, error) { req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) c.setHeaders(req) c.setProxyHeaders(req) + scrapeRequests.Inc() resp, err := c.sc.Do(req) if err != nil { cancel() @@ -327,33 +329,12 @@ var ( scrapesOK = metrics.NewCounter(`vm_promscrape_scrapes_total{status_code="200"}`) scrapesGunzipped = metrics.NewCounter(`vm_promscrape_scrapes_gunziped_total`) scrapesGunzipFailed = metrics.NewCounter(`vm_promscrape_scrapes_gunzip_failed_total`) + scrapeRequests = metrics.NewCounter(`vm_promscrape_scrape_requests_total`) scrapeRetries = metrics.NewCounter(`vm_promscrape_scrape_retries_total`) ) func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error { - sleepTime := time.Second - for { - // Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline - // across multiple retries. - err := hc.DoDeadline(req, resp, deadline) - if err == nil { - return nil - } - if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") { - return err - } - // Retry request if the server closes the keep-alive connection unless deadline exceeds. - maxSleepTime := time.Until(deadline) - if sleepTime > maxSleepTime { - return fmt.Errorf("the server closes all the connection attempts: %w", err) - } - sleepTime += sleepTime - if sleepTime > maxSleepTime { - sleepTime = maxSleepTime - } - time.Sleep(sleepTime) - scrapeRetries.Inc() - } + return discoveryutils.DoRequestWithPossibleRetry(hc, req, resp, deadline, scrapeRequests, scrapeRetries) } type streamReader struct { diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 8cb9b1b729..842bda1bcd 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -240,20 +240,23 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *fasthttp.HostClient, return data, nil } -func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error { +// DoRequestWithPossibleRetry performs the given req at hc and stores the response at resp. +func DoRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time, requestCounter, retryCounter *metrics.Counter) error { sleepTime := time.Second - discoveryRequests.Inc() + requestCounter.Inc() for { // Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline // across multiple retries. err := hc.DoDeadline(req, resp, deadline) if err == nil { - return nil - } - if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") { + statusCode := resp.StatusCode() + if statusCode != fasthttp.StatusTooManyRequests { + return nil + } + } else if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") { return err } - // Retry request if the server closes the keep-alive connection unless deadline exceeds. + // Retry request after exponentially increased sleep. maxSleepTime := time.Until(deadline) if sleepTime > maxSleepTime { return fmt.Errorf("the server closes all the connection attempts: %w", err) @@ -263,11 +266,15 @@ func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, sleepTime = maxSleepTime } time.Sleep(sleepTime) - discoveryRetries.Inc() + retryCounter.Inc() } } +func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error { + return DoRequestWithPossibleRetry(hc, req, resp, deadline, discoveryRequests, discoveryRetries) +} + var ( - discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`) discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`) + discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`) )