2020-02-23 12:35:47 +01:00
package promscrape
import (
2020-04-23 22:40:50 +02:00
"bytes"
2020-02-23 12:35:47 +01:00
"flag"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
var (
2020-04-23 22:40:50 +02:00
configCheckInterval = flag . Duration ( "promscrape.configCheckInterval" , 0 , "Interval for checking for changes in '-promscrape.config' file. " +
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes" )
2020-04-13 20:02:27 +02:00
fileSDCheckInterval = flag . Duration ( "promscrape.fileSDCheckInterval" , 30 * time . Second , "Interval for checking for changes in 'file_sd_config'. " +
2020-02-23 12:35:47 +01:00
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config" )
2020-04-13 20:02:27 +02:00
kubernetesSDCheckInterval = flag . Duration ( "promscrape.kubernetesSDCheckInterval" , 30 * time . Second , "Interval for checking for changes in Kubernetes API server. " +
"This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. " +
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details" )
2020-02-23 12:35:47 +01:00
promscrapeConfigFile = flag . String ( "promscrape.config" , "" , "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. " +
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config for details" )
)
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
func Init ( pushData func ( wr * prompbmarshal . WriteRequest ) ) {
2020-03-03 19:08:08 +01:00
globalStopCh = make ( chan struct { } )
2020-02-23 12:35:47 +01:00
scraperWG . Add ( 1 )
go func ( ) {
defer scraperWG . Done ( )
2020-03-03 19:08:08 +01:00
runScraper ( * promscrapeConfigFile , pushData , globalStopCh )
2020-02-23 12:35:47 +01:00
} ( )
}
// Stop stops Prometheus scraper.
func Stop ( ) {
2020-03-03 19:08:08 +01:00
close ( globalStopCh )
2020-02-23 12:35:47 +01:00
scraperWG . Wait ( )
}
var (
2020-03-03 19:08:08 +01:00
globalStopCh chan struct { }
scraperWG sync . WaitGroup
2020-02-23 12:35:47 +01:00
)
func runScraper ( configFile string , pushData func ( wr * prompbmarshal . WriteRequest ) , globalStopCh <- chan struct { } ) {
if configFile == "" {
// Nothing to scrape.
return
}
sighupCh := make ( chan os . Signal , 1 )
signal . Notify ( sighupCh , syscall . SIGHUP )
logger . Infof ( "reading Prometheus configs from %q" , configFile )
2020-04-23 22:40:50 +02:00
cfg , data , err := loadConfig ( configFile )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Fatalf ( "cannot read %q: %s" , configFile , err )
}
2020-04-23 22:40:50 +02:00
var tickerCh <- chan time . Time
if * configCheckInterval > 0 {
ticker := time . NewTicker ( * configCheckInterval )
tickerCh = ticker . C
defer ticker . Stop ( )
}
2020-02-23 12:35:47 +01:00
mustStop := false
for ! mustStop {
stopCh := make ( chan struct { } )
var wg sync . WaitGroup
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
2020-04-15 10:36:16 +02:00
runStaticScrapers ( cfg , pushData , stopCh )
2020-02-23 12:35:47 +01:00
} ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
2020-04-15 10:36:16 +02:00
runFileSDScrapers ( cfg , pushData , stopCh )
2020-02-23 12:35:47 +01:00
} ( )
2020-04-13 20:02:27 +02:00
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
2020-04-15 10:36:16 +02:00
runKubernetesSDScrapers ( cfg , pushData , stopCh )
2020-04-13 20:02:27 +02:00
} ( )
2020-02-23 12:35:47 +01:00
waitForChans :
select {
case <- sighupCh :
logger . Infof ( "SIGHUP received; reloading Prometheus configs from %q" , configFile )
2020-04-23 22:40:50 +02:00
cfgNew , dataNew , err := loadConfig ( configFile )
if err != nil {
logger . Errorf ( "cannot read %q on SIGHUP: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
if bytes . Equal ( data , dataNew ) {
logger . Infof ( "nothing changed in %q" , configFile )
goto waitForChans
}
cfg = cfgNew
data = dataNew
case <- tickerCh :
cfgNew , dataNew , err := loadConfig ( configFile )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Errorf ( "cannot read %q: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
2020-04-23 22:40:50 +02:00
if bytes . Equal ( data , dataNew ) {
// Nothing changed since the previous loadConfig
goto waitForChans
}
2020-02-23 12:35:47 +01:00
cfg = cfgNew
2020-04-23 22:40:50 +02:00
data = dataNew
2020-02-23 12:35:47 +01:00
case <- globalStopCh :
mustStop = true
}
2020-04-23 22:40:50 +02:00
if ! mustStop {
logger . Infof ( "found changes in %q; applying these changes" , configFile )
}
2020-02-23 12:35:47 +01:00
logger . Infof ( "stopping Prometheus scrapers" )
startTime := time . Now ( )
close ( stopCh )
wg . Wait ( )
logger . Infof ( "stopped Prometheus scrapers in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
configReloads . Inc ( )
}
}
var configReloads = metrics . NewCounter ( ` vm_promscrape_config_reloads_total ` )
2020-04-15 10:36:16 +02:00
func runStaticScrapers ( cfg * Config , pushData func ( wr * prompbmarshal . WriteRequest ) , stopCh <- chan struct { } ) {
sws := cfg . getStaticScrapeWork ( )
2020-02-23 12:35:47 +01:00
if len ( sws ) == 0 {
return
}
logger . Infof ( "starting %d scrapers for `static_config` targets" , len ( sws ) )
staticTargets . Set ( uint64 ( len ( sws ) ) )
runScrapeWorkers ( sws , pushData , stopCh )
staticTargets . Set ( 0 )
logger . Infof ( "stopped all the %d scrapers for `static_config` targets" , len ( sws ) )
}
var staticTargets = metrics . NewCounter ( ` vm_promscrape_targets { type="static"} ` )
2020-04-15 10:36:16 +02:00
func runKubernetesSDScrapers ( cfg * Config , pushData func ( wr * prompbmarshal . WriteRequest ) , stopCh <- chan struct { } ) {
2020-04-13 20:02:27 +02:00
if cfg . kubernetesSDConfigsCount ( ) == 0 {
return
}
2020-04-15 10:36:16 +02:00
sws := cfg . getKubernetesSDScrapeWork ( )
2020-04-13 20:02:27 +02:00
ticker := time . NewTicker ( * kubernetesSDCheckInterval )
defer ticker . Stop ( )
mustStop := false
for ! mustStop {
localStopCh := make ( chan struct { } )
var wg sync . WaitGroup
wg . Add ( 1 )
go func ( sws [ ] ScrapeWork ) {
defer wg . Done ( )
logger . Infof ( "starting %d scrapers for `kubernetes_sd_config` targets" , len ( sws ) )
kubernetesSDTargets . Set ( uint64 ( len ( sws ) ) )
runScrapeWorkers ( sws , pushData , localStopCh )
kubernetesSDTargets . Set ( 0 )
logger . Infof ( "stopped all the %d scrapers for `kubernetes_sd_config` targets" , len ( sws ) )
} ( sws )
waitForChans :
select {
case <- ticker . C :
swsNew := cfg . getKubernetesSDScrapeWork ( )
if equalStaticConfigForScrapeWorks ( swsNew , sws ) {
// Nothing changed, continue waiting for updated scrape work
goto waitForChans
}
logger . Infof ( "restarting scrapers for changed `kubernetes_sd_config` targets" )
sws = swsNew
case <- stopCh :
mustStop = true
}
close ( localStopCh )
wg . Wait ( )
kubernetesSDReloads . Inc ( )
}
}
var (
kubernetesSDTargets = metrics . NewCounter ( ` vm_promscrape_targets { type="kubernetes_sd"} ` )
kubernetesSDReloads = metrics . NewCounter ( ` vm_promscrape_reloads_total { type="kubernetes_sd"} ` )
)
2020-04-15 10:36:16 +02:00
func runFileSDScrapers ( cfg * Config , pushData func ( wr * prompbmarshal . WriteRequest ) , stopCh <- chan struct { } ) {
2020-02-23 12:35:47 +01:00
if cfg . fileSDConfigsCount ( ) == 0 {
return
}
2020-04-15 10:36:16 +02:00
sws := cfg . getFileSDScrapeWork ( nil )
2020-02-23 12:35:47 +01:00
ticker := time . NewTicker ( * fileSDCheckInterval )
defer ticker . Stop ( )
mustStop := false
for ! mustStop {
localStopCh := make ( chan struct { } )
var wg sync . WaitGroup
wg . Add ( 1 )
go func ( sws [ ] ScrapeWork ) {
defer wg . Done ( )
logger . Infof ( "starting %d scrapers for `file_sd_config` targets" , len ( sws ) )
fileSDTargets . Set ( uint64 ( len ( sws ) ) )
runScrapeWorkers ( sws , pushData , localStopCh )
fileSDTargets . Set ( 0 )
logger . Infof ( "stopped all the %d scrapers for `file_sd_config` targets" , len ( sws ) )
} ( sws )
waitForChans :
select {
case <- ticker . C :
2020-04-13 11:59:05 +02:00
swsNew := cfg . getFileSDScrapeWork ( sws )
2020-02-23 12:35:47 +01:00
if equalStaticConfigForScrapeWorks ( swsNew , sws ) {
// Nothing changed, continue waiting for updated scrape work
goto waitForChans
}
logger . Infof ( "restarting scrapers for changed `file_sd_config` targets" )
sws = swsNew
case <- stopCh :
mustStop = true
}
close ( localStopCh )
wg . Wait ( )
fileSDReloads . Inc ( )
}
}
var (
fileSDTargets = metrics . NewCounter ( ` vm_promscrape_targets { type="file_sd"} ` )
2020-04-13 11:59:05 +02:00
fileSDReloads = metrics . NewCounter ( ` vm_promscrape_reloads_total { type="file_sd"} ` )
2020-02-23 12:35:47 +01:00
)
func equalStaticConfigForScrapeWorks ( as , bs [ ] ScrapeWork ) bool {
if len ( as ) != len ( bs ) {
return false
}
for i := range as {
if ! equalStaticConfigForScrapeWork ( & as [ i ] , & bs [ i ] ) {
return false
}
}
return true
}
func equalStaticConfigForScrapeWork ( a , b * ScrapeWork ) bool {
// `static_config` can change only ScrapeURL and Labels. So compare only them.
if a . ScrapeURL != b . ScrapeURL {
return false
}
if ! equalLabels ( a . Labels , b . Labels ) {
return false
}
return true
}
func equalLabels ( as , bs [ ] prompbmarshal . Label ) bool {
if len ( as ) != len ( bs ) {
return false
}
for i := range as {
if ! equalLabel ( & as [ i ] , & bs [ i ] ) {
return false
}
}
return true
}
func equalLabel ( a , b * prompbmarshal . Label ) bool {
if a . Name != b . Name {
return false
}
if a . Value != b . Value {
return false
}
return true
}
// runScrapeWorkers runs sws.
//
// This function returns after closing stopCh.
func runScrapeWorkers ( sws [ ] ScrapeWork , pushData func ( wr * prompbmarshal . WriteRequest ) , stopCh <- chan struct { } ) {
2020-03-11 15:30:14 +01:00
tsmGlobal . RegisterAll ( sws )
2020-02-23 12:35:47 +01:00
var wg sync . WaitGroup
for i := range sws {
cfg := & sws [ i ]
c := newClient ( cfg )
var sw scrapeWork
sw . Config = * cfg
sw . ReadData = c . ReadData
sw . PushData = pushData
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
sw . run ( stopCh )
} ( )
}
wg . Wait ( )
2020-03-11 15:30:14 +01:00
tsmGlobal . UnregisterAll ( sws )
2020-02-23 12:35:47 +01:00
}