app/vmagent: fix data race when multiple -remoteWrite.urlRelabelConfig options are set

Previously multiple goroutines could access remoteWriteCtx.tss concurrently, which could lead to data race
and improper relabeling. Now each goroutine has its own copy of tss during relabeling.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
This commit is contained in:
Aliaksandr Valialkin 2020-07-10 15:13:26 +03:00
parent 805a90f642
commit 4353ff7ef1
2 changed files with 15 additions and 10 deletions

View File

@ -59,7 +59,6 @@ type relabelConfigs struct {
// initLabelsGlobal must be called after parsing command-line flags. // initLabelsGlobal must be called after parsing command-line flags.
func initLabelsGlobal() { func initLabelsGlobal() {
// Init labelsGlobal
labelsGlobal = nil labelsGlobal = nil
for _, s := range *unparsedLabelsGlobal { for _, s := range *unparsedLabelsGlobal {
n := strings.IndexByte(s, '=') n := strings.IndexByte(s, '=')

View File

@ -128,7 +128,7 @@ func Push(wr *prompbmarshal.WriteRequest) {
} }
tss := wr.Timeseries tss := wr.Timeseries
for len(tss) > 0 { for len(tss) > 0 {
// Process big tss in smaller blocks in order to reduce maxmimum memory usage // Process big tss in smaller blocks in order to reduce the maximum memory usage
tssBlock := tss tssBlock := tss
if len(tssBlock) > maxRowsPerBlock { if len(tssBlock) > maxRowsPerBlock {
tssBlock = tss[:maxRowsPerBlock] tssBlock = tss[:maxRowsPerBlock]
@ -162,8 +162,6 @@ type remoteWriteCtx struct {
pss []*pendingSeries pss []*pendingSeries
pssNextIdx uint64 pssNextIdx uint64
tss []prompbmarshal.TimeSeries
relabelMetricsDropped *metrics.Counter relabelMetricsDropped *metrics.Counter
} }
@ -208,15 +206,17 @@ func (rwctx *remoteWriteCtx) MustStop() {
func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
var rctx *relabelCtx var rctx *relabelCtx
var v *[]prompbmarshal.TimeSeries
rcs := allRelabelConfigs.Load().(*relabelConfigs) rcs := allRelabelConfigs.Load().(*relabelConfigs)
prcs := rcs.perURL[rwctx.idx] prcs := rcs.perURL[rwctx.idx]
if len(prcs) > 0 { if len(prcs) > 0 {
rctx = getRelabelCtx()
// Make a copy of tss before applying relabeling in order to prevent // Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs. // from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 for details. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
rwctx.tss = append(rwctx.tss[:0], tss...) // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
tss = rwctx.tss v = tssRelabelPool.Get().(*[]prompbmarshal.TimeSeries)
rctx = getRelabelCtx() tss = append(*v, tss...)
tssLen := len(tss) tssLen := len(tss)
tss = rctx.applyRelabeling(tss, nil, prcs) tss = rctx.applyRelabeling(tss, nil, prcs)
rwctx.relabelMetricsDropped.Add(tssLen - len(tss)) rwctx.relabelMetricsDropped.Add(tssLen - len(tss))
@ -225,8 +225,14 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
pss[idx].Push(tss) pss[idx].Push(tss)
if rctx != nil { if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssRelabelPool.Put(v)
putRelabelCtx(rctx) putRelabelCtx(rctx)
// Zero rwctx.tss in order to free up GC references.
rwctx.tss = prompbmarshal.ResetTimeSeries(rwctx.tss)
} }
} }
var tssRelabelPool = &sync.Pool{
New: func() interface{} {
return []prompbmarshal.TimeSeries{}
},
}