diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index fd01321506..fc0a9870b7 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -290,6 +290,24 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { if needStart { uw.reloadObjects() go uw.watchForUpdates() + if role == "endpoints" || role == "endpointslice" { + // Refresh endpoints and enpointslices targets in background, since they depend on other object types such as pod and service. + // This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 . + go func() { + sleepTime := 20 * time.Second + for { + time.Sleep(sleepTime) + startTime := time.Now() + gw.mu.Lock() + if uw.needUpdateScrapeWorks { + uw.needUpdateScrapeWorks = false + uw.updateScrapeWorksLocked(uw.aws) + } + gw.mu.Unlock() + sleepTime = time.Since(startTime) + } + }() + } } } } @@ -369,6 +387,8 @@ type urlWatcher struct { // objectsByKey contains the latest state for objects obtained from apiURL objectsByKey map[string]object + needUpdateScrapeWorks bool + resourceVersion string objectsCount *metrics.Counter @@ -416,9 +436,19 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() { if len(uw.awsPending) == 0 { return } - aws := make([]*apiWatcher, 0, len(uw.awsPending)) + uw.updateScrapeWorksLocked(uw.awsPending) for aw := range uw.awsPending { uw.aws[aw] = struct{}{} + } + awsPendingLen := len(uw.awsPending) + uw.awsPending = make(map[*apiWatcher]struct{}) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(awsPendingLen) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-awsPendingLen) +} + +func (uw *urlWatcher) updateScrapeWorksLocked(awsMap map[*apiWatcher]struct{}) { + aws := make([]*apiWatcher, 0, len(awsMap)) + for aw := range awsMap { aws = append(aws, aw) } swosByKey := make([]map[string][]interface{}, len(aws)) @@ -452,9 +482,6 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() { for i, aw := range aws { aw.reloadScrapeWorks(uw, swosByKey[i]) } - uw.awsPending = make(map[*apiWatcher]struct{}) - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(aws)) - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(aws)) } func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) { @@ -475,6 +502,7 @@ func (uw *urlWatcher) reloadObjects() string { return uw.resourceVersion } + startTime := time.Now() requestURL := uw.apiURL resp, err := uw.gw.doRequest(requestURL) if err != nil { @@ -512,6 +540,7 @@ func (uw *urlWatcher) reloadObjects() string { added++ } } + uw.needUpdateScrapeWorks = false uw.gw.mu.Unlock() uw.objectsUpdated.Add(updated) @@ -520,8 +549,8 @@ func (uw *urlWatcher) reloadObjects() string { uw.objectsCount.Add(added - removed) uw.resourceVersion = metadata.ResourceVersion - logger.Infof("reloaded %d objects from %q; updated=%d, removed=%d, added=%d, resourceVersion=%q", - len(objectsByKey), requestURL, updated, removed, added, uw.resourceVersion) + logger.Infof("reloaded %d objects from %q in %.3fs; updated=%d, removed=%d, added=%d, resourceVersion=%q", + len(objectsByKey), requestURL, time.Since(startTime).Seconds(), updated, removed, added, uw.resourceVersion) return uw.resourceVersion } @@ -655,13 +684,13 @@ func (uw *urlWatcher) updateObjectLocked(key string, o object) { return } uw.objectsByKey[key] = o - if len(uw.aws) == 0 { - return - } - labels := o.getTargetLabels(uw.gw) - for aw := range uw.aws { - aw.setScrapeWorks(uw, key, labels) + if len(uw.aws) > 0 { + labels := o.getTargetLabels(uw.gw) + for aw := range uw.aws { + aw.setScrapeWorks(uw, key, labels) + } } + uw.maybeUpdateEndpointsScrapeWorksLocked() } func (uw *urlWatcher) removeObjectLocked(key string) { @@ -669,6 +698,24 @@ func (uw *urlWatcher) removeObjectLocked(key string) { for aw := range uw.aws { aw.removeScrapeWorks(uw, key) } + uw.maybeUpdateEndpointsScrapeWorksLocked() +} + +func (uw *urlWatcher) maybeUpdateEndpointsScrapeWorksLocked() { + if uw.role != "service" && uw.role != "pod" { + // Nothing to update + return + } + namespace := uw.namespace + for _, uwx := range uw.gw.m { + if uwx.role != "endpoints" && uwx.role != "endpointslice" { + continue + } + if uwx.namespace != "" && uwx.namespace != namespace { + continue + } + uwx.needUpdateScrapeWorks = true + } } // Bookmark is a bookmark message from Kubernetes Watch API.