lib/promscrape/discovery/{consul,nomad}: wait until the deleted serviceWatchers are stopped inside updateServices() call

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367
This commit is contained in:
Aliaksandr Valialkin 2023-01-05 21:52:31 -08:00
parent 54410bf51b
commit ec7a3b79ab
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 50 additions and 28 deletions

View File

@ -34,15 +34,15 @@ type consulWatcher struct {
servicesLock sync.Mutex servicesLock sync.Mutex
services map[string]*serviceWatcher services map[string]*serviceWatcher
servicesWG sync.WaitGroup stopCh chan struct{}
wg sync.WaitGroup stoppedCh chan struct{}
stopCh chan struct{}
} }
type serviceWatcher struct { type serviceWatcher struct {
serviceName string serviceName string
serviceNodes []ServiceNode serviceNodes []ServiceNode
stopCh chan struct{} stopCh chan struct{}
stoppedCh chan struct{}
} }
// newConsulWatcher creates new watcher and starts background service discovery for Consul. // newConsulWatcher creates new watcher and starts background service discovery for Consul.
@ -72,12 +72,12 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
watchTags: sdc.Tags, watchTags: sdc.Tags,
services: make(map[string]*serviceWatcher), services: make(map[string]*serviceWatcher),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
} }
initCh := make(chan struct{}) initCh := make(chan struct{})
cw.wg.Add(1)
go func() { go func() {
cw.watchForServicesUpdates(initCh) cw.watchForServicesUpdates(initCh)
cw.wg.Done() close(cw.stoppedCh)
}() }()
// wait for initialization to complete // wait for initialization to complete
<-initCh <-initCh
@ -87,11 +87,12 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
func (cw *consulWatcher) mustStop() { func (cw *consulWatcher) mustStop() {
close(cw.stopCh) close(cw.stopCh)
cw.client.Stop() cw.client.Stop()
cw.wg.Wait() <-cw.stoppedCh
} }
func (cw *consulWatcher) updateServices(serviceNames []string) { func (cw *consulWatcher) updateServices(serviceNames []string) {
var initWG sync.WaitGroup var initWG sync.WaitGroup
// Start watchers for new services. // Start watchers for new services.
cw.servicesLock.Lock() cw.servicesLock.Lock()
for _, serviceName := range serviceNames { for _, serviceName := range serviceNames {
@ -102,16 +103,16 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
sw := &serviceWatcher{ sw := &serviceWatcher{
serviceName: serviceName, serviceName: serviceName,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
} }
cw.services[serviceName] = sw cw.services[serviceName] = sw
cw.servicesWG.Add(1)
serviceWatchersCreated.Inc() serviceWatchersCreated.Inc()
initWG.Add(1) initWG.Add(1)
go func() { go func() {
serviceWatchersCount.Inc() serviceWatchersCount.Inc()
sw.watchForServiceNodesUpdates(cw, &initWG) sw.watchForServiceNodesUpdates(cw, &initWG)
serviceWatchersCount.Dec() serviceWatchersCount.Dec()
cw.servicesWG.Done() close(sw.stoppedCh)
}() }()
} }
@ -120,20 +121,24 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
for _, serviceName := range serviceNames { for _, serviceName := range serviceNames {
newServiceNamesMap[serviceName] = struct{}{} newServiceNamesMap[serviceName] = struct{}{}
} }
var swsStopped []*serviceWatcher
for serviceName, sw := range cw.services { for serviceName, sw := range cw.services {
if _, ok := newServiceNamesMap[serviceName]; ok { if _, ok := newServiceNamesMap[serviceName]; ok {
continue continue
} }
close(sw.stopCh) close(sw.stopCh)
delete(cw.services, serviceName) delete(cw.services, serviceName)
serviceWatchersStopped.Inc() swsStopped = append(swsStopped, sw)
// Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime
// if it is blocked in Consul API request.
} }
cw.servicesLock.Unlock() cw.servicesLock.Unlock()
// Wait for initialization to complete. // Wait until deleted service watchers are stopped.
for _, sw := range swsStopped {
<-sw.stoppedCh
serviceWatchersStopped.Inc()
}
// Wait until added service watchers are initialized.
initWG.Wait() initWG.Wait()
} }
@ -175,12 +180,18 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
case <-cw.stopCh: case <-cw.stopCh:
logger.Infof("stopping Consul service watchers for %q", apiServer) logger.Infof("stopping Consul service watchers for %q", apiServer)
startTime := time.Now() startTime := time.Now()
var swsStopped []*serviceWatcher
cw.servicesLock.Lock() cw.servicesLock.Lock()
for _, sw := range cw.services { for _, sw := range cw.services {
close(sw.stopCh) close(sw.stopCh)
swsStopped = append(swsStopped, sw)
} }
cw.servicesLock.Unlock() cw.servicesLock.Unlock()
cw.servicesWG.Wait()
for _, sw := range swsStopped {
<-sw.stoppedCh
}
logger.Infof("stopped Consul service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds()) logger.Infof("stopped Consul service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
return return
} }

