diff --git a/lib/promscrape/discovery/kuma/api.go b/lib/promscrape/discovery/kuma/api.go index 32e5411d2e..f0a9f8096d 100644 --- a/lib/promscrape/discovery/kuma/api.go +++ b/lib/promscrape/discovery/kuma/api.go @@ -164,7 +164,7 @@ func (cfg *apiConfig) updateTargetsLabels(ctx context.Context) error { notModified = true } } - data, err := cfg.client.GetBlockingAPIResponseWithParamsCtx(ctx, cfg.apiPath, updateRequestFunc, inspectResponseFunc) + data, err := cfg.client.GetAPIResponseWithParamsCtx(ctx, cfg.apiPath, updateRequestFunc, inspectResponseFunc) if err != nil { cfg.fetchErrors.Inc() return fmt.Errorf("error when reading Kuma discovery response: %w", err) diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 83d7fed2cd..772339dc9c 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -170,18 +170,23 @@ func (c *Client) Context() context.Context { return c.clientCtx } +// GetAPIResponseWithParamsCtx returns response for given absolute path with blocking client and optional callback for api response, +func (c *Client) GetAPIResponseWithParamsCtx(ctx context.Context, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) { + return c.getAPIResponseWithConcurrencyLimit(ctx, c.client, path, modifyRequest, inspectResponse) +} + // GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request. func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest RequestCallback) ([]byte, error) { - return c.getAPIResponse(path, modifyRequest) + return c.getAPIResponseWithConcurrencyLimit(c.clientCtx, c.client, path, modifyRequest, nil) } // GetAPIResponse returns response for the given absolute path. func (c *Client) GetAPIResponse(path string) ([]byte, error) { - return c.getAPIResponse(path, nil) + return c.getAPIResponseWithConcurrencyLimit(c.clientCtx, c.client, path, nil, nil) } -// GetAPIResponse returns response for the given absolute path with optional callback for request. -func (c *Client) getAPIResponse(path string, modifyRequest RequestCallback) ([]byte, error) { +func (c *Client) getAPIResponseWithConcurrencyLimit(ctx context.Context, client *HTTPClient, path string, + modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) { // Limit the number of concurrent API requests. concurrencyLimitChOnce.Do(concurrencyLimitChInit) t := timerpool.Get(*maxWaitTime) @@ -192,11 +197,13 @@ func (c *Client) getAPIResponse(path string, modifyRequest RequestCallback) ([]b timerpool.Put(t) return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d", c.apiServer, *maxWaitTime, *maxConcurrency) + case <-ctx.Done(): + timerpool.Put(t) + return nil, ctx.Err() } - defer func() { - <-concurrencyLimitCh - }() - return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.client, path, modifyRequest, nil) + data, err := c.getAPIResponseWithParamsAndClientCtx(ctx, client, path, modifyRequest, inspectResponse) + <-concurrencyLimitCh + return data, err } // GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response, @@ -209,11 +216,6 @@ func (c *Client) GetBlockingAPIResponseCtx(ctx context.Context, path string, ins return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, nil, inspectResponse) } -// GetBlockingAPIResponseWithParamsCtx returns response for given absolute path with blocking client and optional callback for api response, -func (c *Client) GetBlockingAPIResponseWithParamsCtx(ctx context.Context, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) { - return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, modifyRequest, inspectResponse) -} - // getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response. func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, client *HTTPClient, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) { requestURL := c.apiServer + path