mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 16:42:27 +01:00
app/vmagent/remotewrite: do not drop persistent queues when -remoteWrite.multitenantURL is set
It is unsafe to drop persistent queues when -remoteWrite.multitenantURL command-line flag is set,
since these queues are created on demand when a new sample for the given tenant is pushed
to the remote storage.
This addresses https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5357
The issue has been appeared in the commit f3a51e8b1d
when implementing https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
This commit is contained in:
parent
28df725a37
commit
a906a7d85c
@ -183,6 +183,7 @@ func Init() {
|
|||||||
if len(*remoteWriteURLs) > 0 {
|
if len(*remoteWriteURLs) > 0 {
|
||||||
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
||||||
}
|
}
|
||||||
|
dropDanglingQueues()
|
||||||
|
|
||||||
// Start config reloader.
|
// Start config reloader.
|
||||||
configReloaderWG.Add(1)
|
configReloaderWG.Add(1)
|
||||||
@ -200,6 +201,42 @@ func Init() {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func dropDanglingQueues() {
|
||||||
|
if *keepDanglingQueues {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(*remoteWriteMultitenantURLs) > 0 {
|
||||||
|
// Do not drop dangling queues for *remoteWriteMultitenantURLs, since it is impossible to determine
|
||||||
|
// unused queues for multitenant urls - they are created on demand when new sample for the given
|
||||||
|
// tenant is pushed to remote storage.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Remove dangling persistent queues, if any.
|
||||||
|
// This is required for the case when the number of queues has been changed or URL have been changed.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
|
||||||
|
//
|
||||||
|
existingQueues := make(map[string]struct{}, len(rwctxsDefault))
|
||||||
|
for _, rwctx := range rwctxsDefault {
|
||||||
|
existingQueues[rwctx.fq.Dirname()] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
queuesDir := filepath.Join(*tmpDataPath, persistentQueueDirname)
|
||||||
|
files := fs.MustReadDir(queuesDir)
|
||||||
|
removed := 0
|
||||||
|
for _, f := range files {
|
||||||
|
dirname := f.Name()
|
||||||
|
if _, ok := existingQueues[dirname]; !ok {
|
||||||
|
logger.Infof("removing dangling queue %q", dirname)
|
||||||
|
fullPath := filepath.Join(queuesDir, dirname)
|
||||||
|
fs.MustRemoveAll(fullPath)
|
||||||
|
removed++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if removed > 0 {
|
||||||
|
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxsDefault))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func reloadRelabelConfigs() {
|
func reloadRelabelConfigs() {
|
||||||
relabelConfigReloads.Inc()
|
relabelConfigReloads.Inc()
|
||||||
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
||||||
@ -273,33 +310,6 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
|||||||
}
|
}
|
||||||
rwctxs[i] = newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
|
rwctxs[i] = newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !*keepDanglingQueues {
|
|
||||||
// Remove dangling queues, if any.
|
|
||||||
// This is required for the case when the number of queues has been changed or URL have been changed.
|
|
||||||
// See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
|
|
||||||
existingQueues := make(map[string]struct{}, len(rwctxs))
|
|
||||||
for _, rwctx := range rwctxs {
|
|
||||||
existingQueues[rwctx.fq.Dirname()] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
queuesDir := filepath.Join(*tmpDataPath, persistentQueueDirname)
|
|
||||||
files := fs.MustReadDir(queuesDir)
|
|
||||||
removed := 0
|
|
||||||
for _, f := range files {
|
|
||||||
dirname := f.Name()
|
|
||||||
if _, ok := existingQueues[dirname]; !ok {
|
|
||||||
logger.Infof("removing dangling queue %q", dirname)
|
|
||||||
fullPath := filepath.Join(queuesDir, dirname)
|
|
||||||
fs.MustRemoveAll(fullPath)
|
|
||||||
removed++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if removed > 0 {
|
|
||||||
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return rwctxs
|
return rwctxs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,6 +28,7 @@ The sandbox cluster installation is running under the constant load generated by
|
|||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): prevent from `FATAL: cannot flush metainfo` panic when [`-remoteWrite.multitenantURL`](https://docs.victoriametrics.com/vmagent.html#multitenancy) command-line flag is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5357).
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly decode zstd-encoded data blocks received via [VictoriaMetrics remote_write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). See [this issue comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992).
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly decode zstd-encoded data blocks received via [VictoriaMetrics remote_write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). See [this issue comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992).
|
||||||
|
|
||||||
## [v1.95.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.1)
|
## [v1.95.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.1)
|
||||||
|
Loading…
Reference in New Issue
Block a user