diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3b3fb45ff6..fa1b398ca3 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f **Update note 2:** The `vm_concurrent_addrows_current` and `vm_concurrent_addrows_capacity` metrics [exported](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) by `vmstorage` are replaced with `vm_concurrent_insert_current` and `vm_concurrent_insert_capacity` metrics in order to be consistent with the corresponding metrics exported by `vminsert`. Please update queries in dahsboards and alerting rules with new metric names if old metric names are used there. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for aggregation of incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) by time and by labels. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when scraping big number of targets without the need to enable [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Prometheus-compatible target discovery for [HashiCorp Nomad](https://www.nomadproject.io/) services via [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367). Thanks to @mr-karan for [the implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3549). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): automatically pre-fetch `metric_relabel_configs` and the target labels when clicking on the `debug metrics relabeling` link at the `http://vmagent:8429/targets` page at the particular target. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabel-debug). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to explore metrics exported by a particular `job` / `instance`. See [these docs](https://docs.victoriametrics.com/#metrics-explorer) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3386). diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index e18855e8bd..84d378f379 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -3,7 +3,6 @@ package promscrape import ( "flag" "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "io" "math" "math/bits" @@ -11,8 +10,10 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -416,6 +417,24 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error body := leveledbytebufferpool.Get(sw.prevBodyLen) var err error body.B, err = sw.ReadData(body.B[:0]) + releaseBody, err := sw.processScrapedData(scrapeTimestamp, realTimestamp, body, err) + if releaseBody { + leveledbytebufferpool.Put(body) + } + return err +} + +var concurrencyLimitCh = make(chan struct{}, 2*cgroup.AvailableCPUs()) + +func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, body *bytesutil.ByteBuffer, err error) (bool, error) { + // This function is CPU-bound, while it may allocate big amounts of memory. + // That's why it is a good idea to limit the number of concurrent calls to this function + // in order to limit memory usage under high load without sacrificing the performance. + concurrencyLimitCh <- struct{}{} + defer func() { + <-concurrencyLimitCh + }() + endTimestamp := time.Now().UnixNano() / 1e6 duration := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(duration) @@ -489,13 +508,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error sw.storeLastScrape(body.B) } sw.finalizeLastScrape() - if !mustSwitchToStreamParse { - // Return body to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse - // This should reduce memory usage when scraping targets which return big responses. - leveledbytebufferpool.Put(body) - } tsmGlobal.Update(sw, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) - return err + return !mustSwitchToStreamParse, err } func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) {