VictoriaMetrics/lib/promscrape/scraper.go
Aliaksandr Valialkin 2f14394335
app/vmagent: follow-up for 090cb2c9de
- Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check
  for the result returned from these functions.

- Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue.

- Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function
  after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case.

- Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead
  of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage
  cannot keep up with the data ingestion rate.

- Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples.

- Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push
  data to persistent queue when -remoteWrite.disableOnDiskQueue is set.

- Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics,
  because they are replaced with vmagent_remotewrite_samples_dropped_total metric.

- Update 'Disabling on-disk persistence' docs at docs/vmagent.md

- Update stale comments in the code

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 12:13:39 +02:00

460 lines
17 KiB
Go

package promscrape
import (
"context"
"flag"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consulagent"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dockerswarm"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/eureka"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kuma"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/nomad"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
)
var (
configCheckInterval = flag.Duration("promscrape.configCheckInterval", 0, "Interval for checking for changes in -promscrape.config file. "+
"By default, the checking is disabled. See how to reload -promscrape.config file at https://docs.victoriametrics.com/vmagent.html#configuration-update")
suppressDuplicateScrapeTargetErrors = flag.Bool("promscrape.suppressDuplicateScrapeTargetErrors", false, "Whether to suppress 'duplicate scrape target' errors; "+
"see https://docs.victoriametrics.com/vmagent.html#troubleshooting for details")
promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+
"The path can point to local file and to http url. "+
"See https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter for details")
fileSDCheckInterval = flag.Duration("promscrape.fileSDCheckInterval", time.Minute, "Interval for checking for changes in 'file_sd_config'. "+
"See https://docs.victoriametrics.com/sd_configs.html#file_sd_configs for details")
)
// CheckConfig checks -promscrape.config for errors and unsupported options.
func CheckConfig() error {
if *promscrapeConfigFile == "" {
return nil
}
_, err := loadConfig(*promscrapeConfigFile)
return err
}
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) {
mustInitClusterMemberID()
globalStopChan = make(chan struct{})
scraperWG.Add(1)
go func() {
defer scraperWG.Done()
runScraper(*promscrapeConfigFile, pushData, globalStopChan)
}()
}
// Stop stops Prometheus scraper.
func Stop() {
close(globalStopChan)
scraperWG.Wait()
}
var (
globalStopChan chan struct{}
scraperWG sync.WaitGroup
// PendingScrapeConfigs - zero value means, that
// all scrapeConfigs are inited and ready for work.
PendingScrapeConfigs int32
// configData contains -promscrape.config data
configData atomic.Pointer[[]byte]
)
// WriteConfigData writes -promscrape.config contents to w
func WriteConfigData(w io.Writer) {
p := configData.Load()
if p == nil {
// Nothing to write to w
return
}
_, _ = w.Write(*p)
}
func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
if configFile == "" {
// Nothing to scrape.
return
}
metrics.RegisterSet(configMetricsSet)
// Register SIGHUP handler for config reload before loadConfig.
// This guarantees that the config will be re-read if the signal arrives just after loadConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil.NewSighupChan()
logger.Infof("reading scrape configs from %q", configFile)
cfg, err := loadConfig(configFile)
if err != nil {
logger.Fatalf("cannot read %q: %s", configFile, err)
}
marshaledData := cfg.marshal()
configData.Store(&marshaledData)
cfg.mustStart()
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
scs := newScrapeConfigs(pushData, globalStopCh)
scs.add("azure_sd_configs", *azure.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getAzureSDScrapeWork(swsPrev) })
scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
scs.add("consulagent_sd_configs", *consulagent.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulAgentSDScrapeWork(swsPrev) })
scs.add("digitalocean_sd_configs", *digitalocean.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDigitalOceanDScrapeWork(swsPrev) })
scs.add("dns_sd_configs", *dns.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
scs.add("docker_sd_configs", *docker.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDockerSDScrapeWork(swsPrev) })
scs.add("dockerswarm_sd_configs", *dockerswarm.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDockerSwarmSDScrapeWork(swsPrev) })
scs.add("ec2_sd_configs", *ec2.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
scs.add("eureka_sd_configs", *eureka.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) })
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
scs.add("gce_sd_configs", *gce.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })
scs.add("http_sd_configs", *http.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getHTTPDScrapeWork(swsPrev) })
scs.add("kubernetes_sd_configs", *kubernetes.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
scs.add("kuma_sd_configs", *kuma.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKumaSDScrapeWork(swsPrev) })
scs.add("nomad_sd_configs", *nomad.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getNomadSDScrapeWork(swsPrev) })
scs.add("openstack_sd_configs", *openstack.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
scs.add("yandexcloud_sd_configs", *yandexcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getYandexCloudSDScrapeWork(swsPrev) })
scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
var tickerCh <-chan time.Time
if *configCheckInterval > 0 {
ticker := time.NewTicker(*configCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
for {
scs.updateConfig(cfg)
waitForChans:
select {
case <-sighupCh:
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
cfgNew, err := loadConfig(configFile)
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
goto waitForChans
}
configSuccess.Set(1)
if !cfgNew.mustRestart(cfg) {
logger.Infof("nothing changed in %q", configFile)
goto waitForChans
}
cfg = cfgNew
marshaledData = cfg.marshal()
configData.Store(&marshaledData)
configReloads.Inc()
configTimestamp.Set(fasttime.UnixTimestamp())
case <-tickerCh:
cfgNew, err := loadConfig(configFile)
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err)
goto waitForChans
}
configSuccess.Set(1)
if !cfgNew.mustRestart(cfg) {
goto waitForChans
}
cfg = cfgNew
marshaledData = cfg.marshal()
configData.Store(&marshaledData)
configReloads.Inc()
configTimestamp.Set(fasttime.UnixTimestamp())
case <-globalStopCh:
cfg.mustStop()
logger.Infof("stopping Prometheus scrapers")
startTime := time.Now()
scs.stop()
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
return
}
}
}
var (
configMetricsSet = metrics.NewSet()
configReloads = configMetricsSet.NewCounter(`vm_promscrape_config_reloads_total`)
configReloadErrors = configMetricsSet.NewCounter(`vm_promscrape_config_reloads_errors_total`)
configSuccess = configMetricsSet.NewCounter(`vm_promscrape_config_last_reload_successful`)
configTimestamp = configMetricsSet.NewCounter(`vm_promscrape_config_last_reload_success_timestamp_seconds`)
)
type scrapeConfigs struct {
pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
wg sync.WaitGroup
stopCh chan struct{}
globalStopCh <-chan struct{}
scfgs []*scrapeConfig
}
func newScrapeConfigs(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scrapeConfigs {
return &scrapeConfigs{
pushData: pushData,
stopCh: make(chan struct{}),
globalStopCh: globalStopCh,
}
}
func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork) {
atomic.AddInt32(&PendingScrapeConfigs, 1)
scfg := &scrapeConfig{
name: name,
pushData: scs.pushData,
getScrapeWork: getScrapeWork,
checkInterval: checkInterval,
cfgCh: make(chan *Config, 1),
stopCh: scs.stopCh,
discoveryDuration: metrics.GetOrCreateHistogram(fmt.Sprintf("vm_promscrape_service_discovery_duration_seconds{type=%q}", name)),
}
scs.wg.Add(1)
go func() {
defer scs.wg.Done()
scfg.run(scs.globalStopCh)
}()
scs.scfgs = append(scs.scfgs, scfg)
}
func (scs *scrapeConfigs) updateConfig(cfg *Config) {
for _, scfg := range scs.scfgs {
scfg.cfgCh <- cfg
}
}
func (scs *scrapeConfigs) stop() {
close(scs.stopCh)
scs.wg.Wait()
scs.scfgs = nil
}
type scrapeConfig struct {
name string
pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork
checkInterval time.Duration
cfgCh chan *Config
stopCh <-chan struct{}
discoveryDuration *metrics.Histogram
}
func (scfg *scrapeConfig) run(globalStopCh <-chan struct{}) {
sg := newScraperGroup(scfg.name, scfg.pushData, globalStopCh)
defer sg.stop()
var tickerCh <-chan time.Time
if scfg.checkInterval > 0 {
ticker := time.NewTicker(scfg.checkInterval)
defer ticker.Stop()
tickerCh = ticker.C
}
cfg := <-scfg.cfgCh
var swsPrev []*ScrapeWork
updateScrapeWork := func(cfg *Config) {
startTime := time.Now()
sws := scfg.getScrapeWork(cfg, swsPrev)
sg.update(sws)
swsPrev = sws
if sg.scrapersStarted.Get() > 0 {
// update duration only if at least one scraper has started
// otherwise this SD is considered as inactive
scfg.discoveryDuration.UpdateDuration(startTime)
}
}
updateScrapeWork(cfg)
atomic.AddInt32(&PendingScrapeConfigs, -1)
for {
select {
case <-scfg.stopCh:
return
case cfg = <-scfg.cfgCh:
case <-tickerCh:
}
updateScrapeWork(cfg)
}
}
type scraperGroup struct {
name string
wg sync.WaitGroup
mLock sync.Mutex
m map[string]*scraper
pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
changesCount *metrics.Counter
activeScrapers *metrics.Counter
scrapersStarted *metrics.Counter
scrapersStopped *metrics.Counter
globalStopCh <-chan struct{}
}
func newScraperGroup(name string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scraperGroup {
sg := &scraperGroup{
name: name,
m: make(map[string]*scraper),
pushData: pushData,
changesCount: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_config_changes_total{type=%q}`, name)),
activeScrapers: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_active_scrapers{type=%q}`, name)),
scrapersStarted: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_started_total{type=%q}`, name)),
scrapersStopped: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_stopped_total{type=%q}`, name)),
globalStopCh: globalStopCh,
}
metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="up"}`, name), func() float64 {
return float64(tsmGlobal.StatusByGroup(sg.name, true))
})
metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="down"}`, name), func() float64 {
return float64(tsmGlobal.StatusByGroup(sg.name, false))
})
return sg
}
func (sg *scraperGroup) stop() {
sg.mLock.Lock()
for _, sc := range sg.m {
sc.cancel()
}
sg.m = nil
sg.mLock.Unlock()
sg.wg.Wait()
}
func (sg *scraperGroup) update(sws []*ScrapeWork) {
sg.mLock.Lock()
defer sg.mLock.Unlock()
additionsCount := 0
deletionsCount := 0
swsMap := make(map[string]*promutils.Labels, len(sws))
var swsToStart []*ScrapeWork
for _, sw := range sws {
key := sw.key()
originalLabels, ok := swsMap[key]
if ok {
if !*suppressDuplicateScrapeTargetErrors {
logger.Errorf("skipping duplicate scrape target with identical labels; endpoint=%s, labels=%s; "+
"make sure service discovery and relabeling is set up properly; "+
"see also https://docs.victoriametrics.com/vmagent.html#troubleshooting; "+
"original labels for target1: %s; original labels for target2: %s",
sw.ScrapeURL, sw.Labels.String(), originalLabels.String(), sw.OriginalLabels.String())
}
droppedTargetsMap.Register(sw.OriginalLabels, sw.RelabelConfigs)
continue
}
swsMap[key] = sw.OriginalLabels
if sg.m[key] != nil {
// The scraper for the given key already exists.
continue
}
swsToStart = append(swsToStart, sw)
}
// Stop deleted scrapers before starting new scrapers in order to prevent
// series overlap when old scrape target is substituted by new scrape target.
var stoppedChs []<-chan struct{}
for key, sc := range sg.m {
if _, ok := swsMap[key]; !ok {
sc.cancel()
stoppedChs = append(stoppedChs, sc.stoppedCh)
delete(sg.m, key)
deletionsCount++
}
}
// Wait until all the deleted scrapers are stopped before starting new scrapers.
for _, ch := range stoppedChs {
<-ch
}
// Start new scrapers only after the deleted scrapers are stopped.
for _, sw := range swsToStart {
sc, err := newScraper(sw, sg.name, sg.pushData)
if err != nil {
logger.Errorf("skipping scraper for url=%s, job=%s because of error: %s", sw.ScrapeURL, sg.name, err)
continue
}
sg.activeScrapers.Inc()
sg.scrapersStarted.Inc()
sg.wg.Add(1)
tsmGlobal.Register(&sc.sw)
go func(sw *ScrapeWork) {
defer func() {
sg.wg.Done()
close(sc.stoppedCh)
}()
sc.sw.run(sc.ctx.Done(), sg.globalStopCh)
tsmGlobal.Unregister(&sc.sw)
sg.activeScrapers.Dec()
sg.scrapersStopped.Inc()
}(sw)
key := sw.key()
sg.m[key] = sc
additionsCount++
}
if additionsCount > 0 || deletionsCount > 0 {
sg.changesCount.Add(additionsCount + deletionsCount)
logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m))
}
}
type scraper struct {
sw scrapeWork
ctx context.Context
cancel context.CancelFunc
// stoppedCh is unblocked when the given scraper is stopped.
stoppedCh chan struct{}
}
func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) (*scraper, error) {
ctx, cancel := context.WithCancel(context.Background())
sc := &scraper{
ctx: ctx,
cancel: cancel,
stoppedCh: make(chan struct{}),
}
c, err := newClient(ctx, sw)
if err != nil {
return nil, err
}
sc.sw.Config = sw
sc.sw.ScrapeGroup = group
sc.sw.ReadData = c.ReadData
sc.sw.GetStreamReader = c.GetStreamReader
sc.sw.PushData = pushData
return sc, nil
}