mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-14 16:12:15 +01:00
discovery/{consul,nomad}: fix cancelling serviceWatcher in-flight requests (#3658)
* lib/promscrape/discovery/{consul,nomad}: fix background service update watches not canceling requests on serviceWatcher stop Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/promscrape/discovery/{consul,nomad}: fix closing serviseWatcher during scrape job restart Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * wip Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
63653b53d6
commit
40d524edb8
@ -22,6 +22,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||
* BUGFIX: do not slow down concurrently executed queries during assisted merges, since assisted merges already prioritize data ingestion over queries. The probability of assisted merges has been increased starting from [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) because of internal refactoring. This could result in slowed down queries when there is a plenty of free CPU resources. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641) issues.
|
||||
* BUGFIX: reduce the increased CPU usage at `vmselect` to v1.85.3 level when processing heavy queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641).
|
||||
* BUGFIX: [retention filters](https://docs.victoriametrics.com/#retention-filters): fix `FATAL: cannot locate metric name for metricID=...: EOF` panic, which could occur when retention filters are enabled.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly cancel in-flight service discovery requests for [consul_sd_configs](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs) and [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs) when the service list changes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): consistently put the scrape url with scrape target labels to all error logs for failed scrapes. Previously some failed scrapes were logged without this information.
|
||||
|
@ -1,6 +1,7 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@ -157,7 +158,7 @@ func maxWaitTime() time.Duration {
|
||||
// getBlockingAPIResponse perfoms blocking request to Consul via client and returns response.
|
||||
//
|
||||
// See https://www.consul.io/api-docs/features/blocking .
|
||||
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
|
||||
func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
|
||||
path += "&index=" + strconv.FormatInt(index, 10)
|
||||
path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds()))
|
||||
getMeta := func(resp *http.Response) {
|
||||
@ -182,7 +183,7 @@ func getBlockingAPIResponse(client *discoveryutils.Client, path string, index in
|
||||
}
|
||||
index = newIndex
|
||||
}
|
||||
data, err := client.GetBlockingAPIResponse(path, getMeta)
|
||||
data, err := client.GetBlockingAPIResponseCtx(ctx, path, getMeta)
|
||||
if err != nil {
|
||||
return nil, index, fmt.Errorf("cannot perform blocking Consul API request at %q: %w", path, err)
|
||||
}
|
||||
|
@ -34,15 +34,17 @@ type consulWatcher struct {
|
||||
servicesLock sync.Mutex
|
||||
services map[string]*serviceWatcher
|
||||
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan struct{}
|
||||
}
|
||||
|
||||
type serviceWatcher struct {
|
||||
serviceName string
|
||||
serviceNodes []ServiceNode
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan struct{}
|
||||
|
||||
stoppedCh chan struct{}
|
||||
|
||||
requestCtx context.Context
|
||||
requestCancel context.CancelFunc
|
||||
}
|
||||
|
||||
// newConsulWatcher creates new watcher and starts background service discovery for Consul.
|
||||
@ -71,7 +73,6 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
|
||||
watchServices: sdc.Services,
|
||||
watchTags: sdc.Tags,
|
||||
services: make(map[string]*serviceWatcher),
|
||||
stopCh: make(chan struct{}),
|
||||
stoppedCh: make(chan struct{}),
|
||||
}
|
||||
initCh := make(chan struct{})
|
||||
@ -85,7 +86,6 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
|
||||
}
|
||||
|
||||
func (cw *consulWatcher) mustStop() {
|
||||
close(cw.stopCh)
|
||||
cw.client.Stop()
|
||||
<-cw.stoppedCh
|
||||
}
|
||||
@ -100,10 +100,12 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
|
||||
// The watcher for serviceName already exists.
|
||||
continue
|
||||
}
|
||||
ctx, cancel := context.WithCancel(cw.client.Context())
|
||||
sw := &serviceWatcher{
|
||||
serviceName: serviceName,
|
||||
stopCh: make(chan struct{}),
|
||||
stoppedCh: make(chan struct{}),
|
||||
serviceName: serviceName,
|
||||
stoppedCh: make(chan struct{}),
|
||||
requestCtx: ctx,
|
||||
requestCancel: cancel,
|
||||
}
|
||||
cw.services[serviceName] = sw
|
||||
serviceWatchersCreated.Inc()
|
||||
@ -126,7 +128,7 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
|
||||
if _, ok := newServiceNamesMap[serviceName]; ok {
|
||||
continue
|
||||
}
|
||||
close(sw.stopCh)
|
||||
sw.requestCancel()
|
||||
delete(cw.services, serviceName)
|
||||
swsStopped = append(swsStopped, sw)
|
||||
}
|
||||
@ -173,24 +175,26 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
checkInterval := getCheckInterval()
|
||||
ticker := time.NewTicker(checkInterval / 2)
|
||||
defer ticker.Stop()
|
||||
stopCh := cw.client.Context().Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-cw.stopCh:
|
||||
case <-stopCh:
|
||||
logger.Infof("stopping Consul service watchers for %q", apiServer)
|
||||
startTime := time.Now()
|
||||
var swsStopped []*serviceWatcher
|
||||
|
||||
cw.servicesLock.Lock()
|
||||
for _, sw := range cw.services {
|
||||
close(sw.stopCh)
|
||||
sw.requestCancel()
|
||||
swsStopped = append(swsStopped, sw)
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
|
||||
for _, sw := range swsStopped {
|
||||
<-sw.stoppedCh
|
||||
serviceWatchersStopped.Inc()
|
||||
}
|
||||
logger.Infof("stopped Consul service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
|
||||
return
|
||||
@ -209,7 +213,7 @@ var (
|
||||
// It returns an empty serviceNames list if response contains the same index.
|
||||
func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64, error) {
|
||||
path := "/v1/catalog/services" + cw.serviceNamesQueryArgs
|
||||
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
|
||||
data, newIndex, err := getBlockingAPIResponse(cw.client.Context(), cw.client, path, index)
|
||||
if err != nil {
|
||||
return nil, index, err
|
||||
}
|
||||
@ -242,7 +246,7 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG
|
||||
index := int64(0)
|
||||
path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs
|
||||
f := func() {
|
||||
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
|
||||
data, newIndex, err := getBlockingAPIResponse(sw.requestCtx, cw.client, path, index)
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
|
||||
@ -273,11 +277,12 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG
|
||||
checkInterval := getCheckInterval()
|
||||
ticker := time.NewTicker(checkInterval / 2)
|
||||
defer ticker.Stop()
|
||||
stopCh := sw.requestCtx.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-sw.stopCh:
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@ -116,7 +117,7 @@ func maxWaitTime() time.Duration {
|
||||
|
||||
// getBlockingAPIResponse perfoms blocking request to Nomad via client and returns response.
|
||||
// See https://developer.hashicorp.com/nomad/api-docs#blocking-queries .
|
||||
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
|
||||
func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
|
||||
path += "&index=" + strconv.FormatInt(index, 10)
|
||||
path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds()))
|
||||
getMeta := func(resp *http.Response) {
|
||||
@ -142,7 +143,7 @@ func getBlockingAPIResponse(client *discoveryutils.Client, path string, index in
|
||||
}
|
||||
index = newIndex
|
||||
}
|
||||
data, err := client.GetBlockingAPIResponse(path, getMeta)
|
||||
data, err := client.GetBlockingAPIResponseCtx(ctx, path, getMeta)
|
||||
if err != nil {
|
||||
return nil, index, fmt.Errorf("cannot perform blocking Nomad API request at %q: %w", path, err)
|
||||
}
|
||||
|
@ -30,15 +30,17 @@ type nomadWatcher struct {
|
||||
servicesLock sync.Mutex
|
||||
services map[string]*serviceWatcher
|
||||
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan struct{}
|
||||
}
|
||||
|
||||
type serviceWatcher struct {
|
||||
serviceName string
|
||||
services []Service
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan struct{}
|
||||
|
||||
stoppedCh chan struct{}
|
||||
|
||||
requestCtx context.Context
|
||||
requestCancel context.CancelFunc
|
||||
}
|
||||
|
||||
// newNomadWatcher creates new watcher and starts background service discovery for Nomad.
|
||||
@ -62,7 +64,6 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, namespace, re
|
||||
client: client,
|
||||
serviceNamesQueryArgs: queryArgs,
|
||||
services: make(map[string]*serviceWatcher),
|
||||
stopCh: make(chan struct{}),
|
||||
stoppedCh: make(chan struct{}),
|
||||
}
|
||||
initCh := make(chan struct{})
|
||||
@ -76,7 +77,6 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, namespace, re
|
||||
}
|
||||
|
||||
func (cw *nomadWatcher) mustStop() {
|
||||
close(cw.stopCh)
|
||||
cw.client.Stop()
|
||||
<-cw.stoppedCh
|
||||
}
|
||||
@ -91,10 +91,12 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
|
||||
// The watcher for serviceName already exists.
|
||||
continue
|
||||
}
|
||||
ctx, cancel := context.WithCancel(cw.client.Context())
|
||||
sw := &serviceWatcher{
|
||||
serviceName: serviceName,
|
||||
stopCh: make(chan struct{}),
|
||||
stoppedCh: make(chan struct{}),
|
||||
serviceName: serviceName,
|
||||
stoppedCh: make(chan struct{}),
|
||||
requestCtx: ctx,
|
||||
requestCancel: cancel,
|
||||
}
|
||||
cw.services[serviceName] = sw
|
||||
serviceWatchersCreated.Inc()
|
||||
@ -117,7 +119,7 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
|
||||
if _, ok := newServiceNamesMap[serviceName]; ok {
|
||||
continue
|
||||
}
|
||||
close(sw.stopCh)
|
||||
sw.requestCancel()
|
||||
delete(cw.services, serviceName)
|
||||
swsStopped = append(swsStopped, sw)
|
||||
}
|
||||
@ -164,24 +166,26 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
checkInterval := getCheckInterval()
|
||||
ticker := time.NewTicker(checkInterval / 2)
|
||||
defer ticker.Stop()
|
||||
stopCh := cw.client.Context().Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-cw.stopCh:
|
||||
case <-stopCh:
|
||||
logger.Infof("stopping Nomad service watchers for %q", apiServer)
|
||||
startTime := time.Now()
|
||||
var swsStopped []*serviceWatcher
|
||||
|
||||
cw.servicesLock.Lock()
|
||||
for _, sw := range cw.services {
|
||||
close(sw.stopCh)
|
||||
sw.requestCancel()
|
||||
swsStopped = append(swsStopped, sw)
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
|
||||
for _, sw := range swsStopped {
|
||||
<-sw.stoppedCh
|
||||
serviceWatchersStopped.Inc()
|
||||
}
|
||||
logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
|
||||
return
|
||||
@ -200,7 +204,7 @@ var (
|
||||
// It returns an empty serviceNames list if response contains the same index.
|
||||
func (cw *nomadWatcher) getBlockingServiceNames(index int64) ([]string, int64, error) {
|
||||
path := "/v1/services" + cw.serviceNamesQueryArgs
|
||||
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
|
||||
data, newIndex, err := getBlockingAPIResponse(cw.client.Context(), cw.client, path, index)
|
||||
if err != nil {
|
||||
return nil, index, err
|
||||
}
|
||||
@ -244,7 +248,7 @@ func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG
|
||||
// TODO: Maybe use a different query arg.
|
||||
path := "/v1/service/" + sw.serviceName + nw.serviceNamesQueryArgs
|
||||
f := func() {
|
||||
data, newIndex, err := getBlockingAPIResponse(nw.client, path, index)
|
||||
data, newIndex, err := getBlockingAPIResponse(sw.requestCtx, nw.client, path, index)
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
|
||||
@ -275,11 +279,12 @@ func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG
|
||||
checkInterval := getCheckInterval()
|
||||
ticker := time.NewTicker(checkInterval / 2)
|
||||
defer ticker.Stop()
|
||||
stopCh := sw.requestCtx.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-sw.stopCh:
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -159,6 +159,11 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Context returns context for the client requests.
|
||||
func (c *Client) Context() context.Context {
|
||||
return c.clientCtx
|
||||
}
|
||||
|
||||
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
|
||||
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest func(request *http.Request)) ([]byte, error) {
|
||||
return c.getAPIResponse(path, modifyRequest)
|
||||
@ -185,16 +190,21 @@ func (c *Client) getAPIResponse(path string, modifyRequest func(request *http.Re
|
||||
defer func() {
|
||||
<-concurrencyLimitCh
|
||||
}()
|
||||
return c.getAPIResponseWithParamsAndClient(c.client, path, modifyRequest, nil)
|
||||
return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.client, path, modifyRequest, nil)
|
||||
}
|
||||
|
||||
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
|
||||
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *http.Response)) ([]byte, error) {
|
||||
return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, nil, inspectResponse)
|
||||
return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.blockingClient, path, nil, inspectResponse)
|
||||
}
|
||||
|
||||
// GetBlockingAPIResponseCtx returns response for given absolute path with blocking client and optional callback for api response,
|
||||
func (c *Client) GetBlockingAPIResponseCtx(ctx context.Context, path string, inspectResponse func(resp *http.Response)) ([]byte, error) {
|
||||
return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, nil, inspectResponse)
|
||||
}
|
||||
|
||||
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response.
|
||||
func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path string, modifyRequest func(req *http.Request), inspectResponse func(resp *http.Response)) ([]byte, error) {
|
||||
func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, client *HTTPClient, path string, modifyRequest func(req *http.Request), inspectResponse func(resp *http.Response)) ([]byte, error) {
|
||||
requestURL := c.apiServer + path
|
||||
u, err := url.Parse(requestURL)
|
||||
if err != nil {
|
||||
@ -202,7 +212,7 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(client.ReadTimeout)
|
||||
ctx, cancel := context.WithDeadline(c.clientCtx, deadline)
|
||||
ctx, cancel := context.WithDeadline(ctx, deadline)
|
||||
defer cancel()
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user