lib/promscrape/discovery/kubernetes: cache ScrapeWork objects as soon as the corresponding k8s objects are changed

This should reduce CPU usage and memory usage when Kubernetes contains tens of thousands of objects
This commit is contained in:
Aliaksandr Valialkin 2021-03-02 16:42:48 +02:00
parent f686174329
commit f9c1fe3852
3 changed files with 83 additions and 111 deletions

View File

@ -14,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -241,10 +240,23 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
ok := true ok := true
for j := range sc.KubernetesSDConfigs { for j := range sc.KubernetesSDConfigs {
sdc := &sc.KubernetesSDConfigs[j] sdc := &sc.KubernetesSDConfigs[j]
var okLocal bool swos, err := sdc.GetScrapeWorkObjects(cfg.baseDir, func(metaLabels map[string]string) interface{} {
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "kubernetes_sd_config") target := metaLabels["__address__"]
if ok { sw, err := sc.swc.getScrapeWork(target, nil, metaLabels)
ok = okLocal if err != nil {
logger.Errorf("cannot create kubernetes_sd_config target target %q for job_name %q: %s", target, sc.swc.jobName, err)
return nil
}
return sw
})
if err != nil {
logger.Errorf("skipping kubernetes_sd_config targets for job_name %q because of error: %s", sc.swc.jobName, err)
ok = false
break
}
for _, swo := range swos {
sw := swo.(*ScrapeWork)
dst = append(dst, sw)
} }
} }
if ok { if ok {
@ -252,7 +264,7 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
} }
swsPrev := swsPrevByJob[sc.swc.jobName] swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 { if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering kubernetes targets for job %q, so preserving the previous targets", sc.swc.jobName) logger.Errorf("there were errors when discovering kubernetes_sd_config targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...) dst = append(dst[:dstLen], swsPrev...)
} }
} }
@ -555,8 +567,6 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
disableKeepAlive: sc.DisableKeepAlive, disableKeepAlive: sc.DisableKeepAlive,
streamParse: sc.StreamParse, streamParse: sc.StreamParse,
scrapeAlignInterval: sc.ScrapeAlignInterval, scrapeAlignInterval: sc.ScrapeAlignInterval,
cache: newScrapeWorkCache(),
} }
return swc, nil return swc, nil
} }
@ -580,64 +590,6 @@ type scrapeWorkConfig struct {
disableKeepAlive bool disableKeepAlive bool
streamParse bool streamParse bool
scrapeAlignInterval time.Duration scrapeAlignInterval time.Duration
cache *scrapeWorkCache
}
type scrapeWorkCache struct {
mu sync.Mutex
m map[string]*scrapeWorkEntry
lastCleanupTime uint64
}
type scrapeWorkEntry struct {
sw *ScrapeWork
lastAccessTime uint64
}
func newScrapeWorkCache() *scrapeWorkCache {
return &scrapeWorkCache{
m: make(map[string]*scrapeWorkEntry),
}
}
func (swc *scrapeWorkCache) Get(key string) *ScrapeWork {
scrapeWorkCacheRequests.Inc()
currentTime := fasttime.UnixTimestamp()
swc.mu.Lock()
swe := swc.m[key]
if swe != nil {
swe.lastAccessTime = currentTime
}
swc.mu.Unlock()
if swe == nil {
return nil
}
scrapeWorkCacheHits.Inc()
return swe.sw
}
var (
scrapeWorkCacheRequests = metrics.NewCounter(`vm_promscrape_scrapework_cache_requests_total`)
scrapeWorkCacheHits = metrics.NewCounter(`vm_promscrape_scrapework_cache_hits_total`)
)
func (swc *scrapeWorkCache) Set(key string, sw *ScrapeWork) {
currentTime := fasttime.UnixTimestamp()
swc.mu.Lock()
swc.m[key] = &scrapeWorkEntry{
sw: sw,
lastAccessTime: currentTime,
}
if currentTime > swc.lastCleanupTime+10*60 {
for k, swe := range swc.m {
if currentTime > swe.lastAccessTime+2*60 {
delete(swc.m, k)
}
}
swc.lastCleanupTime = currentTime
}
swc.mu.Unlock()
} }
type targetLabelsGetter interface { type targetLabelsGetter interface {
@ -761,26 +713,6 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
return dst return dst
} }
func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
bb := scrapeWorkKeyBufPool.Get()
defer scrapeWorkKeyBufPool.Put(bb)
bb.B = appendScrapeWorkKey(bb.B[:0], target, extraLabels, metaLabels)
keyStrUnsafe := bytesutil.ToUnsafeString(bb.B)
if needSkipScrapeWork(keyStrUnsafe) {
return nil, nil
}
if sw := swc.cache.Get(keyStrUnsafe); sw != nil {
return sw, nil
}
sw, err := swc.getScrapeWorkReal(target, extraLabels, metaLabels)
if err == nil {
swc.cache.Set(string(bb.B), sw)
}
return sw, err
}
var scrapeWorkKeyBufPool bytesutil.ByteBufferPool
func appendScrapeWorkKey(dst []byte, target string, extraLabels, metaLabels map[string]string) []byte { func appendScrapeWorkKey(dst []byte, target string, extraLabels, metaLabels map[string]string) []byte {
dst = append(dst, target...) dst = append(dst, target...)
dst = append(dst, ',') dst = append(dst, ',')
@ -814,7 +746,17 @@ func appendSortedKeyValuePairs(dst []byte, m map[string]string) []byte {
return dst return dst
} }
func (swc *scrapeWorkConfig) getScrapeWorkReal(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) { var scrapeWorkKeyBufPool bytesutil.ByteBufferPool
func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
// Verify whether the scrape work must be skipped.
bb := scrapeWorkKeyBufPool.Get()
defer scrapeWorkKeyBufPool.Put(bb)
bb.B = appendScrapeWorkKey(bb.B[:0], target, extraLabels, metaLabels)
if needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B)) {
return nil, nil
}
labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params) labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params)
var originalLabels []prompbmarshal.Label var originalLabels []prompbmarshal.Label
if !*dropOriginalLabels { if !*dropOriginalLabels {

View File

@ -36,15 +36,15 @@ func (ac *apiConfig) mustStop() {
var configMap = discoveryutils.NewConfigMap() var configMap = discoveryutils.NewConfigMap()
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { func getAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) }) v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir, swcFunc) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
return v.(*apiConfig), nil return v.(*apiConfig), nil
} }
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) {
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %w", err) return nil, fmt.Errorf("cannot parse auth config: %w", err)
@ -97,7 +97,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}, },
Timeout: *apiServerTimeout, Timeout: *apiServerTimeout,
} }
aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors) aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors, swcFunc)
cfg := &apiConfig{ cfg := &apiConfig{
aw: aw, aw: aw,
} }
@ -141,6 +141,9 @@ type apiWatcher struct {
// Selectors to apply during watch // Selectors to apply during watch
selectors []Selector selectors []Selector
// Constructor for creating ScrapeWork objects from labels.
swcFunc ScrapeWorkConstructorFunc
// mu protects watchersByURL // mu protects watchersByURL
mu sync.Mutex mu sync.Mutex
@ -157,7 +160,7 @@ func (aw *apiWatcher) mustStop() {
aw.wg.Wait() aw.wg.Wait()
} }
func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector) *apiWatcher { func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector, swcFunc ScrapeWorkConstructorFunc) *apiWatcher {
stopCtx, stopFunc := context.WithCancel(context.Background()) stopCtx, stopFunc := context.WithCancel(context.Background())
return &apiWatcher{ return &apiWatcher{
apiServer: apiServer, apiServer: apiServer,
@ -165,6 +168,7 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa
client: client, client: client,
namespaces: namespaces, namespaces: namespaces,
selectors: selectors, selectors: selectors,
swcFunc: swcFunc,
watchersByURL: make(map[string]*urlWatcher), watchersByURL: make(map[string]*urlWatcher),
@ -173,23 +177,23 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa
} }
} }
// getLabelsForRole returns all the sets of labels for the given role. // getScrapeWorkObjectsForRole returns all the ScrapeWork objects for the given role.
func (aw *apiWatcher) getLabelsForRole(role string) []map[string]string { func (aw *apiWatcher) getScrapeWorkObjectsForRole(role string) []interface{} {
aw.startWatchersForRole(role) aw.startWatchersForRole(role)
var ms []map[string]string var swos []interface{}
aw.mu.Lock() aw.mu.Lock()
for _, uw := range aw.watchersByURL { for _, uw := range aw.watchersByURL {
if uw.role != role { if uw.role != role {
continue continue
} }
uw.mu.Lock() uw.mu.Lock()
for _, labels := range uw.labelsByKey { for _, swosLocal := range uw.swosByKey {
ms = append(ms, labels...) swos = append(swos, swosLocal...)
} }
uw.mu.Unlock() uw.mu.Unlock()
} }
aw.mu.Unlock() aw.mu.Unlock()
return ms return swos
} }
// getObjectByRole returns an object with the given (namespace, name) key and the given role. // getObjectByRole returns an object with the given (namespace, name) key and the given role.
@ -288,12 +292,12 @@ type urlWatcher struct {
parseObject parseObjectFunc parseObject parseObjectFunc
parseObjectList parseObjectListFunc parseObjectList parseObjectListFunc
// mu protects objectsByKey and labelsByKey // mu protects objectsByKey and swosByKey
mu sync.Mutex mu sync.Mutex
// objectsByKey contains the latest state for objects obtained from apiURL // objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object objectsByKey map[string]object
labelsByKey map[string][]map[string]string swosByKey map[string][]interface{}
// the parent apiWatcher // the parent apiWatcher
aw *apiWatcher aw *apiWatcher
@ -316,7 +320,7 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject
parseObjectList: parseObjectList, parseObjectList: parseObjectList,
objectsByKey: make(map[string]object), objectsByKey: make(map[string]object),
labelsByKey: make(map[string][]map[string]string), swosByKey: make(map[string][]interface{}),
aw: aw, aw: aw,
@ -354,20 +358,35 @@ func (uw *urlWatcher) reloadObjects() string {
} }
return "" return ""
} }
labelsByKey := make(map[string][]map[string]string, len(objectsByKey)) swosByKey := make(map[string][]interface{}, len(objectsByKey))
for k, o := range objectsByKey { for k, o := range objectsByKey {
labelsByKey[k] = o.getTargetLabels(aw) labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKey[k] = swos
}
} }
uw.mu.Lock() uw.mu.Lock()
uw.objectsRemoved.Add(-len(uw.objectsByKey)) uw.objectsRemoved.Add(-len(uw.objectsByKey))
uw.objectsAdded.Add(len(objectsByKey)) uw.objectsAdded.Add(len(objectsByKey))
uw.objectsCount.Add(len(objectsByKey) - len(uw.objectsByKey)) uw.objectsCount.Add(len(objectsByKey) - len(uw.objectsByKey))
uw.objectsByKey = objectsByKey uw.objectsByKey = objectsByKey
uw.labelsByKey = labelsByKey uw.swosByKey = swosByKey
uw.mu.Unlock() uw.mu.Unlock()
return metadata.ResourceVersion return metadata.ResourceVersion
} }
func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} {
swos := make([]interface{}, 0, len(labelss))
for _, labels := range labelss {
swo := swcFunc(labels)
if swo != nil {
swos = append(swos, swo)
}
}
return swos
}
// watchForUpdates watches for object updates starting from resourceVersion and updates the corresponding objects to the latest state. // watchForUpdates watches for object updates starting from resourceVersion and updates the corresponding objects to the latest state.
// //
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
@ -439,6 +458,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) {
// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events. // readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events.
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
aw := uw.aw
d := json.NewDecoder(r) d := json.NewDecoder(r)
var we WatchEvent var we WatchEvent
for { for {
@ -459,9 +479,14 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
} }
uw.objectsByKey[key] = o uw.objectsByKey[key] = o
uw.mu.Unlock() uw.mu.Unlock()
labels := o.getTargetLabels(uw.aw) labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
uw.mu.Lock() uw.mu.Lock()
uw.labelsByKey[key] = labels if len(swos) > 0 {
uw.swosByKey[key] = swos
} else {
delete(uw.swosByKey, key)
}
uw.mu.Unlock() uw.mu.Unlock()
case "DELETED": case "DELETED":
uw.mu.Lock() uw.mu.Lock()
@ -470,7 +495,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
uw.objectsCount.Dec() uw.objectsCount.Dec()
} }
delete(uw.objectsByKey, key) delete(uw.objectsByKey, key)
delete(uw.labelsByKey, key) delete(uw.swosByKey, key)
uw.mu.Unlock() uw.mu.Unlock()
default: default:
return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role) return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role)

