mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 22:32:20 +01:00
lib/promscrape: follow-up for 43e104a83f
- Return immediately on context cancel during the backoff sleep. This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747 - Add a comment describing why the second attempt to obtain the response from remote side is perfromed immediately after the first attempt. - Remove fasthttp dependency from lib/promscrape/discoveryutils - Set context deadline before calling doRequestWithPossibleRetry(). This simplifies the doRequestWithPossibleRetry() a bit. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293
This commit is contained in:
parent
5ea6d71cb3
commit
aed2dbe45e
@ -38,11 +38,11 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||||||
* FEATURE: allow setting zero value for `-search.latencyOffset` command-line flag. This may be needed in [some cases](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2061#issuecomment-1299109836). Previously the minimum supported value for `-search.latencyOffset` command-line flag was `1s`.
|
* FEATURE: allow setting zero value for `-search.latencyOffset` command-line flag. This may be needed in [some cases](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2061#issuecomment-1299109836). Previously the minimum supported value for `-search.latencyOffset` command-line flag was `1s`.
|
||||||
|
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): immediately cancel in-flight scrape requests during configuration reload when [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) is disabled. Previously `vmagent` could wait for long time until all the in-flight requests are completed before reloading the configuration. This could significantly slow down configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747).
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): immediately cancel in-flight scrape requests during configuration reload when [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) is disabled. Previously `vmagent` could wait for long time until all the in-flight requests are completed before reloading the configuration. This could significantly slow down configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747).
|
||||||
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not wait for 2 seconds after the first unsuccessful attempt to scrape the target before performing the next attempt. This should improve scrape scrape speed when the target closes [http keep-alive connection](https://en.wikipedia.org/wiki/HTTP_persistent_connection) between scrapes. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747) issues.
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix [Azure service discovery](https://docs.victoriametrics.com/sd_configs.html#azure_sd_configs) inside [Azure Container App](https://learn.microsoft.com/en-us/azure/container-apps/overview). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3830). Thanks to @MattiasAng for the fix!
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix [Azure service discovery](https://docs.victoriametrics.com/sd_configs.html#azure_sd_configs) inside [Azure Container App](https://learn.microsoft.com/en-us/azure/container-apps/overview). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3830). Thanks to @MattiasAng for the fix!
|
||||||
* BUGFIX: prevent from possible data ingestion slowdown and query performance slowdown during [background merges of big parts](https://docs.victoriametrics.com/#storage) on systems with small number of CPU cores (1 or 2 CPU cores). The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790).
|
* BUGFIX: prevent from possible data ingestion slowdown and query performance slowdown during [background merges of big parts](https://docs.victoriametrics.com/#storage) on systems with small number of CPU cores (1 or 2 CPU cores). The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790).
|
||||||
* BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810).
|
* BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810).
|
||||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816).
|
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816).
|
||||||
* BUGFIX: do not use exponential backoff for first retry of scrape request. Previously the first retry of scrape request was delayed by 2 seconds, which could lead to increased delays in case of connection being closed by short keep-alive timeout. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293).
|
|
||||||
|
|
||||||
## [v1.87.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.1)
|
## [v1.87.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.1)
|
||||||
|
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
"github.com/VictoriaMetrics/fasthttp"
|
"github.com/VictoriaMetrics/fasthttp"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
@ -263,7 +264,10 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
|
|||||||
dst = resp.SwapBody(dst)
|
dst = resp.SwapBody(dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := doRequestWithPossibleRetry(c.ctx, c.hc, req, resp, deadline)
|
ctx, cancel := context.WithDeadline(c.ctx, deadline)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
err := doRequestWithPossibleRetry(ctx, c.hc, req, resp)
|
||||||
statusCode := resp.StatusCode()
|
statusCode := resp.StatusCode()
|
||||||
redirectsCount := 0
|
redirectsCount := 0
|
||||||
for err == nil && isStatusRedirect(statusCode) {
|
for err == nil && isStatusRedirect(statusCode) {
|
||||||
@ -283,7 +287,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
req.URI().UpdateBytes(location)
|
req.URI().UpdateBytes(location)
|
||||||
err = doRequestWithPossibleRetry(c.ctx, c.hc, req, resp, deadline)
|
err = doRequestWithPossibleRetry(ctx, c.hc, req, resp)
|
||||||
statusCode = resp.StatusCode()
|
statusCode = resp.StatusCode()
|
||||||
redirectsCount++
|
redirectsCount++
|
||||||
}
|
}
|
||||||
@ -350,16 +354,14 @@ var (
|
|||||||
scrapeRetries = metrics.NewCounter(`vm_promscrape_scrape_retries_total`)
|
scrapeRetries = metrics.NewCounter(`vm_promscrape_scrape_retries_total`)
|
||||||
)
|
)
|
||||||
|
|
||||||
func doRequestWithPossibleRetry(ctx context.Context, hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
|
func doRequestWithPossibleRetry(ctx context.Context, hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) error {
|
||||||
scrapeRequests.Inc()
|
scrapeRequests.Inc()
|
||||||
reqCtx, cancel := context.WithDeadline(ctx, deadline)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
var reqErr error
|
var reqErr error
|
||||||
// Return true if the request execution is completed and retry is not required
|
// Return true if the request execution is completed and retry is not required
|
||||||
attempt := func() bool {
|
attempt := func() bool {
|
||||||
// Use DoCtx instead of Do in order to support context cancellation
|
// Use DoCtx instead of Do in order to support context cancellation
|
||||||
reqErr = hc.DoCtx(reqCtx, req, resp)
|
reqErr = hc.DoCtx(ctx, req, resp)
|
||||||
if reqErr == nil {
|
if reqErr == nil {
|
||||||
statusCode := resp.StatusCode()
|
statusCode := resp.StatusCode()
|
||||||
if statusCode != fasthttp.StatusTooManyRequests {
|
if statusCode != fasthttp.StatusTooManyRequests {
|
||||||
@ -368,7 +370,6 @@ func doRequestWithPossibleRetry(ctx context.Context, hc *fasthttp.HostClient, re
|
|||||||
} else if reqErr != fasthttp.ErrConnectionClosed && !strings.Contains(reqErr.Error(), "broken pipe") {
|
} else if reqErr != fasthttp.ErrConnectionClosed && !strings.Contains(reqErr.Error(), "broken pipe") {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -376,23 +377,22 @@ func doRequestWithPossibleRetry(ctx context.Context, hc *fasthttp.HostClient, re
|
|||||||
return reqErr
|
return reqErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The first attempt was unsuccessful. Use exponential backoff for further attempts.
|
||||||
|
// Perform the second attempt immediately after the first attempt - this should help
|
||||||
|
// in cases when the remote side closes the keep-alive connection before the first attempt.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293
|
||||||
sleepTime := time.Second
|
sleepTime := time.Second
|
||||||
maxSleepTime := time.Until(deadline)
|
// It is expected that the deadline is already set to ctx, so the loop below
|
||||||
|
// should eventually finish if all the attempt() calls are unsuccessful.
|
||||||
for {
|
for {
|
||||||
scrapeRetries.Inc()
|
scrapeRetries.Inc()
|
||||||
if attempt() {
|
if attempt() {
|
||||||
return reqErr
|
return reqErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retry request after exponentially increased sleep.
|
|
||||||
if sleepTime > maxSleepTime {
|
|
||||||
return fmt.Errorf("the server closes all the connection attempts: %w", reqErr)
|
|
||||||
}
|
|
||||||
sleepTime += sleepTime
|
sleepTime += sleepTime
|
||||||
if sleepTime > maxSleepTime {
|
if !discoveryutils.SleepCtx(ctx, sleepTime) {
|
||||||
sleepTime = maxSleepTime
|
return reqErr
|
||||||
}
|
}
|
||||||
time.Sleep(sleepTime)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||||
"github.com/VictoriaMetrics/fasthttp"
|
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -239,7 +238,7 @@ func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, clien
|
|||||||
modifyRequest(req)
|
modifyRequest(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := doRequestWithPossibleRetry(client, req, deadline)
|
resp, err := doRequestWithPossibleRetry(client, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
|
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
|
||||||
}
|
}
|
||||||
@ -270,7 +269,7 @@ func (c *Client) Stop() {
|
|||||||
c.clientCancel()
|
c.clientCancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, deadline time.Time) (*http.Response, error) {
|
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {
|
||||||
discoveryRequests.Inc()
|
discoveryRequests.Inc()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -279,17 +278,15 @@ func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, deadline time
|
|||||||
)
|
)
|
||||||
// Return true if the request execution is completed and retry is not required
|
// Return true if the request execution is completed and retry is not required
|
||||||
attempt := func() bool {
|
attempt := func() bool {
|
||||||
// Use DoCtx instead of Do in order to support context cancellation
|
|
||||||
resp, reqErr = hc.client.Do(req)
|
resp, reqErr = hc.client.Do(req)
|
||||||
if reqErr == nil {
|
if reqErr == nil {
|
||||||
statusCode := resp.StatusCode
|
statusCode := resp.StatusCode
|
||||||
if statusCode != fasthttp.StatusTooManyRequests {
|
if statusCode != http.StatusTooManyRequests {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
} else if reqErr != fasthttp.ErrConnectionClosed && !strings.Contains(reqErr.Error(), "broken pipe") {
|
} else if reqErr != net.ErrClosed && !strings.Contains(reqErr.Error(), "broken pipe") {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -297,22 +294,23 @@ func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, deadline time
|
|||||||
return resp, reqErr
|
return resp, reqErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The first attempt was unsuccessful. Use exponential backoff for further attempts.
|
||||||
|
// Perform the second attempt immediately after the first attempt - this should help
|
||||||
|
// in cases when the remote side closes the keep-alive connection before the first attempt.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293
|
||||||
sleepTime := time.Second
|
sleepTime := time.Second
|
||||||
maxSleepTime := time.Until(deadline)
|
// It is expected that the deadline is already set to req.Context(), so the loop below
|
||||||
|
// should eventually finish if all the attempt() calls are unsuccessful.
|
||||||
|
ctx := req.Context()
|
||||||
for {
|
for {
|
||||||
discoveryRetries.Inc()
|
discoveryRetries.Inc()
|
||||||
if attempt() {
|
if attempt() {
|
||||||
return resp, reqErr
|
return resp, reqErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if sleepTime > maxSleepTime {
|
|
||||||
return nil, fmt.Errorf("the server closes all the connection attempts: %w", reqErr)
|
|
||||||
}
|
|
||||||
sleepTime += sleepTime
|
sleepTime += sleepTime
|
||||||
if sleepTime > maxSleepTime {
|
if !SleepCtx(ctx, sleepTime) {
|
||||||
sleepTime = maxSleepTime
|
return resp, reqErr
|
||||||
}
|
}
|
||||||
time.Sleep(sleepTime)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -320,3 +318,17 @@ var (
|
|||||||
discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`)
|
discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`)
|
||||||
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)
|
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// SleepCtx sleeps for sleepDuration.
|
||||||
|
//
|
||||||
|
// It immediately returns false on ctx cancel or deadline, without waiting for sleepDuration.
|
||||||
|
func SleepCtx(ctx context.Context, sleepDuration time.Duration) bool {
|
||||||
|
t := timerpool.Get(sleepDuration)
|
||||||
|
defer timerpool.Put(t)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false
|
||||||
|
case <-t.C:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user