diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 508e8932ac..31177a7fee 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -3,6 +3,7 @@ package remotewrite import ( "flag" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -69,7 +70,7 @@ func (ps *pendingSeries) periodicFlusher() { case <-ps.stopCh: mustStop = true case <-ticker.C: - if fasttime.UnixTimestamp()-ps.wr.lastFlushTime < uint64(flushSeconds) { + if fasttime.UnixTimestamp()-atomic.LoadUint64(&ps.wr.lastFlushTime) < uint64(flushSeconds) { continue } } @@ -80,10 +81,12 @@ func (ps *pendingSeries) periodicFlusher() { } type writeRequest struct { - wr prompbmarshal.WriteRequest - pushBlock func(block []byte) + // Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures. lastFlushTime uint64 + wr prompbmarshal.WriteRequest + pushBlock func(block []byte) + tss []prompbmarshal.TimeSeries labels []prompbmarshal.Label @@ -114,7 +117,7 @@ func (wr *writeRequest) reset() { func (wr *writeRequest) flush() { wr.wr.Timeseries = wr.tss - wr.lastFlushTime = fasttime.UnixTimestamp() + atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) pushWriteRequest(&wr.wr, wr.pushBlock) wr.reset() }