mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
lib/promscrape: follow-up after bced9fb978
- Document the bugfix at docs/CHANGELOG.md - Wait until all the worker goroutines are done in consulWatcher.mustStop() - Do not log `context canceled` errors when discovering consul serviceNames - Removed explicit handling of gzipped responses at lib/promscrape/discoveryutils.Client, since this handling is automatically performed by net/http.Transport. See DisableCompression option at https://pkg.go.dev/net/http#Transport . - Remove explicit handling of the proxyURL, since it is automatically handled by net/http.Transport. See Proxy option at https://pkg.go.dev/net/http#Transport . - Expliticly set MaxIdleConnsPerHost, since its default value equals to 2. Such a small value may result in excess tcp connection churn when more than 2 concurrent requests are processed by lib/promscrape/discoveryutils.Client. - Do not set explicitly the `Host` request header, since it is automatically set by net/http.Client. - Backport the bugfix to the recently added nomad_sd_configs - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468
This commit is contained in:
parent
de5aad2cde
commit
54410bf51b
@ -31,6 +31,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly parse durations with uppercase suffixes such as `10S`, `5MS`, `1W`, etc. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3589).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix a panic during target discovery when `vmagent` runs with `-promscrape.dropOriginalLabels` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3580). The bug has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): properly encode `filters` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix possible resource leak after hot reload of the updated [consul_sd_configs](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468).
|
||||
|
||||
|
||||
## [v1.85.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.3)
|
||||
|
@ -34,6 +34,7 @@ type consulWatcher struct {
|
||||
servicesLock sync.Mutex
|
||||
services map[string]*serviceWatcher
|
||||
|
||||
servicesWG sync.WaitGroup
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
}
|
||||
@ -73,7 +74,11 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
initCh := make(chan struct{})
|
||||
go cw.watchForServicesUpdates(initCh)
|
||||
cw.wg.Add(1)
|
||||
go func() {
|
||||
cw.watchForServicesUpdates(initCh)
|
||||
cw.wg.Done()
|
||||
}()
|
||||
// wait for initialization to complete
|
||||
<-initCh
|
||||
return cw
|
||||
@ -82,6 +87,7 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
|
||||
func (cw *consulWatcher) mustStop() {
|
||||
close(cw.stopCh)
|
||||
cw.client.Stop()
|
||||
cw.wg.Wait()
|
||||
}
|
||||
|
||||
func (cw *consulWatcher) updateServices(serviceNames []string) {
|
||||
@ -98,14 +104,14 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
cw.services[serviceName] = sw
|
||||
cw.wg.Add(1)
|
||||
cw.servicesWG.Add(1)
|
||||
serviceWatchersCreated.Inc()
|
||||
initWG.Add(1)
|
||||
go func() {
|
||||
serviceWatchersCount.Inc()
|
||||
sw.watchForServiceNodesUpdates(cw, &initWG)
|
||||
serviceWatchersCount.Dec()
|
||||
cw.wg.Done()
|
||||
cw.servicesWG.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
@ -136,11 +142,13 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
|
||||
// watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done.
|
||||
func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
index := int64(0)
|
||||
clientAddr := cw.client.Addr()
|
||||
apiServer := cw.client.APIServer()
|
||||
f := func() {
|
||||
serviceNames, newIndex, err := cw.getBlockingServiceNames(index)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot obtain Consul serviceNames from %q: %s", clientAddr, err)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("cannot obtain Consul serviceNames from %q: %s", apiServer, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if index == newIndex {
|
||||
@ -151,7 +159,7 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
index = newIndex
|
||||
}
|
||||
|
||||
logger.Infof("started Consul service watcher for %q", clientAddr)
|
||||
logger.Infof("started Consul service watcher for %q", apiServer)
|
||||
f()
|
||||
|
||||
// send signal that initialization is complete
|
||||
@ -165,15 +173,15 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-cw.stopCh:
|
||||
logger.Infof("stopping Consul service watchers for %q", clientAddr)
|
||||
logger.Infof("stopping Consul service watchers for %q", apiServer)
|
||||
startTime := time.Now()
|
||||
cw.servicesLock.Lock()
|
||||
for _, sw := range cw.services {
|
||||
close(sw.stopCh)
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
cw.wg.Wait()
|
||||
logger.Infof("stopped Consul service watcher for %q in %.3f seconds", clientAddr, time.Since(startTime).Seconds())
|
||||
cw.servicesWG.Wait()
|
||||
logger.Infof("stopped Consul service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -219,16 +227,15 @@ func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64,
|
||||
//
|
||||
// watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done.
|
||||
func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG *sync.WaitGroup) {
|
||||
clientAddr := cw.client.Addr()
|
||||
apiServer := cw.client.APIServer()
|
||||
index := int64(0)
|
||||
path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs
|
||||
f := func() {
|
||||
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
|
||||
}
|
||||
logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
|
||||
return
|
||||
}
|
||||
if index == newIndex {
|
||||
@ -237,7 +244,7 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG
|
||||
}
|
||||
sns, err := parseServiceNodes(data)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
|
||||
logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package nomad
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -11,7 +12,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/fasthttp"
|
||||
)
|
||||
|
||||
var waitTime = flag.Duration("promscrape.nomad.waitTime", 0, "Wait time used by Nomad service discovery. Default value is used if not set")
|
||||
@ -138,13 +138,13 @@ func maxWaitTime() time.Duration {
|
||||
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
|
||||
path += "&index=" + strconv.FormatInt(index, 10)
|
||||
path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds()))
|
||||
getMeta := func(resp *fasthttp.Response) {
|
||||
ind := resp.Header.Peek("X-Nomad-Index")
|
||||
getMeta := func(resp *http.Response) {
|
||||
ind := resp.Header.Get("X-Nomad-Index")
|
||||
if len(ind) == 0 {
|
||||
logger.Errorf("cannot find X-Nomad-Index header in response from %q", path)
|
||||
return
|
||||
}
|
||||
newIndex, err := strconv.ParseInt(string(ind), 10, 64)
|
||||
newIndex, err := strconv.ParseInt(ind, 10, 64)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot parse X-Nomad-Index header value in response from %q: %s", path, err)
|
||||
return
|
||||
|
@ -1,7 +1,9 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/url"
|
||||
@ -31,6 +33,7 @@ type nomadWatcher struct {
|
||||
servicesLock sync.Mutex
|
||||
services map[string]*serviceWatcher
|
||||
|
||||
servicesWG sync.WaitGroup
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
}
|
||||
@ -61,7 +64,11 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
initCh := make(chan struct{})
|
||||
go cw.watchForServicesUpdates(initCh)
|
||||
cw.wg.Add(1)
|
||||
go func() {
|
||||
cw.watchForServicesUpdates(initCh)
|
||||
cw.wg.Done()
|
||||
}()
|
||||
// wait for initialization to complete
|
||||
<-initCh
|
||||
return cw
|
||||
@ -69,9 +76,8 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n
|
||||
|
||||
func (cw *nomadWatcher) mustStop() {
|
||||
close(cw.stopCh)
|
||||
// Do not wait for the watcher to stop, since it may take
|
||||
// up to discoveryutils.BlockingClientReadTimeout to complete.
|
||||
// TODO: add ability to cancel blocking requests.
|
||||
cw.client.Stop()
|
||||
cw.wg.Wait()
|
||||
}
|
||||
|
||||
func (cw *nomadWatcher) updateServices(serviceNames []string) {
|
||||
@ -88,14 +94,14 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
cw.services[serviceName] = sw
|
||||
cw.wg.Add(1)
|
||||
cw.servicesWG.Add(1)
|
||||
serviceWatchersCreated.Inc()
|
||||
initWG.Add(1)
|
||||
go func() {
|
||||
serviceWatchersCount.Inc()
|
||||
sw.watchForServiceAddressUpdates(cw, &initWG)
|
||||
serviceWatchersCount.Dec()
|
||||
cw.wg.Done()
|
||||
cw.servicesWG.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
@ -126,11 +132,13 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
|
||||
// watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done.
|
||||
func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
index := int64(0)
|
||||
clientAddr := cw.client.Addr()
|
||||
apiServer := cw.client.APIServer()
|
||||
f := func() {
|
||||
serviceNames, newIndex, err := cw.getBlockingServiceNames(index)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot obtain Nomad serviceNames from %q: %s", clientAddr, err)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("cannot obtain Nomad serviceNames from %q: %s", apiServer, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if index == newIndex {
|
||||
@ -141,7 +149,7 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
index = newIndex
|
||||
}
|
||||
|
||||
logger.Infof("started Nomad service watcher for %q", clientAddr)
|
||||
logger.Infof("started Nomad service watcher for %q", apiServer)
|
||||
f()
|
||||
|
||||
// send signal that initialization is complete
|
||||
@ -155,15 +163,15 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-cw.stopCh:
|
||||
logger.Infof("stopping Nomad service watchers for %q", clientAddr)
|
||||
logger.Infof("stopping Nomad service watchers for %q", apiServer)
|
||||
startTime := time.Now()
|
||||
cw.servicesLock.Lock()
|
||||
for _, sw := range cw.services {
|
||||
close(sw.stopCh)
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
cw.wg.Wait()
|
||||
logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", clientAddr, time.Since(startTime).Seconds())
|
||||
cw.servicesWG.Wait()
|
||||
logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -225,14 +233,16 @@ func (cw *nomadWatcher) getServiceSnapshot() map[string][]Service {
|
||||
//
|
||||
// watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done.
|
||||
func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG *sync.WaitGroup) {
|
||||
clientAddr := nw.client.Addr()
|
||||
apiServer := nw.client.APIServer()
|
||||
index := int64(0)
|
||||
// TODO: Maybe use a different query arg.
|
||||
path := "/v1/service/" + sw.serviceName + nw.serviceNamesQueryArgs
|
||||
f := func() {
|
||||
data, newIndex, err := getBlockingAPIResponse(nw.client, path, index)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if index == newIndex {
|
||||
@ -241,7 +251,7 @@ func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG
|
||||
}
|
||||
sns, err := parseServices(data)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot parse Nomad services response for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
|
||||
logger.Errorf("cannot parse Nomad services response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,6 @@
|
||||
package discoveryutils
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"flag"
|
||||
@ -55,7 +54,7 @@ func GetHTTPClient() *http.Client {
|
||||
return defaultClient
|
||||
}
|
||||
|
||||
// Client is http client, which talks to the given apiServer.
|
||||
// Client is http client, which talks to the given apiServer passed to NewClient().
|
||||
type Client struct {
|
||||
// client is used for short requests.
|
||||
client *HTTPClient
|
||||
@ -65,8 +64,6 @@ type Client struct {
|
||||
|
||||
apiServer string
|
||||
|
||||
dialAddr string
|
||||
|
||||
setHTTPHeaders func(req *http.Request)
|
||||
setHTTPProxyHeaders func(req *http.Request)
|
||||
|
||||
@ -81,110 +78,96 @@ type HTTPClient struct {
|
||||
WriteTimeout time.Duration
|
||||
}
|
||||
|
||||
func addMissingPort(addr string, isTLS bool) string {
|
||||
if strings.Contains(addr, ":") {
|
||||
return addr
|
||||
}
|
||||
if isTLS {
|
||||
return addr + ":443"
|
||||
}
|
||||
return addr + ":80"
|
||||
}
|
||||
var defaultDialer = &net.Dialer{}
|
||||
|
||||
// NewClient returns new Client for the given args.
|
||||
func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxyAC *promauth.Config) (*Client, error) {
|
||||
u, err := url.Parse(apiServer)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse provided url %q: %w", apiServer, err)
|
||||
return nil, fmt.Errorf("cannot parse apiServer=%q: %w", apiServer, err)
|
||||
}
|
||||
|
||||
dialFunc := defaultDialer.DialContext
|
||||
if u.Scheme == "unix" {
|
||||
// special case for unix socket connection
|
||||
var dialFunc func(addr string) (net.Conn, error)
|
||||
if string(u.Scheme) == "unix" {
|
||||
dialAddr := u.Path
|
||||
apiServer = "http://"
|
||||
dialFunc = func(_ string) (net.Conn, error) {
|
||||
return net.Dial("unix", dialAddr)
|
||||
apiServer = "http://unix"
|
||||
dialFunc = func(ctx context.Context, _, _ string) (net.Conn, error) {
|
||||
return defaultDialer.DialContext(ctx, "unix", dialAddr)
|
||||
}
|
||||
}
|
||||
|
||||
dialAddr := u.Host
|
||||
isTLS := string(u.Scheme) == "https"
|
||||
isTLS := u.Scheme == "https"
|
||||
var tlsCfg *tls.Config
|
||||
if isTLS {
|
||||
tlsCfg = ac.NewTLSConfig()
|
||||
}
|
||||
|
||||
setHTTPProxyHeaders := func(req *http.Request) {}
|
||||
|
||||
dialAddr = addMissingPort(dialAddr, isTLS)
|
||||
if dialFunc == nil {
|
||||
var err error
|
||||
dialFunc, err = proxyURL.NewDialFunc(proxyAC)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var proxyURLFunc func(*http.Request) (*url.URL, error)
|
||||
if pu := proxyURL.GetURL(); pu != nil {
|
||||
proxyURLFunc = http.ProxyURL(pu)
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: DefaultClientReadTimeout,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
Proxy: proxyURLFunc,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
MaxIdleConnsPerHost: *maxConcurrency,
|
||||
ResponseHeaderTimeout: DefaultClientReadTimeout,
|
||||
DialContext: dialFunc,
|
||||
},
|
||||
}
|
||||
blockingClient := &http.Client{
|
||||
Timeout: BlockingClientReadTimeout,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
Proxy: proxyURLFunc,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
MaxIdleConnsPerHost: 1000,
|
||||
ResponseHeaderTimeout: BlockingClientReadTimeout,
|
||||
DialContext: dialFunc,
|
||||
},
|
||||
}
|
||||
|
||||
setHTTPHeaders := func(req *http.Request) {}
|
||||
if ac != nil {
|
||||
setHTTPHeaders = func(req *http.Request) {
|
||||
ac.SetHeaders(req, true)
|
||||
}
|
||||
}
|
||||
setHTTPProxyHeaders := func(req *http.Request) {}
|
||||
if proxyAC != nil {
|
||||
setHTTPProxyHeaders = func(req *http.Request) {
|
||||
proxyURL.SetHeaders(proxyAC, req)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
hcTransport := &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
MaxConnsPerHost: 2 * *maxConcurrency,
|
||||
ResponseHeaderTimeout: *maxWaitTime,
|
||||
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return dialFunc(dialAddr)
|
||||
},
|
||||
}
|
||||
|
||||
hc := &http.Client{
|
||||
Timeout: DefaultClientReadTimeout,
|
||||
Transport: hcTransport,
|
||||
}
|
||||
|
||||
blockingTransport := &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
MaxConnsPerHost: 64 * 1024,
|
||||
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return dialFunc(dialAddr)
|
||||
},
|
||||
}
|
||||
blockingClient := &http.Client{
|
||||
Timeout: BlockingClientReadTimeout,
|
||||
Transport: blockingTransport,
|
||||
}
|
||||
|
||||
setHTTPHeaders := func(req *http.Request) {}
|
||||
if ac != nil {
|
||||
setHTTPHeaders = func(req *http.Request) { ac.SetHeaders(req, true) }
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &Client{
|
||||
client: &HTTPClient{client: hc, ReadTimeout: DefaultClientReadTimeout, WriteTimeout: DefaultClientWriteTimeout},
|
||||
blockingClient: &HTTPClient{client: blockingClient, ReadTimeout: BlockingClientReadTimeout, WriteTimeout: DefaultClientWriteTimeout},
|
||||
c := &Client{
|
||||
client: &HTTPClient{
|
||||
client: client,
|
||||
ReadTimeout: DefaultClientReadTimeout,
|
||||
WriteTimeout: DefaultClientWriteTimeout,
|
||||
},
|
||||
blockingClient: &HTTPClient{
|
||||
client: blockingClient,
|
||||
ReadTimeout: BlockingClientReadTimeout,
|
||||
WriteTimeout: DefaultClientWriteTimeout,
|
||||
},
|
||||
apiServer: apiServer,
|
||||
dialAddr: dialAddr,
|
||||
setHTTPHeaders: setHTTPHeaders,
|
||||
setHTTPProxyHeaders: setHTTPProxyHeaders,
|
||||
clientCtx: ctx,
|
||||
clientCancel: cancel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Addr returns the address the client connects to.
|
||||
func (c *Client) Addr() string {
|
||||
return c.dialAddr
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
|
||||
// modifyRequestParams should never reference data from request.
|
||||
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequestParams func(request *http.Request)) ([]byte, error) {
|
||||
return c.getAPIResponse(path, modifyRequestParams)
|
||||
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest func(request *http.Request)) ([]byte, error) {
|
||||
return c.getAPIResponse(path, modifyRequest)
|
||||
}
|
||||
|
||||
// GetAPIResponse returns response for the given absolute path.
|
||||
@ -205,12 +188,13 @@ func (c *Client) getAPIResponse(path string, modifyRequest func(request *http.Re
|
||||
return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d",
|
||||
c.apiServer, *maxWaitTime, *maxConcurrency)
|
||||
}
|
||||
defer func() { <-concurrencyLimitCh }()
|
||||
defer func() {
|
||||
<-concurrencyLimitCh
|
||||
}()
|
||||
return c.getAPIResponseWithParamsAndClient(c.client, path, modifyRequest, nil)
|
||||
}
|
||||
|
||||
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
|
||||
// inspectResponse - should never reference data from response.
|
||||
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *http.Response)) ([]byte, error) {
|
||||
return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, nil, inspectResponse)
|
||||
}
|
||||
@ -222,7 +206,6 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse %q: %w", requestURL, err)
|
||||
}
|
||||
u.Host = c.dialAddr
|
||||
|
||||
deadline := time.Now().Add(client.WriteTimeout)
|
||||
ctx, cancel := context.WithDeadline(c.clientCtx, deadline)
|
||||
@ -232,8 +215,6 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri
|
||||
return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err)
|
||||
}
|
||||
|
||||
req.Header.Set("Host", c.dialAddr)
|
||||
req.Header.Set("Accept-Encoding", "gzip")
|
||||
c.setHTTPHeaders(req)
|
||||
c.setHTTPProxyHeaders(req)
|
||||
if modifyRequest != nil {
|
||||
@ -244,20 +225,11 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
|
||||
}
|
||||
|
||||
reader := resp.Body
|
||||
if resp.Header.Get("Content-Encoding") == "gzip" {
|
||||
reader, err = gzip.NewReader(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create gzip reader for %q: %w", requestURL, err)
|
||||
}
|
||||
}
|
||||
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot ungzip response from %q: %w", requestURL, err)
|
||||
}
|
||||
data, err := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read response from %q: %w", requestURL, err)
|
||||
}
|
||||
|
||||
if inspectResponse != nil {
|
||||
inspectResponse(resp)
|
||||
@ -280,10 +252,9 @@ func (c *Client) Stop() {
|
||||
c.clientCancel()
|
||||
}
|
||||
|
||||
// DoRequestWithPossibleRetry performs the given req at client and stores the response at resp.
|
||||
func DoRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, requestCounter, retryCounter *metrics.Counter) (*http.Response, error) {
|
||||
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {
|
||||
sleepTime := time.Second
|
||||
requestCounter.Inc()
|
||||
discoveryRequests.Inc()
|
||||
deadline, ok := req.Context().Deadline()
|
||||
if !ok {
|
||||
deadline = time.Now().Add(hc.WriteTimeout)
|
||||
@ -309,14 +280,10 @@ func DoRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, requestCounte
|
||||
sleepTime = maxSleepTime
|
||||
}
|
||||
time.Sleep(sleepTime)
|
||||
retryCounter.Inc()
|
||||
discoveryRetries.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {
|
||||
return DoRequestWithPossibleRetry(hc, req, discoveryRequests, discoveryRetries)
|
||||
}
|
||||
|
||||
var (
|
||||
discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`)
|
||||
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)
|
||||
|
Loading…
Reference in New Issue
Block a user