diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index fc0a9870b7..3f7e013a53 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -301,7 +301,7 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { gw.mu.Lock() if uw.needUpdateScrapeWorks { uw.needUpdateScrapeWorks = false - uw.updateScrapeWorksLocked(uw.aws) + uw.updateScrapeWorksLocked(uw.objectsByKey, uw.aws) } gw.mu.Unlock() sleepTime = time.Since(startTime) @@ -436,7 +436,7 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() { if len(uw.awsPending) == 0 { return } - uw.updateScrapeWorksLocked(uw.awsPending) + uw.updateScrapeWorksLocked(uw.objectsByKey, uw.awsPending) for aw := range uw.awsPending { uw.aws[aw] = struct{}{} } @@ -446,7 +446,10 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() { metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-awsPendingLen) } -func (uw *urlWatcher) updateScrapeWorksLocked(awsMap map[*apiWatcher]struct{}) { +func (uw *urlWatcher) updateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) { + if len(objectsByKey) == 0 || len(awsMap) == 0 { + return + } aws := make([]*apiWatcher, 0, len(awsMap)) for aw := range awsMap { aws = append(aws, aw) @@ -461,7 +464,7 @@ func (uw *urlWatcher) updateScrapeWorksLocked(awsMap map[*apiWatcher]struct{}) { var swosByKeyLock sync.Mutex var wg sync.WaitGroup limiterCh := make(chan struct{}, cgroup.AvailableCPUs()) - for key, o := range uw.objectsByKey { + for key, o := range objectsByKey { labels := o.getTargetLabels(uw.gw) wg.Add(1) limiterCh <- struct{}{} @@ -484,6 +487,14 @@ func (uw *urlWatcher) updateScrapeWorksLocked(awsMap map[*apiWatcher]struct{}) { } } +func (uw *urlWatcher) removeScrapeWorksLocked(keys []string) { + for _, key := range keys { + for aw := range uw.aws { + aw.removeScrapeWorks(uw, key) + } + } +} + func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) { if _, ok := uw.awsPending[aw]; ok { delete(uw.awsPending, aw) @@ -523,34 +534,46 @@ func (uw *urlWatcher) reloadObjects() string { } uw.gw.mu.Lock() - var updated, removed, added int - for key := range uw.objectsByKey { + objectsAdded := make(map[string]object) + objectsUpdated := make(map[string]object) + var objectsRemoved []string + for key, oPrev := range uw.objectsByKey { o, ok := objectsByKey[key] if ok { - uw.updateObjectLocked(key, o) - updated++ + if !reflect.DeepEqual(oPrev, o) { + objectsUpdated[key] = o + } + // Overwrite oPrev with o even if these objects are equal. + // This should free up memory associated with oPrev. + uw.objectsByKey[key] = o } else { - uw.removeObjectLocked(key) - removed++ + objectsRemoved = append(objectsRemoved, key) + delete(uw.objectsByKey, key) } } for key, o := range objectsByKey { if _, ok := uw.objectsByKey[key]; !ok { - uw.updateObjectLocked(key, o) - added++ + objectsAdded[key] = o + uw.objectsByKey[key] = o } } + uw.removeScrapeWorksLocked(objectsRemoved) + uw.updateScrapeWorksLocked(objectsUpdated, uw.aws) + uw.updateScrapeWorksLocked(objectsAdded, uw.aws) uw.needUpdateScrapeWorks = false + if len(objectsRemoved) > 0 || len(objectsUpdated) > 0 || len(objectsAdded) > 0 { + uw.maybeUpdateEndpointsScrapeWorksLocked() + } uw.gw.mu.Unlock() - uw.objectsUpdated.Add(updated) - uw.objectsRemoved.Add(removed) - uw.objectsAdded.Add(added) - uw.objectsCount.Add(added - removed) + uw.objectsUpdated.Add(len(objectsUpdated)) + uw.objectsRemoved.Add(len(objectsRemoved)) + uw.objectsAdded.Add(len(objectsAdded)) + uw.objectsCount.Add(len(objectsAdded) - len(objectsRemoved)) uw.resourceVersion = metadata.ResourceVersion logger.Infof("reloaded %d objects from %q in %.3fs; updated=%d, removed=%d, added=%d, resourceVersion=%q", - len(objectsByKey), requestURL, time.Since(startTime).Seconds(), updated, removed, added, uw.resourceVersion) + len(objectsByKey), requestURL, time.Since(startTime).Seconds(), len(objectsUpdated), len(objectsRemoved), len(objectsAdded), uw.resourceVersion) return uw.resourceVersion } @@ -631,12 +654,6 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { } key := o.key() uw.gw.mu.Lock() - if _, ok := uw.objectsByKey[key]; !ok { - uw.objectsCount.Inc() - uw.objectsAdded.Inc() - } else { - uw.objectsUpdated.Inc() - } uw.updateObjectLocked(key, o) uw.gw.mu.Unlock() case "DELETED": @@ -646,10 +663,6 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { } key := o.key() uw.gw.mu.Lock() - if _, ok := uw.objectsByKey[key]; ok { - uw.objectsCount.Dec() - uw.objectsRemoved.Inc() - } uw.removeObjectLocked(key) uw.gw.mu.Unlock() case "BOOKMARK": @@ -679,11 +692,19 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { func (uw *urlWatcher) updateObjectLocked(key string, o object) { oPrev, ok := uw.objectsByKey[key] - if ok && reflect.DeepEqual(oPrev, o) { - // Nothing to do, since the new object is equal to the previous one. - return - } + // Overwrite oPrev with o even if these objects are equal. + // This should free up memory associated with oPrev. uw.objectsByKey[key] = o + if !ok { + uw.objectsCount.Inc() + uw.objectsAdded.Inc() + } else { + if reflect.DeepEqual(oPrev, o) { + // Nothing to do, since the new object is equal to the previous one. + return + } + uw.objectsUpdated.Inc() + } if len(uw.aws) > 0 { labels := o.getTargetLabels(uw.gw) for aw := range uw.aws { @@ -694,6 +715,11 @@ func (uw *urlWatcher) updateObjectLocked(key string, o object) { } func (uw *urlWatcher) removeObjectLocked(key string) { + if _, ok := uw.objectsByKey[key]; !ok { + return + } + uw.objectsCount.Dec() + uw.objectsRemoved.Inc() delete(uw.objectsByKey, key) for aw := range uw.aws { aw.removeScrapeWorks(uw, key) @@ -702,7 +728,7 @@ func (uw *urlWatcher) removeObjectLocked(key string) { } func (uw *urlWatcher) maybeUpdateEndpointsScrapeWorksLocked() { - if uw.role != "service" && uw.role != "pod" { + if uw.role != "pod" && uw.role != "service" { // Nothing to update return }