lib/promscrape: properly handle scrape errors when stream parsing is enabled

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/967
This commit is contained in:
Aliaksandr Valialkin 2020-12-15 14:08:06 +02:00
parent 8d1031c29a
commit eddc2bd017
2 changed files with 32 additions and 25 deletions

View File

@ -21,6 +21,8 @@
* BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944 * BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944
* BUGFIX: vminsert: properly update `vm_rpc_rerouted_rows_processed_total` metric. Previously it wasn't updated. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/955 * BUGFIX: vminsert: properly update `vm_rpc_rerouted_rows_processed_total` metric. Previously it wasn't updated. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/955
* BUGFIX: vmagent: properly recover when opening incorrectly stored persistent queue. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/964 * BUGFIX: vmagent: properly recover when opening incorrectly stored persistent queue. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/964
* BUGFIX: vmagent: properly handle scrape errors when stream parsing is enabled with `-promscrape.streamParse` command-line flag or with `stream_parse: true` per-target config option. Previously such errors weren't reported at `/targets` page. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/967
* BUGFIX: assume the previous value is 0 when calculating `increase()` for the first point on the graph if its value doesn't exceed 100 and the delta between two first points equals to 0. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962
# [v1.49.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.49.0) # [v1.49.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.49.0)

View File

@ -313,38 +313,43 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
} }
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
sr, err := sw.GetStreamReader()
if err != nil {
return fmt.Errorf("cannot read data: %s", err)
}
samplesScraped := 0 samplesScraped := 0
samplesPostRelabeling := 0 samplesPostRelabeling := 0
responseSize := int64(0)
wc := writeRequestCtxPool.Get(sw.prevRowsLen) wc := writeRequestCtxPool.Get(sw.prevRowsLen)
var mu sync.Mutex
err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error { sr, err := sw.GetStreamReader()
mu.Lock() if err != nil {
defer mu.Unlock() err = fmt.Errorf("cannot read data: %s", err)
samplesScraped += len(rows) } else {
for i := range rows { var mu sync.Mutex
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error {
} mu.Lock()
// Push the collected rows to sw before returning from the callback, since they cannot be held defer mu.Unlock()
// after returning from the callback - this will result in data race. samplesScraped += len(rows)
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 for i := range rows {
samplesPostRelabeling += len(wc.writeRequest.Timeseries) sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
sw.updateSeriesAdded(wc) }
startTime := time.Now() // Push the collected rows to sw before returning from the callback, since they cannot be held
sw.PushData(&wc.writeRequest) // after returning from the callback - this will result in data race.
pushDataDuration.UpdateDuration(startTime) // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
wc.resetNoRows() samplesPostRelabeling += len(wc.writeRequest.Timeseries)
return nil sw.updateSeriesAdded(wc)
}) startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
return nil
})
responseSize = sr.bytesRead
sr.MustClose()
}
scrapedSamples.Update(float64(samplesScraped)) scrapedSamples.Update(float64(samplesScraped))
endTimestamp := time.Now().UnixNano() / 1e6 endTimestamp := time.Now().UnixNano() / 1e6
duration := float64(endTimestamp-realTimestamp) / 1e3 duration := float64(endTimestamp-realTimestamp) / 1e3
scrapeDuration.Update(duration) scrapeDuration.Update(duration)
scrapeResponseSize.Update(float64(sr.bytesRead)) scrapeResponseSize.Update(float64(responseSize))
sr.MustClose()
up := 1 up := 1
if err != nil { if err != nil {
if samplesScraped == 0 { if samplesScraped == 0 {