mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/promscrape: store the full response in stream parsing mode in scrapeWork.lastScrape byte slice
This allows sending staleness marks and properly calculate scrape_series_added metric in stream parsing mode at the cost of the increased memory usage, since now the potentially big response is kept in the lastScrape byte slice per each scrapeWork. In practice the memory usage increase shouldn't be big, since the response size is usually much smaller than the parsed metrics from this response after the relabeling, which usually adds a big pile of target-specific labels per each metric.
This commit is contained in:
parent
f6d33596ff
commit
9866dd95c1
@ -302,7 +302,7 @@ You can read more about relabeling in the following articles:
|
||||
* If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target.
|
||||
* Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`.
|
||||
|
||||
Prometheus staleness markers aren't sent to `-remoteWrite.url` in [stream parsing mode](#stream-parsing-mode) or if `-promscrape.noStaleMarkers` command-line is set.
|
||||
Prometheus staleness markers aren't sent to `-remoteWrite.url` if `-promscrape.noStaleMarkers` command-line flag is set.
|
||||
|
||||
|
||||
## Stream parsing mode
|
||||
|
@ -9,7 +9,9 @@ sort: 15
|
||||
* FEATURE: vmagent: expose `-promscrape.config` contents at `/config` page as Prometheus does. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1695).
|
||||
* FEATURE: vmagent: add `show original labels` button per each scrape target displayed at `http://vmagent;8429/targets` page. This should improve debuggability for service discovery and relabeling issues similar to [this one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1664). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1698).
|
||||
* FEATURE: vmagent: shard targets among cluster nodes after the relabeling is applied. This should guarantee that targets with the same set of labels go to the same `vmagent` node in the cluster. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687).
|
||||
* FEATURE: vmagent: add `-promscrape.minResponseSizeForStreamParse` command-line flag, which can be used for instructing `vmagent` to automatically switch to [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) for scrape targets with big responses. This should reduce memory usage when `vmagent` scrapes targets with non-uniform response sizes (this is the case in Kubernetes monitoring).
|
||||
* FEATURE: vmagent: atomatically switch to [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) if the response from the given target exceeds the command-line flag value `-promscrape.minResponseSizeForStreamParse`. This should reduce memory usage when `vmagent` scrapes targets with non-uniform response sizes (this is the case in Kubernetes monitoring).
|
||||
* FEATURE: vmagent: send Prometheus-like staleness marks in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously staleness marks wern't sent in stream parsing mode. See [these docs](https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers) for details.
|
||||
* FEATURE: vmagent: properly calculate `scrape_series_added` metric for targets in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously it was set to 0 in stream parsing mode. See [more details about this metric](https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series).
|
||||
* FEATURE: vmagent: return error if `sample_limit` or `series_limit` options are set when [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) is enabled, since these limits cannot be applied in stream parsing mode.
|
||||
* FEATURE: add trigonometric functions, which are going to be added in [Prometheus 2.31](https://github.com/prometheus/prometheus/pull/9239): [acosh](https://docs.victoriametrics.com/MetricsQL.html#acosh), [asinh](https://docs.victoriametrics.com/MetricsQL.html#asinh), [atan](https://docs.victoriametrics.com/MetricsQL.html#atan), [atanh](https://docs.victoriametrics.com/MetricsQL.html#atanh), [cosh](https://docs.victoriametrics.com/MetricsQL.html#cosh), [deg](https://docs.victoriametrics.com/MetricsQL.html#deg), [rad](https://docs.victoriametrics.com/MetricsQL.html#rad), [sinh](https://docs.victoriametrics.com/MetricsQL.html#sinh), [tan](https://docs.victoriametrics.com/MetricsQL.html#tan), [tanh](https://docs.victoriametrics.com/MetricsQL.html#tanh). Also add `atan2` binary operator. See [this pull request](https://github.com/prometheus/prometheus/pull/9248).
|
||||
* FEATURE: consistently return the same set of time series from [limitk](https://docs.victoriametrics.com/MetricsQL.html#limitk) function. This improves the usability of periodically refreshed graphs.
|
||||
|
@ -306,7 +306,7 @@ You can read more about relabeling in the following articles:
|
||||
* If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target.
|
||||
* Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`.
|
||||
|
||||
Prometheus staleness markers aren't sent to `-remoteWrite.url` in [stream parsing mode](#stream-parsing-mode) or if `-promscrape.noStaleMarkers` command-line is set.
|
||||
Prometheus staleness markers aren't sent to `-remoteWrite.url` if `-promscrape.noStaleMarkers` command-line flag is set.
|
||||
|
||||
|
||||
## Stream parsing mode
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"math/bits"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -117,8 +118,9 @@ type ScrapeWork struct {
|
||||
jobNameOriginal string
|
||||
}
|
||||
|
||||
func (sw *ScrapeWork) canUseStreamParse() bool {
|
||||
// Stream parsing mode cannot be used if SampleLimit or SeriesLimit is set.
|
||||
func (sw *ScrapeWork) canSwitchToStreamParseMode() bool {
|
||||
// Deny switching to stream parse mode if `sample_limit` or `series_limit` options are set,
|
||||
// since these limits cannot be applied in stream parsing mode.
|
||||
return sw.SampleLimit <= 0 && sw.SeriesLimit <= 0
|
||||
}
|
||||
|
||||
@ -297,8 +299,15 @@ var (
|
||||
pushDataDuration = metrics.NewHistogram("vm_promscrape_push_data_duration_seconds")
|
||||
)
|
||||
|
||||
func (sw *scrapeWork) mustSwitchToStreamParseMode(responseSize int) bool {
|
||||
if minResponseSizeForStreamParse.N <= 0 {
|
||||
return false
|
||||
}
|
||||
return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.N
|
||||
}
|
||||
|
||||
func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error {
|
||||
if (sw.Config.canUseStreamParse() && sw.prevBodyLen >= minResponseSizeForStreamParse.N) || *streamParse || sw.Config.StreamParse {
|
||||
if sw.mustSwitchToStreamParseMode(sw.prevBodyLen) || *streamParse || sw.Config.StreamParse {
|
||||
// Read data from scrape targets in streaming manner.
|
||||
// This case is optimized for targets exposing more than ten thousand of metrics per target.
|
||||
return sw.scrapeStream(scrapeTimestamp, realTimestamp)
|
||||
@ -364,7 +373,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||
sw.prevLabelsLen = len(wc.labels)
|
||||
sw.prevBodyLen = len(body.B)
|
||||
wc.reset()
|
||||
if len(body.B) < minResponseSizeForStreamParse.N {
|
||||
canReturnToPool := !sw.mustSwitchToStreamParseMode(len(body.B))
|
||||
if canReturnToPool {
|
||||
// Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse
|
||||
// This should reduce memory usage when scraping targets with big responses.
|
||||
writeRequestCtxPool.Put(wc)
|
||||
@ -372,11 +382,11 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||
// body must be released only after wc is released, since wc refers to body.
|
||||
if !areIdenticalSeries {
|
||||
sw.sendStaleSeries(bodyString, scrapeTimestamp, false)
|
||||
}
|
||||
if len(body.B) < minResponseSizeForStreamParse.N {
|
||||
// Save body to sw.lastScrape and return it to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse
|
||||
// This should reduce memory usage when scraping targets which return big responses.
|
||||
sw.lastScrape = append(sw.lastScrape[:0], bodyString...)
|
||||
}
|
||||
if canReturnToPool {
|
||||
// Return wc to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse
|
||||
// This should reduce memory usage when scraping targets which return big responses.
|
||||
leveledbytebufferpool.Put(body)
|
||||
}
|
||||
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
|
||||
@ -389,18 +399,33 @@ func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) {
|
||||
pushDataDuration.UpdateDuration(startTime)
|
||||
}
|
||||
|
||||
type streamBodyReader struct {
|
||||
sr *streamReader
|
||||
body []byte
|
||||
}
|
||||
|
||||
func (sbr *streamBodyReader) Read(b []byte) (int, error) {
|
||||
n, err := sbr.sr.Read(b)
|
||||
sbr.body = append(sbr.body, b[:n]...)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||
samplesScraped := 0
|
||||
samplesPostRelabeling := 0
|
||||
responseSize := int64(0)
|
||||
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
|
||||
// Do not pool sbr in order to reduce memory usage when scraping big responses.
|
||||
sbr := &streamBodyReader{
|
||||
body: make([]byte, 0, len(sw.lastScrape)),
|
||||
}
|
||||
|
||||
sr, err := sw.GetStreamReader()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot read data: %s", err)
|
||||
} else {
|
||||
var mu sync.Mutex
|
||||
err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error {
|
||||
sbr.sr = sr
|
||||
err = parser.ParseStream(sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
samplesScraped += len(rows)
|
||||
@ -421,15 +446,17 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||
wc.resetNoRows()
|
||||
return nil
|
||||
}, sw.logError)
|
||||
responseSize = sr.bytesRead
|
||||
sr.MustClose()
|
||||
}
|
||||
bodyString := bytesutil.ToUnsafeString(sbr.body)
|
||||
lastScrape := bytesutil.ToUnsafeString(sw.lastScrape)
|
||||
areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString)
|
||||
|
||||
scrapedSamples.Update(float64(samplesScraped))
|
||||
endTimestamp := time.Now().UnixNano() / 1e6
|
||||
duration := float64(endTimestamp-realTimestamp) / 1e3
|
||||
scrapeDuration.Update(duration)
|
||||
scrapeResponseSize.Update(float64(responseSize))
|
||||
scrapeResponseSize.Update(float64(len(bodyString)))
|
||||
up := 1
|
||||
if err != nil {
|
||||
if samplesScraped == 0 {
|
||||
@ -437,19 +464,29 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||
}
|
||||
scrapesFailed.Inc()
|
||||
}
|
||||
seriesAdded := 0
|
||||
if !areIdenticalSeries {
|
||||
// The returned value for seriesAdded may be bigger than the real number of added series
|
||||
// if some series were removed during relabeling.
|
||||
// This is a trade-off between performance and accuracy.
|
||||
seriesAdded = sw.getSeriesAdded(bodyString)
|
||||
}
|
||||
sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp)
|
||||
sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp)
|
||||
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
|
||||
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
|
||||
// scrape_series_added isn't calculated in streaming mode,
|
||||
// since it may need unlimited amounts of memory when scraping targets with millions of exposed metrics.
|
||||
sw.addAutoTimeseries(wc, "scrape_series_added", 0, scrapeTimestamp)
|
||||
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
|
||||
sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp)
|
||||
sw.pushData(&wc.writeRequest)
|
||||
sw.prevLabelsLen = len(wc.labels)
|
||||
sw.prevBodyLen = int(responseSize)
|
||||
sw.prevBodyLen = len(bodyString)
|
||||
wc.reset()
|
||||
writeRequestCtxPool.Put(wc)
|
||||
if !areIdenticalSeries {
|
||||
sw.sendStaleSeries(bodyString, scrapeTimestamp, false)
|
||||
sw.lastScrape = append(sw.lastScrape[:0], bodyString...)
|
||||
}
|
||||
runtime.KeepAlive(sbr) // this is needed in order to prevent from GC'ing data pointed by bodyString
|
||||
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
|
||||
// Do not track active series in streaming mode, since this may need too big amounts of memory
|
||||
// when the target exports too big number of metrics.
|
||||
|
Loading…
Reference in New Issue
Block a user