add support to scrape multi tenant metrics (#2950)

* add support to scrape multi tenant metrics

* add support to scrape multi tenant metrics

Co-authored-by: 赵福玉 <zhaofuyu@zhaofuyudeMac-mini.local>
This commit is contained in:
Fury 2022-08-08 19:10:18 +08:00 committed by Aliaksandr Valialkin
parent 69d62d5736
commit 59fdb4cb72
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
7 changed files with 38 additions and 20 deletions

View File

@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest) remotewrite.Push(nil, &ctx.WriteRequest)
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows))) rowsPerInsert.Update(float64(len(rows)))
return nil return nil

View File

@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest) remotewrite.Push(nil, &ctx.WriteRequest)
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows))) rowsPerInsert.Update(float64(len(rows)))
return nil return nil

View File

@ -65,7 +65,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest) remotewrite.Push(nil, &ctx.WriteRequest)
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows))) rowsPerInsert.Update(float64(len(rows)))
return nil return nil

View File

@ -235,8 +235,8 @@ func Stop() {
// Push sends wr to remote storage systems set via `-remoteWrite.url`. // Push sends wr to remote storage systems set via `-remoteWrite.url`.
// //
// Note that wr may be modified by Push due to relabeling and rounding. // Note that wr may be modified by Push due to relabeling and rounding.
func Push(wr *prompbmarshal.WriteRequest) { func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
PushWithAuthToken(nil, wr) PushWithAuthToken(at, wr)
} }
// PushWithAuthToken sends wr to remote storage systems set via `-remoteWrite.multitenantURL`. // PushWithAuthToken sends wr to remote storage systems set via `-remoteWrite.multitenantURL`.

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"net/url" "net/url"
"path/filepath" "path/filepath"
"sort" "sort"
@ -1227,6 +1228,17 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
if metricsPathRelabeled == "" { if metricsPathRelabeled == "" {
metricsPathRelabeled = "/metrics" metricsPathRelabeled = "/metrics"
} }
var at *auth.Token
tenantIdRelabeled := promrelabel.GetLabelValueByName(labels, "__tenant_id__")
if tenantIdRelabeled != "" {
newToken, err := auth.NewToken(tenantIdRelabeled)
if err != nil {
return nil, fmt.Errorf("invalid tenant id: %s for job=%s, err: %w", tenantIdRelabeled, swc.jobName, err)
}
at = newToken
}
if !strings.HasPrefix(metricsPathRelabeled, "/") { if !strings.HasPrefix(metricsPathRelabeled, "/") {
metricsPathRelabeled = "/" + metricsPathRelabeled metricsPathRelabeled = "/" + metricsPathRelabeled
} }
@ -1308,6 +1320,7 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
ScrapeAlignInterval: swc.scrapeAlignInterval, ScrapeAlignInterval: swc.scrapeAlignInterval,
ScrapeOffset: swc.scrapeOffset, ScrapeOffset: swc.scrapeOffset,
SeriesLimit: seriesLimit, SeriesLimit: seriesLimit,
AuthToken: at,
jobNameOriginal: swc.jobName, jobNameOriginal: swc.jobName,
} }

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"flag" "flag"
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"io" "io"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -53,7 +54,7 @@ func CheckConfig() error {
// Init initializes Prometheus scraper with config from the `-promscrape.config`. // Init initializes Prometheus scraper with config from the `-promscrape.config`.
// //
// Scraped data is passed to pushData. // Scraped data is passed to pushData.
func Init(pushData func(wr *prompbmarshal.WriteRequest)) { func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) {
mustInitClusterMemberID() mustInitClusterMemberID()
globalStopChan = make(chan struct{}) globalStopChan = make(chan struct{})
scraperWG.Add(1) scraperWG.Add(1)
@ -91,7 +92,7 @@ func WriteConfigData(w io.Writer) {
_, _ = w.Write(*b) _, _ = w.Write(*b)
} }
func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
if configFile == "" { if configFile == "" {
// Nothing to scrape. // Nothing to scrape.
return return
@ -185,14 +186,14 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`) var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`)
type scrapeConfigs struct { type scrapeConfigs struct {
pushData func(wr *prompbmarshal.WriteRequest) pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
wg sync.WaitGroup wg sync.WaitGroup
stopCh chan struct{} stopCh chan struct{}
globalStopCh <-chan struct{} globalStopCh <-chan struct{}
scfgs []*scrapeConfig scfgs []*scrapeConfig
} }
func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scrapeConfigs { func newScrapeConfigs(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scrapeConfigs {
return &scrapeConfigs{ return &scrapeConfigs{
pushData: pushData, pushData: pushData,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
@ -234,7 +235,7 @@ func (scs *scrapeConfigs) stop() {
type scrapeConfig struct { type scrapeConfig struct {
name string name string
pushData func(wr *prompbmarshal.WriteRequest) pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork
checkInterval time.Duration checkInterval time.Duration
cfgCh chan *Config cfgCh chan *Config
@ -287,7 +288,7 @@ type scraperGroup struct {
wg sync.WaitGroup wg sync.WaitGroup
mLock sync.Mutex mLock sync.Mutex
m map[string]*scraper m map[string]*scraper
pushData func(wr *prompbmarshal.WriteRequest) pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
changesCount *metrics.Counter changesCount *metrics.Counter
activeScrapers *metrics.Counter activeScrapers *metrics.Counter
@ -297,7 +298,7 @@ type scraperGroup struct {
globalStopCh <-chan struct{} globalStopCh <-chan struct{}
} }
func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scraperGroup { func newScraperGroup(name string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scraperGroup {
sg := &scraperGroup{ sg := &scraperGroup{
name: name, name: name,
m: make(map[string]*scraper), m: make(map[string]*scraper),
@ -413,7 +414,7 @@ type scraper struct {
stoppedCh chan struct{} stoppedCh chan struct{}
} }
func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper { func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper {
sc := &scraper{ sc := &scraper{
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}), stoppedCh: make(chan struct{}),

View File

@ -3,6 +3,7 @@ package promscrape
import ( import (
"flag" "flag"
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"io/ioutil" "io/ioutil"
"math" "math"
"math/bits" "math/bits"
@ -119,6 +120,9 @@ type ScrapeWork struct {
// Optional limit on the number of unique series the scrape target can expose. // Optional limit on the number of unique series the scrape target can expose.
SeriesLimit int SeriesLimit int
//The Tenant Info
AuthToken *auth.Token
// The original 'job_name' // The original 'job_name'
jobNameOriginal string jobNameOriginal string
} }
@ -188,7 +192,7 @@ type scrapeWork struct {
GetStreamReader func() (*streamReader, error) GetStreamReader func() (*streamReader, error)
// PushData is called for pushing collected data. // PushData is called for pushing collected data.
PushData func(wr *prompbmarshal.WriteRequest) PushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
// ScrapeGroup is name of ScrapeGroup that // ScrapeGroup is name of ScrapeGroup that
// scrapeWork belongs to // scrapeWork belongs to
@ -487,7 +491,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
// See https://github.com/VictoriaMetrics/operator/issues/497 // See https://github.com/VictoriaMetrics/operator/issues/497
sw.addAutoTimeseries(wc, "scrape_samples_limit", float64(sw.Config.SampleLimit), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_limit", float64(sw.Config.SampleLimit), scrapeTimestamp)
} }
sw.pushData(&wc.writeRequest) sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
sw.prevLabelsLen = len(wc.labels) sw.prevLabelsLen = len(wc.labels)
sw.prevBodyLen = len(bodyString) sw.prevBodyLen = len(bodyString)
wc.reset() wc.reset()
@ -514,9 +518,9 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
return err return err
} }
func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) { func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) {
startTime := time.Now() startTime := time.Now()
sw.PushData(wr) sw.PushData(at, wr)
pushDataDuration.UpdateDuration(startTime) pushDataDuration.UpdateDuration(startTime)
} }
@ -568,7 +572,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+
"either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit)
} }
sw.pushData(&wc.writeRequest) sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
wc.resetNoRows() wc.resetNoRows()
return nil return nil
}, sw.logError) }, sw.logError)
@ -603,7 +607,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp)
sw.pushData(&wc.writeRequest) sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
sw.prevLabelsLen = len(wc.labels) sw.prevLabelsLen = len(wc.labels)
sw.prevBodyLen = sbr.bodyLen sw.prevBodyLen = sbr.bodyLen
wc.reset() wc.reset()
@ -770,7 +774,7 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
} }
staleSamplesCreated.Add(len(samples)) staleSamplesCreated.Add(len(samples))
} }
sw.pushData(&wc.writeRequest) sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
} }
var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`) var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`)