2020-02-23 12:35:47 +01:00
package promscrape
import (
2020-04-23 22:40:50 +02:00
"bytes"
2020-02-23 12:35:47 +01:00
"flag"
2020-05-03 11:41:13 +02:00
"fmt"
2022-08-08 13:10:18 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2021-10-12 15:23:42 +02:00
"io"
2020-02-23 12:35:47 +01:00
"sync"
2020-11-04 19:29:18 +01:00
"sync/atomic"
2020-02-23 12:35:47 +01:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-05-05 08:27:38 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2022-07-13 22:43:18 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
2020-12-03 18:47:40 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
2021-06-25 11:10:20 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
2021-06-25 12:20:18 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dockerswarm"
2021-06-25 11:10:20 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/eureka"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
2021-06-22 12:33:37 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
2021-06-25 11:10:20 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
2022-08-04 19:44:16 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/metrics"
)
var (
2020-04-23 22:40:50 +02:00
configCheckInterval = flag . Duration ( "promscrape.configCheckInterval" , 0 , "Interval for checking for changes in '-promscrape.config' file. " +
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes" )
2021-03-15 20:59:25 +01:00
suppressDuplicateScrapeTargetErrors = flag . Bool ( "promscrape.suppressDuplicateScrapeTargetErrors" , false , "Whether to suppress 'duplicate scrape target' errors; " +
2021-04-20 19:16:17 +02:00
"see https://docs.victoriametrics.com/vmagent.html#troubleshooting for details" )
2021-06-25 11:10:20 +02:00
promscrapeConfigFile = flag . String ( "promscrape.config" , "" , "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. " +
2021-12-02 23:08:42 +01:00
"The path can point to local file and to http url. " +
2021-06-25 11:10:20 +02:00
"See https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter for details" )
2022-02-15 11:04:26 +01:00
fileSDCheckInterval = flag . Duration ( "promscrape.fileSDCheckInterval" , 5 * time . Minute , "Interval for checking for changes in 'file_sd_config'. " +
2021-06-25 11:10:20 +02:00
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config for details" )
2020-02-23 12:35:47 +01:00
)
2020-05-21 14:22:01 +02:00
// CheckConfig checks -promscrape.config for errors and unsupported options.
func CheckConfig ( ) error {
if * promscrapeConfigFile == "" {
return fmt . Errorf ( "missing -promscrape.config option" )
}
2021-11-05 13:41:14 +01:00
_ , _ , err := loadConfig ( * promscrapeConfigFile )
2020-05-21 14:22:01 +02:00
return err
}
2020-02-23 12:35:47 +01:00
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
2022-08-08 13:10:18 +02:00
func Init ( pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) ) {
2022-04-12 11:36:17 +02:00
mustInitClusterMemberID ( )
2022-01-07 00:17:55 +01:00
globalStopChan = make ( chan struct { } )
2020-02-23 12:35:47 +01:00
scraperWG . Add ( 1 )
go func ( ) {
defer scraperWG . Done ( )
2022-01-07 00:17:55 +01:00
runScraper ( * promscrapeConfigFile , pushData , globalStopChan )
2020-02-23 12:35:47 +01:00
} ( )
}
// Stop stops Prometheus scraper.
func Stop ( ) {
2022-01-07 00:17:55 +01:00
close ( globalStopChan )
2020-02-23 12:35:47 +01:00
scraperWG . Wait ( )
}
var (
2022-01-07 00:17:55 +01:00
globalStopChan chan struct { }
scraperWG sync . WaitGroup
2020-11-04 19:29:18 +01:00
// PendingScrapeConfigs - zero value means, that
// all scrapeConfigs are inited and ready for work.
PendingScrapeConfigs int32
2021-10-12 15:23:42 +02:00
// configData contains -promscrape.config data
configData atomic . Value
2020-02-23 12:35:47 +01:00
)
2021-10-12 15:23:42 +02:00
// WriteConfigData writes -promscrape.config contents to w
func WriteConfigData ( w io . Writer ) {
v := configData . Load ( )
if v == nil {
// Nothing to write to w
return
}
b := v . ( * [ ] byte )
2021-10-13 13:57:30 +02:00
_ , _ = w . Write ( * b )
2021-10-12 15:23:42 +02:00
}
2022-08-08 13:10:18 +02:00
func runScraper ( configFile string , pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) , globalStopCh <- chan struct { } ) {
2020-02-23 12:35:47 +01:00
if configFile == "" {
// Nothing to scrape.
return
}
2021-05-21 15:34:03 +02:00
// Register SIGHUP handler for config reload before loadConfig.
// This guarantees that the config will be re-read if the signal arrives just after loadConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil . NewSighupChan ( )
2020-02-23 12:35:47 +01:00
logger . Infof ( "reading Prometheus configs from %q" , configFile )
2021-11-05 13:41:14 +01:00
cfg , data , err := loadConfig ( configFile )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Fatalf ( "cannot read %q: %s" , configFile , err )
}
2021-11-05 13:41:14 +01:00
marshaledData := cfg . marshal ( )
configData . Store ( & marshaledData )
2021-04-05 21:02:09 +02:00
cfg . mustStart ( )
2020-02-23 12:35:47 +01:00
2022-01-07 00:17:55 +01:00
scs := newScrapeConfigs ( pushData , globalStopCh )
2022-07-13 22:43:18 +02:00
scs . add ( "azure_sd_configs" , * azure . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getAzureSDScrapeWork ( swsPrev ) } )
2020-12-08 16:50:03 +01:00
scs . add ( "consul_sd_configs" , * consul . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getConsulSDScrapeWork ( swsPrev ) } )
2021-06-25 11:10:20 +02:00
scs . add ( "digitalocean_sd_configs" , * digitalocean . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDigitalOceanDScrapeWork ( swsPrev ) } )
scs . add ( "dns_sd_configs" , * dns . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDNSSDScrapeWork ( swsPrev ) } )
scs . add ( "docker_sd_configs" , * docker . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDockerSDScrapeWork ( swsPrev ) } )
2021-06-25 12:20:18 +02:00
scs . add ( "dockerswarm_sd_configs" , * dockerswarm . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDockerSwarmSDScrapeWork ( swsPrev ) } )
2021-06-25 11:10:20 +02:00
scs . add ( "ec2_sd_configs" , * ec2 . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getEC2SDScrapeWork ( swsPrev ) } )
scs . add ( "eureka_sd_configs" , * eureka . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getEurekaSDScrapeWork ( swsPrev ) } )
scs . add ( "file_sd_configs" , * fileSDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getFileSDScrapeWork ( swsPrev ) } )
scs . add ( "gce_sd_configs" , * gce . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getGCESDScrapeWork ( swsPrev ) } )
2021-06-22 12:33:37 +02:00
scs . add ( "http_sd_configs" , * http . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getHTTPDScrapeWork ( swsPrev ) } )
2021-06-25 11:10:20 +02:00
scs . add ( "kubernetes_sd_configs" , * kubernetes . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getKubernetesSDScrapeWork ( swsPrev ) } )
scs . add ( "openstack_sd_configs" , * openstack . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getOpenStackSDScrapeWork ( swsPrev ) } )
2022-08-04 19:44:16 +02:00
scs . add ( "yandexcloud_sd_configs" , * yandexcloud . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getYandexCloudSDScrapeWork ( swsPrev ) } )
2021-06-25 11:10:20 +02:00
scs . add ( "static_configs" , 0 , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getStaticScrapeWork ( ) } )
2020-05-03 11:41:13 +02:00
2020-04-23 22:40:50 +02:00
var tickerCh <- chan time . Time
if * configCheckInterval > 0 {
ticker := time . NewTicker ( * configCheckInterval )
tickerCh = ticker . C
defer ticker . Stop ( )
}
2020-05-03 11:41:13 +02:00
for {
scs . updateConfig ( cfg )
2020-02-23 12:35:47 +01:00
waitForChans :
select {
case <- sighupCh :
logger . Infof ( "SIGHUP received; reloading Prometheus configs from %q" , configFile )
2021-11-05 13:41:14 +01:00
cfgNew , dataNew , err := loadConfig ( configFile )
2020-04-23 22:40:50 +02:00
if err != nil {
logger . Errorf ( "cannot read %q on SIGHUP: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
if bytes . Equal ( data , dataNew ) {
logger . Infof ( "nothing changed in %q" , configFile )
goto waitForChans
}
2022-04-16 19:28:46 +02:00
cfgNew . mustRestart ( cfg )
2020-04-23 22:40:50 +02:00
cfg = cfgNew
data = dataNew
2021-11-05 13:41:14 +01:00
marshaledData = cfgNew . marshal ( )
configData . Store ( & marshaledData )
2020-04-23 22:40:50 +02:00
case <- tickerCh :
2021-11-05 13:41:14 +01:00
cfgNew , dataNew , err := loadConfig ( configFile )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Errorf ( "cannot read %q: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
2020-04-23 22:40:50 +02:00
if bytes . Equal ( data , dataNew ) {
// Nothing changed since the previous loadConfig
goto waitForChans
}
2022-04-16 19:28:46 +02:00
cfgNew . mustRestart ( cfg )
2020-02-23 12:35:47 +01:00
cfg = cfgNew
2020-04-23 22:40:50 +02:00
data = dataNew
2022-06-30 12:33:01 +02:00
marshaledData = cfgNew . marshal ( )
2021-11-05 13:41:14 +01:00
configData . Store ( & marshaledData )
2020-02-23 12:35:47 +01:00
case <- globalStopCh :
2021-03-01 13:13:56 +01:00
cfg . mustStop ( )
2020-05-03 11:41:13 +02:00
logger . Infof ( "stopping Prometheus scrapers" )
startTime := time . Now ( )
scs . stop ( )
logger . Infof ( "stopped Prometheus scrapers in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
return
2020-04-23 22:40:50 +02:00
}
2020-05-03 11:41:13 +02:00
logger . Infof ( "found changes in %q; applying these changes" , configFile )
2020-02-23 12:35:47 +01:00
configReloads . Inc ( )
}
}
var configReloads = metrics . NewCounter ( ` vm_promscrape_config_reloads_total ` )
2020-05-03 11:41:13 +02:00
type scrapeConfigs struct {
2022-08-08 13:10:18 +02:00
pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest )
2022-01-07 00:17:55 +01:00
wg sync . WaitGroup
stopCh chan struct { }
globalStopCh <- chan struct { }
scfgs [ ] * scrapeConfig
2020-02-23 12:35:47 +01:00
}
2022-08-08 13:10:18 +02:00
func newScrapeConfigs ( pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) , globalStopCh <- chan struct { } ) * scrapeConfigs {
2020-05-03 11:41:13 +02:00
return & scrapeConfigs {
2022-01-07 00:17:55 +01:00
pushData : pushData ,
stopCh : make ( chan struct { } ) ,
globalStopCh : globalStopCh ,
2020-04-13 20:02:27 +02:00
}
2020-05-03 11:41:13 +02:00
}
2020-04-13 20:02:27 +02:00
2020-12-08 16:50:03 +01:00
func ( scs * scrapeConfigs ) add ( name string , checkInterval time . Duration , getScrapeWork func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork ) {
2020-11-04 19:29:18 +01:00
atomic . AddInt32 ( & PendingScrapeConfigs , 1 )
2020-05-03 11:41:13 +02:00
scfg := & scrapeConfig {
name : name ,
pushData : scs . pushData ,
getScrapeWork : getScrapeWork ,
checkInterval : checkInterval ,
cfgCh : make ( chan * Config , 1 ) ,
stopCh : scs . stopCh ,
2021-02-02 15:13:59 +01:00
discoveryDuration : metrics . GetOrCreateHistogram ( fmt . Sprintf ( "vm_promscrape_service_discovery_duration_seconds{type=%q}" , name ) ) ,
2020-04-13 20:02:27 +02:00
}
2020-05-03 11:41:13 +02:00
scs . wg . Add ( 1 )
go func ( ) {
defer scs . wg . Done ( )
2022-01-07 00:17:55 +01:00
scfg . run ( scs . globalStopCh )
2020-05-03 11:41:13 +02:00
} ( )
scs . scfgs = append ( scs . scfgs , scfg )
2020-04-13 20:02:27 +02:00
}
2020-05-03 11:41:13 +02:00
func ( scs * scrapeConfigs ) updateConfig ( cfg * Config ) {
for _ , scfg := range scs . scfgs {
scfg . cfgCh <- cfg
2020-04-27 18:25:45 +02:00
}
2020-05-03 11:41:13 +02:00
}
2020-04-27 18:25:45 +02:00
2020-05-03 11:41:13 +02:00
func ( scs * scrapeConfigs ) stop ( ) {
close ( scs . stopCh )
scs . wg . Wait ( )
scs . scfgs = nil
2020-04-27 18:25:45 +02:00
}
2020-05-03 11:41:13 +02:00
type scrapeConfig struct {
name string
2022-08-08 13:10:18 +02:00
pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest )
2020-12-08 16:50:03 +01:00
getScrapeWork func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork
2020-05-03 11:41:13 +02:00
checkInterval time . Duration
cfgCh chan * Config
stopCh <- chan struct { }
2021-02-02 15:13:59 +01:00
discoveryDuration * metrics . Histogram
2020-05-03 11:41:13 +02:00
}
2020-04-27 18:25:45 +02:00
2022-01-07 00:17:55 +01:00
func ( scfg * scrapeConfig ) run ( globalStopCh <- chan struct { } ) {
sg := newScraperGroup ( scfg . name , scfg . pushData , globalStopCh )
2020-05-03 11:41:13 +02:00
defer sg . stop ( )
2020-04-24 16:50:21 +02:00
2020-05-03 11:41:13 +02:00
var tickerCh <- chan time . Time
if scfg . checkInterval > 0 {
ticker := time . NewTicker ( scfg . checkInterval )
defer ticker . Stop ( )
tickerCh = ticker . C
2020-04-24 16:50:21 +02:00
}
2020-05-03 11:41:13 +02:00
cfg := <- scfg . cfgCh
2020-12-08 16:50:03 +01:00
var swsPrev [ ] * ScrapeWork
2020-11-04 19:29:18 +01:00
updateScrapeWork := func ( cfg * Config ) {
2021-04-05 12:53:26 +02:00
startTime := time . Now ( )
sws := scfg . getScrapeWork ( cfg , swsPrev )
sg . update ( sws )
swsPrev = sws
2022-06-07 14:46:44 +02:00
if sg . scrapersStarted . Get ( ) > 0 {
// update duration only if at least one scraper has started
// otherwise this SD is considered as inactive
scfg . discoveryDuration . UpdateDuration ( startTime )
}
2020-11-04 19:29:18 +01:00
}
updateScrapeWork ( cfg )
atomic . AddInt32 ( & PendingScrapeConfigs , - 1 )
for {
2020-04-24 16:50:21 +02:00
2020-02-23 12:35:47 +01:00
select {
2020-05-03 11:41:13 +02:00
case <- scfg . stopCh :
return
case cfg = <- scfg . cfgCh :
case <- tickerCh :
2020-02-23 12:35:47 +01:00
}
2020-11-04 19:29:18 +01:00
updateScrapeWork ( cfg )
2020-02-23 12:35:47 +01:00
}
}
2020-05-03 11:41:13 +02:00
type scraperGroup struct {
2020-12-08 00:54:13 +01:00
name string
wg sync . WaitGroup
mLock sync . Mutex
m map [ string ] * scraper
2022-08-08 13:10:18 +02:00
pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest )
2020-12-08 00:54:13 +01:00
2020-12-08 10:57:52 +01:00
changesCount * metrics . Counter
activeScrapers * metrics . Counter
scrapersStarted * metrics . Counter
scrapersStopped * metrics . Counter
2022-01-07 00:17:55 +01:00
globalStopCh <- chan struct { }
2020-05-03 11:41:13 +02:00
}
2020-02-23 12:35:47 +01:00
2022-08-08 13:10:18 +02:00
func newScraperGroup ( name string , pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) , globalStopCh <- chan struct { } ) * scraperGroup {
2020-05-03 11:41:13 +02:00
sg := & scraperGroup {
2020-12-08 00:54:13 +01:00
name : name ,
m : make ( map [ string ] * scraper ) ,
pushData : pushData ,
2020-12-08 10:57:52 +01:00
changesCount : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_config_changes_total { type=%q} ` , name ) ) ,
activeScrapers : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_active_scrapers { type=%q} ` , name ) ) ,
scrapersStarted : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_scrapers_started_total { type=%q} ` , name ) ) ,
scrapersStopped : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_scrapers_stopped_total { type=%q} ` , name ) ) ,
2022-01-07 00:17:55 +01:00
globalStopCh : globalStopCh ,
2020-02-23 12:35:47 +01:00
}
2020-07-13 20:52:03 +02:00
metrics . NewGauge ( fmt . Sprintf ( ` vm_promscrape_targets { type=%q, status="up"} ` , name ) , func ( ) float64 {
return float64 ( tsmGlobal . StatusByGroup ( sg . name , true ) )
} )
metrics . NewGauge ( fmt . Sprintf ( ` vm_promscrape_targets { type=%q, status="down"} ` , name ) , func ( ) float64 {
return float64 ( tsmGlobal . StatusByGroup ( sg . name , false ) )
2020-05-03 11:41:13 +02:00
} )
return sg
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
func ( sg * scraperGroup ) stop ( ) {
sg . mLock . Lock ( )
for _ , sc := range sg . m {
close ( sc . stopCh )
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
sg . m = nil
sg . mLock . Unlock ( )
sg . wg . Wait ( )
2020-02-23 12:35:47 +01:00
}
2021-04-05 12:53:26 +02:00
func ( sg * scraperGroup ) update ( sws [ ] * ScrapeWork ) {
2020-05-03 11:41:13 +02:00
sg . mLock . Lock ( )
defer sg . mLock . Unlock ( )
additionsCount := 0
deletionsCount := 0
2020-10-08 17:50:22 +02:00
swsMap := make ( map [ string ] [ ] prompbmarshal . Label , len ( sws ) )
2021-08-16 23:52:42 +02:00
var swsToStart [ ] * ScrapeWork
2020-12-08 16:50:03 +01:00
for _ , sw := range sws {
2020-05-03 11:41:13 +02:00
key := sw . key ( )
2021-11-30 00:12:24 +01:00
originalLabels , ok := swsMap [ key ]
if ok {
2020-10-08 17:50:22 +02:00
if ! * suppressDuplicateScrapeTargetErrors {
logger . Errorf ( "skipping duplicate scrape target with identical labels; endpoint=%s, labels=%s; " +
"make sure service discovery and relabeling is set up properly; " +
2021-04-20 19:16:17 +02:00
"see also https://docs.victoriametrics.com/vmagent.html#troubleshooting; " +
2020-10-08 17:50:22 +02:00
"original labels for target1: %s; original labels for target2: %s" ,
sw . ScrapeURL , sw . LabelsString ( ) , promLabelsString ( originalLabels ) , promLabelsString ( sw . OriginalLabels ) )
}
2020-10-20 20:44:59 +02:00
droppedTargetsMap . Register ( sw . OriginalLabels )
2020-05-03 11:41:13 +02:00
continue
2020-02-23 12:35:47 +01:00
}
2020-10-08 17:50:22 +02:00
swsMap [ key ] = sw . OriginalLabels
2020-05-03 11:41:13 +02:00
if sg . m [ key ] != nil {
// The scraper for the given key already exists.
continue
}
2021-08-16 23:52:42 +02:00
swsToStart = append ( swsToStart , sw )
}
2020-05-03 11:41:13 +02:00
2021-08-16 23:52:42 +02:00
// Stop deleted scrapers before starting new scrapers in order to prevent
// series overlap when old scrape target is substituted by new scrape target.
var stoppedChs [ ] <- chan struct { }
for key , sc := range sg . m {
if _ , ok := swsMap [ key ] ; ! ok {
close ( sc . stopCh )
stoppedChs = append ( stoppedChs , sc . stoppedCh )
delete ( sg . m , key )
deletionsCount ++
}
}
// Wait until all the deleted scrapers are stopped before starting new scrapers.
for _ , ch := range stoppedChs {
<- ch
}
// Start new scrapers only after the deleted scrapers are stopped.
for _ , sw := range swsToStart {
2020-07-13 20:52:03 +02:00
sc := newScraper ( sw , sg . name , sg . pushData )
2020-12-08 00:54:13 +01:00
sg . activeScrapers . Inc ( )
2020-12-08 10:57:52 +01:00
sg . scrapersStarted . Inc ( )
2020-05-03 11:41:13 +02:00
sg . wg . Add ( 1 )
2022-02-03 17:57:36 +01:00
tsmGlobal . Register ( & sc . sw )
2020-12-15 19:56:16 +01:00
go func ( sw * ScrapeWork ) {
2021-08-16 23:52:42 +02:00
defer func ( ) {
sg . wg . Done ( )
close ( sc . stoppedCh )
} ( )
2022-01-07 00:17:55 +01:00
sc . sw . run ( sc . stopCh , sg . globalStopCh )
2022-02-03 17:57:36 +01:00
tsmGlobal . Unregister ( & sc . sw )
2020-12-08 00:54:13 +01:00
sg . activeScrapers . Dec ( )
2020-12-08 10:57:52 +01:00
sg . scrapersStopped . Inc ( )
2020-12-15 19:56:16 +01:00
} ( sw )
2021-08-16 23:52:42 +02:00
key := sw . key ( )
2020-05-03 11:41:13 +02:00
sg . m [ key ] = sc
additionsCount ++
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
if additionsCount > 0 || deletionsCount > 0 {
sg . changesCount . Add ( additionsCount + deletionsCount )
logger . Infof ( "%s: added targets: %d, removed targets: %d; total targets: %d" , sg . name , additionsCount , deletionsCount , len ( sg . m ) )
2020-02-23 12:35:47 +01:00
}
}
2020-05-03 11:41:13 +02:00
type scraper struct {
2021-08-16 23:52:42 +02:00
sw scrapeWork
// stopCh is unblocked when the given scraper must be stopped.
2020-05-03 11:41:13 +02:00
stopCh chan struct { }
2021-08-16 23:52:42 +02:00
// stoppedCh is unblocked when the given scraper is stopped.
stoppedCh chan struct { }
2020-05-03 11:41:13 +02:00
}
2022-08-08 13:10:18 +02:00
func newScraper ( sw * ScrapeWork , group string , pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) ) * scraper {
2020-05-03 11:41:13 +02:00
sc := & scraper {
2021-08-16 23:52:42 +02:00
stopCh : make ( chan struct { } ) ,
stoppedCh : make ( chan struct { } ) ,
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
c := newClient ( sw )
2020-12-17 13:30:33 +01:00
sc . sw . Config = sw
2020-07-13 20:52:03 +02:00
sc . sw . ScrapeGroup = group
2020-05-03 11:41:13 +02:00
sc . sw . ReadData = c . ReadData
2020-11-01 22:12:13 +01:00
sc . sw . GetStreamReader = c . GetStreamReader
2020-05-03 11:41:13 +02:00
sc . sw . PushData = pushData
return sc
2020-02-23 12:35:47 +01:00
}