From 429848a67d9588f13c2bad2b76b1dd7adeb810fa Mon Sep 17 00:00:00 2001 From: Nikolay Date: Wed, 20 Apr 2022 16:09:40 +0300 Subject: [PATCH] lib/promscrape: reduce latency for k8s GetLabels (#2454) replaces internStringMap with sync.Map - it greatly reduces lock contention concurently reload scrape work for api watcher - each object labels added by dedicated CPU changes can be tested with following script https://gist.github.com/f41gh7/6f8f8d8719786aff1f18a85c23aebf70 --- lib/promscrape/config.go | 24 +++++++----- .../discovery/kubernetes/api_watcher.go | 38 +++++++++++++++---- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 40d2b8f21f..72375c3c6e 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -1221,24 +1222,27 @@ func internLabelStrings(labels []prompbmarshal.Label) { } func internString(s string) string { - internStringsMapLock.Lock() - defer internStringsMapLock.Unlock() - - if sInterned, ok := internStringsMap[s]; ok { - return sInterned + if sInterned, ok := internStringsMap.Load(s); ok { + return sInterned.(string) + } + isc := atomic.LoadUint64(&internStringCount) + if isc > 100e3 { + internStringsMapLock.Lock() + internStringsMap = sync.Map{} + atomic.AddUint64(&internStringCount, ^(isc - 1)) + internStringsMapLock.Unlock() } // Make a new copy for s in order to remove references from possible bigger string s refers to. sCopy := string(append([]byte{}, s...)) - internStringsMap[sCopy] = sCopy - if len(internStringsMap) > 100e3 { - internStringsMap = make(map[string]string, 100e3) - } + internStringsMap.Store(sCopy, sCopy) + atomic.AddUint64(&internStringCount, 1) return sCopy } var ( + internStringCount = uint64(0) internStringsMapLock sync.Mutex - internStringsMap = make(map[string]string, 100e3) + internStringsMap = sync.Map{} ) func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string { diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index da0801cee6..8c5873394f 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -16,6 +16,7 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/metrics" @@ -522,15 +523,36 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatch for i := range aws { swosByKey[i] = make(map[string][]interface{}) } - for key, o := range uw.objectsByKey { - labels := o.getTargetLabels(uw.gw) - for i, aw := range aws { - swos := aw.getScrapeWorkObjectsForLabels(labels) - if len(swos) > 0 { - swosByKey[i][key] = swos - } - } + // update swos concurrently, + // it must decrease reload time for high number of records at promscrape file + maxConcurrent := cgroup.AvailableCPUs() - 2 + if maxConcurrent < 1 { + maxConcurrent = 1 } + limit := make(chan struct{}, maxConcurrent) + var ( + mu sync.Mutex + wg sync.WaitGroup + ) + for key, o := range uw.objectsByKey { + limit <- struct{}{} + wg.Add(1) + // update swsos for each target at separate CPU + go func(key string, o object) { + labels := o.getTargetLabels(uw.gw) + for i, aw := range aws { + swos := aw.getScrapeWorkObjectsForLabels(labels) + if len(swos) > 0 { + mu.Lock() + swosByKey[i][key] = swos + mu.Unlock() + } + } + wg.Done() + <-limit + }(key, o) + } + wg.Wait() for i, aw := range aws { aw.reloadScrapeWorks(uw, swosByKey[i]) }