From 71a170d404e580f154765ce3213b64c0645af895 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 23 Jan 2023 21:52:57 -0800 Subject: [PATCH] lib/promscrape: follow-up for 393876e52a5bd25cec0bebc7997ecbf2e39af8da - Document the change in docs/CHANGELOG.md - Reduce memory usage when sending stale markers even more by parsing the response in stream parsing mode - Update the TestSendStaleSeries Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675 --- docs/CHANGELOG.md | 4 +- lib/promscrape/scrapework.go | 86 +++++++++++++++---------------- lib/promscrape/scrapework_test.go | 47 +++++++++-------- 3 files changed, 69 insertions(+), 68 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 02256ea297..5993e681ed 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,8 +15,10 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when sending stale markers for targets, which expose big number of metrics. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675) issues. + * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): propagate all the timeout-related errors from `vmstorage` to `vmselect` when `vmstorage`. Previously some timeout errors weren't returned from `vmselect` to `vmstorage`. Instead, `vmstorage` could log the error and close the connection to `vmselect`, so `vmselect` was logging cryptic errors such as `cannot execute funcName="..." on vmstorage "...": EOF`. -* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add support for time zone selection for older versions of browsers. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3680) +* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add support for time zone selection for older versions of browsers. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3680). ## [v1.86.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.86.2) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 16f460c239..db495a9a16 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -1,6 +1,7 @@ package promscrape import ( + "bytes" "flag" "fmt" "io" @@ -767,11 +768,6 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int { var sendStaleSeriesConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) -// maxStaleSeriesAtOnce defines the max number of stale series -// to process and send at once. It prevents from excessive memory usage -// when big number of metrics become stale at the same time. -const maxStaleSeriesAtOnce = 1e3 - func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp int64, addAutoSeries bool) { // This function is CPU-bound, while it may allocate big amounts of memory. // That's why it is a good idea to limit the number of concurrent calls to this function @@ -794,46 +790,50 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i writeRequestCtxPool.Put(wc) }() if bodyString != "" { - wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError) - } - - srcRows := wc.rows.Rows - for from := 0; from < len(srcRows); from += maxStaleSeriesAtOnce { - to := from + maxStaleSeriesAtOnce - if to > len(srcRows) { - to = len(srcRows) - } - - for i := range srcRows[from:to] { - sw.addRowToTimeseries(wc, &srcRows[i], timestamp, true) - } - - // add auto series at the last iteration - if addAutoSeries && to == len(srcRows) { - am := &autoMetrics{} - sw.addAutoMetrics(am, wc, timestamp) - } - - // Apply series limit to stale markers in order to prevent sending stale markers for newly created series. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 - if sw.seriesLimitExceeded { - sw.applySeriesLimit(wc) - } - - series := wc.writeRequest.Timeseries - if len(series) == 0 { - continue - } - // Substitute all the values with Prometheus stale markers. - for _, tss := range series { - samples := tss.Samples - for i := range samples { - samples[i].Value = decimal.StaleNaN + // Send stale markers in streaming mode in order to reduce memory usage + // when stale markers for targets exposing big number of metrics must be generated. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668 + // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675 + var mu sync.Mutex + br := bytes.NewBufferString(bodyString) + err := parser.ParseStream(br, timestamp, false, func(rows []parser.Row) error { + mu.Lock() + defer mu.Unlock() + for i := range rows { + sw.addRowToTimeseries(wc, &rows[i], timestamp, true) } - staleSamplesCreated.Add(len(samples)) + // Apply series limit to stale markers in order to prevent sending stale markers for newly created series. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 + if sw.seriesLimitExceeded { + sw.applySeriesLimit(wc) + } + // Push the collected rows to sw before returning from the callback, since they cannot be held + // after returning from the callback - this will result in data race. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 + setStaleMarkersForRows(wc.writeRequest.Timeseries) + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) + wc.resetNoRows() + return nil + }, sw.logError) + if err != nil { + sw.logError(fmt.Errorf("cannot send stale markers: %s", err).Error()) } - sw.pushData(sw.Config.AuthToken, &wc.writeRequest) - wc.reset() + } + if addAutoSeries { + am := &autoMetrics{} + sw.addAutoMetrics(am, wc, timestamp) + } + setStaleMarkersForRows(wc.writeRequest.Timeseries) + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) +} + +func setStaleMarkersForRows(series []prompbmarshal.TimeSeries) { + for _, tss := range series { + samples := tss.Samples + for i := range samples { + samples[i].Value = decimal.StaleNaN + } + staleSamplesCreated.Add(len(samples)) } } diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index 38a3a3462a..3f5330e608 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" ) @@ -687,19 +688,24 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) { } func TestSendStaleSeries(t *testing.T) { - var sw scrapeWork - sw.Config = &ScrapeWork{ - NoStaleMarkers: false, - } - - var timeseriesExpectedN int - sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) { + f := func(lastScrape, currScrape string, staleMarksExpected int) { t.Helper() - if len(wr.Timeseries) != timeseriesExpectedN { - t.Fatalf("expected to get %d stale series; got %d", timeseriesExpectedN, len(wr.Timeseries)) + var sw scrapeWork + sw.Config = &ScrapeWork{ + NoStaleMarkers: false, + } + common.StartUnmarshalWorkers() + defer common.StopUnmarshalWorkers() + + var staleMarks int + sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) { + staleMarks += len(wr.Timeseries) + } + sw.sendStaleSeries(lastScrape, currScrape, 0, false) + if staleMarks != staleMarksExpected { + t.Fatalf("unexpected number of stale marks; got %d; want %d", staleMarks, staleMarksExpected) } } - generateScrape := func(n int) string { w := strings.Builder{} for i := 0; i < n; i++ { @@ -708,20 +714,13 @@ func TestSendStaleSeries(t *testing.T) { return w.String() } - timeseriesExpectedN = 0 - sw.sendStaleSeries("", "", 0, false) - - timeseriesExpectedN = 0 - sw.sendStaleSeries(generateScrape(10), generateScrape(10), 0, false) - - timeseriesExpectedN = 10 - sw.sendStaleSeries(generateScrape(10), "", 0, false) - - timeseriesExpectedN = 5 - sw.sendStaleSeries(generateScrape(10), generateScrape(5), 0, false) - - timeseriesExpectedN = maxStaleSeriesAtOnce - sw.sendStaleSeries(generateScrape(maxStaleSeriesAtOnce*2), "", 0, false) + f("", "", 0) + f(generateScrape(10), generateScrape(10), 0) + f(generateScrape(10), "", 10) + f("", generateScrape(10), 0) + f(generateScrape(10), generateScrape(3), 7) + f(generateScrape(3), generateScrape(10), 0) + f(generateScrape(20000), generateScrape(10), 19990) } func parsePromRow(data string) *parser.Row {