diff --git a/lib/promauth/config.go b/lib/promauth/config.go index 3536ec73d7..11f83f9cbe 100644 --- a/lib/promauth/config.go +++ b/lib/promauth/config.go @@ -1,6 +1,7 @@ package promauth import ( + "bytes" "crypto/tls" "crypto/x509" "encoding/base64" @@ -40,6 +41,27 @@ type Config struct { TLSInsecureSkipVerify bool } +// String returns human-(un)readable representation for cfg. +func (ac *Config) String() string { + return fmt.Sprintf("Authorization=%s, TLSRootCA=%s, TLSCertificate=%s, TLSServerName=%s, TLSInsecureSkipVerify=%v", + ac.Authorization, ac.tlsRootCAString(), ac.tlsCertificateString(), ac.TLSServerName, ac.TLSInsecureSkipVerify) +} + +func (ac *Config) tlsRootCAString() string { + if ac.TLSRootCA == nil { + return "" + } + data := ac.TLSRootCA.Subjects() + return string(bytes.Join(data, []byte("\n"))) +} + +func (ac *Config) tlsCertificateString() string { + if ac.TLSCertificate == nil { + return "" + } + return string(bytes.Join(ac.TLSCertificate.Certificate, []byte("\n"))) +} + // NewTLSConfig returns new TLS config for the given ac. func (ac *Config) NewTLSConfig() *tls.Config { tlsCfg := &tls.Config{ diff --git a/lib/promrelabel/relabel.go b/lib/promrelabel/relabel.go index 02b1327439..474f46361e 100644 --- a/lib/promrelabel/relabel.go +++ b/lib/promrelabel/relabel.go @@ -1,6 +1,7 @@ package promrelabel import ( + "fmt" "regexp" "strconv" "strings" @@ -24,6 +25,12 @@ type ParsedRelabelConfig struct { Action string } +// String returns human-readable representation for prc. +func (prc *ParsedRelabelConfig) String() string { + return fmt.Sprintf("SourceLabels=%s, Separator=%s, TargetLabel=%s, Regex=%s, Modulus=%d, Replacement=%s, Action=%s", + prc.SourceLabels, prc.Separator, prc.TargetLabel, prc.Regex.String(), prc.Modulus, prc.Replacement, prc.Action) +} + // ApplyRelabelConfigs applies prcs to labels starting from the labelsOffset. // // If isFinalize is set, then FinalizeLabels is called on the labels[labelsOffset:]. diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 0b18d388f1..1b335103a4 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -143,38 +143,6 @@ func unmarshalMaybeStrict(data []byte, dst interface{}) error { return err } -func (cfg *Config) kubernetesSDConfigsCount() int { - n := 0 - for i := range cfg.ScrapeConfigs { - n += len(cfg.ScrapeConfigs[i].KubernetesSDConfigs) - } - return n -} - -func (cfg *Config) ec2SDConfigsCount() int { - n := 0 - for i := range cfg.ScrapeConfigs { - n += len(cfg.ScrapeConfigs[i].EC2SDConfigs) - } - return n -} - -func (cfg *Config) gceSDConfigsCount() int { - n := 0 - for i := range cfg.ScrapeConfigs { - n += len(cfg.ScrapeConfigs[i].GCESDConfigs) - } - return n -} - -func (cfg *Config) fileSDConfigsCount() int { - n := 0 - for i := range cfg.ScrapeConfigs { - n += len(cfg.ScrapeConfigs[i].FileSDConfigs) - } - return n -} - // getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg. func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork { var dst []ScrapeWork @@ -215,16 +183,16 @@ func (cfg *Config) getGCESDScrapeWork() []ScrapeWork { } // getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg. -func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork { +func (cfg *Config) getFileSDScrapeWork(swsPrev []ScrapeWork) []ScrapeWork { // Create a map for the previous scrape work. - swPrev := make(map[string][]ScrapeWork) - for i := range prev { - sw := &prev[i] + swsMapPrev := make(map[string][]ScrapeWork) + for i := range swsPrev { + sw := &swsPrev[i] filepath := promrelabel.GetLabelValueByName(sw.Labels, "__vm_filepath") if len(filepath) == 0 { logger.Panicf("BUG: missing `__vm_filepath` label") } else { - swPrev[filepath] = append(swPrev[filepath], *sw) + swsMapPrev[filepath] = append(swsMapPrev[filepath], *sw) } } var dst []ScrapeWork @@ -232,7 +200,7 @@ func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork { sc := &cfg.ScrapeConfigs[i] for j := range sc.FileSDConfigs { sdc := &sc.FileSDConfigs[j] - dst = sdc.appendScrapeWork(dst, swPrev, cfg.baseDir, sc.swc) + dst = sdc.appendScrapeWork(dst, swsMapPrev, cfg.baseDir, sc.swc) } } return dst @@ -377,7 +345,7 @@ func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, ta return dst } -func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swPrev map[string][]ScrapeWork, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { +func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swsMapPrev map[string][]ScrapeWork, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { for _, file := range sdc.Files { pathPattern := getFilepath(baseDir, file) paths := []string{pathPattern} @@ -394,7 +362,7 @@ func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swPrev map[string][] stcs, err := loadStaticConfigs(path) if err != nil { // Do not return this error, since other paths may contain valid scrape configs. - if sws := swPrev[path]; sws != nil { + if sws := swsMapPrev[path]; sws != nil { // Re-use the previous valid scrape work for this path. logger.Errorf("keeping the previously loaded `static_configs` from %q because of error when re-loading the file: %s", path, err) dst = append(dst, sws...) @@ -412,7 +380,7 @@ func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swPrev map[string][] } metaLabels := map[string]string{ "__meta_filepath": pathShort, - "__vm_filepath": pathShort, // This label is needed for internal promscrape logic + "__vm_filepath": path, // This label is needed for internal promscrape logic } for i := range stcs { dst = stcs[i].appendScrapeWork(dst, swc, metaLabels) diff --git a/lib/promscrape/config_test.go b/lib/promscrape/config_test.go index aa696f0568..5a4b09f539 100644 --- a/lib/promscrape/config_test.go +++ b/lib/promscrape/config_test.go @@ -46,8 +46,8 @@ func TestLoadConfig(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %s", err) } - if n := cfg.fileSDConfigsCount(); n != 2 { - t.Fatalf("unexpected number of `file_sd_configs`; got %d; want %d; cfg:\n%#v", n, 2, cfg) + if cfg == nil { + t.Fatalf("expecting non-nil config") } // Try loading non-existing file @@ -1169,3 +1169,17 @@ scrape_configs: } var defaultRegexForRelabelConfig = regexp.MustCompile("^(.*)$") + +func equalStaticConfigForScrapeWorks(a, b []ScrapeWork) bool { + if len(a) != len(b) { + return false + } + for i := range a { + keyA := a[i].key() + keyB := b[i].key() + if keyA != keyB { + return false + } + } + return true +} diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index ab45097d1c..1853cac440 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -3,6 +3,7 @@ package promscrape import ( "bytes" "flag" + "fmt" "os" "os/signal" "sync" @@ -60,8 +61,6 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) // Nothing to scrape. return } - sighupCh := make(chan os.Signal, 1) - signal.Notify(sighupCh, syscall.SIGHUP) logger.Infof("reading Prometheus configs from %q", configFile) cfg, data, err := loadConfig(configFile) @@ -69,43 +68,24 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) logger.Fatalf("cannot read %q: %s", configFile, err) } + scs := newScrapeConfigs(pushData) + scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() }) + scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) }) + scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork() }) + scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork() }) + scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork() }) + + sighupCh := make(chan os.Signal, 1) + signal.Notify(sighupCh, syscall.SIGHUP) + var tickerCh <-chan time.Time if *configCheckInterval > 0 { ticker := time.NewTicker(*configCheckInterval) tickerCh = ticker.C defer ticker.Stop() } - - mustStop := false - for !mustStop { - stopCh := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - runStaticScrapers(cfg, pushData, stopCh) - }() - wg.Add(1) - go func() { - defer wg.Done() - runFileSDScrapers(cfg, pushData, stopCh) - }() - wg.Add(1) - go func() { - defer wg.Done() - runKubernetesSDScrapers(cfg, pushData, stopCh) - }() - wg.Add(1) - go func() { - defer wg.Done() - runEC2SDScrapers(cfg, pushData, stopCh) - }() - wg.Add(1) - go func() { - defer wg.Done() - runGCESDScrapers(cfg, pushData, stopCh) - }() - + for { + scs.updateConfig(cfg) waitForChans: select { case <-sighupCh: @@ -134,281 +114,194 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) cfg = cfgNew data = dataNew case <-globalStopCh: - mustStop = true + logger.Infof("stopping Prometheus scrapers") + startTime := time.Now() + scs.stop() + logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds()) + return } - - if !mustStop { - logger.Infof("found changes in %q; applying these changes", configFile) - } - logger.Infof("stopping Prometheus scrapers") - startTime := time.Now() - close(stopCh) - wg.Wait() - logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds()) + logger.Infof("found changes in %q; applying these changes", configFile) configReloads.Inc() } } var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`) -func runStaticScrapers(cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { - sws := cfg.getStaticScrapeWork() - if len(sws) == 0 { - return - } - logger.Infof("starting %d scrapers for `static_config` targets", len(sws)) - staticTargets.Set(uint64(len(sws))) - runScrapeWorkers(sws, pushData, stopCh) - staticTargets.Set(0) - logger.Infof("stopped all the %d scrapers for `static_config` targets", len(sws)) +type scrapeConfigs struct { + pushData func(wr *prompbmarshal.WriteRequest) + wg sync.WaitGroup + stopCh chan struct{} + scfgs []*scrapeConfig } -var staticTargets = metrics.NewCounter(`vm_promscrape_targets{type="static"}`) - -func runKubernetesSDScrapers(cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { - if cfg.kubernetesSDConfigsCount() == 0 { - return +func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConfigs { + return &scrapeConfigs{ + pushData: pushData, + stopCh: make(chan struct{}), } - sws := cfg.getKubernetesSDScrapeWork() - ticker := time.NewTicker(*kubernetesSDCheckInterval) - defer ticker.Stop() - mustStop := false - for !mustStop { - localStopCh := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(1) - go func(sws []ScrapeWork) { - defer wg.Done() - logger.Infof("starting %d scrapers for `kubernetes_sd_config` targets", len(sws)) - kubernetesSDTargets.Set(uint64(len(sws))) - runScrapeWorkers(sws, pushData, localStopCh) - kubernetesSDTargets.Set(0) - logger.Infof("stopped all the %d scrapers for `kubernetes_sd_config` targets", len(sws)) - }(sws) - waitForChans: +} + +func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork) { + scfg := &scrapeConfig{ + name: name, + pushData: scs.pushData, + getScrapeWork: getScrapeWork, + checkInterval: checkInterval, + cfgCh: make(chan *Config, 1), + stopCh: scs.stopCh, + } + scs.wg.Add(1) + go func() { + defer scs.wg.Done() + scfg.run() + }() + scs.scfgs = append(scs.scfgs, scfg) +} + +func (scs *scrapeConfigs) updateConfig(cfg *Config) { + for _, scfg := range scs.scfgs { + scfg.cfgCh <- cfg + } +} + +func (scs *scrapeConfigs) stop() { + close(scs.stopCh) + scs.wg.Wait() + scs.scfgs = nil +} + +type scrapeConfig struct { + name string + pushData func(wr *prompbmarshal.WriteRequest) + getScrapeWork func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork + checkInterval time.Duration + cfgCh chan *Config + stopCh <-chan struct{} +} + +func (scfg *scrapeConfig) run() { + sg := newScraperGroup(scfg.name, scfg.pushData) + defer sg.stop() + + var tickerCh <-chan time.Time + if scfg.checkInterval > 0 { + ticker := time.NewTicker(scfg.checkInterval) + defer ticker.Stop() + tickerCh = ticker.C + } + + cfg := <-scfg.cfgCh + var swsPrev []ScrapeWork + for { + sws := scfg.getScrapeWork(cfg, swsPrev) + sg.update(sws) + swsPrev = sws + select { - case <-ticker.C: - swsNew := cfg.getKubernetesSDScrapeWork() - if equalStaticConfigForScrapeWorks(swsNew, sws) { - // Nothing changed, continue waiting for updated scrape work - goto waitForChans - } - logger.Infof("restarting scrapers for changed `kubernetes_sd_config` targets") - sws = swsNew - case <-stopCh: - mustStop = true - } - - close(localStopCh) - wg.Wait() - kubernetesSDReloads.Inc() - } -} - -var ( - kubernetesSDTargets = metrics.NewCounter(`vm_promscrape_targets{type="kubernetes_sd"}`) - kubernetesSDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="kubernetes_sd"}`) -) - -func runEC2SDScrapers(cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { - if cfg.ec2SDConfigsCount() == 0 { - return - } - sws := cfg.getEC2SDScrapeWork() - ticker := time.NewTicker(*ec2SDCheckInterval) - defer ticker.Stop() - mustStop := false - for !mustStop { - localStopCh := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(1) - go func(sws []ScrapeWork) { - defer wg.Done() - logger.Infof("starting %d scrapers for `ec2_sd_config` targets", len(sws)) - ec2SDTargets.Set(uint64(len(sws))) - runScrapeWorkers(sws, pushData, localStopCh) - ec2SDTargets.Set(0) - logger.Infof("stopped all the %d scrapers for `ec2_sd_config` targets", len(sws)) - }(sws) - waitForChans: - select { - case <-ticker.C: - swsNew := cfg.getEC2SDScrapeWork() - if equalStaticConfigForScrapeWorks(swsNew, sws) { - // Nothing changed, continue waiting for updated scrape work - goto waitForChans - } - logger.Infof("restarting scrapers for changed `ec2_sd_config` targets") - sws = swsNew - case <-stopCh: - mustStop = true - } - - close(localStopCh) - wg.Wait() - ec2SDReloads.Inc() - } -} - -var ( - ec2SDTargets = metrics.NewCounter(`vm_promscrape_targets{type="ec2_sd"}`) - ec2SDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="ec2_sd"}`) -) - -func runGCESDScrapers(cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { - if cfg.gceSDConfigsCount() == 0 { - return - } - sws := cfg.getGCESDScrapeWork() - ticker := time.NewTicker(*gceSDCheckInterval) - defer ticker.Stop() - mustStop := false - for !mustStop { - localStopCh := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(1) - go func(sws []ScrapeWork) { - defer wg.Done() - logger.Infof("starting %d scrapers for `gce_sd_config` targets", len(sws)) - gceSDTargets.Set(uint64(len(sws))) - runScrapeWorkers(sws, pushData, localStopCh) - gceSDTargets.Set(0) - logger.Infof("stopped all the %d scrapers for `gce_sd_config` targets", len(sws)) - }(sws) - waitForChans: - select { - case <-ticker.C: - swsNew := cfg.getGCESDScrapeWork() - if equalStaticConfigForScrapeWorks(swsNew, sws) { - // Nothing changed, continue waiting for updated scrape work - goto waitForChans - } - logger.Infof("restarting scrapers for changed `gce_sd_config` targets") - sws = swsNew - case <-stopCh: - mustStop = true - } - - close(localStopCh) - wg.Wait() - gceSDReloads.Inc() - } -} - -var ( - gceSDTargets = metrics.NewCounter(`vm_promscrape_targets{type="gce_sd"}`) - gceSDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="gce_sd"}`) -) - -func runFileSDScrapers(cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { - if cfg.fileSDConfigsCount() == 0 { - return - } - sws := cfg.getFileSDScrapeWork(nil) - ticker := time.NewTicker(*fileSDCheckInterval) - defer ticker.Stop() - mustStop := false - for !mustStop { - localStopCh := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(1) - go func(sws []ScrapeWork) { - defer wg.Done() - logger.Infof("starting %d scrapers for `file_sd_config` targets", len(sws)) - fileSDTargets.Set(uint64(len(sws))) - runScrapeWorkers(sws, pushData, localStopCh) - fileSDTargets.Set(0) - logger.Infof("stopped all the %d scrapers for `file_sd_config` targets", len(sws)) - }(sws) - waitForChans: - select { - case <-ticker.C: - swsNew := cfg.getFileSDScrapeWork(sws) - if equalStaticConfigForScrapeWorks(swsNew, sws) { - // Nothing changed, continue waiting for updated scrape work - goto waitForChans - } - logger.Infof("restarting scrapers for changed `file_sd_config` targets") - sws = swsNew - case <-stopCh: - mustStop = true - } - - close(localStopCh) - wg.Wait() - fileSDReloads.Inc() - } -} - -var ( - fileSDTargets = metrics.NewCounter(`vm_promscrape_targets{type="file_sd"}`) - fileSDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="file_sd"}`) -) - -func equalStaticConfigForScrapeWorks(as, bs []ScrapeWork) bool { - if len(as) != len(bs) { - return false - } - for i := range as { - if !equalStaticConfigForScrapeWork(&as[i], &bs[i]) { - return false + case <-scfg.stopCh: + return + case cfg = <-scfg.cfgCh: + case <-tickerCh: } } - return true } -func equalStaticConfigForScrapeWork(a, b *ScrapeWork) bool { - // `static_config` can change only ScrapeURL and Labels. So compare only them. - if a.ScrapeURL != b.ScrapeURL { - return false - } - if !equalLabels(a.Labels, b.Labels) { - return false - } - return true +type scraperGroup struct { + name string + wg sync.WaitGroup + mLock sync.Mutex + m map[string]*scraper + pushData func(wr *prompbmarshal.WriteRequest) + changesCount *metrics.Counter } -func equalLabels(as, bs []prompbmarshal.Label) bool { - if len(as) != len(bs) { - return false +func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup { + sg := &scraperGroup{ + name: name, + m: make(map[string]*scraper), + pushData: pushData, + changesCount: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_config_changes_total{type=%q}`, name)), } - for i := range as { - if !equalLabel(&as[i], &bs[i]) { - return false - } - } - return true + metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q}`, name), func() float64 { + sg.mLock.Lock() + n := len(sg.m) + sg.mLock.Unlock() + return float64(n) + }) + return sg } -func equalLabel(a, b *prompbmarshal.Label) bool { - if a.Name != b.Name { - return false +func (sg *scraperGroup) stop() { + sg.mLock.Lock() + for _, sc := range sg.m { + close(sc.stopCh) } - if a.Value != b.Value { - return false - } - return true + sg.m = nil + sg.mLock.Unlock() + sg.wg.Wait() } -// runScrapeWorkers runs sws. -// -// This function returns after closing stopCh. -func runScrapeWorkers(sws []ScrapeWork, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { - tsmGlobal.RegisterAll(sws) - var wg sync.WaitGroup +func (sg *scraperGroup) update(sws []ScrapeWork) { + sg.mLock.Lock() + defer sg.mLock.Unlock() + + additionsCount := 0 + deletionsCount := 0 + swsMap := make(map[string]bool, len(sws)) for i := range sws { - cfg := &sws[i] - c := newClient(cfg) - var sw scrapeWork - sw.Config = *cfg - sw.ReadData = c.ReadData - sw.PushData = pushData - wg.Add(1) + sw := &sws[i] + key := sw.key() + if swsMap[key] { + logger.Errorf("skipping duplicate scrape target with identical labels; endpoint=%s, labels=%s; make sure service discovery and relabeling is set up properly", + sw.ScrapeURL, sw.LabelsString()) + continue + } + swsMap[key] = true + if sg.m[key] != nil { + // The scraper for the given key already exists. + continue + } + + // Start a scraper for the missing key. + sc := newScraper(sw, sg.pushData) + sg.wg.Add(1) go func() { - defer wg.Done() - sw.run(stopCh) + defer sg.wg.Done() + sc.sw.run(sc.stopCh) + tsmGlobal.Unregister(sw) }() + tsmGlobal.Register(sw) + sg.m[key] = sc + additionsCount++ + } + + // Stop deleted scrapers, which are missing in sws. + for key, sc := range sg.m { + if !swsMap[key] { + close(sc.stopCh) + delete(sg.m, key) + deletionsCount++ + } + } + + if additionsCount > 0 || deletionsCount > 0 { + sg.changesCount.Add(additionsCount + deletionsCount) + logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m)) } - wg.Wait() - tsmGlobal.UnregisterAll(sws) +} + +type scraper struct { + sw scrapeWork + stopCh chan struct{} +} + +func newScraper(sw *ScrapeWork, pushData func(wr *prompbmarshal.WriteRequest)) *scraper { + sc := &scraper{ + stopCh: make(chan struct{}), + } + c := newClient(sw) + sc.sw.Config = *sw + sc.sw.ReadData = c.ReadData + sc.sw.PushData = pushData + return sc } diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 22de48f486..a97b2c01d0 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -68,6 +68,25 @@ type ScrapeWork struct { SampleLimit int } +// key returns unique identifier for the given sw. +// +// it can be used for comparing for equality two ScrapeWork objects. +func (sw *ScrapeWork) key() string { + key := fmt.Sprintf("ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, Labels=%s, "+ + "AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d", + sw.ScrapeURL, sw.ScrapeInterval, sw.ScrapeTimeout, sw.HonorLabels, sw.HonorTimestamps, sw.LabelsString(), + sw.AuthConfig.String(), sw.metricRelabelConfigsString(), sw.SampleLimit) + return key +} + +func (sw *ScrapeWork) metricRelabelConfigsString() string { + var sb strings.Builder + for _, prc := range sw.MetricRelabelConfigs { + fmt.Fprintf(&sb, "%s", prc.String()) + } + return sb.String() +} + // Job returns job for the ScrapeWork func (sw *ScrapeWork) Job() string { return promrelabel.GetLabelValueByName(sw.Labels, "job") diff --git a/lib/promscrape/targetstatus.go b/lib/promscrape/targetstatus.go index eb290f4872..ad0c9c480f 100644 --- a/lib/promscrape/targetstatus.go +++ b/lib/promscrape/targetstatus.go @@ -32,22 +32,17 @@ func (tsm *targetStatusMap) Reset() { tsm.mu.Unlock() } -func (tsm *targetStatusMap) RegisterAll(sws []ScrapeWork) { +func (tsm *targetStatusMap) Register(sw *ScrapeWork) { tsm.mu.Lock() - for i := range sws { - sw := &sws[i] - tsm.m[sw.ID] = targetStatus{ - sw: sw, - } + tsm.m[sw.ID] = targetStatus{ + sw: sw, } tsm.mu.Unlock() } -func (tsm *targetStatusMap) UnregisterAll(sws []ScrapeWork) { +func (tsm *targetStatusMap) Unregister(sw *ScrapeWork) { tsm.mu.Lock() - for i := range sws { - delete(tsm.m, sws[i].ID) - } + delete(tsm.m, sw.ID) tsm.mu.Unlock() } @@ -83,7 +78,6 @@ func (tsm *targetStatusMap) WriteHumanReadable(w io.Writer) { return jss[i].job < jss[j].job }) - targetsByEndpoint := make(map[string]int) for _, js := range jss { sts := js.statuses sort.Slice(sts, func(i, j int) bool { @@ -109,20 +103,9 @@ func (tsm *targetStatusMap) WriteHumanReadable(w io.Writer) { } fmt.Fprintf(w, "\tstate=%s, endpoint=%s, labels=%s, last_scrape=%.3fs ago, scrape_duration=%.3fs, error=%q\n", state, st.sw.ScrapeURL, labelsStr, lastScrape.Seconds(), float64(st.scrapeDuration)/1000, errMsg) - key := fmt.Sprintf("endpoint=%s, labels=%s", st.sw.ScrapeURL, labelsStr) - targetsByEndpoint[key]++ } } fmt.Fprintf(w, "\n") - - // Check whether there are targets with duplicate endpoints and labels. - for key, n := range targetsByEndpoint { - if n <= 1 { - continue - } - fmt.Fprintf(w, "!!! Scrape config error: %d duplicate targets with identical endpoint and labels found:\n", n) - fmt.Fprintf(w, "\t%s\n", key) - } } type jobStatus struct {