lib/promscrape: fix cancelling in-flight scrape requests during configuration reload (#3853)

* lib/promscrape: fix cancelling in-flight scrape requests during configuration reload (see #3747)

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promscrape: fix order of params for `doRequestWithPossibleRetry` to follow codestyle

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promscrape: accept deadline explicitly and extend passed context for local use

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-02-22 20:05:16 +04:00 committed by Aliaksandr Valialkin
parent 3abd6b367e
commit 2c05066f19
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1

View File

@ -262,7 +262,8 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
// This should reduce memory uage when scraping big targets. // This should reduce memory uage when scraping big targets.
dst = resp.SwapBody(dst) dst = resp.SwapBody(dst)
} }
err := doRequestWithPossibleRetry(c.hc, req, resp, deadline)
err := doRequestWithPossibleRetry(c.ctx, c.hc, req, resp, deadline)
statusCode := resp.StatusCode() statusCode := resp.StatusCode()
redirectsCount := 0 redirectsCount := 0
for err == nil && isStatusRedirect(statusCode) { for err == nil && isStatusRedirect(statusCode) {
@ -282,7 +283,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
break break
} }
req.URI().UpdateBytes(location) req.URI().UpdateBytes(location)
err = doRequestWithPossibleRetry(c.hc, req, resp, deadline) err = doRequestWithPossibleRetry(c.ctx, c.hc, req, resp, deadline)
statusCode = resp.StatusCode() statusCode = resp.StatusCode()
redirectsCount++ redirectsCount++
} }
@ -349,13 +350,14 @@ var (
scrapeRetries = metrics.NewCounter(`vm_promscrape_scrape_retries_total`) scrapeRetries = metrics.NewCounter(`vm_promscrape_scrape_retries_total`)
) )
func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error { func doRequestWithPossibleRetry(ctx context.Context, hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
sleepTime := time.Second sleepTime := time.Second
scrapeRequests.Inc() scrapeRequests.Inc()
reqCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
for { for {
// Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline // Use DoCtx instead of Do in order to support context cancellation
// across multiple retries. err := hc.DoCtx(reqCtx, req, resp)
err := hc.DoDeadline(req, resp, deadline)
if err == nil { if err == nil {
statusCode := resp.StatusCode() statusCode := resp.StatusCode()
if statusCode != fasthttp.StatusTooManyRequests { if statusCode != fasthttp.StatusTooManyRequests {
@ -364,6 +366,7 @@ func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request,
} else if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") { } else if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") {
return err return err
} }
// Retry request after exponentially increased sleep. // Retry request after exponentially increased sleep.
maxSleepTime := time.Until(deadline) maxSleepTime := time.Until(deadline)
if sleepTime > maxSleepTime { if sleepTime > maxSleepTime {