mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
c939a8e8a2
* lib/promscrape/discovery/azure: remove API server from URL returned by azure * lib/promscrape/discovery/azure: validate nextLink contains same URL as apiServer
286 lines
9.4 KiB
Go
286 lines
9.4 KiB
Go
package discoveryutils
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"flag"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
|
"github.com/VictoriaMetrics/fasthttp"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
maxConcurrency = flag.Int("promscrape.discovery.concurrency", 100, "The maximum number of concurrent requests to Prometheus autodiscovery API (Consul, Kubernetes, etc.)")
|
|
maxWaitTime = flag.Duration("promscrape.discovery.concurrentWaitTime", time.Minute, "The maximum duration for waiting to perform API requests "+
|
|
"if more than -promscrape.discovery.concurrency requests are simultaneously performed")
|
|
)
|
|
|
|
var defaultClient = &http.Client{
|
|
Timeout: 30 * time.Second,
|
|
}
|
|
|
|
// GetHTTPClient returns default client for http API requests.
|
|
func GetHTTPClient() *http.Client {
|
|
return defaultClient
|
|
}
|
|
|
|
// Client is http client, which talks to the given apiServer.
|
|
type Client struct {
|
|
// hc is used for short requests.
|
|
hc *fasthttp.HostClient
|
|
|
|
// blockingClient is used for long-polling requests.
|
|
blockingClient *fasthttp.HostClient
|
|
|
|
apiServer string
|
|
|
|
hostPort string
|
|
setFasthttpHeaders func(req *fasthttp.Request)
|
|
setFasthttpProxyHeaders func(req *fasthttp.Request)
|
|
sendFullURL bool
|
|
}
|
|
|
|
func addMissingPort(addr string, isTLS bool) string {
|
|
if strings.Contains(addr, ":") {
|
|
return addr
|
|
}
|
|
if isTLS {
|
|
return addr + ":443"
|
|
}
|
|
return addr + ":80"
|
|
}
|
|
|
|
// NewClient returns new Client for the given args.
|
|
func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxyAC *promauth.Config) (*Client, error) {
|
|
var u fasthttp.URI
|
|
u.Update(apiServer)
|
|
|
|
// special case for unix socket connection
|
|
var dialFunc fasthttp.DialFunc
|
|
if string(u.Scheme()) == "unix" {
|
|
dialAddr := string(u.Path())
|
|
apiServer = "http://"
|
|
dialFunc = func(_ string) (net.Conn, error) {
|
|
return net.Dial("unix", dialAddr)
|
|
}
|
|
}
|
|
|
|
hostPort := string(u.Host())
|
|
dialAddr := hostPort
|
|
isTLS := string(u.Scheme()) == "https"
|
|
var tlsCfg *tls.Config
|
|
if isTLS {
|
|
tlsCfg = ac.NewTLSConfig()
|
|
}
|
|
sendFullURL := !isTLS && proxyURL.IsHTTPOrHTTPS()
|
|
setFasthttpProxyHeaders := func(req *fasthttp.Request) {}
|
|
if sendFullURL {
|
|
// Send full urls in requests to a proxy host for non-TLS apiServer
|
|
// like net/http package from Go does.
|
|
// See https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers
|
|
pu := proxyURL.GetURL()
|
|
dialAddr = pu.Host
|
|
isTLS = pu.Scheme == "https"
|
|
if isTLS {
|
|
tlsCfg = proxyAC.NewTLSConfig()
|
|
}
|
|
proxyURLOrig := proxyURL
|
|
setFasthttpProxyHeaders = func(req *fasthttp.Request) {
|
|
proxyURLOrig.SetFasthttpHeaders(proxyAC, req)
|
|
}
|
|
proxyURL = &proxy.URL{}
|
|
}
|
|
hostPort = addMissingPort(hostPort, isTLS)
|
|
dialAddr = addMissingPort(dialAddr, isTLS)
|
|
if dialFunc == nil {
|
|
var err error
|
|
dialFunc, err = proxyURL.NewDialFunc(proxyAC)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
hc := &fasthttp.HostClient{
|
|
Addr: dialAddr,
|
|
Name: "vm_promscrape/discovery",
|
|
IsTLS: isTLS,
|
|
TLSConfig: tlsCfg,
|
|
ReadTimeout: time.Minute,
|
|
WriteTimeout: 10 * time.Second,
|
|
MaxResponseBodySize: 300 * 1024 * 1024,
|
|
MaxConns: 2 * *maxConcurrency,
|
|
Dial: dialFunc,
|
|
}
|
|
blockingClient := &fasthttp.HostClient{
|
|
Addr: dialAddr,
|
|
Name: "vm_promscrape/discovery",
|
|
IsTLS: isTLS,
|
|
TLSConfig: tlsCfg,
|
|
ReadTimeout: BlockingClientReadTimeout,
|
|
WriteTimeout: 10 * time.Second,
|
|
MaxResponseBodySize: 300 * 1024 * 1024,
|
|
MaxConns: 64 * 1024,
|
|
Dial: dialFunc,
|
|
}
|
|
setFasthttpHeaders := func(req *fasthttp.Request) {}
|
|
if ac != nil {
|
|
setFasthttpHeaders = func(req *fasthttp.Request) { ac.SetFasthttpHeaders(req, true) }
|
|
}
|
|
return &Client{
|
|
hc: hc,
|
|
blockingClient: blockingClient,
|
|
apiServer: apiServer,
|
|
hostPort: hostPort,
|
|
setFasthttpHeaders: setFasthttpHeaders,
|
|
setFasthttpProxyHeaders: setFasthttpProxyHeaders,
|
|
sendFullURL: sendFullURL,
|
|
}, nil
|
|
}
|
|
|
|
// BlockingClientReadTimeout is the maximum duration for waiting the response from GetBlockingAPI*
|
|
const BlockingClientReadTimeout = 10 * time.Minute
|
|
|
|
var (
|
|
concurrencyLimitCh chan struct{}
|
|
concurrencyLimitChOnce sync.Once
|
|
)
|
|
|
|
func concurrencyLimitChInit() {
|
|
concurrencyLimitCh = make(chan struct{}, *maxConcurrency)
|
|
}
|
|
|
|
// Addr returns the address the client connects to.
|
|
func (c *Client) Addr() string {
|
|
return c.hc.Addr
|
|
}
|
|
|
|
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
|
|
// modifyRequestParams should never reference data from request.
|
|
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequestParams func(request *fasthttp.Request)) ([]byte, error) {
|
|
return c.getAPIResponse(path, modifyRequestParams)
|
|
}
|
|
|
|
// GetAPIResponse returns response for the given absolute path.
|
|
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
|
return c.getAPIResponse(path, nil)
|
|
}
|
|
|
|
// GetAPIResponse returns response for the given absolute path with optional callback for request.
|
|
func (c *Client) getAPIResponse(path string, modifyRequest func(request *fasthttp.Request)) ([]byte, error) {
|
|
// Limit the number of concurrent API requests.
|
|
concurrencyLimitChOnce.Do(concurrencyLimitChInit)
|
|
t := timerpool.Get(*maxWaitTime)
|
|
select {
|
|
case concurrencyLimitCh <- struct{}{}:
|
|
timerpool.Put(t)
|
|
case <-t.C:
|
|
timerpool.Put(t)
|
|
return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d",
|
|
c.apiServer, *maxWaitTime, *maxConcurrency)
|
|
}
|
|
defer func() { <-concurrencyLimitCh }()
|
|
return c.getAPIResponseWithParamsAndClient(c.hc, path, modifyRequest, nil)
|
|
}
|
|
|
|
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
|
|
// inspectResponse - should never reference data from response.
|
|
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *fasthttp.Response)) ([]byte, error) {
|
|
return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, nil, inspectResponse)
|
|
}
|
|
|
|
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response.
|
|
func (c *Client) getAPIResponseWithParamsAndClient(client *fasthttp.HostClient, path string, modifyRequest func(req *fasthttp.Request), inspectResponse func(resp *fasthttp.Response)) ([]byte, error) {
|
|
requestURL := c.apiServer + path
|
|
var u fasthttp.URI
|
|
u.Update(requestURL)
|
|
var req fasthttp.Request
|
|
if c.sendFullURL {
|
|
req.SetRequestURIBytes(u.FullURI())
|
|
} else {
|
|
req.SetRequestURIBytes(u.RequestURI())
|
|
}
|
|
req.Header.SetHost(c.hostPort)
|
|
req.Header.Set("Accept-Encoding", "gzip")
|
|
c.setFasthttpHeaders(&req)
|
|
c.setFasthttpProxyHeaders(&req)
|
|
if modifyRequest != nil {
|
|
modifyRequest(&req)
|
|
}
|
|
|
|
var resp fasthttp.Response
|
|
deadline := time.Now().Add(client.ReadTimeout)
|
|
if err := doRequestWithPossibleRetry(client, &req, &resp, deadline); err != nil {
|
|
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
|
|
}
|
|
var data []byte
|
|
if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" {
|
|
dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot ungzip response from %q: %w", requestURL, err)
|
|
}
|
|
data = dst
|
|
} else {
|
|
data = append(data[:0], resp.Body()...)
|
|
}
|
|
if inspectResponse != nil {
|
|
inspectResponse(&resp)
|
|
}
|
|
statusCode := resp.StatusCode()
|
|
if statusCode != fasthttp.StatusOK {
|
|
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
|
|
requestURL, statusCode, fasthttp.StatusOK, data)
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
// APIServer returns the API server address
|
|
func (c *Client) APIServer() string {
|
|
return c.apiServer
|
|
}
|
|
|
|
// DoRequestWithPossibleRetry performs the given req at hc and stores the response at resp.
|
|
func DoRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time, requestCounter, retryCounter *metrics.Counter) error {
|
|
sleepTime := time.Second
|
|
requestCounter.Inc()
|
|
for {
|
|
// Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline
|
|
// across multiple retries.
|
|
err := hc.DoDeadline(req, resp, deadline)
|
|
if err == nil {
|
|
statusCode := resp.StatusCode()
|
|
if statusCode != fasthttp.StatusTooManyRequests {
|
|
return nil
|
|
}
|
|
} else if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") {
|
|
return err
|
|
}
|
|
// Retry request after exponentially increased sleep.
|
|
maxSleepTime := time.Until(deadline)
|
|
if sleepTime > maxSleepTime {
|
|
return fmt.Errorf("the server closes all the connection attempts: %w", err)
|
|
}
|
|
sleepTime += sleepTime
|
|
if sleepTime > maxSleepTime {
|
|
sleepTime = maxSleepTime
|
|
}
|
|
time.Sleep(sleepTime)
|
|
retryCounter.Inc()
|
|
}
|
|
}
|
|
|
|
func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
|
|
return DoRequestWithPossibleRetry(hc, req, resp, deadline, discoveryRequests, discoveryRetries)
|
|
}
|
|
|
|
var (
|
|
discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`)
|
|
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)
|
|
)
|