diff --git a/CHANGELOG.md b/CHANGELOG.md index 71967a61a7..a07ab45dbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ * FEATURE: vmagent: add `/ready` HTTP endpoint, which returns 200 OK status code when all the service discovery has been initialized. This may be useful during rolling upgrades. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/875 +* BUGFIX: vmagent: eliminate data race when `-promscrape.streamParse` command-line is set. Previously this mode could result in scraped metrics with garbage labels. + See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 for details. * BUGFIX: properly calculate `topk_*` and `bottomk_*` functions from [MetricsQL](https://victoriametrics.github.io/MetricsQL.html) for time series with gaps. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/883 diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 160c3b7f3f..30b0dea597 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -325,18 +325,16 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { samplesScraped += len(rows) for i := range rows { sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) - if len(wc.labels) > 40000 { - // Limit the maximum size of wc.writeRequest. - // This should reduce memory usage when scraping targets with millions of metrics and/or labels. - // For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/ - samplesPostRelabeling += len(wc.writeRequest.Timeseries) - sw.updateSeriesAdded(wc) - startTime := time.Now() - sw.PushData(&wc.writeRequest) - pushDataDuration.UpdateDuration(startTime) - wc.resetNoRows() - } } + // Push the collected rows to sw before returning from the callback, since they cannot be held + // after returning from the callback - this will result in data race. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 + samplesPostRelabeling += len(wc.writeRequest.Timeseries) + sw.updateSeriesAdded(wc) + startTime := time.Now() + sw.PushData(&wc.writeRequest) + pushDataDuration.UpdateDuration(startTime) + wc.resetNoRows() return nil }) scrapedSamples.Update(float64(samplesScraped)) @@ -352,8 +350,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { } scrapesFailed.Inc() } - samplesPostRelabeling += len(wc.writeRequest.Timeseries) - sw.updateSeriesAdded(wc) seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling) sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index ecd7b2efa0..fdb38eeeda 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -927,9 +927,13 @@ func (b *bucket16) delFromSmallPool(x uint16) bool { func (b *bucket16) appendTo(dst []uint64, hi uint32, hi16 uint16) []uint64 { hi64 := uint64(hi)<<32 | uint64(hi16)<<16 if b.bits == nil { + // Sort a copy of b.smallPool, since b must be readonly in order to prevent from data races + // when b.appendTo is called from concurrent goroutines. + smallPool := b.smallPool + // Use uint16Sorter instead of sort.Slice here in order to reduce memory allocations. a := uint16SorterPool.Get().(*uint16Sorter) - *a = uint16Sorter(b.smallPool[:b.smallPoolLen]) + *a = uint16Sorter(smallPool[:b.smallPoolLen]) if len(*a) > 1 && !sort.IsSorted(a) { sort.Sort(a) }