From 620f05cd2c712f513c0910e52bd78c4d1a379155 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 13 Mar 2021 15:18:47 +0200 Subject: [PATCH] lib/promscrape/discovery: fixes after 133b288681978d82cd3a4c6125824346b9344f9f - Removed a deadlock in addAPIWatcher - Do not create unused ScrapeWork objects - Do not spend CPU resources on creating objectByKey map in addAPIWatcher This work is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1125 --- .../discovery/kubernetes/api_watcher.go | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 26e4076909..2ed5aa8adb 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -132,7 +132,7 @@ func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss [] // getScrapeWorkObjects returns all the ScrapeWork objects for the given aw. func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { - aw.startWatchersForRole(aw.sdc.Role) + aw.startWatchersForRole(aw.sdc.Role, true) aw.swosByKeyLock.Lock() defer aw.swosByKeyLock.Unlock() @@ -154,7 +154,7 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { return nil } key := namespace + "/" + name - aw.startWatchersForRole(role) + aw.startWatchersForRole(role, false) aw.watchersByURLLock.Lock() defer aw.watchersByURLLock.Unlock() @@ -172,15 +172,15 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { return nil } -func (aw *apiWatcher) startWatchersForRole(role string) { +func (aw *apiWatcher) startWatchersForRole(role string, registerAPIWatcher bool) { paths := getAPIPaths(role, aw.sdc.Namespaces.Names, aw.sdc.Selectors) for _, path := range paths { apiURL := aw.apiServer + path - aw.startWatcherForURL(role, apiURL) + aw.startWatcherForURL(role, apiURL, registerAPIWatcher) } } -func (aw *apiWatcher) startWatcherForURL(role, apiURL string) { +func (aw *apiWatcher) startWatcherForURL(role, apiURL string, registerAPIWatcher bool) { aw.watchersByURLLock.Lock() if aw.watchersByURL[apiURL] != nil { // Watcher for the given path already exists. @@ -188,16 +188,21 @@ func (aw *apiWatcher) startWatcherForURL(role, apiURL string) { return } uw := getURLWatcher(role, apiURL, aw.sdc.ProxyURL.URL(), aw.ac) - uw.addAPIWatcher(aw) aw.watchersByURL[apiURL] = uw aw.watchersByURLLock.Unlock() + uw.initOnce() + if registerAPIWatcher { + uw.addAPIWatcher(aw) + } aw.wg.Add(1) go func() { defer aw.wg.Done() <-aw.stopCh + if registerAPIWatcher { + uw.removeAPIWatcher(aw) + } aw.watchersByURLLock.Lock() - uw.removeAPIWatcher(aw) delete(aw.watchersByURL, apiURL) aw.watchersByURLLock.Unlock() }() @@ -232,6 +237,9 @@ type urlWatcher struct { parseObject parseObjectFunc parseObjectList parseObjectListFunc + // once is used for initializing the urlWatcher only once + once sync.Once + // mu protects aws, objectsByKey and resourceVersion mu sync.Mutex @@ -282,24 +290,24 @@ func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)), } - uw.reloadObjects() - go uw.watchForUpdates() return uw } +func (uw *urlWatcher) initOnce() { + uw.once.Do(func() { + uw.reloadObjects() + go uw.watchForUpdates() + }) +} + func (uw *urlWatcher) addAPIWatcher(aw *apiWatcher) { uw.mu.Lock() if _, ok := uw.aws[aw]; ok { logger.Panicf("BUG: aw=%p has been already added", aw) } uw.aws[aw] = struct{}{} - objectsByKey := make(map[string]object) - for key, o := range uw.objectsByKey { - objectsByKey[key] = o - } + aw.reloadScrapeWorks(uw.objectsByKey) uw.mu.Unlock() - - aw.reloadScrapeWorks(objectsByKey) } func (uw *urlWatcher) removeAPIWatcher(aw *apiWatcher) { @@ -427,7 +435,7 @@ func (uw *urlWatcher) watchForUpdates() { } resp, err := uw.doRequest(requestURL) if err != nil { - logger.Errorf("cannot performing request to %q: %s", requestURL, err) + logger.Errorf("cannot perform request to %q: %s", requestURL, err) backoffSleep() continue }