lib/promscrape: properly apply series limit

Fix the following issues:

- Series limit wasn't applied when staleness tracking was disabled.
- Series limit didn't prevent from sending staleness markers for new series exceeding the limit.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660

Thanks to @hagen1778 for the initial attempt to fix the issue
at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3665
This commit is contained in:
Aliaksandr Valialkin 2023-01-17 10:14:46 -08:00
parent e370daecda
commit c33728befb
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
3 changed files with 37 additions and 22 deletions

View File

@ -24,6 +24,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): consistently put the scrape url with scrape target labels to all error logs for failed scrapes. Previously some failed scrapes were logged without this information. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): consistently put the scrape url with scrape target labels to all error logs for failed scrapes. Previously some failed scrapes were logged without this information.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not send stale markers to remote storage for series exceeding the configured [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly apply [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) when [staleness tracking](https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers) is disabled.
* BUGFIX: [Pushgateway import](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format): properly return `200 OK` HTTP response code. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636). * BUGFIX: [Pushgateway import](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format): properly return `200 OK` HTTP response code. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636).
## [v1.86.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.86.1) ## [v1.86.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.86.1)

View File

@ -41,9 +41,10 @@ import (
) )
var ( var (
noStaleMarkers = flag.Bool("promscrape.noStaleMarkers", false, "Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series") noStaleMarkers = flag.Bool("promscrape.noStaleMarkers", false, "Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series")
strictParse = flag.Bool("promscrape.config.strictParse", true, "Whether to deny unsupported fields in -promscrape.config . Set to false in order to silently skip unsupported fields") seriesLimitPerTarget = flag.Int("promscrape.seriesLimitPerTarget", 0, "Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter for more info")
dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+ strictParse = flag.Bool("promscrape.config.strictParse", true, "Whether to deny unsupported fields in -promscrape.config . Set to false in order to silently skip unsupported fields")
dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+
"Returns non-zero exit code on parsing errors and emits these errors to stderr. "+ "Returns non-zero exit code on parsing errors and emits these errors to stderr. "+
"See also -promscrape.config.strictParse command-line flag. "+ "See also -promscrape.config.strictParse command-line flag. "+
"Pass -loggerLevel=ERROR if you don't need to see info messages in the output.") "Pass -loggerLevel=ERROR if you don't need to see info messages in the output.")
@ -971,6 +972,10 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
if sc.NoStaleMarkers != nil { if sc.NoStaleMarkers != nil {
noStaleTracking = *sc.NoStaleMarkers noStaleTracking = *sc.NoStaleMarkers
} }
seriesLimit := *seriesLimitPerTarget
if sc.SeriesLimit > 0 {
seriesLimit = sc.SeriesLimit
}
swc := &scrapeWorkConfig{ swc := &scrapeWorkConfig{
scrapeInterval: scrapeInterval, scrapeInterval: scrapeInterval,
scrapeIntervalString: scrapeInterval.String(), scrapeIntervalString: scrapeInterval.String(),
@ -995,7 +1000,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
streamParse: sc.StreamParse, streamParse: sc.StreamParse,
scrapeAlignInterval: sc.ScrapeAlignInterval.Duration(), scrapeAlignInterval: sc.ScrapeAlignInterval.Duration(),
scrapeOffset: sc.ScrapeOffset.Duration(), scrapeOffset: sc.ScrapeOffset.Duration(),
seriesLimit: sc.SeriesLimit, seriesLimit: seriesLimit,
noStaleMarkers: noStaleTracking, noStaleMarkers: noStaleTracking,
} }
return swc, nil return swc, nil

View File

