2020-02-23 12:35:47 +01:00
package promscrape
import (
2020-04-23 22:40:50 +02:00
"bytes"
2020-02-23 12:35:47 +01:00
"flag"
2020-05-03 11:41:13 +02:00
"fmt"
2020-02-23 12:35:47 +01:00
"sync"
2020-11-04 19:29:18 +01:00
"sync/atomic"
2020-02-23 12:35:47 +01:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-05-05 08:27:38 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2020-12-03 18:47:40 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
2021-06-25 11:10:20 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
2021-06-25 12:20:18 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dockerswarm"
2021-06-25 11:10:20 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/eureka"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
2021-06-22 12:33:37 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
2021-06-25 11:10:20 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/metrics"
)
var (
2020-04-23 22:40:50 +02:00
configCheckInterval = flag . Duration ( "promscrape.configCheckInterval" , 0 , "Interval for checking for changes in '-promscrape.config' file. " +
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes" )
2021-03-15 20:59:25 +01:00
suppressDuplicateScrapeTargetErrors = flag . Bool ( "promscrape.suppressDuplicateScrapeTargetErrors" , false , "Whether to suppress 'duplicate scrape target' errors; " +
2021-04-20 19:16:17 +02:00
"see https://docs.victoriametrics.com/vmagent.html#troubleshooting for details" )
2021-06-25 11:10:20 +02:00
promscrapeConfigFile = flag . String ( "promscrape.config" , "" , "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. " +
"See https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter for details" )
fileSDCheckInterval = flag . Duration ( "promscrape.fileSDCheckInterval" , 30 * time . Second , "Interval for checking for changes in 'file_sd_config'. " +
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config for details" )
2020-02-23 12:35:47 +01:00
)
2020-05-21 14:22:01 +02:00
// CheckConfig checks -promscrape.config for errors and unsupported options.
func CheckConfig ( ) error {
if * promscrapeConfigFile == "" {
return fmt . Errorf ( "missing -promscrape.config option" )
}
_ , _ , err := loadConfig ( * promscrapeConfigFile )
return err
}
2020-02-23 12:35:47 +01:00
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
2021-08-05 08:46:19 +02:00
func Init ( pushData func ( wr * prompbmarshal . WriteRequest ) ) {
2020-03-03 19:08:08 +01:00
globalStopCh = make ( chan struct { } )
2020-02-23 12:35:47 +01:00
scraperWG . Add ( 1 )
go func ( ) {
defer scraperWG . Done ( )
2020-03-03 19:08:08 +01:00
runScraper ( * promscrapeConfigFile , pushData , globalStopCh )
2020-02-23 12:35:47 +01:00
} ( )
}
// Stop stops Prometheus scraper.
func Stop ( ) {
2020-03-03 19:08:08 +01:00
close ( globalStopCh )
2020-02-23 12:35:47 +01:00
scraperWG . Wait ( )
}
var (
2020-03-03 19:08:08 +01:00
globalStopCh chan struct { }
scraperWG sync . WaitGroup
2020-11-04 19:29:18 +01:00
// PendingScrapeConfigs - zero value means, that
// all scrapeConfigs are inited and ready for work.
PendingScrapeConfigs int32
2020-02-23 12:35:47 +01:00
)
2021-08-05 08:46:19 +02:00
func runScraper ( configFile string , pushData func ( wr * prompbmarshal . WriteRequest ) , globalStopCh <- chan struct { } ) {
2020-02-23 12:35:47 +01:00
if configFile == "" {
// Nothing to scrape.
return
}
2021-05-21 15:34:03 +02:00
// Register SIGHUP handler for config reload before loadConfig.
// This guarantees that the config will be re-read if the signal arrives just after loadConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil . NewSighupChan ( )
2020-02-23 12:35:47 +01:00
logger . Infof ( "reading Prometheus configs from %q" , configFile )
2020-04-23 22:40:50 +02:00
cfg , data , err := loadConfig ( configFile )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Fatalf ( "cannot read %q: %s" , configFile , err )
}
2021-04-05 21:02:09 +02:00
cfg . mustStart ( )
2020-02-23 12:35:47 +01:00
2020-05-03 11:41:13 +02:00
scs := newScrapeConfigs ( pushData )
2020-12-08 16:50:03 +01:00
scs . add ( "consul_sd_configs" , * consul . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getConsulSDScrapeWork ( swsPrev ) } )
2021-06-25 11:10:20 +02:00
scs . add ( "digitalocean_sd_configs" , * digitalocean . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDigitalOceanDScrapeWork ( swsPrev ) } )
scs . add ( "dns_sd_configs" , * dns . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDNSSDScrapeWork ( swsPrev ) } )
scs . add ( "docker_sd_configs" , * docker . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDockerSDScrapeWork ( swsPrev ) } )
2021-06-25 12:20:18 +02:00
scs . add ( "dockerswarm_sd_configs" , * dockerswarm . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDockerSwarmSDScrapeWork ( swsPrev ) } )
2021-06-25 11:10:20 +02:00
scs . add ( "ec2_sd_configs" , * ec2 . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getEC2SDScrapeWork ( swsPrev ) } )
scs . add ( "eureka_sd_configs" , * eureka . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getEurekaSDScrapeWork ( swsPrev ) } )
scs . add ( "file_sd_configs" , * fileSDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getFileSDScrapeWork ( swsPrev ) } )
scs . add ( "gce_sd_configs" , * gce . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getGCESDScrapeWork ( swsPrev ) } )
2021-06-22 12:33:37 +02:00
scs . add ( "http_sd_configs" , * http . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getHTTPDScrapeWork ( swsPrev ) } )
2021-06-25 11:10:20 +02:00
scs . add ( "kubernetes_sd_configs" , * kubernetes . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getKubernetesSDScrapeWork ( swsPrev ) } )
scs . add ( "openstack_sd_configs" , * openstack . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getOpenStackSDScrapeWork ( swsPrev ) } )
scs . add ( "static_configs" , 0 , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getStaticScrapeWork ( ) } )
2020-05-03 11:41:13 +02:00
2020-04-23 22:40:50 +02:00
var tickerCh <- chan time . Time
if * configCheckInterval > 0 {
ticker := time . NewTicker ( * configCheckInterval )
tickerCh = ticker . C
defer ticker . Stop ( )
}
2020-05-03 11:41:13 +02:00
for {
scs . updateConfig ( cfg )
2020-02-23 12:35:47 +01:00
waitForChans :
select {
case <- sighupCh :
logger . Infof ( "SIGHUP received; reloading Prometheus configs from %q" , configFile )
2020-04-23 22:40:50 +02:00
cfgNew , dataNew , err := loadConfig ( configFile )
if err != nil {
logger . Errorf ( "cannot read %q on SIGHUP: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
if bytes . Equal ( data , dataNew ) {
logger . Infof ( "nothing changed in %q" , configFile )
goto waitForChans
}
2021-03-01 13:13:56 +01:00
cfg . mustStop ( )
2021-04-05 21:02:09 +02:00
cfgNew . mustStart ( )
2020-04-23 22:40:50 +02:00
cfg = cfgNew
data = dataNew
case <- tickerCh :
cfgNew , dataNew , err := loadConfig ( configFile )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Errorf ( "cannot read %q: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
2020-04-23 22:40:50 +02:00
if bytes . Equal ( data , dataNew ) {
// Nothing changed since the previous loadConfig
goto waitForChans
}
2021-03-01 13:13:56 +01:00
cfg . mustStop ( )
2021-04-05 21:02:09 +02:00
cfgNew . mustStart ( )
2020-02-23 12:35:47 +01:00
cfg = cfgNew
2020-04-23 22:40:50 +02:00
data = dataNew
2020-02-23 12:35:47 +01:00
case <- globalStopCh :
2021-03-01 13:13:56 +01:00
cfg . mustStop ( )
2020-05-03 11:41:13 +02:00
logger . Infof ( "stopping Prometheus scrapers" )
startTime := time . Now ( )
scs . stop ( )
logger . Infof ( "stopped Prometheus scrapers in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
return
2020-04-23 22:40:50 +02:00
}
2020-05-03 11:41:13 +02:00
logger . Infof ( "found changes in %q; applying these changes" , configFile )
2020-02-23 12:35:47 +01:00
configReloads . Inc ( )
}
}
var configReloads = metrics . NewCounter ( ` vm_promscrape_config_reloads_total ` )
2020-05-03 11:41:13 +02:00
type scrapeConfigs struct {
2021-08-05 08:46:19 +02:00
pushData func ( wr * prompbmarshal . WriteRequest )
2020-05-03 11:41:13 +02:00
wg sync . WaitGroup
stopCh chan struct { }
scfgs [ ] * scrapeConfig
2020-02-23 12:35:47 +01:00
}
2021-08-05 08:46:19 +02:00
func newScrapeConfigs ( pushData func ( wr * prompbmarshal . WriteRequest ) ) * scrapeConfigs {
2020-05-03 11:41:13 +02:00
return & scrapeConfigs {
pushData : pushData ,
stopCh : make ( chan struct { } ) ,
2020-04-13 20:02:27 +02:00
}
2020-05-03 11:41:13 +02:00
}
2020-04-13 20:02:27 +02:00
2020-12-08 16:50:03 +01:00
func ( scs * scrapeConfigs ) add ( name string , checkInterval time . Duration , getScrapeWork func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork ) {
2020-11-04 19:29:18 +01:00
atomic . AddInt32 ( & PendingScrapeConfigs , 1 )
2020-05-03 11:41:13 +02:00
scfg := & scrapeConfig {
name : name ,
pushData : scs . pushData ,
getScrapeWork : getScrapeWork ,
checkInterval : checkInterval ,
cfgCh : make ( chan * Config , 1 ) ,
stopCh : scs . stopCh ,
2021-02-02 15:13:59 +01:00
discoveryDuration : metrics . GetOrCreateHistogram ( fmt . Sprintf ( "vm_promscrape_service_discovery_duration_seconds{type=%q}" , name ) ) ,
2020-04-13 20:02:27 +02:00
}
2020-05-03 11:41:13 +02:00
scs . wg . Add ( 1 )
go func ( ) {
defer scs . wg . Done ( )
scfg . run ( )
} ( )
scs . scfgs = append ( scs . scfgs , scfg )
2020-04-13 20:02:27 +02:00
}
2020-05-03 11:41:13 +02:00
func ( scs * scrapeConfigs ) updateConfig ( cfg * Config ) {
for _ , scfg := range scs . scfgs {
scfg . cfgCh <- cfg
2020-04-27 18:25:45 +02:00
}
2020-05-03 11:41:13 +02:00
}
2020-04-27 18:25:45 +02:00
2020-05-03 11:41:13 +02:00
func ( scs * scrapeConfigs ) stop ( ) {
close ( scs . stopCh )
scs . wg . Wait ( )
scs . scfgs = nil
2020-04-27 18:25:45 +02:00
}
2020-05-03 11:41:13 +02:00
type scrapeConfig struct {
name string
2021-08-05 08:46:19 +02:00
pushData func ( wr * prompbmarshal . WriteRequest )
2020-12-08 16:50:03 +01:00
getScrapeWork func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork
2020-05-03 11:41:13 +02:00
checkInterval time . Duration
cfgCh chan * Config
stopCh <- chan struct { }
2021-02-02 15:13:59 +01:00
discoveryDuration * metrics . Histogram
2020-05-03 11:41:13 +02:00
}
2020-04-27 18:25:45 +02:00
2020-05-03 11:41:13 +02:00
func ( scfg * scrapeConfig ) run ( ) {
sg := newScraperGroup ( scfg . name , scfg . pushData )
defer sg . stop ( )
2020-04-24 16:50:21 +02:00
2020-05-03 11:41:13 +02:00
var tickerCh <- chan time . Time
if scfg . checkInterval > 0 {
ticker := time . NewTicker ( scfg . checkInterval )
defer ticker . Stop ( )
tickerCh = ticker . C
2020-04-24 16:50:21 +02:00
}
2020-05-03 11:41:13 +02:00
cfg := <- scfg . cfgCh
2020-12-08 16:50:03 +01:00
var swsPrev [ ] * ScrapeWork
2020-11-04 19:29:18 +01:00
updateScrapeWork := func ( cfg * Config ) {
2021-04-05 12:53:26 +02:00
startTime := time . Now ( )
sws := scfg . getScrapeWork ( cfg , swsPrev )
sg . update ( sws )
swsPrev = sws
scfg . discoveryDuration . UpdateDuration ( startTime )
2020-11-04 19:29:18 +01:00
}
updateScrapeWork ( cfg )
atomic . AddInt32 ( & PendingScrapeConfigs , - 1 )
for {
2020-04-24 16:50:21 +02:00
2020-02-23 12:35:47 +01:00
select {
2020-05-03 11:41:13 +02:00
case <- scfg . stopCh :
return
case cfg = <- scfg . cfgCh :
case <- tickerCh :
2020-02-23 12:35:47 +01:00
}
2020-11-04 19:29:18 +01:00
updateScrapeWork ( cfg )
2020-02-23 12:35:47 +01:00
}
}
2020-05-03 11:41:13 +02:00
type scraperGroup struct {
2020-12-08 00:54:13 +01:00
name string
wg sync . WaitGroup
mLock sync . Mutex
m map [ string ] * scraper
2021-08-05 08:46:19 +02:00
pushData func ( wr * prompbmarshal . WriteRequest )
2020-12-08 00:54:13 +01:00
2020-12-08 10:57:52 +01:00
changesCount * metrics . Counter
activeScrapers * metrics . Counter
scrapersStarted * metrics . Counter
scrapersStopped * metrics . Counter
2020-05-03 11:41:13 +02:00
}
2020-02-23 12:35:47 +01:00
2021-08-05 08:46:19 +02:00
func newScraperGroup ( name string , pushData func ( wr * prompbmarshal . WriteRequest ) ) * scraperGroup {
2020-05-03 11:41:13 +02:00
sg := & scraperGroup {
2020-12-08 00:54:13 +01:00
name : name ,
m : make ( map [ string ] * scraper ) ,
pushData : pushData ,
2020-12-08 10:57:52 +01:00
changesCount : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_config_changes_total { type=%q} ` , name ) ) ,
activeScrapers : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_active_scrapers { type=%q} ` , name ) ) ,
scrapersStarted : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_scrapers_started_total { type=%q} ` , name ) ) ,
scrapersStopped : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_scrapers_stopped_total { type=%q} ` , name ) ) ,
2020-02-23 12:35:47 +01:00
}
2020-07-13 20:52:03 +02:00
metrics . NewGauge ( fmt . Sprintf ( ` vm_promscrape_targets { type=%q, status="up"} ` , name ) , func ( ) float64 {
return float64 ( tsmGlobal . StatusByGroup ( sg . name , true ) )
} )
metrics . NewGauge ( fmt . Sprintf ( ` vm_promscrape_targets { type=%q, status="down"} ` , name ) , func ( ) float64 {
return float64 ( tsmGlobal . StatusByGroup ( sg . name , false ) )
2020-05-03 11:41:13 +02:00
} )
return sg
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
func ( sg * scraperGroup ) stop ( ) {
sg . mLock . Lock ( )
for _ , sc := range sg . m {
close ( sc . stopCh )
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
sg . m = nil
sg . mLock . Unlock ( )
sg . wg . Wait ( )
2020-02-23 12:35:47 +01:00
}
2021-04-05 12:53:26 +02:00
func ( sg * scraperGroup ) update ( sws [ ] * ScrapeWork ) {
2020-05-03 11:41:13 +02:00
sg . mLock . Lock ( )
defer sg . mLock . Unlock ( )
additionsCount := 0
deletionsCount := 0
2020-10-08 17:50:22 +02:00
swsMap := make ( map [ string ] [ ] prompbmarshal . Label , len ( sws ) )
2020-12-08 16:50:03 +01:00
for _ , sw := range sws {
2020-05-03 11:41:13 +02:00
key := sw . key ( )
2020-10-08 17:50:22 +02:00
originalLabels := swsMap [ key ]
if originalLabels != nil {
if ! * suppressDuplicateScrapeTargetErrors {
logger . Errorf ( "skipping duplicate scrape target with identical labels; endpoint=%s, labels=%s; " +
"make sure service discovery and relabeling is set up properly; " +
2021-04-20 19:16:17 +02:00
"see also https://docs.victoriametrics.com/vmagent.html#troubleshooting; " +
2020-10-08 17:50:22 +02:00
"original labels for target1: %s; original labels for target2: %s" ,
sw . ScrapeURL , sw . LabelsString ( ) , promLabelsString ( originalLabels ) , promLabelsString ( sw . OriginalLabels ) )
}
2020-10-20 20:44:59 +02:00
droppedTargetsMap . Register ( sw . OriginalLabels )
2020-05-03 11:41:13 +02:00
continue
2020-02-23 12:35:47 +01:00
}
2020-10-08 17:50:22 +02:00
swsMap [ key ] = sw . OriginalLabels
2020-05-03 11:41:13 +02:00
if sg . m [ key ] != nil {
// The scraper for the given key already exists.
continue
}
// Start a scraper for the missing key.
2020-07-13 20:52:03 +02:00
sc := newScraper ( sw , sg . name , sg . pushData )
2020-12-08 00:54:13 +01:00
sg . activeScrapers . Inc ( )
2020-12-08 10:57:52 +01:00
sg . scrapersStarted . Inc ( )
2020-05-03 11:41:13 +02:00
sg . wg . Add ( 1 )
2020-12-15 19:56:16 +01:00
tsmGlobal . Register ( sw )
go func ( sw * ScrapeWork ) {
2020-05-03 11:41:13 +02:00
defer sg . wg . Done ( )
sc . sw . run ( sc . stopCh )
tsmGlobal . Unregister ( sw )
2020-12-08 00:54:13 +01:00
sg . activeScrapers . Dec ( )
2020-12-08 10:57:52 +01:00
sg . scrapersStopped . Inc ( )
2020-12-15 19:56:16 +01:00
} ( sw )
2020-05-03 11:41:13 +02:00
sg . m [ key ] = sc
additionsCount ++
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
// Stop deleted scrapers, which are missing in sws.
for key , sc := range sg . m {
2020-11-04 10:08:30 +01:00
if _ , ok := swsMap [ key ] ; ! ok {
2020-05-03 11:41:13 +02:00
close ( sc . stopCh )
delete ( sg . m , key )
deletionsCount ++
}
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
if additionsCount > 0 || deletionsCount > 0 {
sg . changesCount . Add ( additionsCount + deletionsCount )
logger . Infof ( "%s: added targets: %d, removed targets: %d; total targets: %d" , sg . name , additionsCount , deletionsCount , len ( sg . m ) )
2020-02-23 12:35:47 +01:00
}
}
2020-05-03 11:41:13 +02:00
type scraper struct {
sw scrapeWork
stopCh chan struct { }
}
2021-08-05 08:46:19 +02:00
func newScraper ( sw * ScrapeWork , group string , pushData func ( wr * prompbmarshal . WriteRequest ) ) * scraper {
2020-05-03 11:41:13 +02:00
sc := & scraper {
stopCh : make ( chan struct { } ) ,
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
c := newClient ( sw )
2020-12-17 13:30:33 +01:00
sc . sw . Config = sw
2020-07-13 20:52:03 +02:00
sc . sw . ScrapeGroup = group
2020-05-03 11:41:13 +02:00
sc . sw . ReadData = c . ReadData
2020-11-01 22:12:13 +01:00
sc . sw . GetStreamReader = c . GetStreamReader
2020-05-03 11:41:13 +02:00
sc . sw . PushData = pushData
return sc
2020-02-23 12:35:47 +01:00
}