lib/promscrape/discovery/kubernetes: improve the performance of urlWatcher.reloadObjects() on multi-CPU systems

Parallelize the generation of ScrapeWork objects there. Previously they were generated in a single goroutine.
This commit is contained in:
Aliaksandr Valialkin 2022-04-22 13:21:58 +03:00
parent 60f74dab56
commit cc6eae6992
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1

View File

@ -301,7 +301,7 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
gw.mu.Lock()
if uw.needUpdateScrapeWorks {
uw.needUpdateScrapeWorks = false
uw.updateScrapeWorksLocked(uw.aws)
uw.updateScrapeWorksLocked(uw.objectsByKey, uw.aws)
}
gw.mu.Unlock()
sleepTime = time.Since(startTime)
@ -436,7 +436,7 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
if len(uw.awsPending) == 0 {
return
}
uw.updateScrapeWorksLocked(uw.awsPending)
uw.updateScrapeWorksLocked(uw.objectsByKey, uw.awsPending)
for aw := range uw.awsPending {
uw.aws[aw] = struct{}{}
}
@ -446,7 +446,10 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-awsPendingLen)
}
func (uw *urlWatcher) updateScrapeWorksLocked(awsMap map[*apiWatcher]struct{}) {
func (uw *urlWatcher) updateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) {
if len(objectsByKey) == 0 || len(awsMap) == 0 {
return
}
aws := make([]*apiWatcher, 0, len(awsMap))
for aw := range awsMap {
aws = append(aws, aw)
@ -461,7 +464,7 @@ func (uw *urlWatcher) updateScrapeWorksLocked(awsMap map[*apiWatcher]struct{}) {
var swosByKeyLock sync.Mutex
var wg sync.WaitGroup
limiterCh := make(chan struct{}, cgroup.AvailableCPUs())
for key, o := range uw.objectsByKey {
for key, o := range objectsByKey {
labels := o.getTargetLabels(uw.gw)
wg.Add(1)
limiterCh <- struct{}{}
@ -484,6 +487,14 @@ func (uw *urlWatcher) updateScrapeWorksLocked(awsMap map[*apiWatcher]struct{}) {
}
}
func (uw *urlWatcher) removeScrapeWorksLocked(keys []string) {
for _, key := range keys {
for aw := range uw.aws {
aw.removeScrapeWorks(uw, key)
}
}
}
func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) {
if _, ok := uw.awsPending[aw]; ok {
delete(uw.awsPending, aw)
@ -523,34 +534,46 @@ func (uw *urlWatcher) reloadObjects() string {
}
uw.gw.mu.Lock()
var updated, removed, added int
for key := range uw.objectsByKey {
objectsAdded := make(map[string]object)
objectsUpdated := make(map[string]object)
var objectsRemoved []string
for key, oPrev := range uw.objectsByKey {
o, ok := objectsByKey[key]
if ok {
uw.updateObjectLocked(key, o)
updated++
if !reflect.DeepEqual(oPrev, o) {
objectsUpdated[key] = o
}
// Overwrite oPrev with o even if these objects are equal.
// This should free up memory associated with oPrev.
uw.objectsByKey[key] = o
} else {
uw.removeObjectLocked(key)
removed++
objectsRemoved = append(objectsRemoved, key)
delete(uw.objectsByKey, key)
}
}
for key, o := range objectsByKey {
if _, ok := uw.objectsByKey[key]; !ok {
uw.updateObjectLocked(key, o)
added++
objectsAdded[key] = o
uw.objectsByKey[key] = o
}
}
uw.removeScrapeWorksLocked(objectsRemoved)
uw.updateScrapeWorksLocked(objectsUpdated, uw.aws)
uw.updateScrapeWorksLocked(objectsAdded, uw.aws)
uw.needUpdateScrapeWorks = false
if len(objectsRemoved) > 0 || len(objectsUpdated) > 0 || len(objectsAdded) > 0 {
uw.maybeUpdateEndpointsScrapeWorksLocked()
}
uw.gw.mu.Unlock()
uw.objectsUpdated.Add(updated)
uw.objectsRemoved.Add(removed)
uw.objectsAdded.Add(added)
uw.objectsCount.Add(added - removed)
uw.objectsUpdated.Add(len(objectsUpdated))
uw.objectsRemoved.Add(len(objectsRemoved))
uw.objectsAdded.Add(len(objectsAdded))
uw.objectsCount.Add(len(objectsAdded) - len(objectsRemoved))
uw.resourceVersion = metadata.ResourceVersion
logger.Infof("reloaded %d objects from %q in %.3fs; updated=%d, removed=%d, added=%d, resourceVersion=%q",
len(objectsByKey), requestURL, time.Since(startTime).Seconds(), updated, removed, added, uw.resourceVersion)
len(objectsByKey), requestURL, time.Since(startTime).Seconds(), len(objectsUpdated), len(objectsRemoved), len(objectsAdded), uw.resourceVersion)
return uw.resourceVersion
}
@ -631,12 +654,6 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
}
key := o.key()
uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; !ok {
uw.objectsCount.Inc()
uw.objectsAdded.Inc()
} else {
uw.objectsUpdated.Inc()
}
uw.updateObjectLocked(key, o)
uw.gw.mu.Unlock()
case "DELETED":
@ -646,10 +663,6 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
}
key := o.key()
uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; ok {
uw.objectsCount.Dec()
uw.objectsRemoved.Inc()
}
uw.removeObjectLocked(key)
uw.gw.mu.Unlock()
case "BOOKMARK":
@ -679,11 +692,19 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
func (uw *urlWatcher) updateObjectLocked(key string, o object) {
oPrev, ok := uw.objectsByKey[key]
if ok && reflect.DeepEqual(oPrev, o) {
// Nothing to do, since the new object is equal to the previous one.
return
}
// Overwrite oPrev with o even if these objects are equal.
// This should free up memory associated with oPrev.
uw.objectsByKey[key] = o
if !ok {
uw.objectsCount.Inc()
uw.objectsAdded.Inc()
} else {
if reflect.DeepEqual(oPrev, o) {
// Nothing to do, since the new object is equal to the previous one.
return
}
uw.objectsUpdated.Inc()
}
if len(uw.aws) > 0 {
labels := o.getTargetLabels(uw.gw)
for aw := range uw.aws {
@ -694,6 +715,11 @@ func (uw *urlWatcher) updateObjectLocked(key string, o object) {
}
func (uw *urlWatcher) removeObjectLocked(key string) {
if _, ok := uw.objectsByKey[key]; !ok {
return
}
uw.objectsCount.Dec()
uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key)
for aw := range uw.aws {
aw.removeScrapeWorks(uw, key)
@ -702,7 +728,7 @@ func (uw *urlWatcher) removeObjectLocked(key string) {
}
func (uw *urlWatcher) maybeUpdateEndpointsScrapeWorksLocked() {
if uw.role != "service" && uw.role != "pod" {
if uw.role != "pod" && uw.role != "service" {
// Nothing to update
return
}