@ -37,7 +37,6 @@ var (
"See also -promscrape.suppressScrapeErrorsDelay") "See also -promscrape.suppressScrapeErrorsDelay")
suppressScrapeErrorsDelay = flag.Duration("promscrape.suppressScrapeErrorsDelay", 0, "The delay for suppressing repeated scrape errors logging per each scrape targets. "+ suppressScrapeErrorsDelay = flag.Duration("promscrape.suppressScrapeErrorsDelay", 0, "The delay for suppressing repeated scrape errors logging per each scrape targets. "+
"This may be used for reducing the number of log lines related to scrape errors. See also -promscrape.suppressScrapeErrors") "This may be used for reducing the number of log lines related to scrape errors. See also -promscrape.suppressScrapeErrors")
seriesLimitPerTarget = flag.Int("promscrape.seriesLimitPerTarget", 0, "Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter for more info")
minResponseSizeForStreamParse = flagutil.NewBytes("promscrape.minResponseSizeForStreamParse", 1e6, "The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode") minResponseSizeForStreamParse = flagutil.NewBytes("promscrape.minResponseSizeForStreamParse", 1e6, "The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode")
) )
@ -451,7 +450,7 @@ func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, b
wc := writeRequestCtxPool.Get(sw.prevLabelsLen) wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
lastScrape := sw.loadLastScrape() lastScrape := sw.loadLastScrape()
bodyString := bytesutil.ToUnsafeString(body.B) bodyString := bytesutil.ToUnsafeString(body.B)
areIdenticalSeries := sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString) areIdenticalSeries := sw.areIdenticalSeries(lastScrape, bodyString)
if err != nil { if err != nil {
up = 0 up = 0
scrapesFailed.Inc() scrapesFailed.Inc()
@ -485,9 +484,6 @@ func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, b
samplesDropped := 0 samplesDropped := 0
if sw.seriesLimitExceeded || !areIdenticalSeries { if sw.seriesLimitExceeded || !areIdenticalSeries {
samplesDropped = sw.applySeriesLimit(wc) samplesDropped = sw.applySeriesLimit(wc)
if samplesDropped > 0 {
sw.seriesLimitExceeded = true
}
} }
am := &autoMetrics{ am := &autoMetrics{
up: up, up: up,
@ -577,7 +573,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
err = sbr.Init(sr) err = sbr.Init(sr)
if err == nil { if err == nil {
bodyString = bytesutil.ToUnsafeString(sbr.body) bodyString = bytesutil.ToUnsafeString(sbr.body)
areIdenticalSeries = sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString) areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString)
err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
@ -594,9 +590,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
} }
if sw.seriesLimitExceeded || !areIdenticalSeries { if sw.seriesLimitExceeded || !areIdenticalSeries {
samplesDropped += sw.applySeriesLimit(wc) samplesDropped += sw.applySeriesLimit(wc)
if samplesDropped > 0 && !sw.seriesLimitExceeded {
sw.seriesLimitExceeded = true
}
} }
// Push the collected rows to sw before returning from the callback, since they cannot be held // Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race. // after returning from the callback - this will result in data race.
@ -655,6 +648,15 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
return err return err
} }
func (sw *scrapeWork) areIdenticalSeries(prevData, currData string) bool {
if sw.Config.NoStaleMarkers && sw.Config.SeriesLimit <= 0 {
// Do not spend CPU time on tracking the changes in series if stale markers are disabled.
// The check for series_limit is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
return true
}
return parser.AreIdenticalSeriesFast(prevData, currData)
}
// leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx // leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx
// structs contain mixed number of labels. // structs contain mixed number of labels.
// //
@ -738,17 +740,13 @@ func (sw *scrapeWork) getSeriesAdded(lastScrape, currScrape string) int {
} }
func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int { func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int {
seriesLimit := *seriesLimitPerTarget if sw.Config.SeriesLimit <= 0 {
if sw.Config.SeriesLimit > 0 {
seriesLimit = sw.Config.SeriesLimit
}
if sw.seriesLimiter == nil && seriesLimit > 0 {
sw.seriesLimiter = bloomfilter.NewLimiter(seriesLimit, 24*time.Hour)
}
sl := sw.seriesLimiter
if sl == nil {
return 0 return 0
} }
if sw.seriesLimiter == nil {
sw.seriesLimiter = bloomfilter.NewLimiter(sw.Config.SeriesLimit, 24*time.Hour)
}
sl := sw.seriesLimiter
dstSeries := wc.writeRequest.Timeseries[:0] dstSeries := wc.writeRequest.Timeseries[:0]
samplesDropped := 0 samplesDropped := 0
for _, ts := range wc.writeRequest.Timeseries { for _, ts := range wc.writeRequest.Timeseries {
@ -761,6 +759,9 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int {
} }
prompbmarshal.ResetTimeSeries(wc.writeRequest.Timeseries[len(dstSeries):]) prompbmarshal.ResetTimeSeries(wc.writeRequest.Timeseries[len(dstSeries):])
wc.writeRequest.Timeseries = dstSeries wc.writeRequest.Timeseries = dstSeries
if samplesDropped > 0 && !sw.seriesLimitExceeded {
sw.seriesLimitExceeded = true
}
return samplesDropped return samplesDropped
} }
@ -784,6 +785,13 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
am := &autoMetrics{} am := &autoMetrics{}
sw.addAutoMetrics(am, wc, timestamp) sw.addAutoMetrics(am, wc, timestamp)
} }
// Apply series limit to stale markers in order to prevent sending stale markers for newly created series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
if sw.seriesLimitExceeded {
sw.applySeriesLimit(wc)
}
series := wc.writeRequest.Timeseries series := wc.writeRequest.Timeseries
if len(series) == 0 { if len(series) == 0 {
return return