diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index 6c461c0dbb..1e18f8e1a0 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -2,6 +2,7 @@ package remotewrite import ( "flag" + "fmt" "strings" "sync" @@ -16,35 +17,60 @@ var ( "Pass multiple -remoteWrite.label flags in order to add multiple flags to metrics before sending them to remote storage") relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabel_config entries. These entries are applied to all the metrics "+ "before sending them to -remoteWrite.url. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config for details") + relabelConfigPaths = flagutil.NewArray("remoteWrite.urlRelabelConfig", "Optional path to relabel config for the corresponding -remoteWrite.url") ) var labelsGlobal []prompbmarshal.Label -var prcsGlobal []promrelabel.ParsedRelabelConfig -// initRelabelGlobal must be called after parsing command-line flags. -func initRelabelGlobal() { +// CheckRelabelConfigs checks -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig. +func CheckRelabelConfigs() error { + _, err := loadRelabelConfigs() + return err +} + +func loadRelabelConfigs() (*relabelConfigs, error) { + var rcs relabelConfigs + if *relabelConfigPathGlobal != "" { + global, err := promrelabel.LoadRelabelConfigs(*relabelConfigPathGlobal) + if err != nil { + return nil, fmt.Errorf("cannot load -remoteWrite.relabelConfig=%q: %s", *relabelConfigPathGlobal, err) + } + rcs.global = global + } + if len(*relabelConfigPaths) > len(*remoteWriteURLs) { + return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", + len(*relabelConfigPaths), len(*remoteWriteURLs)) + } + rcs.perURL = make([][]promrelabel.ParsedRelabelConfig, len(*remoteWriteURLs)) + for i, path := range *relabelConfigPaths { + prc, err := promrelabel.LoadRelabelConfigs(path) + if err != nil { + return nil, fmt.Errorf("cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %s", path, err) + } + rcs.perURL[i] = prc + } + return &rcs, nil +} + +type relabelConfigs struct { + global []promrelabel.ParsedRelabelConfig + perURL [][]promrelabel.ParsedRelabelConfig +} + +// initLabelsGlobal must be called after parsing command-line flags. +func initLabelsGlobal() { // Init labelsGlobal labelsGlobal = nil for _, s := range *unparsedLabelsGlobal { n := strings.IndexByte(s, '=') if n < 0 { - logger.Panicf("FATAL: missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s) + logger.Fatalf("missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s) } labelsGlobal = append(labelsGlobal, prompbmarshal.Label{ Name: s[:n], Value: s[n+1:], }) } - - // Init prcsGlobal - prcsGlobal = nil - if len(*relabelConfigPathGlobal) > 0 { - var err error - prcsGlobal, err = promrelabel.LoadRelabelConfigs(*relabelConfigPathGlobal) - if err != nil { - logger.Panicf("FATAL: cannot load relabel configs from -remoteWrite.relabelConfig=%q: %s", *relabelConfigPathGlobal, err) - } - } } func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label, prcs []promrelabel.ParsedRelabelConfig) []prompbmarshal.TimeSeries { diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 9f747a7a17..114239a6f8 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -3,6 +3,7 @@ package remotewrite import ( "flag" "fmt" + "sync" "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -10,8 +11,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/metrics" xxhash "github.com/cespare/xxhash/v2" ) @@ -20,9 +21,8 @@ var ( remoteWriteURLs = flagutil.NewArray("remoteWrite.url", "Remote storage URL to write data to. It must support Prometheus remote_write API. "+ "It is recommended using VictoriaMetrics as remote storage. Example url: http://:8428/api/v1/write . "+ "Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems") - relabelConfigPaths = flagutil.NewArray("remoteWrite.urlRelabelConfig", "Optional path to relabel config for the corresponding -remoteWrite.url") - tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored") - queues = flag.Int("remoteWrite.queues", 1, "The number of concurrent queues to each -remoteWrite.url. Set more queues if a single queue "+ + tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored") + queues = flag.Int("remoteWrite.queues", 1, "The number of concurrent queues to each -remoteWrite.url. Set more queues if a single queue "+ "isn't enough for sending high volume of collected data to remote storage") showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+ "It is hidden by default, since it can contain sensistive auth info") @@ -32,23 +32,11 @@ var ( "Disk usage is unlimited if the value is set to 0") ) -// CheckRelabelConfigs checks -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig. -func CheckRelabelConfigs() error { - if *relabelConfigPathGlobal != "" { - if _, err := promrelabel.LoadRelabelConfigs(*relabelConfigPathGlobal); err != nil { - return fmt.Errorf("cannot load -remoteWrite.relabelConfig=%q: %s", *relabelConfigPathGlobal, err) - } - } - for _, path := range *relabelConfigPaths { - if _, err := promrelabel.LoadRelabelConfigs(path); err != nil { - return fmt.Errorf("cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %s", path, err) - } - } - return nil -} - var rwctxs []*remoteWriteCtx +// Contains the current relabelConfigs. +var allRelabelConfigs atomic.Value + // Init initializes remotewrite. // // It must be called after flag.Parse(). @@ -56,14 +44,19 @@ var rwctxs []*remoteWriteCtx // Stop must be called for graceful shutdown. func Init() { if len(*remoteWriteURLs) == 0 { - logger.Panicf("FATAL: at least one `-remoteWrite.url` must be set") + logger.Fatalf("at least one `-remoteWrite.url` must be set") } if !*showRemoteWriteURL { // remoteWrite.url can contain authentication codes, so hide it at `/metrics` output. httpserver.RegisterSecretFlag("remoteWrite.url") } - initRelabelGlobal() + initLabelsGlobal() + rcs, err := loadRelabelConfigs() + if err != nil { + logger.Fatalf("cannot load relabel configs: %s", err) + } + allRelabelConfigs.Store(rcs) maxInmemoryBlocks := memory.Allowed() / len(*remoteWriteURLs) / maxRowsPerBlock / 100 if maxInmemoryBlocks > 200 { @@ -76,23 +69,47 @@ func Init() { maxInmemoryBlocks = 2 } for i, remoteWriteURL := range *remoteWriteURLs { - relabelConfigPath := "" - if i < len(*relabelConfigPaths) { - relabelConfigPath = (*relabelConfigPaths)[i] - } urlLabelValue := fmt.Sprintf("secret-url-%d", i+1) if *showRemoteWriteURL { urlLabelValue = remoteWriteURL } - rwctx := newRemoteWriteCtx(i, remoteWriteURL, relabelConfigPath, maxInmemoryBlocks, urlLabelValue) + rwctx := newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, urlLabelValue) rwctxs = append(rwctxs, rwctx) } + + // Start config reloader. + sighupCh := procutil.NewSighupChan() + configReloaderWG.Add(1) + go func() { + defer configReloaderWG.Done() + for { + select { + case <-sighupCh: + case <-stopCh: + return + } + logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig") + rcs, err := loadRelabelConfigs() + if err != nil { + logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err) + continue + } + allRelabelConfigs.Store(rcs) + logger.Infof("Successfully reloaded relabel configs") + } + }() } +var stopCh = make(chan struct{}) +var configReloaderWG sync.WaitGroup + // Stop stops remotewrite. // // It is expected that nobody calls Push during and after the call to this func. func Stop() { + close(stopCh) + configReloaderWG.Wait() + for _, rwctx := range rwctxs { rwctx.MustStop() } @@ -104,6 +121,8 @@ func Stop() { // Note that wr may be modified by Push due to relabeling. func Push(wr *prompbmarshal.WriteRequest) { var rctx *relabelCtx + rcs := allRelabelConfigs.Load().(*relabelConfigs) + prcsGlobal := rcs.global if len(prcsGlobal) > 0 || len(labelsGlobal) > 0 { rctx = getRelabelCtx() } @@ -137,9 +156,9 @@ func Push(wr *prompbmarshal.WriteRequest) { var globalRelabelMetricsDropped = metrics.NewCounter("vmagent_remotewrite_global_relabel_metrics_dropped_total") type remoteWriteCtx struct { + idx int fq *persistentqueue.FastQueue c *client - prcs []promrelabel.ParsedRelabelConfig pss []*pendingSeries pssNextIdx uint64 @@ -148,7 +167,7 @@ type remoteWriteCtx struct { relabelMetricsDropped *metrics.Counter } -func newRemoteWriteCtx(argIdx int, remoteWriteURL, relabelConfigPath string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx { +func newRemoteWriteCtx(argIdx int, remoteWriteURL string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx { h := xxhash.Sum64([]byte(remoteWriteURL)) path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h) fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, *maxPendingBytesPerURL) @@ -159,23 +178,15 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL, relabelConfigPath string, max return float64(fq.GetInmemoryQueueLen()) }) c := newClient(argIdx, remoteWriteURL, urlLabelValue, fq, *queues) - var prcs []promrelabel.ParsedRelabelConfig - if len(relabelConfigPath) > 0 { - var err error - prcs, err = promrelabel.LoadRelabelConfigs(relabelConfigPath) - if err != nil { - logger.Panicf("FATAL: cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %s", relabelConfigPath, err) - } - } pss := make([]*pendingSeries, *queues) for i := range pss { pss[i] = newPendingSeries(fq.MustWriteBlock) } return &remoteWriteCtx{ - fq: fq, - c: c, - prcs: prcs, - pss: pss, + idx: argIdx, + fq: fq, + c: c, + pss: pss, relabelMetricsDropped: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, path, urlLabelValue)), } @@ -185,10 +196,10 @@ func (rwctx *remoteWriteCtx) MustStop() { for _, ps := range rwctx.pss { ps.MustStop() } + rwctx.idx = 0 rwctx.pss = nil rwctx.fq.MustClose() rwctx.fq = nil - rwctx.prcs = nil rwctx.c.MustStop() rwctx.c = nil @@ -197,7 +208,9 @@ func (rwctx *remoteWriteCtx) MustStop() { func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { var rctx *relabelCtx - if len(rwctx.prcs) > 0 { + rcs := allRelabelConfigs.Load().(*relabelConfigs) + prcs := rcs.perURL[rwctx.idx] + if len(prcs) > 0 { // Make a copy of tss before applying relabeling in order to prevent // from affecting time series for other remoteWrite.url configs. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 for details. @@ -205,7 +218,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { tss = rwctx.tss rctx = getRelabelCtx() tssLen := len(tss) - tss = rctx.applyRelabeling(tss, nil, rwctx.prcs) + tss = rctx.applyRelabeling(tss, nil, prcs) rwctx.relabelMetricsDropped.Add(tssLen - len(tss)) } pss := rwctx.pss