mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-16 00:41:24 +01:00
lib/promscrape: reduce processing time for big number of discovered targets by processing them in parallel
This commit is contained in:
parent
f8baf1a76d
commit
55eb983193
@ -10,6 +10,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
@ -608,14 +609,42 @@ func appendGCEScrapeWork(dst []*ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkCo
|
|||||||
|
|
||||||
func appendScrapeWorkForTargetLabels(dst []*ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []*ScrapeWork {
|
func appendScrapeWorkForTargetLabels(dst []*ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []*ScrapeWork {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
for _, metaLabels := range targetLabels {
|
// Process targetLabels in parallel in order to reduce processing time for big number of targetLabels.
|
||||||
|
type result struct {
|
||||||
|
sw *ScrapeWork
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
resultCh := make(chan result)
|
||||||
|
workCh := make(chan map[string]string)
|
||||||
|
goroutines := cgroup.AvailableCPUs()
|
||||||
|
for i := 0; i < goroutines; i++ {
|
||||||
|
go func() {
|
||||||
|
for metaLabels := range workCh {
|
||||||
target := metaLabels["__address__"]
|
target := metaLabels["__address__"]
|
||||||
var err error
|
sw, err := getScrapeWork(swc, target, nil, metaLabels)
|
||||||
dst, err = appendScrapeWork(dst, swc, target, nil, metaLabels)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("error when parsing `%s` target %q for `job_name` %q: %s; skipping it", sectionName, target, swc.jobName, err)
|
err = fmt.Errorf("skipping target %q for job_name %q in %s because of error: %w", target, swc.jobName, sectionName, err)
|
||||||
|
}
|
||||||
|
resultCh <- result{
|
||||||
|
sw: sw,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
for _, metaLabels := range targetLabels {
|
||||||
|
workCh <- metaLabels
|
||||||
|
}
|
||||||
|
close(workCh)
|
||||||
|
for range targetLabels {
|
||||||
|
r := <-resultCh
|
||||||
|
if r.err != nil {
|
||||||
|
logger.Errorf("%s", r.err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if r.sw != nil {
|
||||||
|
dst = append(dst, r.sw)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
metrics.GetOrCreateHistogram(fmt.Sprintf("vm_promscrape_target_relabel_duration_seconds{type=%q}", sectionName)).UpdateDuration(startTime)
|
metrics.GetOrCreateHistogram(fmt.Sprintf("vm_promscrape_target_relabel_duration_seconds{type=%q}", sectionName)).UpdateDuration(startTime)
|
||||||
return dst
|
return dst
|
||||||
@ -673,18 +702,20 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
|
|||||||
logger.Errorf("`static_configs` target for `job_name` %q cannot be empty; skipping it", swc.jobName)
|
logger.Errorf("`static_configs` target for `job_name` %q cannot be empty; skipping it", swc.jobName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var err error
|
sw, err := getScrapeWork(swc, target, stc.Labels, metaLabels)
|
||||||
dst, err = appendScrapeWork(dst, swc, target, stc.Labels, metaLabels)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Do not return this error, since other targets may be valid
|
// Do not return this error, since other targets may be valid
|
||||||
logger.Errorf("error when parsing `static_configs` target %q for `job_name` %q: %s; skipping it", target, swc.jobName, err)
|
logger.Errorf("error when parsing `static_configs` target %q for `job_name` %q: %s; skipping it", target, swc.jobName, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if sw != nil {
|
||||||
|
dst = append(dst, sw)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) ([]*ScrapeWork, error) {
|
func getScrapeWork(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
|
||||||
labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params)
|
labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params)
|
||||||
var originalLabels []prompbmarshal.Label
|
var originalLabels []prompbmarshal.Label
|
||||||
if !*dropOriginalLabels {
|
if !*dropOriginalLabels {
|
||||||
@ -703,7 +734,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
|
|||||||
if len(labels) == 0 {
|
if len(labels) == 0 {
|
||||||
// Drop target without labels.
|
// Drop target without labels.
|
||||||
droppedTargetsMap.Register(originalLabels)
|
droppedTargetsMap.Register(originalLabels)
|
||||||
return dst, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
// See https://www.robustperception.io/life-of-a-label
|
// See https://www.robustperception.io/life-of-a-label
|
||||||
schemeRelabeled := promrelabel.GetLabelValueByName(labels, "__scheme__")
|
schemeRelabeled := promrelabel.GetLabelValueByName(labels, "__scheme__")
|
||||||
@ -714,12 +745,12 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
|
|||||||
if len(addressRelabeled) == 0 {
|
if len(addressRelabeled) == 0 {
|
||||||
// Drop target without scrape address.
|
// Drop target without scrape address.
|
||||||
droppedTargetsMap.Register(originalLabels)
|
droppedTargetsMap.Register(originalLabels)
|
||||||
return dst, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
if strings.Contains(addressRelabeled, "/") {
|
if strings.Contains(addressRelabeled, "/") {
|
||||||
// Drop target with '/'
|
// Drop target with '/'
|
||||||
droppedTargetsMap.Register(originalLabels)
|
droppedTargetsMap.Register(originalLabels)
|
||||||
return dst, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
addressRelabeled = addMissingPort(schemeRelabeled, addressRelabeled)
|
addressRelabeled = addMissingPort(schemeRelabeled, addressRelabeled)
|
||||||
metricsPathRelabeled := promrelabel.GetLabelValueByName(labels, "__metrics_path__")
|
metricsPathRelabeled := promrelabel.GetLabelValueByName(labels, "__metrics_path__")
|
||||||
@ -737,7 +768,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
|
|||||||
paramsStr := url.Values(paramsRelabeled).Encode()
|
paramsStr := url.Values(paramsRelabeled).Encode()
|
||||||
scrapeURL := fmt.Sprintf("%s://%s%s%s%s", schemeRelabeled, addressRelabeled, metricsPathRelabeled, optionalQuestion, paramsStr)
|
scrapeURL := fmt.Sprintf("%s://%s%s%s%s", schemeRelabeled, addressRelabeled, metricsPathRelabeled, optionalQuestion, paramsStr)
|
||||||
if _, err := url.Parse(scrapeURL); err != nil {
|
if _, err := url.Parse(scrapeURL); err != nil {
|
||||||
return dst, fmt.Errorf("invalid url %q for scheme=%q (%q), target=%q (%q), metrics_path=%q (%q) for `job_name` %q: %w",
|
return nil, fmt.Errorf("invalid url %q for scheme=%q (%q), target=%q (%q), metrics_path=%q (%q) for `job_name` %q: %w",
|
||||||
scrapeURL, swc.scheme, schemeRelabeled, target, addressRelabeled, swc.metricsPath, metricsPathRelabeled, swc.jobName, err)
|
scrapeURL, swc.scheme, schemeRelabeled, target, addressRelabeled, swc.metricsPath, metricsPathRelabeled, swc.jobName, err)
|
||||||
}
|
}
|
||||||
// Set missing "instance" label according to https://www.robustperception.io/life-of-a-label
|
// Set missing "instance" label according to https://www.robustperception.io/life-of-a-label
|
||||||
@ -750,7 +781,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
|
|||||||
}
|
}
|
||||||
// Reduce memory usage by interning all the strings in labels.
|
// Reduce memory usage by interning all the strings in labels.
|
||||||
internLabelStrings(labels)
|
internLabelStrings(labels)
|
||||||
dst = append(dst, &ScrapeWork{
|
sw := &ScrapeWork{
|
||||||
ScrapeURL: scrapeURL,
|
ScrapeURL: scrapeURL,
|
||||||
ScrapeInterval: swc.scrapeInterval,
|
ScrapeInterval: swc.scrapeInterval,
|
||||||
ScrapeTimeout: swc.scrapeTimeout,
|
ScrapeTimeout: swc.scrapeTimeout,
|
||||||
@ -768,8 +799,8 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
|
|||||||
ScrapeAlignInterval: swc.scrapeAlignInterval,
|
ScrapeAlignInterval: swc.scrapeAlignInterval,
|
||||||
|
|
||||||
jobNameOriginal: swc.jobName,
|
jobNameOriginal: swc.jobName,
|
||||||
})
|
}
|
||||||
return dst, nil
|
return sw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func internLabelStrings(labels []prompbmarshal.Label) {
|
func internLabelStrings(labels []prompbmarshal.Label) {
|
||||||
|
Loading…
Reference in New Issue
Block a user