diff --git a/app/vmalert/notifier/config_watcher.go b/app/vmalert/notifier/config_watcher.go index 5cba98bd51..a49990151c 100644 --- a/app/vmalert/notifier/config_watcher.go +++ b/app/vmalert/notifier/config_watcher.go @@ -73,35 +73,18 @@ func (cw *configWatcher) reload(path string) error { } // stop existing discovery - close(cw.syncCh) - cw.wg.Wait() + cw.mustStop() // re-start cw with new config cw.syncCh = make(chan struct{}) cw.cfg = cfg - - cw.resetTargets() return cw.start() } -const ( - addRetryBackoff = time.Millisecond * 100 - addRetryCount = 2 -) - func (cw *configWatcher) add(typeK TargetType, interval time.Duration, labelsFn getLabels) error { - var targets []Target - var errors []error - var count int - for { // retry addRetryCount times if first discovery attempts gave no results - targets, errors = targetsFromLabels(labelsFn, cw.cfg, cw.genFn) - for _, err := range errors { - return fmt.Errorf("failed to init notifier for %q: %s", typeK, err) - } - if len(targets) > 0 || count >= addRetryCount { - break - } - time.Sleep(addRetryBackoff) + targets, errors := targetsFromLabels(labelsFn, cw.cfg, cw.genFn) + for _, err := range errors { + return fmt.Errorf("failed to init notifier for %q: %s", typeK, err) } cw.setTargets(typeK, targets) @@ -215,7 +198,10 @@ func (cw *configWatcher) start() error { return nil } -func (cw *configWatcher) resetTargets() { +func (cw *configWatcher) mustStop() { + close(cw.syncCh) + cw.wg.Wait() + cw.targetsMu.Lock() for _, targets := range cw.targets { for _, t := range targets { @@ -224,6 +210,11 @@ func (cw *configWatcher) resetTargets() { } cw.targets = make(map[TargetType][]Target) cw.targetsMu.Unlock() + + for i := range cw.cfg.ConsulSDConfigs { + cw.cfg.ConsulSDConfigs[i].MustStop() + } + cw.cfg = nil } func (cw *configWatcher) setTargets(key TargetType, targets []Target) { diff --git a/app/vmalert/notifier/config_watcher_test.go b/app/vmalert/notifier/config_watcher_test.go index 9107157b4b..4120e75b23 100644 --- a/app/vmalert/notifier/config_watcher_test.go +++ b/app/vmalert/notifier/config_watcher_test.go @@ -9,9 +9,6 @@ import ( "os" "sync" "testing" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul" ) func TestConfigWatcherReload(t *testing.T) { @@ -31,6 +28,7 @@ static_configs: if err != nil { t.Fatalf("failed to start config watcher: %s", err) } + defer cw.mustStop() ns := cw.notifiers() if len(ns) != 2 { t.Fatalf("expected to have 2 notifiers; got %d %#v", len(ns), ns) @@ -78,16 +76,11 @@ consul_sd_configs: - alertmanager `, consulSDServer.URL)) - prevCheckInterval := *consul.SDCheckInterval - defer func() { *consul.SDCheckInterval = prevCheckInterval }() - - *consul.SDCheckInterval = time.Millisecond * 100 - cw, err := newWatcher(consulSDFile.Name(), nil) if err != nil { t.Fatalf("failed to start config watcher: %s", err) } - time.Sleep(*consul.SDCheckInterval * 2) + defer cw.mustStop() if len(cw.notifiers()) != 2 { t.Fatalf("expected to get 2 notifiers; got %d", len(cw.notifiers())) @@ -161,6 +154,7 @@ consul_sd_configs: if err != nil { t.Fatalf("failed to start config watcher: %s", err) } + defer cw.mustStop() const workers = 500 const iterations = 10 diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f1cae3d227..cdbaba37e6 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -25,6 +25,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: reduce memory usage during the first three hours after the upgrade from versions older than v1.73.0. The memory usage spike was related to the need of in-memory caches' re-population after the upgrade because of the fix for [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401). Now cache size limits are reduced in order to occupy less memory during the upgrade. * BUGFIX: fix a bug, which could significantly slow down requests to `/api/v1/labels` and `/api/v1/label//values`. These APIs are used by Grafana for auto-completion of label names and label values. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2200). * BUGFIX: vmalert: add support for `$externalLabels` and `$externalURL` template vars in the same way as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2193). +* BUGFIX: vmalert: make sure notifiers are discovered during initialization if they are configured via `consul_sd_configs`. Previously they could be discovered in 30 seconds (the default value for `-promscrape.consulSDCheckInterval` command-line flag) after the initialization. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2202). * BUGFIX: update default value for `-promscrape.fileSDCheckInterval`, so it matches default duration used by Prometheus for checking for updates in `file_sd_configs`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2187). Thanks to @corporate-gadfly for the fix. diff --git a/lib/promscrape/discovery/consul/watch.go b/lib/promscrape/discovery/consul/watch.go index f5d60c4eab..8162998c9a 100644 --- a/lib/promscrape/discovery/consul/watch.go +++ b/lib/promscrape/discovery/consul/watch.go @@ -42,7 +42,7 @@ type serviceWatcher struct { stopCh chan struct{} } -// newConsulWatcher creates new watcher and start background service discovery for Consul. +// newConsulWatcher creates new watcher and starts background service discovery for Consul. func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, namespace string) *consulWatcher { baseQueryArgs := "?dc=" + url.QueryEscape(datacenter) if sdc.AllowStale { @@ -67,7 +67,10 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, services: make(map[string]*serviceWatcher), stopCh: make(chan struct{}), } - go cw.watchForServicesUpdates() + initCh := make(chan struct{}) + go cw.watchForServicesUpdates(initCh) + // wait for initialization to complete + <-initCh return cw } @@ -78,11 +81,57 @@ func (cw *consulWatcher) mustStop() { // TODO: add ability to cancel blocking requests. } +func (cw *consulWatcher) updateServices(serviceNames []string) { + var initWG sync.WaitGroup + // Start watchers for new services. + cw.servicesLock.Lock() + for _, serviceName := range serviceNames { + if _, ok := cw.services[serviceName]; ok { + // The watcher for serviceName already exists. + continue + } + sw := &serviceWatcher{ + serviceName: serviceName, + stopCh: make(chan struct{}), + } + cw.services[serviceName] = sw + cw.wg.Add(1) + serviceWatchersCreated.Inc() + initWG.Add(1) + go func() { + serviceWatchersCount.Inc() + sw.watchForServiceNodesUpdates(cw, &initWG) + serviceWatchersCount.Dec() + cw.wg.Done() + }() + } + + // Stop watchers for removed services. + newServiceNamesMap := make(map[string]struct{}, len(serviceNames)) + for _, serviceName := range serviceNames { + newServiceNamesMap[serviceName] = struct{}{} + } + for serviceName, sw := range cw.services { + if _, ok := newServiceNamesMap[serviceName]; ok { + continue + } + close(sw.stopCh) + delete(cw.services, serviceName) + serviceWatchersStopped.Inc() + + // Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime + // if it is blocked in Consul API request. + } + cw.servicesLock.Unlock() + + // Wait for initialization to complete. + initWG.Wait() +} + // watchForServicesUpdates watches for new services and updates it in cw. -func (cw *consulWatcher) watchForServicesUpdates() { - checkInterval := getCheckInterval() - ticker := time.NewTicker(checkInterval / 2) - defer ticker.Stop() +// +// watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done. +func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) { index := int64(0) clientAddr := cw.client.Addr() f := func() { @@ -95,51 +144,19 @@ func (cw *consulWatcher) watchForServicesUpdates() { // Nothing changed. return } - - cw.servicesLock.Lock() - // Start watchers for new services. - for _, serviceName := range serviceNames { - if _, ok := cw.services[serviceName]; ok { - // The watcher for serviceName already exists. - continue - } - sw := &serviceWatcher{ - serviceName: serviceName, - stopCh: make(chan struct{}), - } - cw.services[serviceName] = sw - cw.wg.Add(1) - serviceWatchersCreated.Inc() - go func() { - serviceWatchersCount.Inc() - sw.watchForServiceNodesUpdates(cw) - serviceWatchersCount.Dec() - cw.wg.Done() - }() - } - // Stop watchers for removed services. - newServiceNamesMap := make(map[string]struct{}, len(serviceNames)) - for _, serviceName := range serviceNames { - newServiceNamesMap[serviceName] = struct{}{} - } - for serviceName, sw := range cw.services { - if _, ok := newServiceNamesMap[serviceName]; ok { - continue - } - close(sw.stopCh) - delete(cw.services, serviceName) - serviceWatchersStopped.Inc() - - // Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime - // if it is blocked in Consul API request. - } - cw.servicesLock.Unlock() - + cw.updateServices(serviceNames) index = newIndex } logger.Infof("started Consul service watcher for %q", clientAddr) f() + + // send signal that initialization is complete + close(initCh) + + checkInterval := getCheckInterval() + ticker := time.NewTicker(checkInterval / 2) + defer ticker.Stop() for { select { case <-ticker.C: @@ -196,10 +213,9 @@ func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64, } // watchForServiceNodesUpdates watches for Consul serviceNode changes for the given serviceName. -func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher) { - checkInterval := getCheckInterval() - ticker := time.NewTicker(checkInterval / 2) - defer ticker.Stop() +// +// watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done. +func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG *sync.WaitGroup) { clientAddr := cw.client.Addr() index := int64(0) path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs @@ -227,6 +243,12 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher) { } f() + // Notify caller that initialization is complete + initWG.Done() + + checkInterval := getCheckInterval() + ticker := time.NewTicker(checkInterval / 2) + defer ticker.Stop() for { select { case <-ticker.C: