mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
38188e1d6b
This should prevent from possible 'memory leaks' when a pointer to ScrapeWork item stored in the slice
could prevent from releasing memory occupied by all the ScrapeWork items stored in the slice when they
are no longer used.
See the related commit e205975716
and the related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825
365 lines
13 KiB
Go
365 lines
13 KiB
Go
package promscrape
|
|
|
|
import (
|
|
"bytes"
|
|
"flag"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
configCheckInterval = flag.Duration("promscrape.configCheckInterval", 0, "Interval for checking for changes in '-promscrape.config' file. "+
|
|
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes")
|
|
fileSDCheckInterval = flag.Duration("promscrape.fileSDCheckInterval", 30*time.Second, "Interval for checking for changes in 'file_sd_config'. "+
|
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config for details")
|
|
kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+
|
|
"This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+
|
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details")
|
|
openstackSDCheckInterval = flag.Duration("promscrape.openstackSDCheckInterval", 30*time.Second, "Interval for checking for changes in openstack API server. "+
|
|
"This works only if `openstack_sd_configs` is configured in '-promscrape.config' file. "+
|
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config for details")
|
|
eurekaSDCheckInterval = flag.Duration("promscrape.eurekaSDCheckInterval", 30*time.Second, "Interval for checking for changes in eureka. "+
|
|
"This works only if `eureka_sd_configs` is configured in '-promscrape.config' file. "+
|
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config for details")
|
|
dnsSDCheckInterval = flag.Duration("promscrape.dnsSDCheckInterval", 30*time.Second, "Interval for checking for changes in dns. "+
|
|
"This works only if `dns_sd_configs` is configured in '-promscrape.config' file. "+
|
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config for details")
|
|
ec2SDCheckInterval = flag.Duration("promscrape.ec2SDCheckInterval", time.Minute, "Interval for checking for changes in ec2. "+
|
|
"This works only if `ec2_sd_configs` is configured in '-promscrape.config' file. "+
|
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config for details")
|
|
gceSDCheckInterval = flag.Duration("promscrape.gceSDCheckInterval", time.Minute, "Interval for checking for changes in gce. "+
|
|
"This works only if `gce_sd_configs` is configured in '-promscrape.config' file. "+
|
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config for details")
|
|
dockerswarmSDCheckInterval = flag.Duration("promscrape.dockerswarmSDCheckInterval", 30*time.Second, "Interval for checking for changes in dockerswarm. "+
|
|
"This works only if `dockerswarm_sd_configs` is configured in '-promscrape.config' file. "+
|
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config for details")
|
|
promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+
|
|
"See https://victoriametrics.github.io/#how-to-scrape-prometheus-exporters-such-as-node-exporter for details")
|
|
suppressDuplicateScrapeTargetErrors = flag.Bool("promscrape.suppressDuplicateScrapeTargetErrors", false, "Whether to suppress `duplicate scrape target` errors; "+
|
|
"see https://victoriametrics.github.io/vmagent.html#troubleshooting for details")
|
|
)
|
|
|
|
// CheckConfig checks -promscrape.config for errors and unsupported options.
|
|
func CheckConfig() error {
|
|
if *promscrapeConfigFile == "" {
|
|
return fmt.Errorf("missing -promscrape.config option")
|
|
}
|
|
_, _, err := loadConfig(*promscrapeConfigFile)
|
|
return err
|
|
}
|
|
|
|
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
|
|
//
|
|
// Scraped data is passed to pushData.
|
|
func Init(pushData func(wr *prompbmarshal.WriteRequest)) {
|
|
globalStopCh = make(chan struct{})
|
|
scraperWG.Add(1)
|
|
go func() {
|
|
defer scraperWG.Done()
|
|
runScraper(*promscrapeConfigFile, pushData, globalStopCh)
|
|
}()
|
|
}
|
|
|
|
// Stop stops Prometheus scraper.
|
|
func Stop() {
|
|
close(globalStopCh)
|
|
scraperWG.Wait()
|
|
}
|
|
|
|
var (
|
|
globalStopCh chan struct{}
|
|
scraperWG sync.WaitGroup
|
|
// PendingScrapeConfigs - zero value means, that
|
|
// all scrapeConfigs are inited and ready for work.
|
|
PendingScrapeConfigs int32
|
|
)
|
|
|
|
func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
|
|
if configFile == "" {
|
|
// Nothing to scrape.
|
|
return
|
|
}
|
|
|
|
logger.Infof("reading Prometheus configs from %q", configFile)
|
|
cfg, data, err := loadConfig(configFile)
|
|
if err != nil {
|
|
logger.Fatalf("cannot read %q: %s", configFile, err)
|
|
}
|
|
|
|
scs := newScrapeConfigs(pushData)
|
|
scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
|
|
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
|
|
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
|
|
scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
|
|
scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
|
|
scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) })
|
|
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
|
|
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
|
|
scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })
|
|
scs.add("dockerswarm_sd_configs", *dockerswarmSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDockerSwarmSDScrapeWork(swsPrev) })
|
|
|
|
sighupCh := procutil.NewSighupChan()
|
|
|
|
var tickerCh <-chan time.Time
|
|
if *configCheckInterval > 0 {
|
|
ticker := time.NewTicker(*configCheckInterval)
|
|
tickerCh = ticker.C
|
|
defer ticker.Stop()
|
|
}
|
|
for {
|
|
scs.updateConfig(cfg)
|
|
waitForChans:
|
|
select {
|
|
case <-sighupCh:
|
|
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
|
|
cfgNew, dataNew, err := loadConfig(configFile)
|
|
if err != nil {
|
|
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
|
|
goto waitForChans
|
|
}
|
|
if bytes.Equal(data, dataNew) {
|
|
logger.Infof("nothing changed in %q", configFile)
|
|
goto waitForChans
|
|
}
|
|
cfg = cfgNew
|
|
data = dataNew
|
|
case <-tickerCh:
|
|
cfgNew, dataNew, err := loadConfig(configFile)
|
|
if err != nil {
|
|
logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err)
|
|
goto waitForChans
|
|
}
|
|
if bytes.Equal(data, dataNew) {
|
|
// Nothing changed since the previous loadConfig
|
|
goto waitForChans
|
|
}
|
|
cfg = cfgNew
|
|
data = dataNew
|
|
case <-globalStopCh:
|
|
logger.Infof("stopping Prometheus scrapers")
|
|
startTime := time.Now()
|
|
scs.stop()
|
|
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
|
|
return
|
|
}
|
|
logger.Infof("found changes in %q; applying these changes", configFile)
|
|
configReloads.Inc()
|
|
}
|
|
}
|
|
|
|
var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`)
|
|
|
|
type scrapeConfigs struct {
|
|
pushData func(wr *prompbmarshal.WriteRequest)
|
|
wg sync.WaitGroup
|
|
stopCh chan struct{}
|
|
scfgs []*scrapeConfig
|
|
}
|
|
|
|
func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConfigs {
|
|
return &scrapeConfigs{
|
|
pushData: pushData,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork) {
|
|
atomic.AddInt32(&PendingScrapeConfigs, 1)
|
|
scfg := &scrapeConfig{
|
|
name: name,
|
|
pushData: scs.pushData,
|
|
getScrapeWork: getScrapeWork,
|
|
checkInterval: checkInterval,
|
|
cfgCh: make(chan *Config, 1),
|
|
stopCh: scs.stopCh,
|
|
}
|
|
scs.wg.Add(1)
|
|
go func() {
|
|
defer scs.wg.Done()
|
|
scfg.run()
|
|
}()
|
|
scs.scfgs = append(scs.scfgs, scfg)
|
|
}
|
|
|
|
func (scs *scrapeConfigs) updateConfig(cfg *Config) {
|
|
for _, scfg := range scs.scfgs {
|
|
scfg.cfgCh <- cfg
|
|
}
|
|
}
|
|
|
|
func (scs *scrapeConfigs) stop() {
|
|
close(scs.stopCh)
|
|
scs.wg.Wait()
|
|
scs.scfgs = nil
|
|
}
|
|
|
|
type scrapeConfig struct {
|
|
name string
|
|
pushData func(wr *prompbmarshal.WriteRequest)
|
|
getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork
|
|
checkInterval time.Duration
|
|
cfgCh chan *Config
|
|
stopCh <-chan struct{}
|
|
}
|
|
|
|
func (scfg *scrapeConfig) run() {
|
|
sg := newScraperGroup(scfg.name, scfg.pushData)
|
|
defer sg.stop()
|
|
|
|
var tickerCh <-chan time.Time
|
|
if scfg.checkInterval > 0 {
|
|
ticker := time.NewTicker(scfg.checkInterval)
|
|
defer ticker.Stop()
|
|
tickerCh = ticker.C
|
|
}
|
|
|
|
cfg := <-scfg.cfgCh
|
|
var swsPrev []*ScrapeWork
|
|
updateScrapeWork := func(cfg *Config) {
|
|
sws := scfg.getScrapeWork(cfg, swsPrev)
|
|
sg.update(sws)
|
|
swsPrev = sws
|
|
}
|
|
updateScrapeWork(cfg)
|
|
atomic.AddInt32(&PendingScrapeConfigs, -1)
|
|
|
|
for {
|
|
|
|
select {
|
|
case <-scfg.stopCh:
|
|
return
|
|
case cfg = <-scfg.cfgCh:
|
|
case <-tickerCh:
|
|
}
|
|
updateScrapeWork(cfg)
|
|
}
|
|
}
|
|
|
|
type scraperGroup struct {
|
|
name string
|
|
wg sync.WaitGroup
|
|
mLock sync.Mutex
|
|
m map[string]*scraper
|
|
pushData func(wr *prompbmarshal.WriteRequest)
|
|
|
|
changesCount *metrics.Counter
|
|
activeScrapers *metrics.Counter
|
|
scrapersStarted *metrics.Counter
|
|
scrapersStopped *metrics.Counter
|
|
}
|
|
|
|
func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup {
|
|
sg := &scraperGroup{
|
|
name: name,
|
|
m: make(map[string]*scraper),
|
|
pushData: pushData,
|
|
|
|
changesCount: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_config_changes_total{type=%q}`, name)),
|
|
activeScrapers: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_active_scrapers{type=%q}`, name)),
|
|
scrapersStarted: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_started_total{type=%q}`, name)),
|
|
scrapersStopped: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_stopped_total{type=%q}`, name)),
|
|
}
|
|
metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="up"}`, name), func() float64 {
|
|
return float64(tsmGlobal.StatusByGroup(sg.name, true))
|
|
})
|
|
metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="down"}`, name), func() float64 {
|
|
return float64(tsmGlobal.StatusByGroup(sg.name, false))
|
|
})
|
|
return sg
|
|
}
|
|
|
|
func (sg *scraperGroup) stop() {
|
|
sg.mLock.Lock()
|
|
for _, sc := range sg.m {
|
|
close(sc.stopCh)
|
|
}
|
|
sg.m = nil
|
|
sg.mLock.Unlock()
|
|
sg.wg.Wait()
|
|
}
|
|
|
|
func (sg *scraperGroup) update(sws []*ScrapeWork) {
|
|
sg.mLock.Lock()
|
|
defer sg.mLock.Unlock()
|
|
|
|
additionsCount := 0
|
|
deletionsCount := 0
|
|
swsMap := make(map[string][]prompbmarshal.Label, len(sws))
|
|
for _, sw := range sws {
|
|
key := sw.key()
|
|
originalLabels := swsMap[key]
|
|
if originalLabels != nil {
|
|
if !*suppressDuplicateScrapeTargetErrors {
|
|
logger.Errorf("skipping duplicate scrape target with identical labels; endpoint=%s, labels=%s; "+
|
|
"make sure service discovery and relabeling is set up properly; "+
|
|
"see also https://victoriametrics.github.io/vmagent.html#troubleshooting; "+
|
|
"original labels for target1: %s; original labels for target2: %s",
|
|
sw.ScrapeURL, sw.LabelsString(), promLabelsString(originalLabels), promLabelsString(sw.OriginalLabels))
|
|
}
|
|
droppedTargetsMap.Register(sw.OriginalLabels)
|
|
continue
|
|
}
|
|
swsMap[key] = sw.OriginalLabels
|
|
if sg.m[key] != nil {
|
|
// The scraper for the given key already exists.
|
|
continue
|
|
}
|
|
|
|
// Start a scraper for the missing key.
|
|
sc := newScraper(sw, sg.name, sg.pushData)
|
|
sg.activeScrapers.Inc()
|
|
sg.scrapersStarted.Inc()
|
|
sg.wg.Add(1)
|
|
go func() {
|
|
defer sg.wg.Done()
|
|
sc.sw.run(sc.stopCh)
|
|
tsmGlobal.Unregister(sw)
|
|
sg.activeScrapers.Dec()
|
|
sg.scrapersStopped.Inc()
|
|
}()
|
|
tsmGlobal.Register(sw)
|
|
sg.m[key] = sc
|
|
additionsCount++
|
|
}
|
|
|
|
// Stop deleted scrapers, which are missing in sws.
|
|
for key, sc := range sg.m {
|
|
if _, ok := swsMap[key]; !ok {
|
|
close(sc.stopCh)
|
|
delete(sg.m, key)
|
|
deletionsCount++
|
|
}
|
|
}
|
|
|
|
if additionsCount > 0 || deletionsCount > 0 {
|
|
sg.changesCount.Add(additionsCount + deletionsCount)
|
|
logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m))
|
|
}
|
|
}
|
|
|
|
type scraper struct {
|
|
sw scrapeWork
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper {
|
|
sc := &scraper{
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
c := newClient(sw)
|
|
sc.sw.Config = *sw
|
|
sc.sw.ScrapeGroup = group
|
|
sc.sw.ReadData = c.ReadData
|
|
sc.sw.GetStreamReader = c.GetStreamReader
|
|
sc.sw.PushData = pushData
|
|
return sc
|
|
}
|