diff --git a/app/vmagent/graphite/request_handler.go b/app/vmagent/graphite/request_handler.go index fd0d96cd6a..c3ef22d8df 100644 --- a/app/vmagent/graphite/request_handler.go +++ b/app/vmagent/graphite/request_handler.go @@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(nil, &ctx.WriteRequest) rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/opentsdb/request_handler.go b/app/vmagent/opentsdb/request_handler.go index 628676de94..2721912d63 100644 --- a/app/vmagent/opentsdb/request_handler.go +++ b/app/vmagent/opentsdb/request_handler.go @@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(nil, &ctx.WriteRequest) rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index b3026ab861..7d2d409eb2 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -65,7 +65,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(nil, &ctx.WriteRequest) rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index ff77572df4..b8b9b9aaf5 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -235,8 +235,8 @@ func Stop() { // Push sends wr to remote storage systems set via `-remoteWrite.url`. // // Note that wr may be modified by Push due to relabeling and rounding. -func Push(wr *prompbmarshal.WriteRequest) { - PushWithAuthToken(nil, wr) +func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) { + PushWithAuthToken(at, wr) } // PushWithAuthToken sends wr to remote storage systems set via `-remoteWrite.multitenantURL`. diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 5ec00710c9..e2c5061bb0 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "flag" "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "net/url" "path/filepath" "sort" @@ -1227,6 +1228,17 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel if metricsPathRelabeled == "" { metricsPathRelabeled = "/metrics" } + + var at *auth.Token + tenantIdRelabeled := promrelabel.GetLabelValueByName(labels, "__tenant_id__") + if tenantIdRelabeled != "" { + newToken, err := auth.NewToken(tenantIdRelabeled) + if err != nil { + return nil, fmt.Errorf("invalid tenant id: %s for job=%s, err: %w", tenantIdRelabeled, swc.jobName, err) + } + at = newToken + } + if !strings.HasPrefix(metricsPathRelabeled, "/") { metricsPathRelabeled = "/" + metricsPathRelabeled } @@ -1308,6 +1320,7 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel ScrapeAlignInterval: swc.scrapeAlignInterval, ScrapeOffset: swc.scrapeOffset, SeriesLimit: seriesLimit, + AuthToken: at, jobNameOriginal: swc.jobName, } diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index eb60d63a04..a4cdeffbc0 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -4,6 +4,7 @@ import ( "bytes" "flag" "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "io" "sync" "sync/atomic" @@ -53,7 +54,7 @@ func CheckConfig() error { // Init initializes Prometheus scraper with config from the `-promscrape.config`. // // Scraped data is passed to pushData. -func Init(pushData func(wr *prompbmarshal.WriteRequest)) { +func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) { mustInitClusterMemberID() globalStopChan = make(chan struct{}) scraperWG.Add(1) @@ -91,7 +92,7 @@ func WriteConfigData(w io.Writer) { _, _ = w.Write(*b) } -func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { +func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { if configFile == "" { // Nothing to scrape. return @@ -185,14 +186,14 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`) type scrapeConfigs struct { - pushData func(wr *prompbmarshal.WriteRequest) + pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest) wg sync.WaitGroup stopCh chan struct{} globalStopCh <-chan struct{} scfgs []*scrapeConfig } -func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scrapeConfigs { +func newScrapeConfigs(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scrapeConfigs { return &scrapeConfigs{ pushData: pushData, stopCh: make(chan struct{}), @@ -234,7 +235,7 @@ func (scs *scrapeConfigs) stop() { type scrapeConfig struct { name string - pushData func(wr *prompbmarshal.WriteRequest) + pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest) getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork checkInterval time.Duration cfgCh chan *Config @@ -287,7 +288,7 @@ type scraperGroup struct { wg sync.WaitGroup mLock sync.Mutex m map[string]*scraper - pushData func(wr *prompbmarshal.WriteRequest) + pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest) changesCount *metrics.Counter activeScrapers *metrics.Counter @@ -297,7 +298,7 @@ type scraperGroup struct { globalStopCh <-chan struct{} } -func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scraperGroup { +func newScraperGroup(name string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scraperGroup { sg := &scraperGroup{ name: name, m: make(map[string]*scraper), @@ -413,7 +414,7 @@ type scraper struct { stoppedCh chan struct{} } -func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper { +func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper { sc := &scraper{ stopCh: make(chan struct{}), stoppedCh: make(chan struct{}), diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 529746f05e..eb411a30c6 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -3,6 +3,7 @@ package promscrape import ( "flag" "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "io/ioutil" "math" "math/bits" @@ -119,6 +120,9 @@ type ScrapeWork struct { // Optional limit on the number of unique series the scrape target can expose. SeriesLimit int + //The Tenant Info + AuthToken *auth.Token + // The original 'job_name' jobNameOriginal string } @@ -188,7 +192,7 @@ type scrapeWork struct { GetStreamReader func() (*streamReader, error) // PushData is called for pushing collected data. - PushData func(wr *prompbmarshal.WriteRequest) + PushData func(at *auth.Token, wr *prompbmarshal.WriteRequest) // ScrapeGroup is name of ScrapeGroup that // scrapeWork belongs to @@ -487,7 +491,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error // See https://github.com/VictoriaMetrics/operator/issues/497 sw.addAutoTimeseries(wc, "scrape_samples_limit", float64(sw.Config.SampleLimit), scrapeTimestamp) } - sw.pushData(&wc.writeRequest) + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) sw.prevLabelsLen = len(wc.labels) sw.prevBodyLen = len(bodyString) wc.reset() @@ -514,9 +518,9 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error return err } -func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) { +func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) { startTime := time.Now() - sw.PushData(wr) + sw.PushData(at, wr) pushDataDuration.UpdateDuration(startTime) } @@ -568,7 +572,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) } - sw.pushData(&wc.writeRequest) + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) wc.resetNoRows() return nil }, sw.logError) @@ -603,7 +607,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp) - sw.pushData(&wc.writeRequest) + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) sw.prevLabelsLen = len(wc.labels) sw.prevBodyLen = sbr.bodyLen wc.reset() @@ -770,7 +774,7 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i } staleSamplesCreated.Add(len(samples)) } - sw.pushData(&wc.writeRequest) + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) } var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`)