lib/promscrape: eliminate data race in stream parse mode

Previously `-promscrape.streamParse` mode could result in garbage labels for the scraped metrics because of data race.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
This commit is contained in:
Aliaksandr Valialkin 2020-11-07 12:44:15 +02:00
parent 6ca5a94359
commit 535fea3d11
3 changed files with 16 additions and 14 deletions

View File

@ -10,6 +10,8 @@
* FEATURE: vmagent: add `/ready` HTTP endpoint, which returns 200 OK status code when all the service discovery has been initialized.
This may be useful during rolling upgrades. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/875
* BUGFIX: vmagent: eliminate data race when `-promscrape.streamParse` command-line is set. Previously this mode could result in scraped metrics with garbage labels.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 for details.
* BUGFIX: properly calculate `topk_*` and `bottomk_*` functions from [MetricsQL](https://victoriametrics.github.io/MetricsQL.html) for time series with gaps.
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/883

View File

@ -325,18 +325,16 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
samplesScraped += len(rows)
for i := range rows {
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
if len(wc.labels) > 40000 {
// Limit the maximum size of wc.writeRequest.
// This should reduce memory usage when scraping targets with millions of metrics and/or labels.
// For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
}
}
return nil
})
scrapedSamples.Update(float64(samplesScraped))
@ -352,8 +350,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
}
scrapesFailed.Inc()
}
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling)
sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp)

View File

@ -927,9 +927,13 @@ func (b *bucket16) delFromSmallPool(x uint16) bool {
func (b *bucket16) appendTo(dst []uint64, hi uint32, hi16 uint16) []uint64 {
hi64 := uint64(hi)<<32 | uint64(hi16)<<16
if b.bits == nil {
// Sort a copy of b.smallPool, since b must be readonly in order to prevent from data races
// when b.appendTo is called from concurrent goroutines.
smallPool := b.smallPool
// Use uint16Sorter instead of sort.Slice here in order to reduce memory allocations.
a := uint16SorterPool.Get().(*uint16Sorter)
*a = uint16Sorter(b.smallPool[:b.smallPoolLen])
*a = uint16Sorter(smallPool[:b.smallPoolLen])
if len(*a) > 1 && !sort.IsSorted(a) {
sort.Sort(a)
}