lib/promscrape: reduce memory and CPU usage when Prometheus staleness tracking is enabled for metrics from deleted / disappeared scrape targets

Store the scraped response body instead of storing the parsed and relabeld metrics.
This should reduce memory usage, since the response body takes less memory than the parsed and relabeled metrics.
This is especially true for Kubernetes service discovery, which adds many long labels for all the scraped metrics.

This should also reduce CPU usage, since the marshaling of the parsed
and relabeld metrics has been substituted by response body copying.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526
This commit is contained in:
Aliaksandr Valialkin 2021-08-21 21:16:50 +03:00
parent ff4c7c1a3d
commit 67bc407747
2 changed files with 41 additions and 79 deletions

View File

@ -6,7 +6,7 @@ sort: 15
## tip
* FEATURE: vmagent: reduce memory usage and CPU usage when Prometheus staleness tracking is enabled for metrics exported from the deleted or disappeared scrape targets.
* FEATURE: take into account failed queries in `vm_request_duration_seconds` summary at `/metrics`. Previously only successful queries were taken into account. This could result in skewed summary. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1537).
* FEATURE: vmalert: add `-disableAlertgroupLabel` command-line flag for disabling the label with alert group name. This may be needed for proper deduplication in Alertmanager. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1532).

View File

@ -11,7 +11,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
@ -187,8 +186,9 @@ type scrapeWork struct {
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
prevLabelsLen int
activeSeriesBuf []byte
activeSeries [][]byte
// lastScrape holds the last response from scrape target.
// It is used for generating Prometheus stale markers.
lastScrape []byte
}
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
@ -240,7 +240,7 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) {
select {
case <-stopCh:
t := time.Now().UnixNano() / 1e6
sw.sendStaleMarkers(t, false)
sw.sendStaleMarkersForLastScrape(t, true)
return
case tt := <-ticker.C:
t := tt.UnixNano() / 1e6
@ -284,7 +284,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
}
// Common case: read all the data from scrape target to memory (body) and then process it.
// This case should work more optimally for than stream parse code above for common case when scrape target exposes
// This case should work more optimally than stream parse code for common case when scrape target exposes
// up to a few thousand metrics.
body := leveledbytebufferpool.Get(sw.prevBodyLen)
var err error
@ -295,11 +295,11 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
scrapeResponseSize.Update(float64(len(body.B)))
up := 1
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
bodyString := bytesutil.ToUnsafeString(body.B)
if err != nil {
up = 0
scrapesFailed.Inc()
} else {
bodyString := bytesutil.ToUnsafeString(body.B)
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
}
srcRows := wc.rows.Rows
@ -323,10 +323,6 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
if up == 0 {
sw.sendStaleMarkers(scrapeTimestamp, true)
}
sw.updateActiveSeries(wc)
sw.pushData(&wc.writeRequest)
sw.prevLabelsLen = len(wc.labels)
wc.reset()
@ -335,20 +331,12 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
sw.prevBodyLen = len(body.B)
leveledbytebufferpool.Put(body)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
return err
}
func isAutogenSeries(name string) bool {
switch name {
case "up",
"scrape_duration_seconds",
"scrape_samples_scraped",
"scrape_samples_post_metric_relabeling",
"scrape_series_added":
return true
default:
return false
if up == 0 {
bodyString = ""
sw.sendStaleMarkersForLastScrape(scrapeTimestamp, false)
}
sw.updateLastScrape(bodyString)
return err
}
func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) {
@ -412,14 +400,13 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
// Do not call sw.updateActiveSeries(wc), since wc doesn't contain the full list of scraped metrics.
// Do not track active series in streaming mode, since this may need too big amounts of memory
// when the target exports too big number of metrics.
sw.pushData(&wc.writeRequest)
sw.prevLabelsLen = len(wc.labels)
wc.reset()
writeRequestCtxPool.Put(wc)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
// Do not track active series in streaming mode, since this may need too big amounts of memory
// when the target exports too big number of metrics.
return err
}
@ -503,69 +490,44 @@ func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) {
}
}
func (sw *scrapeWork) updateActiveSeries(wc *writeRequestCtx) {
func (sw *scrapeWork) updateLastScrape(response string) {
if *noStaleMarkers {
return
}
b := sw.activeSeriesBuf[:0]
as := sw.activeSeries[:0]
for _, ts := range wc.writeRequest.Timeseries {
bLen := len(b)
for _, label := range ts.Labels {
b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Name))
b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Value))
}
as = append(as, b[bLen:])
}
sw.activeSeriesBuf = b
sw.activeSeries = as
sw.lastScrape = append(sw.lastScrape[:0], response...)
}
func (sw *scrapeWork) sendStaleMarkers(timestamp int64, skipAutogenSeries bool) {
series := make([]prompbmarshal.TimeSeries, 0, len(sw.activeSeries))
staleMarkSamples := []prompbmarshal.Sample{
{
Value: decimal.StaleNaN,
Timestamp: timestamp,
},
func (sw *scrapeWork) sendStaleMarkersForLastScrape(timestamp int64, addAutoSeries bool) {
bodyString := bytesutil.ToUnsafeString(sw.lastScrape)
if len(bodyString) == 0 && !addAutoSeries {
return
}
for _, b := range sw.activeSeries {
var labels []prompbmarshal.Label
skipSeries := false
for len(b) > 0 {
tail, name, err := encoding.UnmarshalBytes(b)
if err != nil {
logger.Panicf("BUG: cannot unmarshal label name from activeSeries: %s", err)
}
b = tail
tail, value, err := encoding.UnmarshalBytes(b)
if err != nil {
logger.Panicf("BUG: cannot unmarshal label value from activeSeries: %s", err)
}
b = tail
if skipAutogenSeries && string(name) == "__name__" && isAutogenSeries(bytesutil.ToUnsafeString(value)) {
skipSeries = true
}
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(name),
Value: bytesutil.ToUnsafeString(value),
})
}
if skipSeries {
continue
}
series = append(series, prompbmarshal.TimeSeries{
Labels: labels,
Samples: staleMarkSamples,
})
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
srcRows := wc.rows.Rows
for i := range srcRows {
sw.addRowToTimeseries(wc, &srcRows[i], timestamp, true)
}
if addAutoSeries {
sw.addAutoTimeseries(wc, "up", 0, timestamp)
sw.addAutoTimeseries(wc, "scrape_duration_seconds", 0, timestamp)
sw.addAutoTimeseries(wc, "scrape_samples_scraped", 0, timestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", 0, timestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", 0, timestamp)
}
series := wc.writeRequest.Timeseries
if len(series) == 0 {
return
}
wr := &prompbmarshal.WriteRequest{
Timeseries: series,
// Substitute all the values with Prometheus stale markers.
for _, tss := range series {
samples := tss.Samples
for i := range samples {
samples[i].Value = decimal.StaleNaN
}
}
sw.pushData(wr)
sw.pushData(&wc.writeRequest)
writeRequestCtxPool.Put(wc)
}
func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int {