lib/promscrape/discovery/kubernetes: properly update endpoints and endpointslice objects when the related pod or service objects are updated

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240

This is a follow-up for 2341bd48d7
This commit is contained in:
Aliaksandr Valialkin 2022-04-21 12:57:44 +03:00
parent 7ca08945fd
commit fea9d1e6ee
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1

View File

@ -290,6 +290,24 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
if needStart {
uw.reloadObjects()
go uw.watchForUpdates()
if role == "endpoints" || role == "endpointslice" {
// Refresh endpoints and enpointslices targets in background, since they depend on other object types such as pod and service.
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 .
go func() {
sleepTime := 20 * time.Second
for {
time.Sleep(sleepTime)
startTime := time.Now()
gw.mu.Lock()
if uw.needUpdateScrapeWorks {
uw.needUpdateScrapeWorks = false
uw.updateScrapeWorksLocked(uw.aws)
}
gw.mu.Unlock()
sleepTime = time.Since(startTime)
}
}()
}
}
}
}
@ -369,6 +387,8 @@ type urlWatcher struct {
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
needUpdateScrapeWorks bool
resourceVersion string
objectsCount *metrics.Counter
@ -416,9 +436,19 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
if len(uw.awsPending) == 0 {
return
}
aws := make([]*apiWatcher, 0, len(uw.awsPending))
uw.updateScrapeWorksLocked(uw.awsPending)
for aw := range uw.awsPending {
uw.aws[aw] = struct{}{}
}
awsPendingLen := len(uw.awsPending)
uw.awsPending = make(map[*apiWatcher]struct{})
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(awsPendingLen)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-awsPendingLen)
}
func (uw *urlWatcher) updateScrapeWorksLocked(awsMap map[*apiWatcher]struct{}) {
aws := make([]*apiWatcher, 0, len(awsMap))
for aw := range awsMap {
aws = append(aws, aw)
}
swosByKey := make([]map[string][]interface{}, len(aws))
@ -452,9 +482,6 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
for i, aw := range aws {
aw.reloadScrapeWorks(uw, swosByKey[i])
}
uw.awsPending = make(map[*apiWatcher]struct{})
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(aws))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(aws))
}
func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) {
@ -475,6 +502,7 @@ func (uw *urlWatcher) reloadObjects() string {
return uw.resourceVersion
}
startTime := time.Now()
requestURL := uw.apiURL
resp, err := uw.gw.doRequest(requestURL)
if err != nil {
@ -512,6 +540,7 @@ func (uw *urlWatcher) reloadObjects() string {
added++
}
}
uw.needUpdateScrapeWorks = false
uw.gw.mu.Unlock()
uw.objectsUpdated.Add(updated)
@ -520,8 +549,8 @@ func (uw *urlWatcher) reloadObjects() string {
uw.objectsCount.Add(added - removed)
uw.resourceVersion = metadata.ResourceVersion
logger.Infof("reloaded %d objects from %q; updated=%d, removed=%d, added=%d, resourceVersion=%q",
len(objectsByKey), requestURL, updated, removed, added, uw.resourceVersion)
logger.Infof("reloaded %d objects from %q in %.3fs; updated=%d, removed=%d, added=%d, resourceVersion=%q",
len(objectsByKey), requestURL, time.Since(startTime).Seconds(), updated, removed, added, uw.resourceVersion)
return uw.resourceVersion
}
@ -655,13 +684,13 @@ func (uw *urlWatcher) updateObjectLocked(key string, o object) {
return
}
uw.objectsByKey[key] = o
if len(uw.aws) == 0 {
return
}
labels := o.getTargetLabels(uw.gw)
for aw := range uw.aws {
aw.setScrapeWorks(uw, key, labels)
if len(uw.aws) > 0 {
labels := o.getTargetLabels(uw.gw)
for aw := range uw.aws {
aw.setScrapeWorks(uw, key, labels)
}
}
uw.maybeUpdateEndpointsScrapeWorksLocked()
}
func (uw *urlWatcher) removeObjectLocked(key string) {
@ -669,6 +698,24 @@ func (uw *urlWatcher) removeObjectLocked(key string) {
for aw := range uw.aws {
aw.removeScrapeWorks(uw, key)
}
uw.maybeUpdateEndpointsScrapeWorksLocked()
}
func (uw *urlWatcher) maybeUpdateEndpointsScrapeWorksLocked() {
if uw.role != "service" && uw.role != "pod" {
// Nothing to update
return
}
namespace := uw.namespace
for _, uwx := range uw.gw.m {
if uwx.role != "endpoints" && uwx.role != "endpointslice" {
continue
}
if uwx.namespace != "" && uwx.namespace != namespace {
continue
}
uwx.needUpdateScrapeWorks = true
}
}
// Bookmark is a bookmark message from Kubernetes Watch API.