diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 0f9c47abd..3fa07a66f 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -1518,6 +1518,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Supports array of values separated by comma or specified via multiple flags. -remoteWrite.relabelConfig string Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling + -remoteWrite.keepDanglingQueues + Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on. -remoteWrite.roundDigits array Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics Supports array of values separated by comma or specified via multiple flags. diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index b27141769..322e9d368 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -4,17 +4,22 @@ import ( "flag" "fmt" "net/url" + "os" + "path/filepath" "strconv" "sync" "sync/atomic" "time" + "github.com/cespare/xxhash/v2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" @@ -24,7 +29,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" - "github.com/cespare/xxhash/v2" ) var ( @@ -36,6 +40,8 @@ var ( "Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url") tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+ "See also -remoteWrite.maxDiskUsagePerURL") + keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+ + "Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.") queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+ "isn't enough for sending high volume of collected data to remote storage. Default value is 2 * numberOfAvailableCPUs") showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+ @@ -94,6 +100,8 @@ var allRelabelConfigs atomic.Value // since it may lead to high memory usage due to big number of buffers. var maxQueues = cgroup.AvailableCPUs() * 16 +const persistentQueueDir = "persistent-queue" + // InitSecretFlags must be called after flag.Parse and before any logging. func InitSecretFlags() { if !*showRemoteWriteURL { @@ -225,6 +233,34 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { } rwctxs[i] = newRemoteWriteCtx(i, at, remoteWriteURL, maxInmemoryBlocks, sanitizedURL) } + + if !*keepDanglingQueues { + // Remove dangling queues, if any. + // This is required for the case when the number of queues has been changed or URL have been changed. + // See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014 + existingQueues := make(map[string]struct{}, len(rwctxs)) + for _, rwctx := range rwctxs { + existingQueues[rwctx.fq.Dir()] = struct{}{} + } + + queuesDir := filepath.Join(*tmpDataPath, persistentQueueDir) + files, err := os.ReadDir(queuesDir) + if err != nil { + logger.Fatalf("cannot read queues dir %q: %s", queuesDir, err) + } + removed := 0 + for _, f := range files { + fullPath := filepath.Join(queuesDir, f.Name()) + if _, ok := existingQueues[fullPath]; !ok { + logger.Infof("removing dangling queue %q", fullPath) + fs.MustRemoveAll(fullPath) + removed++ + } + } + + logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs)) + } + return rwctxs } @@ -466,7 +502,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI pqURL.RawQuery = "" pqURL.Fragment = "" h := xxhash.Sum64([]byte(pqURL.String())) - queuePath := fmt.Sprintf("%s/persistent-queue/%d_%016X", *tmpDataPath, argIdx+1, h) + queuePath := fmt.Sprintf("%s/%s/%d_%016X", *tmpDataPath, persistentQueueDir, argIdx+1, h) maxPendingBytes := maxPendingBytesPerURL.GetOptionalArgOrDefault(argIdx, 0) fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes) _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ea7a55488..b029b4db9 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -32,6 +32,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): updated usability and the search process in cardinality explorer. Made this process straightforward for user. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3986). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): automatically disable progress bar when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3823). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add `-configCheckInterval` command-line flag, which can be used for automatic re-reading the `-auth.config` file. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3990). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.keepDanglingQueues` command-line flag. It allows to force `vmagent` to keep dangling queues on the disk when `vmagent` is started. By default, persistent queues without matching `remoteWrite.url` will be deleted. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014). * BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error. diff --git a/docs/vmagent.md b/docs/vmagent.md index cebfaac04..d9789fe7b 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1522,6 +1522,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Supports array of values separated by comma or specified via multiple flags. -remoteWrite.relabelConfig string Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling + -remoteWrite.keepDanglingQueues + Keep persistent queues contents in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on. -remoteWrite.roundDigits array Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics Supports array of values separated by comma or specified via multiple flags. diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 1b85b2235..9eb8f0ad8 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -199,3 +199,8 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { fq.cond.Wait() } } + +// Dir returns the directory for persistent queue. +func (fq *FastQueue) Dir() string { + return fq.pq.dir +}