From 8e2a8a6ae2fe3dd02a0ca5d7c431dc9d1addc367 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Tue, 24 Jan 2023 06:15:59 +0100 Subject: [PATCH] lib/promscrape: limit number of sent stale series at once (#3686) Stale series are sent when there is a difference between current and previous scrapes. Those series which disappeared in the current scrape are marked as stale and sent to the remote storage. Sending stale series requires memory allocation and in case when too many series disappear in the same it could result in noticeable memory spike. For example, re-deploy of a big fleet of service can result into excessive memory usage for vmagent, because all the series with old pod name will be marked as stale and sent to the remote write storage. This change limits the number of stale series which can be sent at once, so memory usage remains steady. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668 https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675 Signed-off-by: hagen1778 Signed-off-by: hagen1778 --- lib/promscrape/scrapework.go | 64 +++++++++++++++++++------------ lib/promscrape/scrapework_test.go | 38 ++++++++++++++++++ 2 files changed, 78 insertions(+), 24 deletions(-) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 18ddb39052..16f460c239 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -767,6 +767,11 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int { var sendStaleSeriesConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) +// maxStaleSeriesAtOnce defines the max number of stale series +// to process and send at once. It prevents from excessive memory usage +// when big number of metrics become stale at the same time. +const maxStaleSeriesAtOnce = 1e3 + func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp int64, addAutoSeries bool) { // This function is CPU-bound, while it may allocate big amounts of memory. // That's why it is a good idea to limit the number of concurrent calls to this function @@ -790,35 +795,46 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i }() if bodyString != "" { wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError) - srcRows := wc.rows.Rows - for i := range srcRows { + } + + srcRows := wc.rows.Rows + for from := 0; from < len(srcRows); from += maxStaleSeriesAtOnce { + to := from + maxStaleSeriesAtOnce + if to > len(srcRows) { + to = len(srcRows) + } + + for i := range srcRows[from:to] { sw.addRowToTimeseries(wc, &srcRows[i], timestamp, true) } - } - if addAutoSeries { - am := &autoMetrics{} - sw.addAutoMetrics(am, wc, timestamp) - } - // Apply series limit to stale markers in order to prevent sending stale markers for newly created series. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 - if sw.seriesLimitExceeded { - sw.applySeriesLimit(wc) - } - - series := wc.writeRequest.Timeseries - if len(series) == 0 { - return - } - // Substitute all the values with Prometheus stale markers. - for _, tss := range series { - samples := tss.Samples - for i := range samples { - samples[i].Value = decimal.StaleNaN + // add auto series at the last iteration + if addAutoSeries && to == len(srcRows) { + am := &autoMetrics{} + sw.addAutoMetrics(am, wc, timestamp) } - staleSamplesCreated.Add(len(samples)) + + // Apply series limit to stale markers in order to prevent sending stale markers for newly created series. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 + if sw.seriesLimitExceeded { + sw.applySeriesLimit(wc) + } + + series := wc.writeRequest.Timeseries + if len(series) == 0 { + continue + } + // Substitute all the values with Prometheus stale markers. + for _, tss := range series { + samples := tss.Samples + for i := range samples { + samples[i].Value = decimal.StaleNaN + } + staleSamplesCreated.Add(len(samples)) + } + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) + wc.reset() } - sw.pushData(sw.Config.AuthToken, &wc.writeRequest) } var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`) diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index b94ba1dd1f..38a3a3462a 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -686,6 +686,44 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) { `metric{a="e",foo="bar"} 0 123`) } +func TestSendStaleSeries(t *testing.T) { + var sw scrapeWork + sw.Config = &ScrapeWork{ + NoStaleMarkers: false, + } + + var timeseriesExpectedN int + sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) { + t.Helper() + if len(wr.Timeseries) != timeseriesExpectedN { + t.Fatalf("expected to get %d stale series; got %d", timeseriesExpectedN, len(wr.Timeseries)) + } + } + + generateScrape := func(n int) string { + w := strings.Builder{} + for i := 0; i < n; i++ { + w.WriteString(fmt.Sprintf("foo_%d 1\n", i)) + } + return w.String() + } + + timeseriesExpectedN = 0 + sw.sendStaleSeries("", "", 0, false) + + timeseriesExpectedN = 0 + sw.sendStaleSeries(generateScrape(10), generateScrape(10), 0, false) + + timeseriesExpectedN = 10 + sw.sendStaleSeries(generateScrape(10), "", 0, false) + + timeseriesExpectedN = 5 + sw.sendStaleSeries(generateScrape(10), generateScrape(5), 0, false) + + timeseriesExpectedN = maxStaleSeriesAtOnce + sw.sendStaleSeries(generateScrape(maxStaleSeriesAtOnce*2), "", 0, false) +} + func parsePromRow(data string) *parser.Row { var rows parser.Rows errLogger := func(s string) {