diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 322e9d3682..1ffa377b07 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -100,7 +100,7 @@ var allRelabelConfigs atomic.Value // since it may lead to high memory usage due to big number of buffers. var maxQueues = cgroup.AvailableCPUs() * 16 -const persistentQueueDir = "persistent-queue" +const persistentQueueDirname = "persistent-queue" // InitSecretFlags must be called after flag.Parse and before any logging. func InitSecretFlags() { @@ -240,25 +240,27 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { // See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014 existingQueues := make(map[string]struct{}, len(rwctxs)) for _, rwctx := range rwctxs { - existingQueues[rwctx.fq.Dir()] = struct{}{} + existingQueues[rwctx.fq.Dirname()] = struct{}{} } - queuesDir := filepath.Join(*tmpDataPath, persistentQueueDir) + queuesDir := filepath.Join(*tmpDataPath, persistentQueueDirname) files, err := os.ReadDir(queuesDir) if err != nil { logger.Fatalf("cannot read queues dir %q: %s", queuesDir, err) } removed := 0 for _, f := range files { - fullPath := filepath.Join(queuesDir, f.Name()) - if _, ok := existingQueues[fullPath]; !ok { - logger.Infof("removing dangling queue %q", fullPath) + dirname := f.Name() + if _, ok := existingQueues[dirname]; !ok { + logger.Infof("removing dangling queue %q", dirname) + fullPath := filepath.Join(queuesDir, dirname) fs.MustRemoveAll(fullPath) removed++ } } - - logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs)) + if removed > 0 { + logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs)) + } } return rwctxs @@ -502,7 +504,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI pqURL.RawQuery = "" pqURL.Fragment = "" h := xxhash.Sum64([]byte(pqURL.String())) - queuePath := fmt.Sprintf("%s/%s/%d_%016X", *tmpDataPath, persistentQueueDir, argIdx+1, h) + queuePath := filepath.Join(*tmpDataPath, persistentQueueDirname, fmt.Sprintf("%d_%016X", argIdx+1, h)) maxPendingBytes := maxPendingBytesPerURL.GetOptionalArgOrDefault(argIdx, 0) fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes) _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b029b4db9c..36fcce1f30 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -25,6 +25,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol) when [sending / receiving data to / from Kafka](https://docs.victoriametrics.com/vmagent.html#kafka-integration). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to `Kafka` located in another datacenter or availability zone. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). @@ -32,7 +33,6 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): updated usability and the search process in cardinality explorer. Made this process straightforward for user. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3986). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): automatically disable progress bar when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3823). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add `-configCheckInterval` command-line flag, which can be used for automatic re-reading the `-auth.config` file. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3990). -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.keepDanglingQueues` command-line flag. It allows to force `vmagent` to keep dangling queues on the disk when `vmagent` is started. By default, persistent queues without matching `remoteWrite.url` will be deleted. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014). * BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error. diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 9eb8f0ad89..23d4c26250 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -2,6 +2,7 @@ package persistentqueue import ( "fmt" + "path/filepath" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -201,6 +202,6 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { } // Dir returns the directory for persistent queue. -func (fq *FastQueue) Dir() string { - return fq.pq.dir +func (fq *FastQueue) Dirname() string { + return filepath.Base(fq.pq.dir) }