app/vmagent/remotewrite: cosmetic updates after f3a51e8b1d

- Compare directory names instead of paths to directory when determining which persistent queues must be deleted
  This is less error-prone solution, since paths to the same directory can differ, which could lead
  to accidental directory removal for the existing -remoteWrite.url

- Log the `removed %d dangling queues` message when at least a single queue has been removed

- Consistently use filepath.Join() for creating paths to persistent queues.
  This is needed for Windows support (see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70 )

- Clarify the description of the change at docs/CHANGELOG.md

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
This commit is contained in:
Aliaksandr Valialkin 2023-03-27 18:33:05 -07:00
parent 6ed6eb0c4c
commit 6f5bbf096a
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
3 changed files with 15 additions and 12 deletions

View File

@ -100,7 +100,7 @@ var allRelabelConfigs atomic.Value
// since it may lead to high memory usage due to big number of buffers. // since it may lead to high memory usage due to big number of buffers.
var maxQueues = cgroup.AvailableCPUs() * 16 var maxQueues = cgroup.AvailableCPUs() * 16
const persistentQueueDir = "persistent-queue" const persistentQueueDirname = "persistent-queue"
// InitSecretFlags must be called after flag.Parse and before any logging. // InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags() { func InitSecretFlags() {
@ -240,26 +240,28 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
// See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014 // See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
existingQueues := make(map[string]struct{}, len(rwctxs)) existingQueues := make(map[string]struct{}, len(rwctxs))
for _, rwctx := range rwctxs { for _, rwctx := range rwctxs {
existingQueues[rwctx.fq.Dir()] = struct{}{} existingQueues[rwctx.fq.Dirname()] = struct{}{}
} }
queuesDir := filepath.Join(*tmpDataPath, persistentQueueDir) queuesDir := filepath.Join(*tmpDataPath, persistentQueueDirname)
files, err := os.ReadDir(queuesDir) files, err := os.ReadDir(queuesDir)
if err != nil { if err != nil {
logger.Fatalf("cannot read queues dir %q: %s", queuesDir, err) logger.Fatalf("cannot read queues dir %q: %s", queuesDir, err)
} }
removed := 0 removed := 0
for _, f := range files { for _, f := range files {
fullPath := filepath.Join(queuesDir, f.Name()) dirname := f.Name()
if _, ok := existingQueues[fullPath]; !ok { if _, ok := existingQueues[dirname]; !ok {
logger.Infof("removing dangling queue %q", fullPath) logger.Infof("removing dangling queue %q", dirname)
fullPath := filepath.Join(queuesDir, dirname)
fs.MustRemoveAll(fullPath) fs.MustRemoveAll(fullPath)
removed++ removed++
} }
} }
if removed > 0 {
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs)) logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs))
} }
}
return rwctxs return rwctxs
} }
@ -502,7 +504,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
pqURL.RawQuery = "" pqURL.RawQuery = ""
pqURL.Fragment = "" pqURL.Fragment = ""
h := xxhash.Sum64([]byte(pqURL.String())) h := xxhash.Sum64([]byte(pqURL.String()))
queuePath := fmt.Sprintf("%s/%s/%d_%016X", *tmpDataPath, persistentQueueDir, argIdx+1, h) queuePath := filepath.Join(*tmpDataPath, persistentQueueDirname, fmt.Sprintf("%d_%016X", argIdx+1, h))
maxPendingBytes := maxPendingBytesPerURL.GetOptionalArgOrDefault(argIdx, 0) maxPendingBytes := maxPendingBytesPerURL.GetOptionalArgOrDefault(argIdx, 0)
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes) fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {

View File

@ -25,6 +25,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol) when [sending / receiving data to / from Kafka](https://docs.victoriametrics.com/vmagent.html#kafka-integration). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to `Kafka` located in another datacenter or availability zone. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol) when [sending / receiving data to / from Kafka](https://docs.victoriametrics.com/vmagent.html#kafka-integration). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to `Kafka` located in another datacenter or availability zone. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
@ -32,7 +33,6 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): updated usability and the search process in cardinality explorer. Made this process straightforward for user. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3986). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): updated usability and the search process in cardinality explorer. Made this process straightforward for user. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3986).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): automatically disable progress bar when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3823). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): automatically disable progress bar when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3823).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add `-configCheckInterval` command-line flag, which can be used for automatic re-reading the `-auth.config` file. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3990). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add `-configCheckInterval` command-line flag, which can be used for automatic re-reading the `-auth.config` file. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3990).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.keepDanglingQueues` command-line flag. It allows to force `vmagent` to keep dangling queues on the disk when `vmagent` is started. By default, persistent queues without matching `remoteWrite.url` will be deleted. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). * BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error. * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.

View File

@ -2,6 +2,7 @@ package persistentqueue
import ( import (
"fmt" "fmt"
"path/filepath"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -201,6 +202,6 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
} }
// Dir returns the directory for persistent queue. // Dir returns the directory for persistent queue.
func (fq *FastQueue) Dir() string { func (fq *FastQueue) Dirname() string {
return fq.pq.dir return filepath.Base(fq.pq.dir)
} }