View File

@ -33,15 +33,15 @@ type nomadWatcher struct {
servicesLock sync.Mutex servicesLock sync.Mutex
services map[string]*serviceWatcher services map[string]*serviceWatcher
servicesWG sync.WaitGroup stopCh chan struct{}
wg sync.WaitGroup stoppedCh chan struct{}
stopCh chan struct{}
} }
type serviceWatcher struct { type serviceWatcher struct {
serviceName string serviceName string
services []Service services []Service
stopCh chan struct{} stopCh chan struct{}
stoppedCh chan struct{}
} }
// newNomadWatcher creates new watcher and starts background service discovery for Nomad. // newNomadWatcher creates new watcher and starts background service discovery for Nomad.
@ -62,12 +62,12 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n
watchTags: sdc.Tags, watchTags: sdc.Tags,
services: make(map[string]*serviceWatcher), services: make(map[string]*serviceWatcher),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
} }
initCh := make(chan struct{}) initCh := make(chan struct{})
cw.wg.Add(1)
go func() { go func() {
cw.watchForServicesUpdates(initCh) cw.watchForServicesUpdates(initCh)
cw.wg.Done() close(cw.stoppedCh)
}() }()
// wait for initialization to complete // wait for initialization to complete
<-initCh <-initCh
@ -77,11 +77,12 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n
func (cw *nomadWatcher) mustStop() { func (cw *nomadWatcher) mustStop() {
close(cw.stopCh) close(cw.stopCh)
cw.client.Stop() cw.client.Stop()
cw.wg.Wait() <-cw.stoppedCh
} }
func (cw *nomadWatcher) updateServices(serviceNames []string) { func (cw *nomadWatcher) updateServices(serviceNames []string) {
var initWG sync.WaitGroup var initWG sync.WaitGroup
// Start watchers for new services. // Start watchers for new services.
cw.servicesLock.Lock() cw.servicesLock.Lock()
for _, serviceName := range serviceNames { for _, serviceName := range serviceNames {
@ -92,16 +93,16 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
sw := &serviceWatcher{ sw := &serviceWatcher{
serviceName: serviceName, serviceName: serviceName,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
} }
cw.services[serviceName] = sw cw.services[serviceName] = sw
cw.servicesWG.Add(1)
serviceWatchersCreated.Inc() serviceWatchersCreated.Inc()
initWG.Add(1) initWG.Add(1)
go func() { go func() {
serviceWatchersCount.Inc() serviceWatchersCount.Inc()
sw.watchForServiceAddressUpdates(cw, &initWG) sw.watchForServiceAddressUpdates(cw, &initWG)
serviceWatchersCount.Dec() serviceWatchersCount.Dec()
cw.servicesWG.Done() close(sw.stoppedCh)
}() }()
} }
@ -110,20 +111,24 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
for _, serviceName := range serviceNames { for _, serviceName := range serviceNames {
newServiceNamesMap[serviceName] = struct{}{} newServiceNamesMap[serviceName] = struct{}{}
} }
var swsStopped []*serviceWatcher
for serviceName, sw := range cw.services { for serviceName, sw := range cw.services {
if _, ok := newServiceNamesMap[serviceName]; ok { if _, ok := newServiceNamesMap[serviceName]; ok {
continue continue
} }
close(sw.stopCh) close(sw.stopCh)
delete(cw.services, serviceName) delete(cw.services, serviceName)
serviceWatchersStopped.Inc() swsStopped = append(swsStopped, sw)
// Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime
// if it is blocked in Nomad API request.
} }
cw.servicesLock.Unlock() cw.servicesLock.Unlock()
// Wait for initialization to complete. // Wait until deleted service watchers are stopped.
for _, sw := range swsStopped {
<-sw.stoppedCh
serviceWatchersStopped.Inc()
}
// Wait until added service watchers are initialized.
initWG.Wait() initWG.Wait()
} }
@ -165,12 +170,18 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
case <-cw.stopCh: case <-cw.stopCh:
logger.Infof("stopping Nomad service watchers for %q", apiServer) logger.Infof("stopping Nomad service watchers for %q", apiServer)
startTime := time.Now() startTime := time.Now()
var swsStopped []*serviceWatcher
cw.servicesLock.Lock() cw.servicesLock.Lock()
for _, sw := range cw.services { for _, sw := range cw.services {
close(sw.stopCh) close(sw.stopCh)
swsStopped = append(swsStopped, sw)
} }
cw.servicesLock.Unlock() cw.servicesLock.Unlock()
cw.servicesWG.Wait()
for _, sw := range swsStopped {
<-sw.stoppedCh
}
logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds()) logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
return return
} }