From 74c00a87623718f9c1a8ecaeab3a6a0febc2db2f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 14 Sep 2022 13:14:04 +0300 Subject: [PATCH] lib/promscrape: read response body into memory in stream parsing mode before parsing it This reduces scrape duration for targets returning big responses. The response body was already read into memory in stream parsing mode before this change, so this commit shouldn't increase memory usage. --- docs/CHANGELOG.md | 1 + lib/promscrape/scrapework.go | 82 +++++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7cf32d73d..6ea658710 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -23,6 +23,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `vm-native-step-interval` command line flag for `vm-native` mode. New option allows splitting the import process into chunks by time interval. This helps migrating data sets with high churn rate and provides better control over the process. See [feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2733). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add `top queries` tab, which shows various stats for recently executed queries. See [these docs](https://docs.victoriametrics.com/#top-queries) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2707). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `debug` mode to the alerting rule settings for printing additional information into logs during evaluation. See `debug` param in [alerting rule config](https://docs.victoriametrics.com/vmalert.html#alerting-rules). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): minimize the time needed for reading large responses from scrape targets in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). This should reduce scrape durations for such targets as [kube-state-metrics](https://github.com/kubernetes/kube-state-metrics) running in a big Kubernetes cluster. * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate `rate_over_sum(m[d])` as `sum_over_time(m[d])/d`. Previously the `sum_over_time(m[d])` could be improperly divided by smaller than `d` time range. See [rate_over_sum() docs](https://docs.victoriametrics.com/MetricsQL.html#rate_over_sum) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3045). * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): properly calculate query results at `vmselect`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3067). The issue has been introduced in [v1.81.0](https://docs.victoriametrics.com/CHANGELOG.html#v1810). diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index ae3dc4d4c..d08d5d04d 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -516,19 +516,35 @@ func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) { } type streamBodyReader struct { - sr *streamReader - body []byte - bodyLen int - captureBody bool + body []byte + bodyLen int + readOffset int +} + +func (sbr *streamBodyReader) Init(sr *streamReader) error { + sbr.body = nil + sbr.bodyLen = 0 + sbr.readOffset = 0 + // Read the whole response body in memory before parsing it in stream mode. + // This minimizes the time needed for reading response body from scrape target. + startTime := fasttime.UnixTimestamp() + body, err := io.ReadAll(sr) + if err != nil { + d := fasttime.UnixTimestamp() - startTime + return fmt.Errorf("cannot read stream body in %d seconds: %w", d, err) + } + sbr.body = body + sbr.bodyLen = len(body) + return nil } func (sbr *streamBodyReader) Read(b []byte) (int, error) { - n, err := sbr.sr.Read(b) - sbr.bodyLen += n - if sbr.captureBody { - sbr.body = append(sbr.body, b[:n]...) + if sbr.readOffset >= len(sbr.body) { + return 0, io.EOF } - return n, err + n := copy(b, sbr.body[sbr.readOffset:]) + sbr.readOffset += n + return n, nil } func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { @@ -536,37 +552,37 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { samplesPostRelabeling := 0 wc := writeRequestCtxPool.Get(sw.prevLabelsLen) // Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses. - sbr := &streamBodyReader{ - captureBody: !*noStaleMarkers, - } + var sbr streamBodyReader sr, err := sw.GetStreamReader() if err != nil { err = fmt.Errorf("cannot read data: %s", err) } else { var mu sync.Mutex - sbr.sr = sr - err = parser.ParseStream(sbr, scrapeTimestamp, false, func(rows []parser.Row) error { - mu.Lock() - defer mu.Unlock() - samplesScraped += len(rows) - for i := range rows { - sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) - } - // Push the collected rows to sw before returning from the callback, since they cannot be held - // after returning from the callback - this will result in data race. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 - samplesPostRelabeling += len(wc.writeRequest.Timeseries) - if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { + err = sbr.Init(sr) + if err != nil { + err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { + mu.Lock() + defer mu.Unlock() + samplesScraped += len(rows) + for i := range rows { + sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) + } + // Push the collected rows to sw before returning from the callback, since they cannot be held + // after returning from the callback - this will result in data race. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 + samplesPostRelabeling += len(wc.writeRequest.Timeseries) + if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { + wc.resetNoRows() + scrapesSkippedBySampleLimit.Inc() + return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ + "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) + } + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) wc.resetNoRows() - scrapesSkippedBySampleLimit.Inc() - return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ - "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) - } - sw.pushData(sw.Config.AuthToken, &wc.writeRequest) - wc.resetNoRows() - return nil - }, sw.logError) + return nil + }, sw.logError) + } sr.MustClose() } lastScrape := sw.loadLastScrape()