From 9c81429299f908e2ee022d0351a1201eb197a9c5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 17 Feb 2021 21:42:45 +0200 Subject: [PATCH] app/vmagent/remotewrite: cleanup after 1d1ba889fe61b2ce55216e616428839261e8d07c Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065 --- app/vmagent/remotewrite/client.go | 31 +++++++++++++++++++------------ docs/CHANGELOG.md | 1 + 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index d13c16e434..d8cbfd3c8a 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -171,30 +171,35 @@ func getTLSConfig(argIdx int) (*tls.Config, error) { func (c *client) runWorker() { var ok bool var block []byte - ch := make(chan struct{}) + ch := make(chan bool, 1) for { block, ok = c.fq.MustReadBlock(block[:0]) if !ok { return } go func() { - if delivered := c.sendBlockOk(block); !delivered { - return - } - ch <- struct{}{} + ch <- c.sendBlock(block) }() select { - case <-ch: - // The block has been sent successfully - continue + case ok := <-ch: + if ok { + // The block has been sent successfully + continue + } + // Return unsent block to the queue. + c.fq.MustWriteBlock(block) + return case <-c.stopCh: // c must be stopped. Wait for a while in the hope the block will be sent. graceDuration := 5 * time.Second select { - case <-ch: - logger.Infof("stop ok") - // The block has been sent successfully. + case ok := <-ch: + if !ok { + // Return unsent block to the queue. + c.fq.MustWriteBlock(block) + } case <-time.After(graceDuration): + // Return unsent block to the queue. c.fq.MustWriteBlock(block) } return @@ -202,7 +207,9 @@ func (c *client) runWorker() { } } -func (c *client) sendBlockOk(block []byte) bool { +// sendBlock returns false only if c.stopCh is closed. +// Otherwise it tries sending the block to remote storage indefinitely. +func (c *client) sendBlock(block []byte) bool { c.rl.register(len(block), c.stopCh) retryDuration := time.Second retriesCount := 0 diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 25130331d1..44e43252f1 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -18,6 +18,7 @@ * BUGFIX: do not spam error logs when discovering Docker Swarm targets without dedicated IP. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1028 . * BUGFIX: properly embed timezone data into VictoriaMetrics apps. This should fix `-loggerTimezone` usage inside Docker containers. * BUGFIX: properly build Docker images for non-amd64 architectures (arm, arm64, ppc64le, 386) on [Docker hub](https://hub.docker.com/u/victoriametrics/). Previously these images were incorrectly based on amd64 base image, so they didn't work. +* BUGFIX: vmagent: return back unsent block to the queue during graceful shutdown. Previously this block could be dropped if remote storage is unavailable during vmagent shutdown. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065 . # [v1.53.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.53.1)