2020-04-29 16:27:08 +02:00
package discoveryutils
import (
2023-01-06 04:34:47 +01:00
"context"
2020-05-04 19:48:02 +02:00
"crypto/tls"
2023-06-09 09:26:33 +02:00
"errors"
2020-05-19 16:35:47 +02:00
"flag"
2020-05-04 19:48:02 +02:00
"fmt"
2023-01-06 04:34:47 +01:00
"io"
2020-05-04 19:48:02 +02:00
"net"
2020-04-29 16:27:08 +02:00
"net/http"
2023-01-06 04:34:47 +01:00
"net/url"
2020-05-04 19:48:02 +02:00
"strings"
2020-05-19 16:35:47 +02:00
"sync"
2020-04-29 16:27:08 +02:00
"time"
2020-05-04 19:48:02 +02:00
2023-06-05 15:56:49 +02:00
"golang.org/x/net/http2"
2020-05-04 19:48:02 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2020-12-24 09:56:10 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
2020-05-19 16:35:47 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2021-02-01 19:02:51 +01:00
"github.com/VictoriaMetrics/metrics"
2020-04-29 16:27:08 +02:00
)
2020-05-19 16:35:47 +02:00
var (
2020-06-20 16:52:49 +02:00
maxConcurrency = flag . Int ( "promscrape.discovery.concurrency" , 100 , "The maximum number of concurrent requests to Prometheus autodiscovery API (Consul, Kubernetes, etc.)" )
2020-05-19 16:35:47 +02:00
maxWaitTime = flag . Duration ( "promscrape.discovery.concurrentWaitTime" , time . Minute , "The maximum duration for waiting to perform API requests " +
"if more than -promscrape.discovery.concurrency requests are simultaneously performed" )
)
2020-04-29 16:27:08 +02:00
var defaultClient = & http . Client {
Timeout : 30 * time . Second ,
}
2023-01-06 04:34:47 +01:00
var (
concurrencyLimitCh chan struct { }
concurrencyLimitChOnce sync . Once
)
const (
// BlockingClientReadTimeout is the maximum duration for waiting the response from GetBlockingAPI*
BlockingClientReadTimeout = 10 * time . Minute
// DefaultClientReadTimeout is the maximum duration for waiting the response from GetAPI*
DefaultClientReadTimeout = time . Minute
)
2023-02-23 02:05:49 +01:00
// RequestCallback is called on the request before sending the request to the server.
type RequestCallback func ( req * http . Request )
// ResponseCallback is called on the response before validating and returning the response to the caller.
type ResponseCallback func ( resp * http . Response )
2023-02-22 13:59:56 +01:00
2023-01-06 04:34:47 +01:00
func concurrencyLimitChInit ( ) {
concurrencyLimitCh = make ( chan struct { } , * maxConcurrency )
}
2020-04-29 16:27:08 +02:00
// GetHTTPClient returns default client for http API requests.
func GetHTTPClient ( ) * http . Client {
return defaultClient
}
2020-05-04 19:48:02 +02:00
2023-01-06 06:13:02 +01:00
// Client is http client, which talks to the given apiServer passed to NewClient().
2020-05-04 19:48:02 +02:00
type Client struct {
2023-01-06 04:34:47 +01:00
// client is used for short requests.
client * HTTPClient
2020-12-03 18:50:50 +01:00
// blockingClient is used for long-polling requests.
2023-01-06 04:34:47 +01:00
blockingClient * HTTPClient
2020-12-03 18:50:50 +01:00
apiServer string
2021-04-03 23:40:08 +02:00
2023-01-06 04:34:47 +01:00
setHTTPHeaders func ( req * http . Request )
setHTTPProxyHeaders func ( req * http . Request )
clientCtx context . Context
clientCancel context . CancelFunc
}
// HTTPClient is a wrapper around http.Client with timeouts.
type HTTPClient struct {
2023-01-07 10:26:31 +01:00
client * http . Client
ReadTimeout time . Duration
2020-05-04 19:48:02 +02:00
}
2023-01-06 06:13:02 +01:00
var defaultDialer = & net . Dialer { }
2022-07-07 01:25:31 +02:00
2021-04-03 23:40:08 +02:00
// NewClient returns new Client for the given args.
2023-06-05 15:56:49 +02:00
func NewClient ( apiServer string , ac * promauth . Config , proxyURL * proxy . URL , proxyAC * promauth . Config , httpCfg promauth . HTTPClientConfig ) ( * Client , error ) {
2023-01-06 04:34:47 +01:00
u , err := url . Parse ( apiServer )
if err != nil {
2023-01-06 06:13:02 +01:00
return nil , fmt . Errorf ( "cannot parse apiServer=%q: %w" , apiServer , err )
2023-01-06 04:34:47 +01:00
}
2020-10-12 12:38:21 +02:00
2023-01-06 06:13:02 +01:00
dialFunc := defaultDialer . DialContext
if u . Scheme == "unix" {
// special case for unix socket connection
2023-01-06 04:34:47 +01:00
dialAddr := u . Path
2023-01-06 06:13:02 +01:00
apiServer = "http://unix"
dialFunc = func ( ctx context . Context , _ , _ string ) ( net . Conn , error ) {
return defaultDialer . DialContext ( ctx , "unix" , dialAddr )
2020-10-12 12:38:21 +02:00
}
}
2020-12-24 09:52:37 +01:00
2023-01-06 06:13:02 +01:00
isTLS := u . Scheme == "https"
2021-04-03 23:40:08 +02:00
var tlsCfg * tls . Config
2021-03-09 17:54:09 +01:00
if isTLS {
2020-05-04 19:48:02 +02:00
tlsCfg = ac . NewTLSConfig ( )
}
2023-01-06 04:34:47 +01:00
2023-01-06 06:13:02 +01:00
var proxyURLFunc func ( * http . Request ) ( * url . URL , error )
if pu := proxyURL . GetURL ( ) ; pu != nil {
proxyURLFunc = http . ProxyURL ( pu )
2020-12-24 09:56:10 +01:00
}
2023-01-06 04:34:47 +01:00
2023-01-06 06:13:02 +01:00
client := & http . Client {
Timeout : DefaultClientReadTimeout ,
Transport : & http . Transport {
TLSClientConfig : tlsCfg ,
Proxy : proxyURLFunc ,
TLSHandshakeTimeout : 10 * time . Second ,
MaxIdleConnsPerHost : * maxConcurrency ,
ResponseHeaderTimeout : DefaultClientReadTimeout ,
DialContext : dialFunc ,
2023-01-06 04:34:47 +01:00
} ,
}
blockingClient := & http . Client {
2023-01-06 06:13:02 +01:00
Timeout : BlockingClientReadTimeout ,
Transport : & http . Transport {
TLSClientConfig : tlsCfg ,
Proxy : proxyURLFunc ,
TLSHandshakeTimeout : 10 * time . Second ,
MaxIdleConnsPerHost : 1000 ,
ResponseHeaderTimeout : BlockingClientReadTimeout ,
DialContext : dialFunc ,
} ,
2021-04-03 23:40:08 +02:00
}
2020-05-04 19:48:02 +02:00
2023-01-06 04:34:47 +01:00
setHTTPHeaders := func ( req * http . Request ) { }
if ac != nil {
2023-01-06 06:13:02 +01:00
setHTTPHeaders = func ( req * http . Request ) {
ac . SetHeaders ( req , true )
}
}
2023-06-05 15:56:49 +02:00
if httpCfg . FollowRedirects != nil && ! * httpCfg . FollowRedirects {
2023-07-06 19:31:35 +02:00
checkRedirect := func ( req * http . Request , via [ ] * http . Request ) error {
2023-05-26 09:39:45 +02:00
return http . ErrUseLastResponse
}
2023-07-06 19:31:35 +02:00
client . CheckRedirect = checkRedirect
blockingClient . CheckRedirect = checkRedirect
2023-05-26 09:39:45 +02:00
}
2023-01-06 06:13:02 +01:00
setHTTPProxyHeaders := func ( req * http . Request ) { }
if proxyAC != nil {
setHTTPProxyHeaders = func ( req * http . Request ) {
proxyURL . SetHeaders ( proxyAC , req )
}
2023-01-06 04:34:47 +01:00
}
2023-06-05 15:56:49 +02:00
if httpCfg . EnableHTTP2 != nil && * httpCfg . EnableHTTP2 {
_ , err := http2 . ConfigureTransports ( client . Transport . ( * http . Transport ) )
if err != nil {
return nil , fmt . Errorf ( "failed to configure HTTP/2 transport: %s" , err )
}
}
2023-01-06 04:34:47 +01:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2020-05-19 16:35:47 +02:00
2023-01-06 06:13:02 +01:00
c := & Client {
client : & HTTPClient {
2023-01-07 10:26:31 +01:00
client : client ,
ReadTimeout : DefaultClientReadTimeout ,
2023-01-06 06:13:02 +01:00
} ,
blockingClient : & HTTPClient {
2023-01-07 10:26:31 +01:00
client : blockingClient ,
ReadTimeout : BlockingClientReadTimeout ,
2023-01-06 06:13:02 +01:00
} ,
2023-01-06 04:34:47 +01:00
apiServer : apiServer ,
setHTTPHeaders : setHTTPHeaders ,
setHTTPProxyHeaders : setHTTPProxyHeaders ,
clientCtx : ctx ,
clientCancel : cancel ,
2023-01-06 06:13:02 +01:00
}
return c , nil
2020-12-03 18:47:40 +01:00
}
2023-01-18 06:47:11 +01:00
// Context returns context for the client requests.
func ( c * Client ) Context ( ) context . Context {
return c . clientCtx
}
2023-02-24 00:13:08 +01:00
// GetAPIResponseWithParamsCtx returns response for given absolute path with blocking client and optional callback for api response,
func ( c * Client ) GetAPIResponseWithParamsCtx ( ctx context . Context , path string , modifyRequest RequestCallback , inspectResponse ResponseCallback ) ( [ ] byte , error ) {
return c . getAPIResponseWithConcurrencyLimit ( ctx , c . client , path , modifyRequest , inspectResponse )
}
2021-06-22 12:33:37 +02:00
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
2023-02-22 13:59:56 +01:00
func ( c * Client ) GetAPIResponseWithReqParams ( path string , modifyRequest RequestCallback ) ( [ ] byte , error ) {
2023-02-24 00:13:08 +01:00
return c . getAPIResponseWithConcurrencyLimit ( c . clientCtx , c . client , path , modifyRequest , nil )
2021-06-22 12:33:37 +02:00
}
2020-05-04 19:48:02 +02:00
// GetAPIResponse returns response for the given absolute path.
func ( c * Client ) GetAPIResponse ( path string ) ( [ ] byte , error ) {
2023-02-24 00:13:08 +01:00
return c . getAPIResponseWithConcurrencyLimit ( c . clientCtx , c . client , path , nil , nil )
2021-06-22 12:33:37 +02:00
}
2023-02-24 00:13:08 +01:00
func ( c * Client ) getAPIResponseWithConcurrencyLimit ( ctx context . Context , client * HTTPClient , path string ,
2023-05-26 09:39:45 +02:00
modifyRequest RequestCallback , inspectResponse ResponseCallback ,
) ( [ ] byte , error ) {
2020-05-19 16:35:47 +02:00
// Limit the number of concurrent API requests.
concurrencyLimitChOnce . Do ( concurrencyLimitChInit )
t := timerpool . Get ( * maxWaitTime )
select {
case concurrencyLimitCh <- struct { } { } :
timerpool . Put ( t )
case <- t . C :
timerpool . Put ( t )
return nil , fmt . Errorf ( "too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d" ,
c . apiServer , * maxWaitTime , * maxConcurrency )
2023-02-24 00:13:08 +01:00
case <- ctx . Done ( ) :
timerpool . Put ( t )
return nil , ctx . Err ( )
2020-05-19 16:35:47 +02:00
}
2023-02-24 00:13:08 +01:00
data , err := c . getAPIResponseWithParamsAndClientCtx ( ctx , client , path , modifyRequest , inspectResponse )
<- concurrencyLimitCh
return data , err
2020-12-03 18:47:40 +01:00
}
2020-05-19 16:35:47 +02:00
2020-12-03 18:47:40 +01:00
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
2023-02-22 13:59:56 +01:00
func ( c * Client ) GetBlockingAPIResponse ( path string , inspectResponse ResponseCallback ) ( [ ] byte , error ) {
2023-01-18 06:47:11 +01:00
return c . getAPIResponseWithParamsAndClientCtx ( c . clientCtx , c . blockingClient , path , nil , inspectResponse )
}
// GetBlockingAPIResponseCtx returns response for given absolute path with blocking client and optional callback for api response,
2023-02-22 13:59:56 +01:00
func ( c * Client ) GetBlockingAPIResponseCtx ( ctx context . Context , path string , inspectResponse ResponseCallback ) ( [ ] byte , error ) {
2023-01-18 06:47:11 +01:00
return c . getAPIResponseWithParamsAndClientCtx ( ctx , c . blockingClient , path , nil , inspectResponse )
2020-12-03 18:47:40 +01:00
}
2021-06-22 12:33:37 +02:00
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response.
2023-02-22 13:59:56 +01:00
func ( c * Client ) getAPIResponseWithParamsAndClientCtx ( ctx context . Context , client * HTTPClient , path string , modifyRequest RequestCallback , inspectResponse ResponseCallback ) ( [ ] byte , error ) {
2020-05-04 19:48:02 +02:00
requestURL := c . apiServer + path
2023-01-06 04:34:47 +01:00
u , err := url . Parse ( requestURL )
if err != nil {
return nil , fmt . Errorf ( "cannot parse %q: %w" , requestURL , err )
}
2023-01-07 10:13:03 +01:00
deadline := time . Now ( ) . Add ( client . ReadTimeout )
2023-01-18 06:47:11 +01:00
ctx , cancel := context . WithDeadline ( ctx , deadline )
2023-01-06 04:34:47 +01:00
defer cancel ( )
2023-02-23 03:58:44 +01:00
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , u . String ( ) , nil )
2023-01-06 04:34:47 +01:00
if err != nil {
return nil , fmt . Errorf ( "cannot create request for %q: %w" , requestURL , err )
2021-04-03 23:40:08 +02:00
}
2023-01-06 04:34:47 +01:00
c . setHTTPHeaders ( req )
c . setHTTPProxyHeaders ( req )
2021-06-22 12:33:37 +02:00
if modifyRequest != nil {
2023-01-06 04:34:47 +01:00
modifyRequest ( req )
2021-06-22 12:33:37 +02:00
}
2020-12-03 18:47:40 +01:00
2023-02-24 21:11:44 +01:00
resp , err := doRequestWithPossibleRetry ( client , req )
2023-01-06 04:34:47 +01:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot fetch %q: %w" , requestURL , err )
2020-05-04 19:48:02 +02:00
}
2023-01-06 06:13:02 +01:00
data , err := io . ReadAll ( resp . Body )
_ = resp . Body . Close ( )
2023-01-06 04:34:47 +01:00
if err != nil {
2023-01-06 06:13:02 +01:00
return nil , fmt . Errorf ( "cannot read response from %q: %w" , requestURL , err )
2023-01-06 04:34:47 +01:00
}
2020-12-03 18:47:40 +01:00
if inspectResponse != nil {
2023-01-06 04:34:47 +01:00
inspectResponse ( resp )
2020-12-03 18:47:40 +01:00
}
2023-01-06 04:34:47 +01:00
statusCode := resp . StatusCode
if statusCode != http . StatusOK {
2020-05-04 19:48:02 +02:00
return nil , fmt . Errorf ( "unexpected status code returned from %q: %d; expecting %d; response body: %q" ,
2023-01-06 04:34:47 +01:00
requestURL , statusCode , http . StatusOK , data )
2020-05-04 19:48:02 +02:00
}
return data , nil
}
2020-08-13 21:31:42 +02:00
2022-12-09 03:29:10 +01:00
// APIServer returns the API server address
func ( c * Client ) APIServer ( ) string {
return c . apiServer
}
2023-01-06 04:34:47 +01:00
// Stop cancels all in-flight requests
func ( c * Client ) Stop ( ) {
c . clientCancel ( )
}
2023-02-24 21:11:44 +01:00
func doRequestWithPossibleRetry ( hc * HTTPClient , req * http . Request ) ( * http . Response , error ) {
2023-01-06 06:13:02 +01:00
discoveryRequests . Inc ( )
2023-01-06 04:34:47 +01:00
2023-02-24 20:39:56 +01:00
var (
reqErr error
resp * http . Response
)
// Return true if the request execution is completed and retry is not required
attempt := func ( ) bool {
resp , reqErr = hc . client . Do ( req )
if reqErr == nil {
2023-01-06 04:34:47 +01:00
statusCode := resp . StatusCode
2023-02-24 21:11:44 +01:00
if statusCode != http . StatusTooManyRequests {
2023-02-24 20:39:56 +01:00
return true
2022-08-16 13:52:38 +02:00
}
2023-06-09 09:26:33 +02:00
} else if ! errors . Is ( reqErr , net . ErrClosed ) && ! strings . Contains ( reqErr . Error ( ) , "broken pipe" ) {
2023-02-24 20:39:56 +01:00
return true
2020-08-13 21:31:42 +02:00
}
2023-02-24 20:39:56 +01:00
return false
}
if attempt ( ) {
return resp , reqErr
}
2023-02-24 21:11:44 +01:00
// The first attempt was unsuccessful. Use exponential backoff for further attempts.
// Perform the second attempt immediately after the first attempt - this should help
// in cases when the remote side closes the keep-alive connection before the first attempt.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293
2023-02-24 20:39:56 +01:00
sleepTime := time . Second
2023-02-24 21:11:44 +01:00
// It is expected that the deadline is already set to req.Context(), so the loop below
// should eventually finish if all the attempt() calls are unsuccessful.
ctx := req . Context ( )
2023-02-24 20:39:56 +01:00
for {
discoveryRetries . Inc ( )
if attempt ( ) {
return resp , reqErr
}
2021-05-13 09:38:43 +02:00
sleepTime += sleepTime
2023-02-24 21:11:44 +01:00
if ! SleepCtx ( ctx , sleepTime ) {
return resp , reqErr
2021-05-13 09:38:43 +02:00
}
2020-08-13 21:31:42 +02:00
}
}
2021-02-01 19:02:51 +01:00
var (
discoveryRequests = metrics . NewCounter ( ` vm_promscrape_discovery_requests_total ` )
2022-08-16 13:52:38 +02:00
discoveryRetries = metrics . NewCounter ( ` vm_promscrape_discovery_retries_total ` )
2021-02-01 19:02:51 +01:00
)
2023-02-24 21:11:44 +01:00
// SleepCtx sleeps for sleepDuration.
//
// It immediately returns false on ctx cancel or deadline, without waiting for sleepDuration.
func SleepCtx ( ctx context . Context , sleepDuration time . Duration ) bool {
t := timerpool . Get ( sleepDuration )
defer timerpool . Put ( t )
select {
case <- ctx . Done ( ) :
return false
case <- t . C :
return true
}
}