diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 0b50310fe..b8768928f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -21,6 +21,7 @@ * BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944 * BUGFIX: vminsert: properly update `vm_rpc_rerouted_rows_processed_total` metric. Previously it wasn't updated. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/955 * BUGFIX: vmagent: properly recover when opening incorrectly stored persistent queue. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/964 +* BUGFIX: vmagent: properly handle scrape errors when stream parsing is enabled with `-promscrape.streamParse` command-line flag or with `stream_parse: true` per-target config option. Previously such errors weren't reported at `/targets` page. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/967 * BUGFIX: assume the previous value is 0 when calculating `increase()` for the first point on the graph if its value doesn't exceed 100 and the delta between two first points equals to 0. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962 diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 58392cd93..e1ea2958f 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -313,38 +313,43 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error } func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { - sr, err := sw.GetStreamReader() - if err != nil { - return fmt.Errorf("cannot read data: %s", err) - } samplesScraped := 0 samplesPostRelabeling := 0 + responseSize := int64(0) wc := writeRequestCtxPool.Get(sw.prevRowsLen) - var mu sync.Mutex - err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error { - mu.Lock() - defer mu.Unlock() - samplesScraped += len(rows) - for i := range rows { - sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) - } - // Push the collected rows to sw before returning from the callback, since they cannot be held - // after returning from the callback - this will result in data race. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 - samplesPostRelabeling += len(wc.writeRequest.Timeseries) - sw.updateSeriesAdded(wc) - startTime := time.Now() - sw.PushData(&wc.writeRequest) - pushDataDuration.UpdateDuration(startTime) - wc.resetNoRows() - return nil - }) + + sr, err := sw.GetStreamReader() + if err != nil { + err = fmt.Errorf("cannot read data: %s", err) + } else { + var mu sync.Mutex + err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error { + mu.Lock() + defer mu.Unlock() + samplesScraped += len(rows) + for i := range rows { + sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) + } + // Push the collected rows to sw before returning from the callback, since they cannot be held + // after returning from the callback - this will result in data race. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 + samplesPostRelabeling += len(wc.writeRequest.Timeseries) + sw.updateSeriesAdded(wc) + startTime := time.Now() + sw.PushData(&wc.writeRequest) + pushDataDuration.UpdateDuration(startTime) + wc.resetNoRows() + return nil + }) + responseSize = sr.bytesRead + sr.MustClose() + } + scrapedSamples.Update(float64(samplesScraped)) endTimestamp := time.Now().UnixNano() / 1e6 duration := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(duration) - scrapeResponseSize.Update(float64(sr.bytesRead)) - sr.MustClose() + scrapeResponseSize.Update(float64(responseSize)) up := 1 if err != nil { if samplesScraped == 0 {