mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
lib/promscrape: add -promscrape.discovery.concurrency
and -promscrape.discovery.concurrentWaitTime
flags for tuning the number of concurrent requests to autodiscovery API servers at Consul or Kubernetes
This commit is contained in:
parent
7d46dd452a
commit
73ec5cf460
@ -2,17 +2,26 @@ package discoveryutils
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||||
"github.com/VictoriaMetrics/fasthttp"
|
"github.com/VictoriaMetrics/fasthttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
maxConcurrency = flag.Int("promscrape.discovery.concurrency", 500, "The maximum number of concurrent requests to Prometheus autodiscovery API (Consul, Kubernetes, etc.)")
|
||||||
|
maxWaitTime = flag.Duration("promscrape.discovery.concurrentWaitTime", time.Minute, "The maximum duration for waiting to perform API requests "+
|
||||||
|
"if more than -promscrape.discovery.concurrency requests are simultaneously performed")
|
||||||
|
)
|
||||||
|
|
||||||
var defaultClient = &http.Client{
|
var defaultClient = &http.Client{
|
||||||
Timeout: 30 * time.Second,
|
Timeout: 30 * time.Second,
|
||||||
}
|
}
|
||||||
@ -56,6 +65,7 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
|
|||||||
ReadTimeout: time.Minute,
|
ReadTimeout: time.Minute,
|
||||||
WriteTimeout: 10 * time.Second,
|
WriteTimeout: 10 * time.Second,
|
||||||
MaxResponseBodySize: 300 * 1024 * 1024,
|
MaxResponseBodySize: 300 * 1024 * 1024,
|
||||||
|
MaxConns: *maxConcurrency,
|
||||||
}
|
}
|
||||||
return &Client{
|
return &Client{
|
||||||
hc: hc,
|
hc: hc,
|
||||||
@ -65,8 +75,30 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
concurrencyLimitCh chan struct{}
|
||||||
|
concurrencyLimitChOnce sync.Once
|
||||||
|
)
|
||||||
|
|
||||||
|
func concurrencyLimitChInit() {
|
||||||
|
concurrencyLimitCh = make(chan struct{}, *maxConcurrency)
|
||||||
|
}
|
||||||
|
|
||||||
// GetAPIResponse returns response for the given absolute path.
|
// GetAPIResponse returns response for the given absolute path.
|
||||||
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
||||||
|
// Limit the number of concurrent API requests.
|
||||||
|
concurrencyLimitChOnce.Do(concurrencyLimitChInit)
|
||||||
|
t := timerpool.Get(*maxWaitTime)
|
||||||
|
select {
|
||||||
|
case concurrencyLimitCh <- struct{}{}:
|
||||||
|
timerpool.Put(t)
|
||||||
|
case <-t.C:
|
||||||
|
timerpool.Put(t)
|
||||||
|
return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d",
|
||||||
|
c.apiServer, *maxWaitTime, *maxConcurrency)
|
||||||
|
}
|
||||||
|
defer func() { <-concurrencyLimitCh }()
|
||||||
|
|
||||||
requestURL := c.apiServer + path
|
requestURL := c.apiServer + path
|
||||||
var u fasthttp.URI
|
var u fasthttp.URI
|
||||||
u.Update(requestURL)
|
u.Update(requestURL)
|
||||||
|
Loading…
Reference in New Issue
Block a user