View File

@ -37,15 +37,20 @@ type Selector struct {
Field string `yaml:"field"` Field string `yaml:"field"`
} }
// GetLabels returns labels for the given sdc and baseDir. // ScrapeWorkConstructorFunc must construct ScrapeWork object for the given metaLabels.
func (sdc *SDConfig) GetLabels(baseDir string) ([]map[string]string, error) { type ScrapeWorkConstructorFunc func(metaLabels map[string]string) interface{}
cfg, err := getAPIConfig(sdc, baseDir)
// GetScrapeWorkObjects returns ScrapeWork objects for the given sdc and baseDir.
//
// swcFunc is used for constructing such objects.
func (sdc *SDConfig) GetScrapeWorkObjects(baseDir string, swcFunc ScrapeWorkConstructorFunc) ([]interface{}, error) {
cfg, err := getAPIConfig(sdc, baseDir, swcFunc)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot create API config: %w", err) return nil, fmt.Errorf("cannot create API config: %w", err)
} }
switch sdc.Role { switch sdc.Role {
case "node", "pod", "service", "endpoints", "endpointslices", "ingress": case "node", "pod", "service", "endpoints", "endpointslices", "ingress":
return cfg.aw.getLabelsForRole(sdc.Role), nil return cfg.aw.getScrapeWorkObjectsForRole(sdc.Role), nil
default: default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`; skipping it", sdc.Role) return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`; skipping it", sdc.Role)
} }