lib/promscrape/discovery/kubernetes: reduce memory usage when Kubernetes service discovery is configured on a big number of scrape jobs

Previously vmagent was creating a separate Kubernetes object cache per each scrape job.
This could result in increased memory usage when monitoring a Kubernetes cluster with big number of objects (pods / nodes / services, etc.)
as seen at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1113

Now it uses a shared map of scrape objects across multiple scrape jobs.
This commit is contained in:
Aliaksandr Valialkin 2021-03-05 17:29:54 +02:00
parent 92ddb8f197
commit 5807ff57f3
2 changed files with 115 additions and 37 deletions

View File

@ -8,6 +8,8 @@
- `histogram_stddev(buckets)` - returns standard deviation for the given buckets. - `histogram_stddev(buckets)` - returns standard deviation for the given buckets.
* FEATURE: vmagent: add ability to replicate scrape targets among `vmagent` instances in the cluster with `-promscrape.cluster.replicationFactor` command-line flag. See [these docs](https://victoriametrics.github.io/vmagent.html#scraping-big-number-of-targets). * FEATURE: vmagent: add ability to replicate scrape targets among `vmagent` instances in the cluster with `-promscrape.cluster.replicationFactor` command-line flag. See [these docs](https://victoriametrics.github.io/vmagent.html#scraping-big-number-of-targets).
* BUGFIX: vmagent: reduce memory usage when Kubernetes service discovery is used in big number of distinct jobs by sharing the cache. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1113
# [v1.55.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.55.1) # [v1.55.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.55.1)

View File

@ -124,9 +124,7 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
if uw.role != role { if uw.role != role {
continue continue
} }
uw.mu.Lock() o = uw.objectsByKey.get(key)
o = uw.objectsByKey[key]
uw.mu.Unlock()
if o != nil { if o != nil {
break break
} }
@ -164,10 +162,7 @@ func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseO
logger.Infof("started watcher for %q", apiURL) logger.Infof("started watcher for %q", apiURL)
uw.watchForUpdates(resourceVersion) uw.watchForUpdates(resourceVersion)
logger.Infof("stopped watcher for %q", apiURL) logger.Infof("stopped watcher for %q", apiURL)
uw.mu.Lock() uw.objectsByKey.decRef()
uw.objectsCount.Add(-len(uw.objectsByKey))
uw.objectsRemoved.Add(len(uw.objectsByKey))
uw.mu.Unlock()
aw.mu.Lock() aw.mu.Lock()
delete(aw.watchersByURL, apiURL) delete(aw.watchersByURL, apiURL)
@ -207,12 +202,12 @@ type urlWatcher struct {
parseObject parseObjectFunc parseObject parseObjectFunc
parseObjectList parseObjectListFunc parseObjectList parseObjectListFunc
// mu protects objectsByKey and swosByKey
mu sync.Mutex
// objectsByKey contains the latest state for objects obtained from apiURL // objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object objectsByKey *objectsMap
swosByKey map[string][]interface{}
// mu protects swosByKey
mu sync.Mutex
swosByKey map[string][]interface{}
// the parent apiWatcher // the parent apiWatcher
aw *apiWatcher aw *apiWatcher
@ -220,10 +215,6 @@ type urlWatcher struct {
watchersCount *metrics.Counter watchersCount *metrics.Counter
watchersCreated *metrics.Counter watchersCreated *metrics.Counter
watchersStopped *metrics.Counter watchersStopped *metrics.Counter
objectsCount *metrics.Counter
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
} }
func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher { func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher {
@ -234,7 +225,7 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject
parseObject: parseObject, parseObject: parseObject,
parseObjectList: parseObjectList, parseObjectList: parseObjectList,
objectsByKey: make(map[string]object), objectsByKey: sharedObjectsGlobal.getByAPIURL(role, apiURL),
swosByKey: make(map[string][]interface{}), swosByKey: make(map[string][]interface{}),
aw: aw, aw: aw,
@ -242,10 +233,6 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject
watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)), watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)),
watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)), watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)),
watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)), watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
} }
} }
@ -273,6 +260,7 @@ func (uw *urlWatcher) reloadObjects() string {
} }
return "" return ""
} }
uw.objectsByKey.reload(objectsByKey)
swosByKey := make(map[string][]interface{}, len(objectsByKey)) swosByKey := make(map[string][]interface{}, len(objectsByKey))
for k, o := range objectsByKey { for k, o := range objectsByKey {
labels := o.getTargetLabels(aw) labels := o.getTargetLabels(aw)
@ -282,10 +270,6 @@ func (uw *urlWatcher) reloadObjects() string {
} }
} }
uw.mu.Lock() uw.mu.Lock()
uw.objectsAdded.Add(len(objectsByKey))
uw.objectsRemoved.Add(len(uw.objectsByKey))
uw.objectsCount.Add(len(objectsByKey) - len(uw.objectsByKey))
uw.objectsByKey = objectsByKey
uw.swosByKey = swosByKey uw.swosByKey = swosByKey
uw.mu.Unlock() uw.mu.Unlock()
return metadata.ResourceVersion return metadata.ResourceVersion
@ -388,13 +372,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
key := o.key() key := o.key()
switch we.Type { switch we.Type {
case "ADDED", "MODIFIED": case "ADDED", "MODIFIED":
uw.mu.Lock() uw.objectsByKey.update(key, o)
if uw.objectsByKey[key] == nil {
uw.objectsAdded.Inc()
uw.objectsCount.Inc()
}
uw.objectsByKey[key] = o
uw.mu.Unlock()
labels := o.getTargetLabels(aw) labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
uw.mu.Lock() uw.mu.Lock()
@ -405,12 +383,8 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
} }
uw.mu.Unlock() uw.mu.Unlock()
case "DELETED": case "DELETED":
uw.objectsByKey.remove(key)
uw.mu.Lock() uw.mu.Lock()
if uw.objectsByKey[key] != nil {
uw.objectsRemoved.Inc()
uw.objectsCount.Dec()
}
delete(uw.objectsByKey, key)
delete(uw.swosByKey, key) delete(uw.swosByKey, key)
uw.mu.Unlock() uw.mu.Unlock()
default: default:
@ -513,3 +487,105 @@ func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc)
return nil, nil return nil, nil
} }
} }
type objectsMap struct {
mu sync.Mutex
refCount int
m map[string]object
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
objectsCount *metrics.Counter
}
func (om *objectsMap) incRef() {
om.mu.Lock()
om.refCount++
om.mu.Unlock()
}
func (om *objectsMap) decRef() {
om.mu.Lock()
om.refCount--
if om.refCount < 0 {
logger.Panicf("BUG: refCount cannot be smaller than 0; got %d", om.refCount)
}
if om.refCount == 0 {
// Free up memory occupied by om.m
om.objectsRemoved.Add(len(om.m))
om.objectsCount.Add(-len(om.m))
om.m = make(map[string]object)
}
om.mu.Unlock()
}
func (om *objectsMap) reload(m map[string]object) {
om.mu.Lock()
om.objectsAdded.Add(len(m))
om.objectsRemoved.Add(len(om.m))
om.objectsCount.Add(len(m) - len(om.m))
for k := range om.m {
delete(om.m, k)
}
for k, o := range m {
om.m[k] = o
}
om.mu.Unlock()
}
func (om *objectsMap) update(key string, o object) {
om.mu.Lock()
if om.m[key] == nil {
om.objectsAdded.Inc()
om.objectsCount.Inc()
}
om.m[key] = o
om.mu.Unlock()
}
func (om *objectsMap) remove(key string) {
om.mu.Lock()
if om.m[key] != nil {
om.objectsRemoved.Inc()
om.objectsCount.Dec()
delete(om.m, key)
}
om.mu.Unlock()
}
func (om *objectsMap) get(key string) object {
om.mu.Lock()
o, ok := om.m[key]
om.mu.Unlock()
if !ok {
return nil
}
return o
}
type sharedObjects struct {
mu sync.Mutex
oms map[string]*objectsMap
}
func (so *sharedObjects) getByAPIURL(role, apiURL string) *objectsMap {
so.mu.Lock()
om := so.oms[apiURL]
if om == nil {
om = &objectsMap{
m: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
}
so.oms[apiURL] = om
}
so.mu.Unlock()
om.incRef()
return om
}
var sharedObjectsGlobal = &sharedObjects{
oms: make(map[string]*objectsMap),
}