From d5622b32e2f685e171f41d4b7c22d090db92e470 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 21 Aug 2021 21:16:50 +0300 Subject: [PATCH] lib/promscrape: reduce memory and CPU usage when Prometheus staleness tracking is enabled for metrics from deleted / disappeared scrape targets Store the scraped response body instead of storing the parsed and relabeld metrics. This should reduce memory usage, since the response body takes less memory than the parsed and relabeled metrics. This is especially true for Kubernetes service discovery, which adds many long labels for all the scraped metrics. This should also reduce CPU usage, since the marshaling of the parsed and relabeld metrics has been substituted by response body copying. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 --- docs/CHANGELOG.md | 2 +- lib/promscrape/scrapework.go | 118 ++++++++++++----------------------- 2 files changed, 41 insertions(+), 79 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 18d9f0fd5..79bbbe2a9 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,7 +6,7 @@ sort: 15 ## tip - +* FEATURE: vmagent: reduce memory usage and CPU usage when Prometheus staleness tracking is enabled for metrics exported from the deleted or disappeared scrape targets. * FEATURE: take into account failed queries in `vm_request_duration_seconds` summary at `/metrics`. Previously only successful queries were taken into account. This could result in skewed summary. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1537). * FEATURE: vmalert: add `-disableAlertgroupLabel` command-line flag for disabling the label with alert group name. This may be needed for proper deduplication in Alertmanager. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1532). diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index b000dee90..a62459c5f 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -11,7 +11,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" @@ -187,8 +186,9 @@ type scrapeWork struct { // It is used as a hint in order to reduce memory usage when parsing scrape responses. prevLabelsLen int - activeSeriesBuf []byte - activeSeries [][]byte + // lastScrape holds the last response from scrape target. + // It is used for generating Prometheus stale markers. + lastScrape []byte } func (sw *scrapeWork) run(stopCh <-chan struct{}) { @@ -240,7 +240,7 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) { select { case <-stopCh: t := time.Now().UnixNano() / 1e6 - sw.sendStaleMarkers(t, false) + sw.sendStaleMarkersForLastScrape(t, true) return case tt := <-ticker.C: t := tt.UnixNano() / 1e6 @@ -284,7 +284,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error } // Common case: read all the data from scrape target to memory (body) and then process it. - // This case should work more optimally for than stream parse code above for common case when scrape target exposes + // This case should work more optimally than stream parse code for common case when scrape target exposes // up to a few thousand metrics. body := leveledbytebufferpool.Get(sw.prevBodyLen) var err error @@ -295,11 +295,11 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error scrapeResponseSize.Update(float64(len(body.B))) up := 1 wc := writeRequestCtxPool.Get(sw.prevLabelsLen) + bodyString := bytesutil.ToUnsafeString(body.B) if err != nil { up = 0 scrapesFailed.Inc() } else { - bodyString := bytesutil.ToUnsafeString(body.B) wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError) } srcRows := wc.rows.Rows @@ -323,10 +323,6 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) - if up == 0 { - sw.sendStaleMarkers(scrapeTimestamp, true) - } - sw.updateActiveSeries(wc) sw.pushData(&wc.writeRequest) sw.prevLabelsLen = len(wc.labels) wc.reset() @@ -335,20 +331,12 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error sw.prevBodyLen = len(body.B) leveledbytebufferpool.Put(body) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) - return err -} - -func isAutogenSeries(name string) bool { - switch name { - case "up", - "scrape_duration_seconds", - "scrape_samples_scraped", - "scrape_samples_post_metric_relabeling", - "scrape_series_added": - return true - default: - return false + if up == 0 { + bodyString = "" + sw.sendStaleMarkersForLastScrape(scrapeTimestamp, false) } + sw.updateLastScrape(bodyString) + return err } func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) { @@ -412,14 +400,13 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) - // Do not call sw.updateActiveSeries(wc), since wc doesn't contain the full list of scraped metrics. - // Do not track active series in streaming mode, since this may need too big amounts of memory - // when the target exports too big number of metrics. sw.pushData(&wc.writeRequest) sw.prevLabelsLen = len(wc.labels) wc.reset() writeRequestCtxPool.Put(wc) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) + // Do not track active series in streaming mode, since this may need too big amounts of memory + // when the target exports too big number of metrics. return err } @@ -503,69 +490,44 @@ func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) { } } -func (sw *scrapeWork) updateActiveSeries(wc *writeRequestCtx) { +func (sw *scrapeWork) updateLastScrape(response string) { if *noStaleMarkers { return } - b := sw.activeSeriesBuf[:0] - as := sw.activeSeries[:0] - for _, ts := range wc.writeRequest.Timeseries { - bLen := len(b) - for _, label := range ts.Labels { - b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Name)) - b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Value)) - } - as = append(as, b[bLen:]) - } - sw.activeSeriesBuf = b - sw.activeSeries = as + sw.lastScrape = append(sw.lastScrape[:0], response...) } -func (sw *scrapeWork) sendStaleMarkers(timestamp int64, skipAutogenSeries bool) { - series := make([]prompbmarshal.TimeSeries, 0, len(sw.activeSeries)) - staleMarkSamples := []prompbmarshal.Sample{ - { - Value: decimal.StaleNaN, - Timestamp: timestamp, - }, +func (sw *scrapeWork) sendStaleMarkersForLastScrape(timestamp int64, addAutoSeries bool) { + bodyString := bytesutil.ToUnsafeString(sw.lastScrape) + if len(bodyString) == 0 && !addAutoSeries { + return } - for _, b := range sw.activeSeries { - var labels []prompbmarshal.Label - skipSeries := false - for len(b) > 0 { - tail, name, err := encoding.UnmarshalBytes(b) - if err != nil { - logger.Panicf("BUG: cannot unmarshal label name from activeSeries: %s", err) - } - b = tail - tail, value, err := encoding.UnmarshalBytes(b) - if err != nil { - logger.Panicf("BUG: cannot unmarshal label value from activeSeries: %s", err) - } - b = tail - if skipAutogenSeries && string(name) == "__name__" && isAutogenSeries(bytesutil.ToUnsafeString(value)) { - skipSeries = true - } - labels = append(labels, prompbmarshal.Label{ - Name: bytesutil.ToUnsafeString(name), - Value: bytesutil.ToUnsafeString(value), - }) - } - if skipSeries { - continue - } - series = append(series, prompbmarshal.TimeSeries{ - Labels: labels, - Samples: staleMarkSamples, - }) + wc := writeRequestCtxPool.Get(sw.prevLabelsLen) + wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError) + srcRows := wc.rows.Rows + for i := range srcRows { + sw.addRowToTimeseries(wc, &srcRows[i], timestamp, true) } + if addAutoSeries { + sw.addAutoTimeseries(wc, "up", 0, timestamp) + sw.addAutoTimeseries(wc, "scrape_duration_seconds", 0, timestamp) + sw.addAutoTimeseries(wc, "scrape_samples_scraped", 0, timestamp) + sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", 0, timestamp) + sw.addAutoTimeseries(wc, "scrape_series_added", 0, timestamp) + } + series := wc.writeRequest.Timeseries if len(series) == 0 { return } - wr := &prompbmarshal.WriteRequest{ - Timeseries: series, + // Substitute all the values with Prometheus stale markers. + for _, tss := range series { + samples := tss.Samples + for i := range samples { + samples[i].Value = decimal.StaleNaN + } } - sw.pushData(wr) + sw.pushData(&wc.writeRequest) + writeRequestCtxPool.Put(wc) } func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int {