diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 3bdde100c8..0fe0d8adf2 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -2,17 +2,26 @@ package discoveryutils import ( "crypto/tls" + "flag" "fmt" "net" "net/http" "strings" + "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/fasthttp" ) +var ( + maxConcurrency = flag.Int("promscrape.discovery.concurrency", 500, "The maximum number of concurrent requests to Prometheus autodiscovery API (Consul, Kubernetes, etc.)") + maxWaitTime = flag.Duration("promscrape.discovery.concurrentWaitTime", time.Minute, "The maximum duration for waiting to perform API requests "+ + "if more than -promscrape.discovery.concurrency requests are simultaneously performed") +) + var defaultClient = &http.Client{ Timeout: 30 * time.Second, } @@ -56,6 +65,7 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { ReadTimeout: time.Minute, WriteTimeout: 10 * time.Second, MaxResponseBodySize: 300 * 1024 * 1024, + MaxConns: *maxConcurrency, } return &Client{ hc: hc, @@ -65,8 +75,30 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { }, nil } +var ( + concurrencyLimitCh chan struct{} + concurrencyLimitChOnce sync.Once +) + +func concurrencyLimitChInit() { + concurrencyLimitCh = make(chan struct{}, *maxConcurrency) +} + // GetAPIResponse returns response for the given absolute path. func (c *Client) GetAPIResponse(path string) ([]byte, error) { + // Limit the number of concurrent API requests. + concurrencyLimitChOnce.Do(concurrencyLimitChInit) + t := timerpool.Get(*maxWaitTime) + select { + case concurrencyLimitCh <- struct{}{}: + timerpool.Put(t) + case <-t.C: + timerpool.Put(t) + return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d", + c.apiServer, *maxWaitTime, *maxConcurrency) + } + defer func() { <-concurrencyLimitCh }() + requestURL := c.apiServer + path var u fasthttp.URI u.Update(requestURL)