2020-02-23 12:35:47 +01:00
package promscrape
import (
2020-04-23 22:40:50 +02:00
"bytes"
2020-02-23 12:35:47 +01:00
"flag"
2020-05-03 11:41:13 +02:00
"fmt"
2020-02-23 12:35:47 +01:00
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
var (
2020-04-23 22:40:50 +02:00
configCheckInterval = flag . Duration ( "promscrape.configCheckInterval" , 0 , "Interval for checking for changes in '-promscrape.config' file. " +
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes" )
2020-04-13 20:02:27 +02:00
fileSDCheckInterval = flag . Duration ( "promscrape.fileSDCheckInterval" , 30 * time . Second , "Interval for checking for changes in 'file_sd_config'. " +
2020-04-24 16:50:21 +02:00
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config for details" )
2020-04-13 20:02:27 +02:00
kubernetesSDCheckInterval = flag . Duration ( "promscrape.kubernetesSDCheckInterval" , 30 * time . Second , "Interval for checking for changes in Kubernetes API server. " +
"This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. " +
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details" )
2020-04-27 18:25:45 +02:00
ec2SDCheckInterval = flag . Duration ( "promscrape.ec2SDCheckInterval" , time . Minute , "Interval for checking for changes in ec2. " +
"This works only if `ec2_sd_configs` is configured in '-promscrape.config' file. " +
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config for details" )
2020-04-24 16:50:21 +02:00
gceSDCheckInterval = flag . Duration ( "promscrape.gceSDCheckInterval" , time . Minute , "Interval for checking for changes in gce. " +
"This works only if `gce_sd_configs` is configured in '-promscrape.config' file. " +
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config for details" )
2020-02-23 12:35:47 +01:00
promscrapeConfigFile = flag . String ( "promscrape.config" , "" , "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. " +
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config for details" )
)
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
func Init ( pushData func ( wr * prompbmarshal . WriteRequest ) ) {
2020-03-03 19:08:08 +01:00
globalStopCh = make ( chan struct { } )
2020-02-23 12:35:47 +01:00
scraperWG . Add ( 1 )
go func ( ) {
defer scraperWG . Done ( )
2020-03-03 19:08:08 +01:00
runScraper ( * promscrapeConfigFile , pushData , globalStopCh )
2020-02-23 12:35:47 +01:00
} ( )
}
// Stop stops Prometheus scraper.
func Stop ( ) {
2020-03-03 19:08:08 +01:00
close ( globalStopCh )
2020-02-23 12:35:47 +01:00
scraperWG . Wait ( )
}
var (
2020-03-03 19:08:08 +01:00
globalStopCh chan struct { }
scraperWG sync . WaitGroup
2020-02-23 12:35:47 +01:00
)
func runScraper ( configFile string , pushData func ( wr * prompbmarshal . WriteRequest ) , globalStopCh <- chan struct { } ) {
if configFile == "" {
// Nothing to scrape.
return
}
logger . Infof ( "reading Prometheus configs from %q" , configFile )
2020-04-23 22:40:50 +02:00
cfg , data , err := loadConfig ( configFile )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Fatalf ( "cannot read %q: %s" , configFile , err )
}
2020-05-03 11:41:13 +02:00
scs := newScrapeConfigs ( pushData )
scs . add ( "static_configs" , 0 , func ( cfg * Config , swsPrev [ ] ScrapeWork ) [ ] ScrapeWork { return cfg . getStaticScrapeWork ( ) } )
scs . add ( "file_sd_configs" , * fileSDCheckInterval , func ( cfg * Config , swsPrev [ ] ScrapeWork ) [ ] ScrapeWork { return cfg . getFileSDScrapeWork ( swsPrev ) } )
scs . add ( "kubernetes_sd_configs" , * kubernetesSDCheckInterval , func ( cfg * Config , swsPrev [ ] ScrapeWork ) [ ] ScrapeWork { return cfg . getKubernetesSDScrapeWork ( ) } )
scs . add ( "ec2_sd_configs" , * ec2SDCheckInterval , func ( cfg * Config , swsPrev [ ] ScrapeWork ) [ ] ScrapeWork { return cfg . getEC2SDScrapeWork ( ) } )
scs . add ( "gce_sd_configs" , * gceSDCheckInterval , func ( cfg * Config , swsPrev [ ] ScrapeWork ) [ ] ScrapeWork { return cfg . getGCESDScrapeWork ( ) } )
sighupCh := make ( chan os . Signal , 1 )
signal . Notify ( sighupCh , syscall . SIGHUP )
2020-04-23 22:40:50 +02:00
var tickerCh <- chan time . Time
if * configCheckInterval > 0 {
ticker := time . NewTicker ( * configCheckInterval )
tickerCh = ticker . C
defer ticker . Stop ( )
}
2020-05-03 11:41:13 +02:00
for {
scs . updateConfig ( cfg )
2020-02-23 12:35:47 +01:00
waitForChans :
select {
case <- sighupCh :
logger . Infof ( "SIGHUP received; reloading Prometheus configs from %q" , configFile )
2020-04-23 22:40:50 +02:00
cfgNew , dataNew , err := loadConfig ( configFile )
if err != nil {
logger . Errorf ( "cannot read %q on SIGHUP: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
if bytes . Equal ( data , dataNew ) {
logger . Infof ( "nothing changed in %q" , configFile )
goto waitForChans
}
cfg = cfgNew
data = dataNew
case <- tickerCh :
cfgNew , dataNew , err := loadConfig ( configFile )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Errorf ( "cannot read %q: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
2020-04-23 22:40:50 +02:00
if bytes . Equal ( data , dataNew ) {
// Nothing changed since the previous loadConfig
goto waitForChans
}
2020-02-23 12:35:47 +01:00
cfg = cfgNew
2020-04-23 22:40:50 +02:00
data = dataNew
2020-02-23 12:35:47 +01:00
case <- globalStopCh :
2020-05-03 11:41:13 +02:00
logger . Infof ( "stopping Prometheus scrapers" )
startTime := time . Now ( )
scs . stop ( )
logger . Infof ( "stopped Prometheus scrapers in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
return
2020-04-23 22:40:50 +02:00
}
2020-05-03 11:41:13 +02:00
logger . Infof ( "found changes in %q; applying these changes" , configFile )
2020-02-23 12:35:47 +01:00
configReloads . Inc ( )
}
}
var configReloads = metrics . NewCounter ( ` vm_promscrape_config_reloads_total ` )
2020-05-03 11:41:13 +02:00
type scrapeConfigs struct {
pushData func ( wr * prompbmarshal . WriteRequest )
wg sync . WaitGroup
stopCh chan struct { }
scfgs [ ] * scrapeConfig
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
func newScrapeConfigs ( pushData func ( wr * prompbmarshal . WriteRequest ) ) * scrapeConfigs {
return & scrapeConfigs {
pushData : pushData ,
stopCh : make ( chan struct { } ) ,
2020-04-13 20:02:27 +02:00
}
2020-05-03 11:41:13 +02:00
}
2020-04-13 20:02:27 +02:00
2020-05-03 11:41:13 +02:00
func ( scs * scrapeConfigs ) add ( name string , checkInterval time . Duration , getScrapeWork func ( cfg * Config , swsPrev [ ] ScrapeWork ) [ ] ScrapeWork ) {
scfg := & scrapeConfig {
name : name ,
pushData : scs . pushData ,
getScrapeWork : getScrapeWork ,
checkInterval : checkInterval ,
cfgCh : make ( chan * Config , 1 ) ,
stopCh : scs . stopCh ,
2020-04-13 20:02:27 +02:00
}
2020-05-03 11:41:13 +02:00
scs . wg . Add ( 1 )
go func ( ) {
defer scs . wg . Done ( )
scfg . run ( )
} ( )
scs . scfgs = append ( scs . scfgs , scfg )
2020-04-13 20:02:27 +02:00
}
2020-05-03 11:41:13 +02:00
func ( scs * scrapeConfigs ) updateConfig ( cfg * Config ) {
for _ , scfg := range scs . scfgs {
scfg . cfgCh <- cfg
2020-04-27 18:25:45 +02:00
}
2020-05-03 11:41:13 +02:00
}
2020-04-27 18:25:45 +02:00
2020-05-03 11:41:13 +02:00
func ( scs * scrapeConfigs ) stop ( ) {
close ( scs . stopCh )
scs . wg . Wait ( )
scs . scfgs = nil
2020-04-27 18:25:45 +02:00
}
2020-05-03 11:41:13 +02:00
type scrapeConfig struct {
name string
pushData func ( wr * prompbmarshal . WriteRequest )
getScrapeWork func ( cfg * Config , swsPrev [ ] ScrapeWork ) [ ] ScrapeWork
checkInterval time . Duration
cfgCh chan * Config
stopCh <- chan struct { }
}
2020-04-27 18:25:45 +02:00
2020-05-03 11:41:13 +02:00
func ( scfg * scrapeConfig ) run ( ) {
sg := newScraperGroup ( scfg . name , scfg . pushData )
defer sg . stop ( )
2020-04-24 16:50:21 +02:00
2020-05-03 11:41:13 +02:00
var tickerCh <- chan time . Time
if scfg . checkInterval > 0 {
ticker := time . NewTicker ( scfg . checkInterval )
defer ticker . Stop ( )
tickerCh = ticker . C
2020-04-24 16:50:21 +02:00
}
2020-05-03 11:41:13 +02:00
cfg := <- scfg . cfgCh
var swsPrev [ ] ScrapeWork
for {
sws := scfg . getScrapeWork ( cfg , swsPrev )
sg . update ( sws )
swsPrev = sws
2020-04-24 16:50:21 +02:00
2020-02-23 12:35:47 +01:00
select {
2020-05-03 11:41:13 +02:00
case <- scfg . stopCh :
return
case cfg = <- scfg . cfgCh :
case <- tickerCh :
2020-02-23 12:35:47 +01:00
}
}
}
2020-05-03 11:41:13 +02:00
type scraperGroup struct {
name string
wg sync . WaitGroup
mLock sync . Mutex
m map [ string ] * scraper
pushData func ( wr * prompbmarshal . WriteRequest )
changesCount * metrics . Counter
}
2020-02-23 12:35:47 +01:00
2020-05-03 11:41:13 +02:00
func newScraperGroup ( name string , pushData func ( wr * prompbmarshal . WriteRequest ) ) * scraperGroup {
sg := & scraperGroup {
name : name ,
m : make ( map [ string ] * scraper ) ,
pushData : pushData ,
changesCount : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_config_changes_total { type=%q} ` , name ) ) ,
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
metrics . NewGauge ( fmt . Sprintf ( ` vm_promscrape_targets { type=%q} ` , name ) , func ( ) float64 {
sg . mLock . Lock ( )
n := len ( sg . m )
sg . mLock . Unlock ( )
return float64 ( n )
} )
return sg
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
func ( sg * scraperGroup ) stop ( ) {
sg . mLock . Lock ( )
for _ , sc := range sg . m {
close ( sc . stopCh )
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
sg . m = nil
sg . mLock . Unlock ( )
sg . wg . Wait ( )
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
func ( sg * scraperGroup ) update ( sws [ ] ScrapeWork ) {
sg . mLock . Lock ( )
defer sg . mLock . Unlock ( )
additionsCount := 0
deletionsCount := 0
swsMap := make ( map [ string ] bool , len ( sws ) )
for i := range sws {
sw := & sws [ i ]
key := sw . key ( )
if swsMap [ key ] {
logger . Errorf ( "skipping duplicate scrape target with identical labels; endpoint=%s, labels=%s; make sure service discovery and relabeling is set up properly" ,
sw . ScrapeURL , sw . LabelsString ( ) )
continue
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
swsMap [ key ] = true
if sg . m [ key ] != nil {
// The scraper for the given key already exists.
continue
}
// Start a scraper for the missing key.
sc := newScraper ( sw , sg . pushData )
sg . wg . Add ( 1 )
go func ( ) {
defer sg . wg . Done ( )
sc . sw . run ( sc . stopCh )
tsmGlobal . Unregister ( sw )
} ( )
tsmGlobal . Register ( sw )
sg . m [ key ] = sc
additionsCount ++
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
// Stop deleted scrapers, which are missing in sws.
for key , sc := range sg . m {
if ! swsMap [ key ] {
close ( sc . stopCh )
delete ( sg . m , key )
deletionsCount ++
}
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
if additionsCount > 0 || deletionsCount > 0 {
sg . changesCount . Add ( additionsCount + deletionsCount )
logger . Infof ( "%s: added targets: %d, removed targets: %d; total targets: %d" , sg . name , additionsCount , deletionsCount , len ( sg . m ) )
2020-02-23 12:35:47 +01:00
}
}
2020-05-03 11:41:13 +02:00
type scraper struct {
sw scrapeWork
stopCh chan struct { }
}
func newScraper ( sw * ScrapeWork , pushData func ( wr * prompbmarshal . WriteRequest ) ) * scraper {
sc := & scraper {
stopCh : make ( chan struct { } ) ,
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
c := newClient ( sw )
sc . sw . Config = * sw
sc . sw . ReadData = c . ReadData
sc . sw . PushData = pushData
return sc
2020-02-23 12:35:47 +01:00
}