VictoriaMetrics/lib/promscrape/discoveryutils/client.go
Aliaksandr Valialkin d13906bf1f lib/promscrape: exponentially increase retry interval on unsuccesful requests to scrape targets or to service discovery services
This should reduce CPU load at vmagent and at remote side when the remote side doesn't accept HTTP requests.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1289
2021-05-13 10:47:07 +03:00

255 lines
7.8 KiB
Go

package discoveryutils
import (
"crypto/tls"
"flag"
"fmt"
"net"
"net/http"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/fasthttp"
"github.com/VictoriaMetrics/metrics"
)
var (
maxConcurrency = flag.Int("promscrape.discovery.concurrency", 100, "The maximum number of concurrent requests to Prometheus autodiscovery API (Consul, Kubernetes, etc.)")
maxWaitTime = flag.Duration("promscrape.discovery.concurrentWaitTime", time.Minute, "The maximum duration for waiting to perform API requests "+
"if more than -promscrape.discovery.concurrency requests are simultaneously performed")
)
var defaultClient = &http.Client{
Timeout: 30 * time.Second,
}
// GetHTTPClient returns default client for http API requests.
func GetHTTPClient() *http.Client {
return defaultClient
}
// Client is http client, which talks to the given apiServer.
type Client struct {
// hc is used for short requests.
hc *fasthttp.HostClient
// blockingClient is used for long-polling requests.
blockingClient *fasthttp.HostClient
apiServer string
hostPort string
authHeader string
proxyAuthHeader string
sendFullURL bool
}
// NewClient returns new Client for the given args.
func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL, proxyAC *promauth.Config) (*Client, error) {
var u fasthttp.URI
u.Update(apiServer)
// special case for unix socket connection
var dialFunc fasthttp.DialFunc
if string(u.Scheme()) == "unix" {
dialAddr := string(u.Path())
apiServer = "http://"
dialFunc = func(_ string) (net.Conn, error) {
return net.Dial("unix", dialAddr)
}
}
hostPort := string(u.Host())
isTLS := string(u.Scheme()) == "https"
var tlsCfg *tls.Config
if isTLS {
tlsCfg = ac.NewTLSConfig()
}
sendFullURL := !isTLS && proxyURL.IsHTTPOrHTTPS()
proxyAuthHeader := ""
if sendFullURL {
// Send full urls in requests to a proxy host for non-TLS apiServer
// like net/http package from Go does.
// See https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers
pu := proxyURL.URL()
hostPort = pu.Host
isTLS = pu.Scheme == "https"
if isTLS {
tlsCfg = proxyAC.NewTLSConfig()
}
proxyAuthHeader = proxyURL.GetAuthHeader(proxyAC)
proxyURL = proxy.URL{}
}
if !strings.Contains(hostPort, ":") {
port := "80"
if isTLS {
port = "443"
}
hostPort = net.JoinHostPort(hostPort, port)
}
if dialFunc == nil {
var err error
dialFunc, err = proxyURL.NewDialFunc(proxyAC)
if err != nil {
return nil, err
}
}
hc := &fasthttp.HostClient{
Addr: hostPort,
Name: "vm_promscrape/discovery",
IsTLS: isTLS,
TLSConfig: tlsCfg,
ReadTimeout: time.Minute,
WriteTimeout: 10 * time.Second,
MaxResponseBodySize: 300 * 1024 * 1024,
MaxConns: 2 * *maxConcurrency,
Dial: dialFunc,
}
blockingClient := &fasthttp.HostClient{
Addr: hostPort,
Name: "vm_promscrape/discovery",
IsTLS: isTLS,
TLSConfig: tlsCfg,
ReadTimeout: BlockingClientReadTimeout,
WriteTimeout: 10 * time.Second,
MaxResponseBodySize: 300 * 1024 * 1024,
MaxConns: 64 * 1024,
Dial: dialFunc,
}
authHeader := ""
if ac != nil {
authHeader = ac.Authorization
}
return &Client{
hc: hc,
blockingClient: blockingClient,
apiServer: apiServer,
hostPort: hostPort,
authHeader: authHeader,
proxyAuthHeader: proxyAuthHeader,
sendFullURL: sendFullURL,
}, nil
}
// BlockingClientReadTimeout is the maximum duration for waiting the response from GetBlockingAPI*
const BlockingClientReadTimeout = 10 * time.Minute
var (
concurrencyLimitCh chan struct{}
concurrencyLimitChOnce sync.Once
)
func concurrencyLimitChInit() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrency)
}
// Addr returns the address the client connects to.
func (c *Client) Addr() string {
return c.hc.Addr
}
// GetAPIResponse returns response for the given absolute path.
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
// Limit the number of concurrent API requests.
concurrencyLimitChOnce.Do(concurrencyLimitChInit)
t := timerpool.Get(*maxWaitTime)
select {
case concurrencyLimitCh <- struct{}{}:
timerpool.Put(t)
case <-t.C:
timerpool.Put(t)
return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d",
c.apiServer, *maxWaitTime, *maxConcurrency)
}
defer func() { <-concurrencyLimitCh }()
return c.getAPIResponseWithParamsAndClient(c.hc, path, nil)
}
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
// inspectResponse - should never reference data from response.
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *fasthttp.Response)) ([]byte, error) {
return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, inspectResponse)
}
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for response.
func (c *Client) getAPIResponseWithParamsAndClient(client *fasthttp.HostClient, path string, inspectResponse func(resp *fasthttp.Response)) ([]byte, error) {
requestURL := c.apiServer + path
var u fasthttp.URI
u.Update(requestURL)
var req fasthttp.Request
if c.sendFullURL {
req.SetRequestURIBytes(u.FullURI())
} else {
req.SetRequestURIBytes(u.RequestURI())
}
req.Header.SetHost(c.hostPort)
req.Header.Set("Accept-Encoding", "gzip")
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
if c.proxyAuthHeader != "" {
req.Header.Set("Proxy-Authorization", c.proxyAuthHeader)
}
var resp fasthttp.Response
deadline := time.Now().Add(client.ReadTimeout)
if err := doRequestWithPossibleRetry(client, &req, &resp, deadline); err != nil {
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
}
var data []byte
if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" {
dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body())
if err != nil {
return nil, fmt.Errorf("cannot ungzip response from %q: %w", requestURL, err)
}
data = dst
} else {
data = append(data[:0], resp.Body()...)
}
if inspectResponse != nil {
inspectResponse(&resp)
}
statusCode := resp.StatusCode()
if statusCode != fasthttp.StatusOK {
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
requestURL, statusCode, fasthttp.StatusOK, data)
}
return data, nil
}
func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
sleepTime := time.Second
discoveryRequests.Inc()
for {
// Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline
// across multiple retries.
err := hc.DoDeadline(req, resp, deadline)
if err == nil {
return nil
}
if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") {
return err
}
// Retry request if the server closes the keep-alive connection unless deadline exceeds.
maxSleepTime := time.Until(deadline)
if sleepTime > maxSleepTime {
return fmt.Errorf("the server closes all the connection attempts: %w", err)
}
sleepTime += sleepTime
if sleepTime > maxSleepTime {
sleepTime = maxSleepTime
}
time.Sleep(sleepTime)
discoveryRetries.Inc()
}
}
var (
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)
discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`)
)