lib/promscrape: send stale markers for the previously scraped metrics on failed scrapes like Prometheus does

This commit is contained in:
Aliaksandr Valialkin 2021-08-18 21:58:40 +03:00
parent 738741ab0d
commit 8ee575dee9
3 changed files with 32 additions and 2 deletions

View File

@ -12,6 +12,7 @@ sort: 15
* FEATURE: vmselect: add `-search.noStaleMarkers` command-line flag for stale markers handling in queries. * FEATURE: vmselect: add `-search.noStaleMarkers` command-line flag for stale markers handling in queries.
* BUGFIX: vmagent: stop scrapers for deleted targets before starting scrapers for added targets. This should prevent from possible time series overlap when old targets are substituted by new targets (for example, during new deployment in Kubernetes). The overlap could lead to incorrect query results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1509). * BUGFIX: vmagent: stop scrapers for deleted targets before starting scrapers for added targets. This should prevent from possible time series overlap when old targets are substituted by new targets (for example, during new deployment in Kubernetes). The overlap could lead to incorrect query results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1509).
* BUGFIX: vmagent: send Prometheus stale markers for the previously scraped metrics on failed scrapes like Prometheus does. See [this article](https://www.robustperception.io/staleness-and-promql).
* BUGFIX: upgrade base Docker image from Alpine 3.14.0 to Alpine 3.14.1 . This fixes potential security issues - see [Alpine 3.14.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.14.1-released.html). * BUGFIX: upgrade base Docker image from Alpine 3.14.0 to Alpine 3.14.1 . This fixes potential security issues - see [Alpine 3.14.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.14.1-released.html).

View File

@ -239,7 +239,7 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) {
timestamp += scrapeInterval.Milliseconds() timestamp += scrapeInterval.Milliseconds()
select { select {
case <-stopCh: case <-stopCh:
sw.sendStaleMarkers() sw.sendStaleMarkers(false)
return return
case tt := <-ticker.C: case tt := <-ticker.C:
t := tt.UnixNano() / 1e6 t := tt.UnixNano() / 1e6
@ -322,6 +322,9 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
if up == 0 {
sw.sendStaleMarkers(true)
}
sw.updateActiveSeries(wc) sw.updateActiveSeries(wc)
sw.pushData(&wc.writeRequest) sw.pushData(&wc.writeRequest)
sw.prevLabelsLen = len(wc.labels) sw.prevLabelsLen = len(wc.labels)
@ -334,6 +337,19 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
return err return err
} }
func isAutogenSeries(name string) bool {
switch name {
case "up",
"scrape_duration_seconds",
"scrape_samples_scraped",
"scrape_samples_post_metric_relabeling",
"scrape_series_added":
return true
default:
return false
}
}
func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) { func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) {
startTime := time.Now() startTime := time.Now()
sw.PushData(wr) sw.PushData(wr)
@ -504,7 +520,7 @@ func (sw *scrapeWork) updateActiveSeries(wc *writeRequestCtx) {
sw.activeSeries = as sw.activeSeries = as
} }
func (sw *scrapeWork) sendStaleMarkers() { func (sw *scrapeWork) sendStaleMarkers(skipAutogenSeries bool) {
series := make([]prompbmarshal.TimeSeries, 0, len(sw.activeSeries)) series := make([]prompbmarshal.TimeSeries, 0, len(sw.activeSeries))
staleMarkSamples := []prompbmarshal.Sample{ staleMarkSamples := []prompbmarshal.Sample{
{ {
@ -514,6 +530,7 @@ func (sw *scrapeWork) sendStaleMarkers() {
} }
for _, b := range sw.activeSeries { for _, b := range sw.activeSeries {
var labels []prompbmarshal.Label var labels []prompbmarshal.Label
skipSeries := false
for len(b) > 0 { for len(b) > 0 {
tail, name, err := encoding.UnmarshalBytes(b) tail, name, err := encoding.UnmarshalBytes(b)
if err != nil { if err != nil {
@ -525,16 +542,25 @@ func (sw *scrapeWork) sendStaleMarkers() {
logger.Panicf("BUG: cannot unmarshal label value from activeSeries: %s", err) logger.Panicf("BUG: cannot unmarshal label value from activeSeries: %s", err)
} }
b = tail b = tail
if skipAutogenSeries && string(name) == "__name__" && isAutogenSeries(bytesutil.ToUnsafeString(value)) {
skipSeries = true
}
labels = append(labels, prompbmarshal.Label{ labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(name), Name: bytesutil.ToUnsafeString(name),
Value: bytesutil.ToUnsafeString(value), Value: bytesutil.ToUnsafeString(value),
}) })
} }
if skipSeries {
continue
}
series = append(series, prompbmarshal.TimeSeries{ series = append(series, prompbmarshal.TimeSeries{
Labels: labels, Labels: labels,
Samples: staleMarkSamples, Samples: staleMarkSamples,
}) })
} }
if len(series) == 0 {
return
}
wr := &prompbmarshal.WriteRequest{ wr := &prompbmarshal.WriteRequest{
Timeseries: series, Timeseries: series,
} }

View File

@ -128,6 +128,9 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
if pushDataCalls == 0 { if pushDataCalls == 0 {
t.Fatalf("missing pushData calls") t.Fatalf("missing pushData calls")
} }
if len(timeseriesExpected) != 0 {
t.Fatalf("%d series weren't pushed", len(timeseriesExpected))
}
} }
f(``, &ScrapeWork{}, ` f(``, &ScrapeWork{}, `