mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
lib/promscrape: limit number of sent stale series at once (#3686)
Stale series are sent when there is a difference between current and previous scrapes. Those series which disappeared in the current scrape are marked as stale and sent to the remote storage. Sending stale series requires memory allocation and in case when too many series disappear in the same it could result in noticeable memory spike. For example, re-deploy of a big fleet of service can result into excessive memory usage for vmagent, because all the series with old pod name will be marked as stale and sent to the remote write storage. This change limits the number of stale series which can be sent at once, so memory usage remains steady. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668 https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675 Signed-off-by: hagen1778 <roman@victoriametrics.com> Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
95d4db0506
commit
8e2a8a6ae2
@ -767,6 +767,11 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int {
|
||||
|
||||
var sendStaleSeriesConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
|
||||
|
||||
// maxStaleSeriesAtOnce defines the max number of stale series
|
||||
// to process and send at once. It prevents from excessive memory usage
|
||||
// when big number of metrics become stale at the same time.
|
||||
const maxStaleSeriesAtOnce = 1e3
|
||||
|
||||
func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp int64, addAutoSeries bool) {
|
||||
// This function is CPU-bound, while it may allocate big amounts of memory.
|
||||
// That's why it is a good idea to limit the number of concurrent calls to this function
|
||||
@ -790,35 +795,46 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
|
||||
}()
|
||||
if bodyString != "" {
|
||||
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
|
||||
srcRows := wc.rows.Rows
|
||||
for i := range srcRows {
|
||||
}
|
||||
|
||||
srcRows := wc.rows.Rows
|
||||
for from := 0; from < len(srcRows); from += maxStaleSeriesAtOnce {
|
||||
to := from + maxStaleSeriesAtOnce
|
||||
if to > len(srcRows) {
|
||||
to = len(srcRows)
|
||||
}
|
||||
|
||||
for i := range srcRows[from:to] {
|
||||
sw.addRowToTimeseries(wc, &srcRows[i], timestamp, true)
|
||||
}
|
||||
}
|
||||
if addAutoSeries {
|
||||
am := &autoMetrics{}
|
||||
sw.addAutoMetrics(am, wc, timestamp)
|
||||
}
|
||||
|
||||
// Apply series limit to stale markers in order to prevent sending stale markers for newly created series.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
|
||||
if sw.seriesLimitExceeded {
|
||||
sw.applySeriesLimit(wc)
|
||||
}
|
||||
|
||||
series := wc.writeRequest.Timeseries
|
||||
if len(series) == 0 {
|
||||
return
|
||||
}
|
||||
// Substitute all the values with Prometheus stale markers.
|
||||
for _, tss := range series {
|
||||
samples := tss.Samples
|
||||
for i := range samples {
|
||||
samples[i].Value = decimal.StaleNaN
|
||||
// add auto series at the last iteration
|
||||
if addAutoSeries && to == len(srcRows) {
|
||||
am := &autoMetrics{}
|
||||
sw.addAutoMetrics(am, wc, timestamp)
|
||||
}
|
||||
staleSamplesCreated.Add(len(samples))
|
||||
|
||||
// Apply series limit to stale markers in order to prevent sending stale markers for newly created series.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
|
||||
if sw.seriesLimitExceeded {
|
||||
sw.applySeriesLimit(wc)
|
||||
}
|
||||
|
||||
series := wc.writeRequest.Timeseries
|
||||
if len(series) == 0 {
|
||||
continue
|
||||
}
|
||||
// Substitute all the values with Prometheus stale markers.
|
||||
for _, tss := range series {
|
||||
samples := tss.Samples
|
||||
for i := range samples {
|
||||
samples[i].Value = decimal.StaleNaN
|
||||
}
|
||||
staleSamplesCreated.Add(len(samples))
|
||||
}
|
||||
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
|
||||
wc.reset()
|
||||
}
|
||||
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
|
||||
}
|
||||
|
||||
var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`)
|
||||
|
@ -686,6 +686,44 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) {
|
||||
`metric{a="e",foo="bar"} 0 123`)
|
||||
}
|
||||
|
||||
func TestSendStaleSeries(t *testing.T) {
|
||||
var sw scrapeWork
|
||||
sw.Config = &ScrapeWork{
|
||||
NoStaleMarkers: false,
|
||||
}
|
||||
|
||||
var timeseriesExpectedN int
|
||||
sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
t.Helper()
|
||||
if len(wr.Timeseries) != timeseriesExpectedN {
|
||||
t.Fatalf("expected to get %d stale series; got %d", timeseriesExpectedN, len(wr.Timeseries))
|
||||
}
|
||||
}
|
||||
|
||||
generateScrape := func(n int) string {
|
||||
w := strings.Builder{}
|
||||
for i := 0; i < n; i++ {
|
||||
w.WriteString(fmt.Sprintf("foo_%d 1\n", i))
|
||||
}
|
||||
return w.String()
|
||||
}
|
||||
|
||||
timeseriesExpectedN = 0
|
||||
sw.sendStaleSeries("", "", 0, false)
|
||||
|
||||
timeseriesExpectedN = 0
|
||||
sw.sendStaleSeries(generateScrape(10), generateScrape(10), 0, false)
|
||||
|
||||
timeseriesExpectedN = 10
|
||||
sw.sendStaleSeries(generateScrape(10), "", 0, false)
|
||||
|
||||
timeseriesExpectedN = 5
|
||||
sw.sendStaleSeries(generateScrape(10), generateScrape(5), 0, false)
|
||||
|
||||
timeseriesExpectedN = maxStaleSeriesAtOnce
|
||||
sw.sendStaleSeries(generateScrape(maxStaleSeriesAtOnce*2), "", 0, false)
|
||||
}
|
||||
|
||||
func parsePromRow(data string) *parser.Row {
|
||||
var rows parser.Rows
|
||||
errLogger := func(s string) {
|
||||
|
Loading…
Reference in New Issue
Block a user