mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-04 16:51:11 +01:00
lib/promscrape: properly handle scrape errors when stream parsing is enabled
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/967
This commit is contained in:
parent
11fa458e39
commit
b730fc2667
@ -21,6 +21,7 @@
|
|||||||
* BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944
|
* BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944
|
||||||
* BUGFIX: vminsert: properly update `vm_rpc_rerouted_rows_processed_total` metric. Previously it wasn't updated. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/955
|
* BUGFIX: vminsert: properly update `vm_rpc_rerouted_rows_processed_total` metric. Previously it wasn't updated. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/955
|
||||||
* BUGFIX: vmagent: properly recover when opening incorrectly stored persistent queue. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/964
|
* BUGFIX: vmagent: properly recover when opening incorrectly stored persistent queue. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/964
|
||||||
|
* BUGFIX: vmagent: properly handle scrape errors when stream parsing is enabled with `-promscrape.streamParse` command-line flag or with `stream_parse: true` per-target config option. Previously such errors weren't reported at `/targets` page. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/967
|
||||||
* BUGFIX: assume the previous value is 0 when calculating `increase()` for the first point on the graph if its value doesn't exceed 100 and the delta between two first points equals to 0. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962
|
* BUGFIX: assume the previous value is 0 when calculating `increase()` for the first point on the graph if its value doesn't exceed 100 and the delta between two first points equals to 0. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962
|
||||||
|
|
||||||
|
|
||||||
|
@ -313,38 +313,43 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||||
sr, err := sw.GetStreamReader()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot read data: %s", err)
|
|
||||||
}
|
|
||||||
samplesScraped := 0
|
samplesScraped := 0
|
||||||
samplesPostRelabeling := 0
|
samplesPostRelabeling := 0
|
||||||
|
responseSize := int64(0)
|
||||||
wc := writeRequestCtxPool.Get(sw.prevRowsLen)
|
wc := writeRequestCtxPool.Get(sw.prevRowsLen)
|
||||||
var mu sync.Mutex
|
|
||||||
err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error {
|
sr, err := sw.GetStreamReader()
|
||||||
mu.Lock()
|
if err != nil {
|
||||||
defer mu.Unlock()
|
err = fmt.Errorf("cannot read data: %s", err)
|
||||||
samplesScraped += len(rows)
|
} else {
|
||||||
for i := range rows {
|
var mu sync.Mutex
|
||||||
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
|
err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error {
|
||||||
}
|
mu.Lock()
|
||||||
// Push the collected rows to sw before returning from the callback, since they cannot be held
|
defer mu.Unlock()
|
||||||
// after returning from the callback - this will result in data race.
|
samplesScraped += len(rows)
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
|
for i := range rows {
|
||||||
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
|
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
|
||||||
sw.updateSeriesAdded(wc)
|
}
|
||||||
startTime := time.Now()
|
// Push the collected rows to sw before returning from the callback, since they cannot be held
|
||||||
sw.PushData(&wc.writeRequest)
|
// after returning from the callback - this will result in data race.
|
||||||
pushDataDuration.UpdateDuration(startTime)
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
|
||||||
wc.resetNoRows()
|
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
|
||||||
return nil
|
sw.updateSeriesAdded(wc)
|
||||||
})
|
startTime := time.Now()
|
||||||
|
sw.PushData(&wc.writeRequest)
|
||||||
|
pushDataDuration.UpdateDuration(startTime)
|
||||||
|
wc.resetNoRows()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
responseSize = sr.bytesRead
|
||||||
|
sr.MustClose()
|
||||||
|
}
|
||||||
|
|
||||||
scrapedSamples.Update(float64(samplesScraped))
|
scrapedSamples.Update(float64(samplesScraped))
|
||||||
endTimestamp := time.Now().UnixNano() / 1e6
|
endTimestamp := time.Now().UnixNano() / 1e6
|
||||||
duration := float64(endTimestamp-realTimestamp) / 1e3
|
duration := float64(endTimestamp-realTimestamp) / 1e3
|
||||||
scrapeDuration.Update(duration)
|
scrapeDuration.Update(duration)
|
||||||
scrapeResponseSize.Update(float64(sr.bytesRead))
|
scrapeResponseSize.Update(float64(responseSize))
|
||||||
sr.MustClose()
|
|
||||||
up := 1
|
up := 1
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if samplesScraped == 0 {
|
if samplesScraped == 0 {
|
||||||
|
Loading…
Reference in New Issue
Block a user