mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
lib/promscrape/discovery: fixes after 133b288681
- Removed a deadlock in addAPIWatcher - Do not create unused ScrapeWork objects - Do not spend CPU resources on creating objectByKey map in addAPIWatcher This work is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1125
This commit is contained in:
parent
afa5b58c2d
commit
620f05cd2c
@ -132,7 +132,7 @@ func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []
|
||||
|
||||
// getScrapeWorkObjects returns all the ScrapeWork objects for the given aw.
|
||||
func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
|
||||
aw.startWatchersForRole(aw.sdc.Role)
|
||||
aw.startWatchersForRole(aw.sdc.Role, true)
|
||||
aw.swosByKeyLock.Lock()
|
||||
defer aw.swosByKeyLock.Unlock()
|
||||
|
||||
@ -154,7 +154,7 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
|
||||
return nil
|
||||
}
|
||||
key := namespace + "/" + name
|
||||
aw.startWatchersForRole(role)
|
||||
aw.startWatchersForRole(role, false)
|
||||
aw.watchersByURLLock.Lock()
|
||||
defer aw.watchersByURLLock.Unlock()
|
||||
|
||||
@ -172,15 +172,15 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) startWatchersForRole(role string) {
|
||||
func (aw *apiWatcher) startWatchersForRole(role string, registerAPIWatcher bool) {
|
||||
paths := getAPIPaths(role, aw.sdc.Namespaces.Names, aw.sdc.Selectors)
|
||||
for _, path := range paths {
|
||||
apiURL := aw.apiServer + path
|
||||
aw.startWatcherForURL(role, apiURL)
|
||||
aw.startWatcherForURL(role, apiURL, registerAPIWatcher)
|
||||
}
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) startWatcherForURL(role, apiURL string) {
|
||||
func (aw *apiWatcher) startWatcherForURL(role, apiURL string, registerAPIWatcher bool) {
|
||||
aw.watchersByURLLock.Lock()
|
||||
if aw.watchersByURL[apiURL] != nil {
|
||||
// Watcher for the given path already exists.
|
||||
@ -188,16 +188,21 @@ func (aw *apiWatcher) startWatcherForURL(role, apiURL string) {
|
||||
return
|
||||
}
|
||||
uw := getURLWatcher(role, apiURL, aw.sdc.ProxyURL.URL(), aw.ac)
|
||||
uw.addAPIWatcher(aw)
|
||||
aw.watchersByURL[apiURL] = uw
|
||||
aw.watchersByURLLock.Unlock()
|
||||
uw.initOnce()
|
||||
if registerAPIWatcher {
|
||||
uw.addAPIWatcher(aw)
|
||||
}
|
||||
|
||||
aw.wg.Add(1)
|
||||
go func() {
|
||||
defer aw.wg.Done()
|
||||
<-aw.stopCh
|
||||
if registerAPIWatcher {
|
||||
uw.removeAPIWatcher(aw)
|
||||
}
|
||||
aw.watchersByURLLock.Lock()
|
||||
uw.removeAPIWatcher(aw)
|
||||
delete(aw.watchersByURL, apiURL)
|
||||
aw.watchersByURLLock.Unlock()
|
||||
}()
|
||||
@ -232,6 +237,9 @@ type urlWatcher struct {
|
||||
parseObject parseObjectFunc
|
||||
parseObjectList parseObjectListFunc
|
||||
|
||||
// once is used for initializing the urlWatcher only once
|
||||
once sync.Once
|
||||
|
||||
// mu protects aws, objectsByKey and resourceVersion
|
||||
mu sync.Mutex
|
||||
|
||||
@ -282,24 +290,24 @@ func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config)
|
||||
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
|
||||
objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
|
||||
}
|
||||
uw.reloadObjects()
|
||||
go uw.watchForUpdates()
|
||||
return uw
|
||||
}
|
||||
|
||||
func (uw *urlWatcher) initOnce() {
|
||||
uw.once.Do(func() {
|
||||
uw.reloadObjects()
|
||||
go uw.watchForUpdates()
|
||||
})
|
||||
}
|
||||
|
||||
func (uw *urlWatcher) addAPIWatcher(aw *apiWatcher) {
|
||||
uw.mu.Lock()
|
||||
if _, ok := uw.aws[aw]; ok {
|
||||
logger.Panicf("BUG: aw=%p has been already added", aw)
|
||||
}
|
||||
uw.aws[aw] = struct{}{}
|
||||
objectsByKey := make(map[string]object)
|
||||
for key, o := range uw.objectsByKey {
|
||||
objectsByKey[key] = o
|
||||
}
|
||||
aw.reloadScrapeWorks(uw.objectsByKey)
|
||||
uw.mu.Unlock()
|
||||
|
||||
aw.reloadScrapeWorks(objectsByKey)
|
||||
}
|
||||
|
||||
func (uw *urlWatcher) removeAPIWatcher(aw *apiWatcher) {
|
||||
@ -427,7 +435,7 @@ func (uw *urlWatcher) watchForUpdates() {
|
||||
}
|
||||
resp, err := uw.doRequest(requestURL)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot performing request to %q: %s", requestURL, err)
|
||||
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
|
||||
backoffSleep()
|
||||
continue
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user