From 4353ff7ef13a0472a0472b389d8bb352a6770f0c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 10 Jul 2020 15:13:26 +0300 Subject: [PATCH] app/vmagent: fix data race when multiple `-remoteWrite.urlRelabelConfig` options are set Previously multiple goroutines could access remoteWriteCtx.tss concurrently, which could lead to data race and improper relabeling. Now each goroutine has its own copy of tss during relabeling. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599 --- app/vmagent/remotewrite/relabel.go | 1 - app/vmagent/remotewrite/remotewrite.go | 24 +++++++++++++++--------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index 6adede560b..f1d48b3dde 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -59,7 +59,6 @@ type relabelConfigs struct { // initLabelsGlobal must be called after parsing command-line flags. func initLabelsGlobal() { - // Init labelsGlobal labelsGlobal = nil for _, s := range *unparsedLabelsGlobal { n := strings.IndexByte(s, '=') diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 1646b17511..fdd1d021df 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -128,7 +128,7 @@ func Push(wr *prompbmarshal.WriteRequest) { } tss := wr.Timeseries for len(tss) > 0 { - // Process big tss in smaller blocks in order to reduce maxmimum memory usage + // Process big tss in smaller blocks in order to reduce the maximum memory usage tssBlock := tss if len(tssBlock) > maxRowsPerBlock { tssBlock = tss[:maxRowsPerBlock] @@ -162,8 +162,6 @@ type remoteWriteCtx struct { pss []*pendingSeries pssNextIdx uint64 - tss []prompbmarshal.TimeSeries - relabelMetricsDropped *metrics.Counter } @@ -208,15 +206,17 @@ func (rwctx *remoteWriteCtx) MustStop() { func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { var rctx *relabelCtx + var v *[]prompbmarshal.TimeSeries rcs := allRelabelConfigs.Load().(*relabelConfigs) prcs := rcs.perURL[rwctx.idx] if len(prcs) > 0 { + rctx = getRelabelCtx() // Make a copy of tss before applying relabeling in order to prevent // from affecting time series for other remoteWrite.url configs. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 for details. - rwctx.tss = append(rwctx.tss[:0], tss...) - tss = rwctx.tss - rctx = getRelabelCtx() + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 + // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599 + v = tssRelabelPool.Get().(*[]prompbmarshal.TimeSeries) + tss = append(*v, tss...) tssLen := len(tss) tss = rctx.applyRelabeling(tss, nil, prcs) rwctx.relabelMetricsDropped.Add(tssLen - len(tss)) @@ -225,8 +225,14 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) pss[idx].Push(tss) if rctx != nil { + *v = prompbmarshal.ResetTimeSeries(tss) + tssRelabelPool.Put(v) putRelabelCtx(rctx) - // Zero rwctx.tss in order to free up GC references. - rwctx.tss = prompbmarshal.ResetTimeSeries(rwctx.tss) } } + +var tssRelabelPool = &sync.Pool{ + New: func() interface{} { + return []prompbmarshal.TimeSeries{} + }, +}