app/vmagent/remotewrite: cleanup after 1d1ba889fe

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065
This commit is contained in:
Aliaksandr Valialkin 2021-02-17 21:42:45 +02:00
parent 40973eda1c
commit 9c81429299
2 changed files with 20 additions and 12 deletions

View File

@ -171,30 +171,35 @@ func getTLSConfig(argIdx int) (*tls.Config, error) {
func (c *client) runWorker() { func (c *client) runWorker() {
var ok bool var ok bool
var block []byte var block []byte
ch := make(chan struct{}) ch := make(chan bool, 1)
for { for {
block, ok = c.fq.MustReadBlock(block[:0]) block, ok = c.fq.MustReadBlock(block[:0])
if !ok { if !ok {
return return
} }
go func() { go func() {
if delivered := c.sendBlockOk(block); !delivered { ch <- c.sendBlock(block)
return
}
ch <- struct{}{}
}() }()
select { select {
case <-ch: case ok := <-ch:
if ok {
// The block has been sent successfully // The block has been sent successfully
continue continue
}
// Return unsent block to the queue.
c.fq.MustWriteBlock(block)
return
case <-c.stopCh: case <-c.stopCh:
// c must be stopped. Wait for a while in the hope the block will be sent. // c must be stopped. Wait for a while in the hope the block will be sent.
graceDuration := 5 * time.Second graceDuration := 5 * time.Second
select { select {
case <-ch: case ok := <-ch:
logger.Infof("stop ok") if !ok {
// The block has been sent successfully. // Return unsent block to the queue.
c.fq.MustWriteBlock(block)
}
case <-time.After(graceDuration): case <-time.After(graceDuration):
// Return unsent block to the queue.
c.fq.MustWriteBlock(block) c.fq.MustWriteBlock(block)
} }
return return
@ -202,7 +207,9 @@ func (c *client) runWorker() {
} }
} }
func (c *client) sendBlockOk(block []byte) bool { // sendBlock returns false only if c.stopCh is closed.
// Otherwise it tries sending the block to remote storage indefinitely.
func (c *client) sendBlock(block []byte) bool {
c.rl.register(len(block), c.stopCh) c.rl.register(len(block), c.stopCh)
retryDuration := time.Second retryDuration := time.Second
retriesCount := 0 retriesCount := 0

View File

@ -18,6 +18,7 @@
* BUGFIX: do not spam error logs when discovering Docker Swarm targets without dedicated IP. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1028 . * BUGFIX: do not spam error logs when discovering Docker Swarm targets without dedicated IP. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1028 .
* BUGFIX: properly embed timezone data into VictoriaMetrics apps. This should fix `-loggerTimezone` usage inside Docker containers. * BUGFIX: properly embed timezone data into VictoriaMetrics apps. This should fix `-loggerTimezone` usage inside Docker containers.
* BUGFIX: properly build Docker images for non-amd64 architectures (arm, arm64, ppc64le, 386) on [Docker hub](https://hub.docker.com/u/victoriametrics/). Previously these images were incorrectly based on amd64 base image, so they didn't work. * BUGFIX: properly build Docker images for non-amd64 architectures (arm, arm64, ppc64le, 386) on [Docker hub](https://hub.docker.com/u/victoriametrics/). Previously these images were incorrectly based on amd64 base image, so they didn't work.
* BUGFIX: vmagent: return back unsent block to the queue during graceful shutdown. Previously this block could be dropped if remote storage is unavailable during vmagent shutdown. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065 .
# [v1.53.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.53.1) # [v1.53.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.53.1)