mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/promscrape: reduce latency for k8s GetLabels (#2454)
replaces internStringMap with sync.Map - it greatly reduces lock contention concurently reload scrape work for api watcher - each object labels added by dedicated CPU changes can be tested with following script https://gist.github.com/f41gh7/6f8f8d8719786aff1f18a85c23aebf70
This commit is contained in:
parent
3d0549c982
commit
91e290a8ff
@ -9,6 +9,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@ -1221,24 +1222,27 @@ func internLabelStrings(labels []prompbmarshal.Label) {
|
||||
}
|
||||
|
||||
func internString(s string) string {
|
||||
internStringsMapLock.Lock()
|
||||
defer internStringsMapLock.Unlock()
|
||||
|
||||
if sInterned, ok := internStringsMap[s]; ok {
|
||||
return sInterned
|
||||
if sInterned, ok := internStringsMap.Load(s); ok {
|
||||
return sInterned.(string)
|
||||
}
|
||||
isc := atomic.LoadUint64(&internStringCount)
|
||||
if isc > 100e3 {
|
||||
internStringsMapLock.Lock()
|
||||
internStringsMap = sync.Map{}
|
||||
atomic.AddUint64(&internStringCount, ^(isc - 1))
|
||||
internStringsMapLock.Unlock()
|
||||
}
|
||||
// Make a new copy for s in order to remove references from possible bigger string s refers to.
|
||||
sCopy := string(append([]byte{}, s...))
|
||||
internStringsMap[sCopy] = sCopy
|
||||
if len(internStringsMap) > 100e3 {
|
||||
internStringsMap = make(map[string]string, 100e3)
|
||||
}
|
||||
internStringsMap.Store(sCopy, sCopy)
|
||||
atomic.AddUint64(&internStringCount, 1)
|
||||
return sCopy
|
||||
}
|
||||
|
||||
var (
|
||||
internStringCount = uint64(0)
|
||||
internStringsMapLock sync.Mutex
|
||||
internStringsMap = make(map[string]string, 100e3)
|
||||
internStringsMap = sync.Map{}
|
||||
)
|
||||
|
||||
func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string {
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
@ -522,15 +523,36 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatch
|
||||
for i := range aws {
|
||||
swosByKey[i] = make(map[string][]interface{})
|
||||
}
|
||||
for key, o := range uw.objectsByKey {
|
||||
labels := o.getTargetLabels(uw.gw)
|
||||
for i, aw := range aws {
|
||||
swos := aw.getScrapeWorkObjectsForLabels(labels)
|
||||
if len(swos) > 0 {
|
||||
swosByKey[i][key] = swos
|
||||
}
|
||||
}
|
||||
// update swos concurrently,
|
||||
// it must decrease reload time for high number of records at promscrape file
|
||||
maxConcurrent := cgroup.AvailableCPUs() - 2
|
||||
if maxConcurrent < 1 {
|
||||
maxConcurrent = 1
|
||||
}
|
||||
limit := make(chan struct{}, maxConcurrent)
|
||||
var (
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
for key, o := range uw.objectsByKey {
|
||||
limit <- struct{}{}
|
||||
wg.Add(1)
|
||||
// update swsos for each target at separate CPU
|
||||
go func(key string, o object) {
|
||||
labels := o.getTargetLabels(uw.gw)
|
||||
for i, aw := range aws {
|
||||
swos := aw.getScrapeWorkObjectsForLabels(labels)
|
||||
if len(swos) > 0 {
|
||||
mu.Lock()
|
||||
swosByKey[i][key] = swos
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
<-limit
|
||||
}(key, o)
|
||||
}
|
||||
wg.Wait()
|
||||
for i, aw := range aws {
|
||||
aw.reloadScrapeWorks(uw, swosByKey[i])
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user