From 4268a310c11be5776f8f8f179f2d6e4f7a3d0a30 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 3 Jul 2024 14:12:41 +0200 Subject: [PATCH] app/vmagent/remotewrite/remotewrite.go: make remoteWriteCtx.TryPush code easier to follow Move the code responsible for relabelCtx clearing into deferred function. This allows making more clear the remoteWriteCtx.TryPush code. This is a follow-up for 879771808b7f7fe6bd3020957fb6835a02424846 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205 While at it, clarify the description of the bugfix at docs/CHANGELOG.md --- app/vmagent/remotewrite/remotewrite.go | 62 +++++++++++++------------- docs/CHANGELOG.md | 2 +- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index f79d4b42f8..1d7bacc503 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -848,7 +848,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in func (rwctx *remoteWriteCtx) MustStop() { // sas and deduplicator must be stopped before rwctx is closed - // because sas can write pending series to rwctx.pss if there are any + // because they can write pending series to rwctx.pss if there are any sas := rwctx.sas.Swap(nil) sas.MustStop() @@ -875,12 +875,20 @@ func (rwctx *remoteWriteCtx) MustStop() { // TryPush sends tss series to the configured remote write endpoint // -// TryPush can be called concurrently for multiple remoteWriteCtx, -// so it shouldn't modify tss entries. +// TryPush doesn't modify tss, so tss can be passed concurrently to TryPush across distinct rwctx instances. func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool { - // Apply relabeling var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries + defer func() { + if rctx == nil { + return + } + *v = prompbmarshal.ResetTimeSeries(tss) + tssPool.Put(v) + putRelabelCtx(rctx) + }() + + // Apply relabeling rcs := allRelabelConfigs.Load() pcs := rcs.perURL[rwctx.idx] if pcs.Len() > 0 { @@ -916,28 +924,21 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa matchIdxsPool.Put(matchIdxs) } else if rwctx.deduplicator != nil { rwctx.deduplicator.Push(tss) - tss = tss[:0] + return true } - // Try pushing the data to remote storage - ok := rwctx.tryPushInternal(tss) - - // Return back relabeling contexts to the pool - if rctx != nil { - *v = prompbmarshal.ResetTimeSeries(tss) - tssPool.Put(v) - putRelabelCtx(rctx) + // Try pushing tss to remote storage + if rwctx.tryPushInternal(tss) { + return true } - if !ok { - rwctx.pushFailures.Inc() - if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload { - rwctx.rowsDroppedOnPushFailure.Add(len(tss)) - return true - } + // Couldn't push tss to remote storage + rwctx.pushFailures.Inc() + if forceDropSamplesOnFailure || rwctx.dropSamplesOnOverload { + rwctx.rowsDroppedOnPushFailure.Add(len(tss)) + return true } - - return ok + return false } var matchIdxsPool bytesutil.ByteBufferPool @@ -974,6 +975,15 @@ func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSe func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) bool { var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries + defer func() { + if rctx == nil { + return + } + *v = prompbmarshal.ResetTimeSeries(tss) + tssPool.Put(v) + putRelabelCtx(rctx) + }() + if len(labelsGlobal) > 0 { // Make a copy of tss before adding extra labels in order to prevent // from affecting time series for other remoteWrite.url configs. @@ -986,15 +996,7 @@ func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) boo pss := rwctx.pss idx := rwctx.pssNextIdx.Add(1) % uint64(len(pss)) - ok := pss[idx].TryPush(tss) - - if rctx != nil { - *v = prompbmarshal.ResetTimeSeries(tss) - tssPool.Put(v) - putRelabelCtx(rctx) - } - - return ok + return pss[idx].TryPush(tss) } var tssPool = &sync.Pool{ diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index feda00f610..93bc51cc78 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -129,7 +129,7 @@ Released at 2024-06-07 * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix calendar display when `UTC+00:00` timezone is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6239). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): remove redundant requests on the `Explore Cardinality` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6240). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix handling of URL params for browser history navigation (back and forward buttons). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6126) and [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5516#issuecomment-1867507232). -* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) when more than one `-remoteWrite.url` command-line flags are passed to `vmagent` together with non-zero `-remoteWrite.streamAggr.dedupInterval` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): skip empty data blocks before sending to the remote write destination. Thanks to @viperstars for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6241). * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): set correct suffix `_prometheus` for aggregation outputs [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus). Before, outputs `total` and `total_prometheus` or `increase` and `increase_prometheus` had the same suffix. * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): prevent from excessive resource usage when stream aggregation config file is empty.