lib/promscrape/discovery/kuma: substitute blocking HTTP call with non-blocking HTTP call at discoveryutils.Client

This commit is contained in:
Aliaksandr Valialkin 2023-02-23 15:13:08 -08:00
parent c708ce1985
commit bb5a3dc153
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 16 additions and 14 deletions

View File

@ -164,7 +164,7 @@ func (cfg *apiConfig) updateTargetsLabels(ctx context.Context) error {
notModified = true
}
}
data, err := cfg.client.GetBlockingAPIResponseWithParamsCtx(ctx, cfg.apiPath, updateRequestFunc, inspectResponseFunc)
data, err := cfg.client.GetAPIResponseWithParamsCtx(ctx, cfg.apiPath, updateRequestFunc, inspectResponseFunc)
if err != nil {
cfg.fetchErrors.Inc()
return fmt.Errorf("error when reading Kuma discovery response: %w", err)

View File

@ -170,18 +170,23 @@ func (c *Client) Context() context.Context {
return c.clientCtx
}
// GetAPIResponseWithParamsCtx returns response for given absolute path with blocking client and optional callback for api response,
func (c *Client) GetAPIResponseWithParamsCtx(ctx context.Context, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) {
return c.getAPIResponseWithConcurrencyLimit(ctx, c.client, path, modifyRequest, inspectResponse)
}
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest RequestCallback) ([]byte, error) {
return c.getAPIResponse(path, modifyRequest)
return c.getAPIResponseWithConcurrencyLimit(c.clientCtx, c.client, path, modifyRequest, nil)
}
// GetAPIResponse returns response for the given absolute path.
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
return c.getAPIResponse(path, nil)
return c.getAPIResponseWithConcurrencyLimit(c.clientCtx, c.client, path, nil, nil)
}
// GetAPIResponse returns response for the given absolute path with optional callback for request.
func (c *Client) getAPIResponse(path string, modifyRequest RequestCallback) ([]byte, error) {
func (c *Client) getAPIResponseWithConcurrencyLimit(ctx context.Context, client *HTTPClient, path string,
modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) {
// Limit the number of concurrent API requests.
concurrencyLimitChOnce.Do(concurrencyLimitChInit)
t := timerpool.Get(*maxWaitTime)
@ -192,11 +197,13 @@ func (c *Client) getAPIResponse(path string, modifyRequest RequestCallback) ([]b
timerpool.Put(t)
return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d",
c.apiServer, *maxWaitTime, *maxConcurrency)
case <-ctx.Done():
timerpool.Put(t)
return nil, ctx.Err()
}
defer func() {
<-concurrencyLimitCh
}()
return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.client, path, modifyRequest, nil)
data, err := c.getAPIResponseWithParamsAndClientCtx(ctx, client, path, modifyRequest, inspectResponse)
<-concurrencyLimitCh
return data, err
}
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
@ -209,11 +216,6 @@ func (c *Client) GetBlockingAPIResponseCtx(ctx context.Context, path string, ins
return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, nil, inspectResponse)
}
// GetBlockingAPIResponseWithParamsCtx returns response for given absolute path with blocking client and optional callback for api response,
func (c *Client) GetBlockingAPIResponseWithParamsCtx(ctx context.Context, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) {
return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, modifyRequest, inspectResponse)
}
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response.
func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, client *HTTPClient, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) {
requestURL := c.apiServer + path