mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
lib/promscrape: limit the concurrency during parsing and relabeling the scraped samples
This should reduce memory usage when scraping big number of targets, since this limits the summary memory usage during concurrent parsing and relabeling by the number of available CPU cores.
This commit is contained in:
parent
3461ae8f13
commit
7fb02f536a
@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||||||
**Update note 2:** The `vm_concurrent_addrows_current` and `vm_concurrent_addrows_capacity` metrics [exported](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) by `vmstorage` are replaced with `vm_concurrent_insert_current` and `vm_concurrent_insert_capacity` metrics in order to be consistent with the corresponding metrics exported by `vminsert`. Please update queries in dahsboards and alerting rules with new metric names if old metric names are used there.
|
**Update note 2:** The `vm_concurrent_addrows_current` and `vm_concurrent_addrows_capacity` metrics [exported](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) by `vmstorage` are replaced with `vm_concurrent_insert_current` and `vm_concurrent_insert_capacity` metrics in order to be consistent with the corresponding metrics exported by `vminsert`. Please update queries in dahsboards and alerting rules with new metric names if old metric names are used there.
|
||||||
|
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for aggregation of incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) by time and by labels. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for aggregation of incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) by time and by labels. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460).
|
||||||
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when scraping big number of targets without the need to enable [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode).
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Prometheus-compatible target discovery for [HashiCorp Nomad](https://www.nomadproject.io/) services via [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367). Thanks to @mr-karan for [the implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3549).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Prometheus-compatible target discovery for [HashiCorp Nomad](https://www.nomadproject.io/) services via [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367). Thanks to @mr-karan for [the implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3549).
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): automatically pre-fetch `metric_relabel_configs` and the target labels when clicking on the `debug metrics relabeling` link at the `http://vmagent:8429/targets` page at the particular target. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabel-debug).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): automatically pre-fetch `metric_relabel_configs` and the target labels when clicking on the `debug metrics relabeling` link at the `http://vmagent:8429/targets` page at the particular target. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabel-debug).
|
||||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to explore metrics exported by a particular `job` / `instance`. See [these docs](https://docs.victoriametrics.com/#metrics-explorer) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3386).
|
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to explore metrics exported by a particular `job` / `instance`. See [these docs](https://docs.victoriametrics.com/#metrics-explorer) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3386).
|
||||||
|
@ -3,7 +3,6 @@ package promscrape
|
|||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"math/bits"
|
"math/bits"
|
||||||
@ -11,8 +10,10 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
@ -416,6 +417,24 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
|||||||
body := leveledbytebufferpool.Get(sw.prevBodyLen)
|
body := leveledbytebufferpool.Get(sw.prevBodyLen)
|
||||||
var err error
|
var err error
|
||||||
body.B, err = sw.ReadData(body.B[:0])
|
body.B, err = sw.ReadData(body.B[:0])
|
||||||
|
releaseBody, err := sw.processScrapedData(scrapeTimestamp, realTimestamp, body, err)
|
||||||
|
if releaseBody {
|
||||||
|
leveledbytebufferpool.Put(body)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var concurrencyLimitCh = make(chan struct{}, 2*cgroup.AvailableCPUs())
|
||||||
|
|
||||||
|
func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, body *bytesutil.ByteBuffer, err error) (bool, error) {
|
||||||
|
// This function is CPU-bound, while it may allocate big amounts of memory.
|
||||||
|
// That's why it is a good idea to limit the number of concurrent calls to this function
|
||||||
|
// in order to limit memory usage under high load without sacrificing the performance.
|
||||||
|
concurrencyLimitCh <- struct{}{}
|
||||||
|
defer func() {
|
||||||
|
<-concurrencyLimitCh
|
||||||
|
}()
|
||||||
|
|
||||||
endTimestamp := time.Now().UnixNano() / 1e6
|
endTimestamp := time.Now().UnixNano() / 1e6
|
||||||
duration := float64(endTimestamp-realTimestamp) / 1e3
|
duration := float64(endTimestamp-realTimestamp) / 1e3
|
||||||
scrapeDuration.Update(duration)
|
scrapeDuration.Update(duration)
|
||||||
@ -489,13 +508,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
|||||||
sw.storeLastScrape(body.B)
|
sw.storeLastScrape(body.B)
|
||||||
}
|
}
|
||||||
sw.finalizeLastScrape()
|
sw.finalizeLastScrape()
|
||||||
if !mustSwitchToStreamParse {
|
|
||||||
// Return body to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse
|
|
||||||
// This should reduce memory usage when scraping targets which return big responses.
|
|
||||||
leveledbytebufferpool.Put(body)
|
|
||||||
}
|
|
||||||
tsmGlobal.Update(sw, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
|
tsmGlobal.Update(sw, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
|
||||||
return err
|
return !mustSwitchToStreamParse, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||||
|
Loading…
Reference in New Issue
Block a user