diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 18ddb39052..16f460c239 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -767,6 +767,11 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int { var sendStaleSeriesConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) +// maxStaleSeriesAtOnce defines the max number of stale series +// to process and send at once. It prevents from excessive memory usage +// when big number of metrics become stale at the same time. +const maxStaleSeriesAtOnce = 1e3 + func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp int64, addAutoSeries bool) { // This function is CPU-bound, while it may allocate big amounts of memory. // That's why it is a good idea to limit the number of concurrent calls to this function @@ -790,35 +795,46 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i }() if bodyString != "" { wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError) - srcRows := wc.rows.Rows - for i := range srcRows { + } + + srcRows := wc.rows.Rows + for from := 0; from < len(srcRows); from += maxStaleSeriesAtOnce { + to := from + maxStaleSeriesAtOnce + if to > len(srcRows) { + to = len(srcRows) + } + + for i := range srcRows[from:to] { sw.addRowToTimeseries(wc, &srcRows[i], timestamp, true) } - } - if addAutoSeries { - am := &autoMetrics{} - sw.addAutoMetrics(am, wc, timestamp) - } - // Apply series limit to stale markers in order to prevent sending stale markers for newly created series. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 - if sw.seriesLimitExceeded { - sw.applySeriesLimit(wc) - } - - series := wc.writeRequest.Timeseries - if len(series) == 0 { - return - } - // Substitute all the values with Prometheus stale markers. - for _, tss := range series { - samples := tss.Samples - for i := range samples { - samples[i].Value = decimal.StaleNaN + // add auto series at the last iteration + if addAutoSeries && to == len(srcRows) { + am := &autoMetrics{} + sw.addAutoMetrics(am, wc, timestamp) } - staleSamplesCreated.Add(len(samples)) + + // Apply series limit to stale markers in order to prevent sending stale markers for newly created series. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 + if sw.seriesLimitExceeded { + sw.applySeriesLimit(wc) + } + + series := wc.writeRequest.Timeseries + if len(series) == 0 { + continue + } + // Substitute all the values with Prometheus stale markers. + for _, tss := range series { + samples := tss.Samples + for i := range samples { + samples[i].Value = decimal.StaleNaN + } + staleSamplesCreated.Add(len(samples)) + } + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) + wc.reset() } - sw.pushData(sw.Config.AuthToken, &wc.writeRequest) } var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`) diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index b94ba1dd1f..38a3a3462a 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -686,6 +686,44 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) { `metric{a="e",foo="bar"} 0 123`) } +func TestSendStaleSeries(t *testing.T) { + var sw scrapeWork + sw.Config = &ScrapeWork{ + NoStaleMarkers: false, + } + + var timeseriesExpectedN int + sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) { + t.Helper() + if len(wr.Timeseries) != timeseriesExpectedN { + t.Fatalf("expected to get %d stale series; got %d", timeseriesExpectedN, len(wr.Timeseries)) + } + } + + generateScrape := func(n int) string { + w := strings.Builder{} + for i := 0; i < n; i++ { + w.WriteString(fmt.Sprintf("foo_%d 1\n", i)) + } + return w.String() + } + + timeseriesExpectedN = 0 + sw.sendStaleSeries("", "", 0, false) + + timeseriesExpectedN = 0 + sw.sendStaleSeries(generateScrape(10), generateScrape(10), 0, false) + + timeseriesExpectedN = 10 + sw.sendStaleSeries(generateScrape(10), "", 0, false) + + timeseriesExpectedN = 5 + sw.sendStaleSeries(generateScrape(10), generateScrape(5), 0, false) + + timeseriesExpectedN = maxStaleSeriesAtOnce + sw.sendStaleSeries(generateScrape(maxStaleSeriesAtOnce*2), "", 0, false) +} + func parsePromRow(data string) *parser.Row { var rows parser.Rows errLogger := func(s string) {