From cc72f9428d6e514fd4cc34365450e8e3d6d4ede2 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Wed, 29 Sep 2021 00:52:07 +0300 Subject: [PATCH] changes vmagent api (#1656) * changes vmagent api adds auth.Token to promremotewrite InsertHandlerReader changes remoteWrite client constructor, allows to use multiple remoteWriteUrl schemes, like kafka:// changes url path concatenation for tenant remoteWrite Update app/vmagent/remotewrite/client.go Co-authored-by: Aliaksandr Valialkin * Update app/vmagent/remotewrite/remotewrite.go * Apply suggestions from code review Co-authored-by: Aliaksandr Valialkin --- .../promremotewrite/request_handler.go | 4 +- app/vmagent/remotewrite/client.go | 15 ++++--- app/vmagent/remotewrite/remotewrite.go | 39 +++++++++++++------ 3 files changed, 40 insertions(+), 18 deletions(-) diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index 527baab4e2..29af224ed8 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -37,10 +37,10 @@ func InsertHandler(at *auth.Token, req *http.Request) error { } // InsertHandlerForReader processes metrics from given reader -func InsertHandlerForReader(r io.Reader) error { +func InsertHandlerForReader(at *auth.Token, r io.Reader) error { return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(r, func(tss []prompb.TimeSeries) error { - return insertRows(nil, tss, nil) + return insertRows(at, tss, nil) }) }) } diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 667f5adbfc..171cb896ec 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -67,7 +67,8 @@ type client struct { fq *persistentqueue.FastQueue hc *http.Client - authCfg *promauth.Config + sendBlock func(block []byte) bool + authCfg *promauth.Config rl rateLimiter @@ -84,7 +85,7 @@ type client struct { stopCh chan struct{} } -func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client { +func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client { authCfg, err := getAuthConfig(argIdx) if err != nil { logger.Panicf("FATAL: cannot initialize auth config: %s", err) @@ -121,6 +122,11 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu }, stopCh: make(chan struct{}), } + c.sendBlock = c.sendBlockHTTP + return c +} + +func (c *client) init(argIdx, concurrency int, sanitizedURL string) { if bytesPerSec := rateLimit.GetOptionalArgOrDefault(argIdx, 0); bytesPerSec > 0 { logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL) c.rl.perSecondLimit = int64(bytesPerSec) @@ -143,7 +149,6 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu }() } logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL) - return c } func (c *client) MustStop() { @@ -237,9 +242,9 @@ func (c *client) runWorker() { } } -// sendBlock returns false only if c.stopCh is closed. +// sendBlockHTTP returns false only if c.stopCh is closed. // Otherwise it tries sending the block to remote storage indefinitely. -func (c *client) sendBlock(block []byte) bool { +func (c *client) sendBlockHTTP(block []byte) bool { c.rl.register(len(block), c.stopCh) retryDuration := time.Second retriesCount := 0 diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index c4ea41c4eb..434fe02284 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -3,6 +3,7 @@ package remotewrite import ( "flag" "fmt" + "net/url" "strconv" "sync" "sync/atomic" @@ -181,17 +182,21 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { maxInmemoryBlocks = 2 } rwctxs := make([]*remoteWriteCtx, len(urls)) - for i, remoteWriteURL := range urls { + for i, remoteWriteURLRaw := range urls { + remoteWriteURL, err := url.Parse(remoteWriteURLRaw) + if err != nil { + logger.Fatalf("invalid -remoteWrite.url=%q: %s", remoteWriteURL, err) + } sanitizedURL := fmt.Sprintf("%d:secret-url", i+1) if at != nil { // Construct full remote_write url for the given tenant according to https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format - remoteWriteURL = fmt.Sprintf("%s/insert/%d:%d/prometheus/api/v1/write", remoteWriteURL, at.AccountID, at.ProjectID) + remoteWriteURL.Path = fmt.Sprintf("%s/insert/%d:%d/prometheus/api/v1/write", remoteWriteURL.Path, at.AccountID, at.ProjectID) sanitizedURL = fmt.Sprintf("%s:%d:%d", sanitizedURL, at.AccountID, at.ProjectID) } if *showRemoteWriteURL { sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL) } - rwctxs[i] = newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL) + rwctxs[i] = newRemoteWriteCtx(i, at, remoteWriteURL, maxInmemoryBlocks, sanitizedURL) } return rwctxs } @@ -403,17 +408,29 @@ type remoteWriteCtx struct { relabelMetricsDropped *metrics.Counter } -func newRemoteWriteCtx(argIdx int, remoteWriteURL string, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx { - h := xxhash.Sum64([]byte(remoteWriteURL)) - path := fmt.Sprintf("%s/persistent-queue/%d_%016X", *tmpDataPath, argIdx+1, h) - fq := persistentqueue.MustOpenFastQueue(path, sanitizedURL, maxInmemoryBlocks, maxPendingBytesPerURL.N) - _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, path, sanitizedURL), func() float64 { +func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx { + // strip query params, otherwise changing params resets pq + pqURL := *remoteWriteURL + pqURL.RawQuery = "" + pqURL.Fragment = "" + h := xxhash.Sum64([]byte(pqURL.String())) + queuePath := fmt.Sprintf("%s/persistent-queue/%d_%016X", *tmpDataPath, argIdx+1, h) + fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytesPerURL.N) + _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { return float64(fq.GetPendingBytes()) }) - _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, path, sanitizedURL), func() float64 { + _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 { return float64(fq.GetInmemoryQueueLen()) }) - c := newClient(argIdx, remoteWriteURL, sanitizedURL, fq, *queues) + var c *client + switch remoteWriteURL.Scheme { + case "http", "https": + c = newHTTPClient(argIdx, remoteWriteURL.String(), sanitizedURL, fq, *queues) + default: + logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL) + } + c.init(argIdx, *queues, sanitizedURL) + sf := significantFigures.GetOptionalArgOrDefault(argIdx, 0) rd := roundDigits.GetOptionalArgOrDefault(argIdx, 100) pssLen := *queues @@ -432,7 +449,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL string, maxInmemoryBlocks int, c: c, pss: pss, - relabelMetricsDropped: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, path, sanitizedURL)), + relabelMetricsDropped: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), } }