2020-04-29 16:27:08 +02:00
package discoveryutils
import (
2020-05-04 19:48:02 +02:00
"crypto/tls"
2020-05-19 16:35:47 +02:00
"flag"
2020-05-04 19:48:02 +02:00
"fmt"
"net"
2020-04-29 16:27:08 +02:00
"net/http"
2020-05-04 19:48:02 +02:00
"strings"
2020-05-19 16:35:47 +02:00
"sync"
2020-04-29 16:27:08 +02:00
"time"
2020-05-04 19:48:02 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2020-12-24 09:56:10 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
2020-05-19 16:35:47 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2020-05-04 19:48:02 +02:00
"github.com/VictoriaMetrics/fasthttp"
2021-02-01 19:02:51 +01:00
"github.com/VictoriaMetrics/metrics"
2020-04-29 16:27:08 +02:00
)
2020-05-19 16:35:47 +02:00
var (
2020-06-20 16:52:49 +02:00
maxConcurrency = flag . Int ( "promscrape.discovery.concurrency" , 100 , "The maximum number of concurrent requests to Prometheus autodiscovery API (Consul, Kubernetes, etc.)" )
2020-05-19 16:35:47 +02:00
maxWaitTime = flag . Duration ( "promscrape.discovery.concurrentWaitTime" , time . Minute , "The maximum duration for waiting to perform API requests " +
"if more than -promscrape.discovery.concurrency requests are simultaneously performed" )
)
2020-04-29 16:27:08 +02:00
var defaultClient = & http . Client {
Timeout : 30 * time . Second ,
}
// GetHTTPClient returns default client for http API requests.
func GetHTTPClient ( ) * http . Client {
return defaultClient
}
2020-05-04 19:48:02 +02:00
// Client is http client, which talks to the given apiServer.
type Client struct {
2020-12-03 18:50:50 +01:00
// hc is used for short requests.
2020-12-03 18:47:40 +01:00
hc * fasthttp . HostClient
2020-12-03 18:50:50 +01:00
// blockingClient is used for long-polling requests.
2020-12-03 18:47:40 +01:00
blockingClient * fasthttp . HostClient
2020-12-03 18:50:50 +01:00
apiServer string
2021-04-03 23:40:08 +02:00
2021-05-14 19:00:05 +02:00
hostPort string
getAuthHeader func ( ) string
getProxyAuthHeader func ( ) string
sendFullURL bool
2020-05-04 19:48:02 +02:00
}
2021-04-03 23:40:08 +02:00
// NewClient returns new Client for the given args.
func NewClient ( apiServer string , ac * promauth . Config , proxyURL proxy . URL , proxyAC * promauth . Config ) ( * Client , error ) {
var u fasthttp . URI
2020-05-04 19:48:02 +02:00
u . Update ( apiServer )
2020-10-12 12:38:21 +02:00
// special case for unix socket connection
2021-04-03 23:40:08 +02:00
var dialFunc fasthttp . DialFunc
2020-10-12 12:38:21 +02:00
if string ( u . Scheme ( ) ) == "unix" {
dialAddr := string ( u . Path ( ) )
apiServer = "http://"
dialFunc = func ( _ string ) ( net . Conn , error ) {
return net . Dial ( "unix" , dialAddr )
}
}
2020-12-24 09:52:37 +01:00
2020-05-04 19:48:02 +02:00
hostPort := string ( u . Host ( ) )
isTLS := string ( u . Scheme ( ) ) == "https"
2021-04-03 23:40:08 +02:00
var tlsCfg * tls . Config
2021-03-09 17:54:09 +01:00
if isTLS {
2020-05-04 19:48:02 +02:00
tlsCfg = ac . NewTLSConfig ( )
}
2021-04-03 23:40:08 +02:00
sendFullURL := ! isTLS && proxyURL . IsHTTPOrHTTPS ( )
2021-05-14 19:00:05 +02:00
getProxyAuthHeader := func ( ) string { return "" }
2021-04-03 23:40:08 +02:00
if sendFullURL {
// Send full urls in requests to a proxy host for non-TLS apiServer
// like net/http package from Go does.
// See https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers
pu := proxyURL . URL ( )
hostPort = pu . Host
isTLS = pu . Scheme == "https"
if isTLS {
tlsCfg = proxyAC . NewTLSConfig ( )
}
2021-05-14 19:00:05 +02:00
getProxyAuthHeader = func ( ) string {
return proxyURL . GetAuthHeader ( proxyAC )
}
2021-04-03 23:40:08 +02:00
proxyURL = proxy . URL { }
}
2020-05-04 19:48:02 +02:00
if ! strings . Contains ( hostPort , ":" ) {
port := "80"
if isTLS {
port = "443"
}
hostPort = net . JoinHostPort ( hostPort , port )
}
2020-12-24 09:56:10 +01:00
if dialFunc == nil {
2021-04-03 23:40:08 +02:00
var err error
dialFunc , err = proxyURL . NewDialFunc ( proxyAC )
2020-12-24 09:56:10 +01:00
if err != nil {
return nil , err
}
}
2020-05-04 19:48:02 +02:00
hc := & fasthttp . HostClient {
Addr : hostPort ,
Name : "vm_promscrape/discovery" ,
IsTLS : isTLS ,
TLSConfig : tlsCfg ,
ReadTimeout : time . Minute ,
WriteTimeout : 10 * time . Second ,
MaxResponseBodySize : 300 * 1024 * 1024 ,
2020-06-20 16:52:49 +02:00
MaxConns : 2 * * maxConcurrency ,
2020-10-12 12:38:21 +02:00
Dial : dialFunc ,
2020-05-04 19:48:02 +02:00
}
2020-12-03 18:50:50 +01:00
blockingClient := & fasthttp . HostClient {
2020-12-03 18:47:40 +01:00
Addr : hostPort ,
Name : "vm_promscrape/discovery" ,
IsTLS : isTLS ,
TLSConfig : tlsCfg ,
2020-12-11 16:22:37 +01:00
ReadTimeout : BlockingClientReadTimeout ,
2020-12-03 18:47:40 +01:00
WriteTimeout : 10 * time . Second ,
MaxResponseBodySize : 300 * 1024 * 1024 ,
2020-12-05 11:13:57 +01:00
MaxConns : 64 * 1024 ,
2020-12-03 18:47:40 +01:00
Dial : dialFunc ,
}
2021-05-14 19:00:05 +02:00
getAuthHeader := func ( ) string { return "" }
2021-04-03 23:40:08 +02:00
if ac != nil {
2021-05-14 19:00:05 +02:00
getAuthHeader = ac . GetAuthHeader
2021-04-03 23:40:08 +02:00
}
2020-05-04 19:48:02 +02:00
return & Client {
2021-05-14 19:00:05 +02:00
hc : hc ,
blockingClient : blockingClient ,
apiServer : apiServer ,
hostPort : hostPort ,
getAuthHeader : getAuthHeader ,
getProxyAuthHeader : getProxyAuthHeader ,
sendFullURL : sendFullURL ,
2020-05-04 19:48:02 +02:00
} , nil
}
2020-12-11 16:22:37 +01:00
// BlockingClientReadTimeout is the maximum duration for waiting the response from GetBlockingAPI*
const BlockingClientReadTimeout = 10 * time . Minute
2020-05-19 16:35:47 +02:00
var (
concurrencyLimitCh chan struct { }
concurrencyLimitChOnce sync . Once
)
func concurrencyLimitChInit ( ) {
concurrencyLimitCh = make ( chan struct { } , * maxConcurrency )
}
2020-12-03 18:50:50 +01:00
// Addr returns the address the client connects to.
func ( c * Client ) Addr ( ) string {
return c . hc . Addr
2020-12-03 18:47:40 +01:00
}
2021-06-22 12:33:37 +02:00
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
// modifyRequestParams should never reference data from request.
func ( c * Client ) GetAPIResponseWithReqParams ( path string , modifyRequestParams func ( request * fasthttp . Request ) ) ( [ ] byte , error ) {
return c . getAPIResponse ( path , modifyRequestParams )
}
2020-05-04 19:48:02 +02:00
// GetAPIResponse returns response for the given absolute path.
func ( c * Client ) GetAPIResponse ( path string ) ( [ ] byte , error ) {
2021-06-22 12:33:37 +02:00
return c . getAPIResponse ( path , nil )
}
// GetAPIResponse returns response for the given absolute path with optional callback for request.
func ( c * Client ) getAPIResponse ( path string , modifyRequest func ( request * fasthttp . Request ) ) ( [ ] byte , error ) {
2020-05-19 16:35:47 +02:00
// Limit the number of concurrent API requests.
concurrencyLimitChOnce . Do ( concurrencyLimitChInit )
t := timerpool . Get ( * maxWaitTime )
select {
case concurrencyLimitCh <- struct { } { } :
timerpool . Put ( t )
case <- t . C :
timerpool . Put ( t )
return nil , fmt . Errorf ( "too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d" ,
c . apiServer , * maxWaitTime , * maxConcurrency )
}
defer func ( ) { <- concurrencyLimitCh } ( )
2021-06-22 12:33:37 +02:00
return c . getAPIResponseWithParamsAndClient ( c . hc , path , modifyRequest , nil )
2020-12-03 18:47:40 +01:00
}
2020-05-19 16:35:47 +02:00
2020-12-03 18:47:40 +01:00
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
// inspectResponse - should never reference data from response.
func ( c * Client ) GetBlockingAPIResponse ( path string , inspectResponse func ( resp * fasthttp . Response ) ) ( [ ] byte , error ) {
2021-06-22 12:33:37 +02:00
return c . getAPIResponseWithParamsAndClient ( c . blockingClient , path , nil , inspectResponse )
2020-12-03 18:47:40 +01:00
}
2021-06-22 12:33:37 +02:00
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response.
func ( c * Client ) getAPIResponseWithParamsAndClient ( client * fasthttp . HostClient , path string , modifyRequest func ( req * fasthttp . Request ) , inspectResponse func ( resp * fasthttp . Response ) ) ( [ ] byte , error ) {
2020-05-04 19:48:02 +02:00
requestURL := c . apiServer + path
var u fasthttp . URI
u . Update ( requestURL )
var req fasthttp . Request
2021-04-03 23:40:08 +02:00
if c . sendFullURL {
req . SetRequestURIBytes ( u . FullURI ( ) )
} else {
req . SetRequestURIBytes ( u . RequestURI ( ) )
}
2021-04-04 00:18:24 +02:00
req . Header . SetHost ( c . hostPort )
2020-05-04 19:48:02 +02:00
req . Header . Set ( "Accept-Encoding" , "gzip" )
2021-05-14 19:00:05 +02:00
if ah := c . getAuthHeader ( ) ; ah != "" {
req . Header . Set ( "Authorization" , ah )
2021-04-03 23:40:08 +02:00
}
2021-05-14 19:00:05 +02:00
if ah := c . getProxyAuthHeader ( ) ; ah != "" {
req . Header . Set ( "Proxy-Authorization" , ah )
2020-05-04 19:48:02 +02:00
}
2021-06-22 12:33:37 +02:00
if modifyRequest != nil {
modifyRequest ( & req )
}
2020-12-03 18:47:40 +01:00
2020-05-04 19:48:02 +02:00
var resp fasthttp . Response
2020-12-03 18:47:40 +01:00
deadline := time . Now ( ) . Add ( client . ReadTimeout )
if err := doRequestWithPossibleRetry ( client , & req , & resp , deadline ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot fetch %q: %w" , requestURL , err )
2020-05-04 19:48:02 +02:00
}
var data [ ] byte
if ce := resp . Header . Peek ( "Content-Encoding" ) ; string ( ce ) == "gzip" {
dst , err := fasthttp . AppendGunzipBytes ( nil , resp . Body ( ) )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot ungzip response from %q: %w" , requestURL , err )
2020-05-04 19:48:02 +02:00
}
data = dst
} else {
data = append ( data [ : 0 ] , resp . Body ( ) ... )
}
2020-12-03 18:47:40 +01:00
if inspectResponse != nil {
inspectResponse ( & resp )
}
2020-05-04 19:48:02 +02:00
statusCode := resp . StatusCode ( )
if statusCode != fasthttp . StatusOK {
return nil , fmt . Errorf ( "unexpected status code returned from %q: %d; expecting %d; response body: %q" ,
requestURL , statusCode , fasthttp . StatusOK , data )
}
return data , nil
}
2020-08-13 21:31:42 +02:00
func doRequestWithPossibleRetry ( hc * fasthttp . HostClient , req * fasthttp . Request , resp * fasthttp . Response , deadline time . Time ) error {
2021-05-13 09:38:43 +02:00
sleepTime := time . Second
2021-02-01 19:02:51 +01:00
discoveryRequests . Inc ( )
2020-08-13 21:31:42 +02:00
for {
// Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline
// across multiple retries.
err := hc . DoDeadline ( req , resp , deadline )
if err == nil {
return nil
}
2021-01-22 12:22:20 +01:00
if err != fasthttp . ErrConnectionClosed && ! strings . Contains ( err . Error ( ) , "broken pipe" ) {
2020-08-13 21:31:42 +02:00
return err
}
// Retry request if the server closes the keep-alive connection unless deadline exceeds.
2021-05-13 09:38:43 +02:00
maxSleepTime := time . Until ( deadline )
if sleepTime > maxSleepTime {
2020-08-13 21:31:42 +02:00
return fmt . Errorf ( "the server closes all the connection attempts: %w" , err )
}
2021-05-13 09:38:43 +02:00
sleepTime += sleepTime
if sleepTime > maxSleepTime {
sleepTime = maxSleepTime
}
time . Sleep ( sleepTime )
2021-02-01 19:02:51 +01:00
discoveryRetries . Inc ( )
2020-08-13 21:31:42 +02:00
}
}
2021-02-01 19:02:51 +01:00
var (
discoveryRetries = metrics . NewCounter ( ` vm_promscrape_discovery_retries_total ` )
discoveryRequests = metrics . NewCounter ( ` vm_promscrape_discovery_requests_total ` )
)