From 7fb02f536a2327cfc146e56600f09ad3e436038c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 6 Jan 2023 22:59:15 -0800 Subject: [PATCH] lib/promscrape: limit the concurrency during parsing and relabeling the scraped samples This should reduce memory usage when scraping big number of targets, since this limits the summary memory usage during concurrent parsing and relabeling by the number of available CPU cores. --- docs/CHANGELOG.md | 1 + lib/promscrape/scrapework.go | 28 +++++++++++++++++++++------- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3b3fb45ff6..fa1b398ca3 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f **Update note 2:** The `vm_concurrent_addrows_current` and `vm_concurrent_addrows_capacity` metrics [exported](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) by `vmstorage` are replaced with `vm_concurrent_insert_current` and `vm_concurrent_insert_capacity` metrics in order to be consistent with the corresponding metrics exported by `vminsert`. Please update queries in dahsboards and alerting rules with new metric names if old metric names are used there. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for aggregation of incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) by time and by labels. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when scraping big number of targets without the need to enable [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Prometheus-compatible target discovery for [HashiCorp Nomad](https://www.nomadproject.io/) services via [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367). Thanks to @mr-karan for [the implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3549). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): automatically pre-fetch `metric_relabel_configs` and the target labels when clicking on the `debug metrics relabeling` link at the `http://vmagent:8429/targets` page at the particular target. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabel-debug). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to explore metrics exported by a particular `job` / `instance`. See [these docs](https://docs.victoriametrics.com/#metrics-explorer) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3386). diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index e18855e8bd..84d378f379 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -3,7 +3,6 @@ package promscrape import ( "flag" "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "io" "math" "math/bits" @@ -11,8 +10,10 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -416,6 +417,24 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error body := leveledbytebufferpool.Get(sw.prevBodyLen) var err error body.B, err = sw.ReadData(body.B[:0]) + releaseBody, err := sw.processScrapedData(scrapeTimestamp, realTimestamp, body, err) + if releaseBody { + leveledbytebufferpool.Put(body) + } + return err +} + +var concurrencyLimitCh = make(chan struct{}, 2*cgroup.AvailableCPUs()) + +func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, body *bytesutil.ByteBuffer, err error) (bool, error) { + // This function is CPU-bound, while it may allocate big amounts of memory. + // That's why it is a good idea to limit the number of concurrent calls to this function + // in order to limit memory usage under high load without sacrificing the performance. + concurrencyLimitCh <- struct{}{} + defer func() { + <-concurrencyLimitCh + }() + endTimestamp := time.Now().UnixNano() / 1e6 duration := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(duration) @@ -489,13 +508,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error sw.storeLastScrape(body.B) } sw.finalizeLastScrape() - if !mustSwitchToStreamParse { - // Return body to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse - // This should reduce memory usage when scraping targets which return big responses. - leveledbytebufferpool.Put(body) - } tsmGlobal.Update(sw, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) - return err + return !mustSwitchToStreamParse, err } func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) {