mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
1417 lines
48 KiB
Go
1417 lines
48 KiB
Go
package promscrape
|
|
|
|
import (
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"net/url"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dockerswarm"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/eureka"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/cespare/xxhash/v2"
|
|
"gopkg.in/yaml.v2"
|
|
)
|
|
|
|
var (
|
|
strictParse = flag.Bool("promscrape.config.strictParse", true, "Whether to deny unsupported fields in -promscrape.config . Set to false in order to silently skip unsupported fields")
|
|
dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+
|
|
"Returns non-zero exit code on parsing errors and emits these errors to stderr. "+
|
|
"See also -promscrape.config.strictParse command-line flag. "+
|
|
"Pass -loggerLevel=ERROR if you don't need to see info messages in the output.")
|
|
dropOriginalLabels = flag.Bool("promscrape.dropOriginalLabels", false, "Whether to drop original labels for scrape targets at /targets and /api/v1/targets pages. "+
|
|
"This may be needed for reducing memory usage when original labels for big number of scrape targets occupy big amounts of memory. "+
|
|
"Note that this reduces debuggability for improper per-target relabeling configs")
|
|
clusterMembersCount = flag.Int("promscrape.cluster.membersCount", 0, "The number of members in a cluster of scrapers. "+
|
|
"Each member must have an unique -promscrape.cluster.memberNum in the range 0 ... promscrape.cluster.membersCount-1 . "+
|
|
"Each member then scrapes roughly 1/N of all the targets. By default cluster scraping is disabled, i.e. a single scraper scrapes all the targets")
|
|
clusterMemberNum = flag.String("promscrape.cluster.memberNum", "0", "The number of number in the cluster of scrapers. "+
|
|
"It must be an unique value in the range 0 ... promscrape.cluster.membersCount-1 across scrapers in the cluster. "+
|
|
"Can be specified as pod name of Kubernetes StatefulSet - pod-name-Num, where Num is a numeric part of pod name")
|
|
clusterReplicationFactor = flag.Int("promscrape.cluster.replicationFactor", 1, "The number of members in the cluster, which scrape the same targets. "+
|
|
"If the replication factor is greater than 1, then the deduplication must be enabled at remote storage side. See https://docs.victoriametrics.com/#deduplication")
|
|
clusterName = flag.String("promscrape.cluster.name", "", "Optional name of the cluster. If multiple vmagent clusters scrape the same targets, "+
|
|
"then each cluster must have unique name in order to properly de-duplicate samples received from these clusters. "+
|
|
"See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2679")
|
|
)
|
|
|
|
var clusterMemberID int
|
|
|
|
func mustInitClusterMemberID() {
|
|
s := *clusterMemberNum
|
|
// special case for kubernetes deployment, where pod-name formatted at some-pod-name-1
|
|
// obtain memberNum from last segment
|
|
// https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2359
|
|
if idx := strings.LastIndexByte(s, '-'); idx >= 0 {
|
|
s = s[idx+1:]
|
|
}
|
|
n, err := strconv.ParseInt(s, 10, 64)
|
|
if err != nil {
|
|
logger.Fatalf("cannot parse -promscrape.cluster.memberNum=%q: %s", *clusterMemberNum, err)
|
|
}
|
|
clusterMemberID = int(n)
|
|
}
|
|
|
|
// Config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/
|
|
type Config struct {
|
|
Global GlobalConfig `yaml:"global,omitempty"`
|
|
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
|
|
ScrapeConfigFiles []string `yaml:"scrape_config_files,omitempty"`
|
|
|
|
// This is set to the directory from where the config has been loaded.
|
|
baseDir string
|
|
}
|
|
|
|
func (cfg *Config) unmarshal(data []byte, isStrict bool) error {
|
|
data = envtemplate.Replace(data)
|
|
var err error
|
|
if isStrict {
|
|
if err = yaml.UnmarshalStrict(data, cfg); err != nil {
|
|
err = fmt.Errorf("%w; pass -promscrape.config.strictParse=false command-line flag for ignoring unknown fields in yaml config", err)
|
|
}
|
|
} else {
|
|
err = yaml.Unmarshal(data, cfg)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (cfg *Config) marshal() []byte {
|
|
data, err := yaml.Marshal(cfg)
|
|
if err != nil {
|
|
logger.Panicf("BUG: cannot marshal Config: %s", err)
|
|
}
|
|
return data
|
|
}
|
|
|
|
func (cfg *Config) mustStart() {
|
|
startTime := time.Now()
|
|
logger.Infof("starting service discovery routines...")
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
sc.mustStart(cfg.baseDir)
|
|
}
|
|
jobNames := cfg.getJobNames()
|
|
tsmGlobal.registerJobNames(jobNames)
|
|
logger.Infof("started service discovery routines in %.3f seconds", time.Since(startTime).Seconds())
|
|
}
|
|
|
|
func (cfg *Config) mustRestart(prevCfg *Config) {
|
|
startTime := time.Now()
|
|
logger.Infof("restarting service discovery routines...")
|
|
|
|
prevScrapeCfgByName := make(map[string]*ScrapeConfig, len(prevCfg.ScrapeConfigs))
|
|
for _, scPrev := range prevCfg.ScrapeConfigs {
|
|
prevScrapeCfgByName[scPrev.JobName] = scPrev
|
|
}
|
|
|
|
// Restart all the scrape jobs on Global config change.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2884
|
|
needGlobalRestart := !areEqualGlobalConfigs(&cfg.Global, &prevCfg.Global)
|
|
|
|
// Loop over the the new jobs, start new ones and restart updated ones.
|
|
var started, stopped, restarted int
|
|
currentJobNames := make(map[string]struct{}, len(cfg.ScrapeConfigs))
|
|
for i, sc := range cfg.ScrapeConfigs {
|
|
currentJobNames[sc.JobName] = struct{}{}
|
|
scPrev := prevScrapeCfgByName[sc.JobName]
|
|
if scPrev == nil {
|
|
// New scrape config has been appeared. Start it.
|
|
sc.mustStart(cfg.baseDir)
|
|
started++
|
|
continue
|
|
}
|
|
if !needGlobalRestart && areEqualScrapeConfigs(scPrev, sc) {
|
|
// The scrape config didn't change, so no need to restart it.
|
|
// Use the reference to the previous job, so it could be stopped properly later.
|
|
cfg.ScrapeConfigs[i] = scPrev
|
|
} else {
|
|
// The scrape config has been changed. Stop the previous scrape config and start new one.
|
|
scPrev.mustStop()
|
|
sc.mustStart(cfg.baseDir)
|
|
restarted++
|
|
}
|
|
}
|
|
// Stop preious jobs which weren't found in the current configuration.
|
|
for _, scPrev := range prevCfg.ScrapeConfigs {
|
|
if _, ok := currentJobNames[scPrev.JobName]; !ok {
|
|
scPrev.mustStop()
|
|
stopped++
|
|
}
|
|
}
|
|
jobNames := cfg.getJobNames()
|
|
tsmGlobal.registerJobNames(jobNames)
|
|
logger.Infof("restarted service discovery routines in %.3f seconds, stopped=%d, started=%d, restarted=%d", time.Since(startTime).Seconds(), stopped, started, restarted)
|
|
}
|
|
|
|
func areEqualGlobalConfigs(a, b *GlobalConfig) bool {
|
|
sa := a.marshalJSON()
|
|
sb := b.marshalJSON()
|
|
return string(sa) == string(sb)
|
|
}
|
|
|
|
func areEqualScrapeConfigs(a, b *ScrapeConfig) bool {
|
|
sa := a.marshalJSON()
|
|
sb := b.marshalJSON()
|
|
return string(sa) == string(sb)
|
|
}
|
|
|
|
func (sc *ScrapeConfig) unmarshalJSON(data []byte) error {
|
|
return json.Unmarshal(data, sc)
|
|
}
|
|
|
|
func (sc *ScrapeConfig) marshalJSON() []byte {
|
|
data, err := json.Marshal(sc)
|
|
if err != nil {
|
|
logger.Panicf("BUG: cannot marshal ScrapeConfig: %s", err)
|
|
}
|
|
return data
|
|
}
|
|
|
|
func (gc *GlobalConfig) marshalJSON() []byte {
|
|
data, err := json.Marshal(gc)
|
|
if err != nil {
|
|
logger.Panicf("BUG: cannot marshal GlobalConfig: %s", err)
|
|
}
|
|
return data
|
|
}
|
|
|
|
func (cfg *Config) mustStop() {
|
|
startTime := time.Now()
|
|
logger.Infof("stopping service discovery routines...")
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
sc.mustStop()
|
|
}
|
|
logger.Infof("stopped service discovery routines in %.3f seconds", time.Since(startTime).Seconds())
|
|
}
|
|
|
|
// getJobNames returns all the scrape job names from the cfg.
|
|
func (cfg *Config) getJobNames() []string {
|
|
a := make([]string, 0, len(cfg.ScrapeConfigs))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
a = append(a, sc.JobName)
|
|
}
|
|
return a
|
|
}
|
|
|
|
// GlobalConfig represents essential parts for `global` section of Prometheus config.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/
|
|
type GlobalConfig struct {
|
|
ScrapeInterval *promutils.Duration `yaml:"scrape_interval,omitempty"`
|
|
ScrapeTimeout *promutils.Duration `yaml:"scrape_timeout,omitempty"`
|
|
ExternalLabels map[string]string `yaml:"external_labels,omitempty"`
|
|
}
|
|
|
|
// ScrapeConfig represents essential parts for `scrape_config` section of Prometheus config.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
|
|
type ScrapeConfig struct {
|
|
JobName string `yaml:"job_name"`
|
|
ScrapeInterval *promutils.Duration `yaml:"scrape_interval,omitempty"`
|
|
ScrapeTimeout *promutils.Duration `yaml:"scrape_timeout,omitempty"`
|
|
MetricsPath string `yaml:"metrics_path,omitempty"`
|
|
HonorLabels bool `yaml:"honor_labels,omitempty"`
|
|
HonorTimestamps *bool `yaml:"honor_timestamps,omitempty"`
|
|
FollowRedirects *bool `yaml:"follow_redirects,omitempty"`
|
|
Scheme string `yaml:"scheme,omitempty"`
|
|
Params map[string][]string `yaml:"params,omitempty"`
|
|
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
|
|
ProxyURL *proxy.URL `yaml:"proxy_url,omitempty"`
|
|
RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs,omitempty"`
|
|
MetricRelabelConfigs []promrelabel.RelabelConfig `yaml:"metric_relabel_configs,omitempty"`
|
|
SampleLimit int `yaml:"sample_limit,omitempty"`
|
|
|
|
AzureSDConfigs []azure.SDConfig `yaml:"azure_sd_configs,omitempty"`
|
|
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs,omitempty"`
|
|
DigitaloceanSDConfigs []digitalocean.SDConfig `yaml:"digitalocean_sd_configs,omitempty"`
|
|
DNSSDConfigs []dns.SDConfig `yaml:"dns_sd_configs,omitempty"`
|
|
DockerSDConfigs []docker.SDConfig `yaml:"docker_sd_configs,omitempty"`
|
|
DockerSwarmSDConfigs []dockerswarm.SDConfig `yaml:"dockerswarm_sd_configs,omitempty"`
|
|
EC2SDConfigs []ec2.SDConfig `yaml:"ec2_sd_configs,omitempty"`
|
|
EurekaSDConfigs []eureka.SDConfig `yaml:"eureka_sd_configs,omitempty"`
|
|
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs,omitempty"`
|
|
GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs,omitempty"`
|
|
HTTPSDConfigs []http.SDConfig `yaml:"http_sd_configs,omitempty"`
|
|
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs,omitempty"`
|
|
OpenStackSDConfigs []openstack.SDConfig `yaml:"openstack_sd_configs,omitempty"`
|
|
StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"`
|
|
|
|
// These options are supported only by lib/promscrape.
|
|
RelabelDebug bool `yaml:"relabel_debug,omitempty"`
|
|
MetricRelabelDebug bool `yaml:"metric_relabel_debug,omitempty"`
|
|
DisableCompression bool `yaml:"disable_compression,omitempty"`
|
|
DisableKeepAlive bool `yaml:"disable_keepalive,omitempty"`
|
|
StreamParse bool `yaml:"stream_parse,omitempty"`
|
|
ScrapeAlignInterval *promutils.Duration `yaml:"scrape_align_interval,omitempty"`
|
|
ScrapeOffset *promutils.Duration `yaml:"scrape_offset,omitempty"`
|
|
SeriesLimit int `yaml:"series_limit,omitempty"`
|
|
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
|
|
|
|
// This is set in loadConfig
|
|
swc *scrapeWorkConfig
|
|
}
|
|
|
|
func (sc *ScrapeConfig) mustStart(baseDir string) {
|
|
swosFunc := func(metaLabels map[string]string) interface{} {
|
|
target := metaLabels["__address__"]
|
|
sw, err := sc.swc.getScrapeWork(target, nil, metaLabels)
|
|
if err != nil {
|
|
logger.Errorf("cannot create kubernetes_sd_config target %q for job_name %q: %s", target, sc.swc.jobName, err)
|
|
return nil
|
|
}
|
|
return sw
|
|
}
|
|
for i := range sc.KubernetesSDConfigs {
|
|
sc.KubernetesSDConfigs[i].MustStart(baseDir, swosFunc)
|
|
}
|
|
}
|
|
|
|
func (sc *ScrapeConfig) mustStop() {
|
|
for i := range sc.AzureSDConfigs {
|
|
sc.AzureSDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.ConsulSDConfigs {
|
|
sc.ConsulSDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.DigitaloceanSDConfigs {
|
|
sc.DigitaloceanSDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.DNSSDConfigs {
|
|
sc.DNSSDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.DockerSDConfigs {
|
|
sc.DockerSDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.DockerSwarmSDConfigs {
|
|
sc.DockerSwarmSDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.EC2SDConfigs {
|
|
sc.EC2SDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.EurekaSDConfigs {
|
|
sc.EurekaSDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.GCESDConfigs {
|
|
sc.GCESDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.HTTPSDConfigs {
|
|
sc.HTTPSDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.KubernetesSDConfigs {
|
|
sc.KubernetesSDConfigs[i].MustStop()
|
|
}
|
|
for i := range sc.OpenStackSDConfigs {
|
|
sc.OpenStackSDConfigs[i].MustStop()
|
|
}
|
|
}
|
|
|
|
// FileSDConfig represents file-based service discovery config.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config
|
|
type FileSDConfig struct {
|
|
Files []string `yaml:"files"`
|
|
// `refresh_interval` is ignored. See `-promscrape.fileSDCheckInterval`
|
|
}
|
|
|
|
// StaticConfig represents essential parts for `static_config` section of Prometheus config.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config
|
|
type StaticConfig struct {
|
|
Targets []string `yaml:"targets"`
|
|
Labels map[string]string `yaml:"labels,omitempty"`
|
|
}
|
|
|
|
func loadStaticConfigs(path string) ([]StaticConfig, error) {
|
|
data, err := fs.ReadFileOrHTTP(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read `static_configs` from %q: %w", path, err)
|
|
}
|
|
data = envtemplate.Replace(data)
|
|
var stcs []StaticConfig
|
|
if err := yaml.UnmarshalStrict(data, &stcs); err != nil {
|
|
return nil, fmt.Errorf("cannot unmarshal `static_configs` from %q: %w", path, err)
|
|
}
|
|
return stcs, nil
|
|
}
|
|
|
|
// loadConfig loads Prometheus config from the given path.
|
|
func loadConfig(path string) (*Config, []byte, error) {
|
|
data, err := fs.ReadFileOrHTTP(path)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("cannot read Prometheus config from %q: %w", path, err)
|
|
}
|
|
var c Config
|
|
dataNew, err := c.parseData(data, path)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("cannot parse Prometheus config from %q: %w", path, err)
|
|
}
|
|
return &c, dataNew, nil
|
|
}
|
|
|
|
func loadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) ([]*ScrapeConfig, []byte, error) {
|
|
var scrapeConfigs []*ScrapeConfig
|
|
var scsData []byte
|
|
for _, filePath := range scrapeConfigFiles {
|
|
filePath := fs.GetFilepath(baseDir, filePath)
|
|
paths := []string{filePath}
|
|
if strings.Contains(filePath, "*") {
|
|
ps, err := filepath.Glob(filePath)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("invalid pattern %q: %w", filePath, err)
|
|
}
|
|
sort.Strings(ps)
|
|
paths = ps
|
|
}
|
|
for _, path := range paths {
|
|
data, err := fs.ReadFileOrHTTP(path)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("cannot load %q: %w", path, err)
|
|
}
|
|
data = envtemplate.Replace(data)
|
|
var scs []*ScrapeConfig
|
|
if err = yaml.UnmarshalStrict(data, &scs); err != nil {
|
|
return nil, nil, fmt.Errorf("cannot parse %q: %w", path, err)
|
|
}
|
|
scrapeConfigs = append(scrapeConfigs, scs...)
|
|
scsData = append(scsData, '\n')
|
|
scsData = append(scsData, data...)
|
|
}
|
|
}
|
|
return scrapeConfigs, scsData, nil
|
|
}
|
|
|
|
// IsDryRun returns true if -promscrape.config.dryRun command-line flag is set
|
|
func IsDryRun() bool {
|
|
return *dryRun
|
|
}
|
|
|
|
func (cfg *Config) parseData(data []byte, path string) ([]byte, error) {
|
|
if err := cfg.unmarshal(data, *strictParse); err != nil {
|
|
return nil, fmt.Errorf("cannot unmarshal data: %w", err)
|
|
}
|
|
absPath, err := filepath.Abs(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot obtain abs path for %q: %w", path, err)
|
|
}
|
|
cfg.baseDir = filepath.Dir(absPath)
|
|
|
|
// Load cfg.ScrapeConfigFiles into c.ScrapeConfigs
|
|
scs, scsData, err := loadScrapeConfigFiles(cfg.baseDir, cfg.ScrapeConfigFiles)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot load `scrape_config_files` from %q: %w", path, err)
|
|
}
|
|
cfg.ScrapeConfigFiles = nil
|
|
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scs...)
|
|
dataNew := append(data, scsData...)
|
|
|
|
// Check that all the scrape configs have unique JobName
|
|
m := make(map[string]struct{}, len(cfg.ScrapeConfigs))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
jobName := sc.JobName
|
|
if _, ok := m[jobName]; ok {
|
|
return nil, fmt.Errorf("duplicate `job_name` in `scrape_configs` loaded from %q: %q", path, jobName)
|
|
}
|
|
m[jobName] = struct{}{}
|
|
}
|
|
|
|
// Initialize cfg.ScrapeConfigs
|
|
for i, sc := range cfg.ScrapeConfigs {
|
|
// Make a copy of sc in order to remove references to `data` memory.
|
|
// This should prevent from memory leaks on config reload.
|
|
sc = sc.clone()
|
|
cfg.ScrapeConfigs[i] = sc
|
|
|
|
swc, err := getScrapeWorkConfig(sc, cfg.baseDir, &cfg.Global)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse `scrape_config`: %w", err)
|
|
}
|
|
sc.swc = swc
|
|
}
|
|
return dataNew, nil
|
|
}
|
|
|
|
func (sc *ScrapeConfig) clone() *ScrapeConfig {
|
|
data := sc.marshalJSON()
|
|
var scCopy ScrapeConfig
|
|
if err := scCopy.unmarshalJSON(data); err != nil {
|
|
logger.Panicf("BUG: cannot unmarshal scrape config: %s", err)
|
|
}
|
|
return &scCopy
|
|
}
|
|
|
|
func getSWSByJob(sws []*ScrapeWork) map[string][]*ScrapeWork {
|
|
m := make(map[string][]*ScrapeWork)
|
|
for _, sw := range sws {
|
|
m[sw.jobNameOriginal] = append(m[sw.jobNameOriginal], sw)
|
|
}
|
|
return m
|
|
}
|
|
|
|
// getAzureSDScrapeWork returns `azure_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getAzureSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.AzureSDConfigs {
|
|
sdc := &sc.AzureSDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "azure_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering azure targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getConsulSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.ConsulSDConfigs {
|
|
sdc := &sc.ConsulSDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "consul_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering consul targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getDigitalOceanDScrapeWork returns `digitalocean_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getDigitalOceanDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.DigitaloceanSDConfigs {
|
|
sdc := &sc.DigitaloceanSDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "digitalocean_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering digitalocean targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getDNSSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.DNSSDConfigs {
|
|
sdc := &sc.DNSSDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "dns_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering dns targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getDockerSDScrapeWork returns `docker_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getDockerSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.DockerSDConfigs {
|
|
sdc := &sc.DockerSDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "docker_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering docker targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getDockerSwarmSDScrapeWork returns `dockerswarm_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getDockerSwarmSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.DockerSwarmSDConfigs {
|
|
sdc := &sc.DockerSwarmSDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "dockerswarm_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering dockerswarm targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getEC2SDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.EC2SDConfigs {
|
|
sdc := &sc.EC2SDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "ec2_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering ec2 targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getEurekaSDScrapeWork returns `eureka_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getEurekaSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.EurekaSDConfigs {
|
|
sdc := &sc.EurekaSDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "eureka_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering eureka targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getFileSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
// Create a map for the previous scrape work.
|
|
swsMapPrev := make(map[string][]*ScrapeWork)
|
|
for _, sw := range prev {
|
|
filepath := promrelabel.GetLabelValueByName(sw.Labels, "__vm_filepath")
|
|
if len(filepath) == 0 {
|
|
logger.Panicf("BUG: missing `__vm_filepath` label")
|
|
} else {
|
|
swsMapPrev[filepath] = append(swsMapPrev[filepath], sw)
|
|
}
|
|
}
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
for j := range sc.FileSDConfigs {
|
|
sdc := &sc.FileSDConfigs[j]
|
|
dst = sdc.appendScrapeWork(dst, swsMapPrev, cfg.baseDir, sc.swc)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getGCESDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.GCESDConfigs {
|
|
sdc := &sc.GCESDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "gce_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering gce targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getHTTPDScrapeWork returns `http_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getHTTPDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.HTTPSDConfigs {
|
|
sdc := &sc.HTTPSDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "http_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering http targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.KubernetesSDConfigs {
|
|
sdc := &sc.KubernetesSDConfigs[j]
|
|
swos, err := sdc.GetScrapeWorkObjects()
|
|
if err != nil {
|
|
logger.Errorf("skipping kubernetes_sd_config targets for job_name %q because of error: %s", sc.swc.jobName, err)
|
|
ok = false
|
|
break
|
|
}
|
|
for _, swo := range swos {
|
|
sw := swo.(*ScrapeWork)
|
|
dst = append(dst, sw)
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering kubernetes_sd_config targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getOpenStackSDScrapeWork returns `openstack_sd_configs` ScrapeWork from cfg.
|
|
func (cfg *Config) getOpenStackSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|
swsPrevByJob := getSWSByJob(prev)
|
|
dst := make([]*ScrapeWork, 0, len(prev))
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
dstLen := len(dst)
|
|
ok := true
|
|
for j := range sc.OpenStackSDConfigs {
|
|
sdc := &sc.OpenStackSDConfigs[j]
|
|
var okLocal bool
|
|
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "openstack_sd_config")
|
|
if ok {
|
|
ok = okLocal
|
|
}
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
swsPrev := swsPrevByJob[sc.swc.jobName]
|
|
if len(swsPrev) > 0 {
|
|
logger.Errorf("there were errors when discovering openstack targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
|
dst = append(dst[:dstLen], swsPrev...)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// getStaticScrapeWork returns `static_configs` ScrapeWork from from cfg.
|
|
func (cfg *Config) getStaticScrapeWork() []*ScrapeWork {
|
|
var dst []*ScrapeWork
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
for j := range sc.StaticConfigs {
|
|
stc := &sc.StaticConfigs[j]
|
|
dst = stc.appendScrapeWork(dst, sc.swc, nil)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConfig) (*scrapeWorkConfig, error) {
|
|
jobName := sc.JobName
|
|
if jobName == "" {
|
|
return nil, fmt.Errorf("missing `job_name` field in `scrape_config`")
|
|
}
|
|
scrapeInterval := sc.ScrapeInterval.Duration()
|
|
if scrapeInterval <= 0 {
|
|
scrapeInterval = globalCfg.ScrapeInterval.Duration()
|
|
if scrapeInterval <= 0 {
|
|
scrapeInterval = defaultScrapeInterval
|
|
}
|
|
}
|
|
scrapeTimeout := sc.ScrapeTimeout.Duration()
|
|
if scrapeTimeout <= 0 {
|
|
scrapeTimeout = globalCfg.ScrapeTimeout.Duration()
|
|
if scrapeTimeout <= 0 {
|
|
scrapeTimeout = defaultScrapeTimeout
|
|
}
|
|
}
|
|
if scrapeTimeout > scrapeInterval {
|
|
// Limit the `scrape_timeout` with `scrape_interval` like Prometheus does.
|
|
// This guarantees that the scraper can miss only a single scrape if the target sometimes responds slowly.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1281#issuecomment-840538907
|
|
scrapeTimeout = scrapeInterval
|
|
}
|
|
honorLabels := sc.HonorLabels
|
|
honorTimestamps := true
|
|
if sc.HonorTimestamps != nil {
|
|
honorTimestamps = *sc.HonorTimestamps
|
|
}
|
|
denyRedirects := false
|
|
if sc.FollowRedirects != nil {
|
|
denyRedirects = !*sc.FollowRedirects
|
|
}
|
|
metricsPath := sc.MetricsPath
|
|
if metricsPath == "" {
|
|
metricsPath = "/metrics"
|
|
}
|
|
scheme := sc.Scheme
|
|
if scheme == "" {
|
|
scheme = "http"
|
|
}
|
|
if scheme != "http" && scheme != "https" {
|
|
return nil, fmt.Errorf("unexpected `scheme` for `job_name` %q: %q; supported values: http or https", jobName, scheme)
|
|
}
|
|
params := sc.Params
|
|
ac, err := sc.HTTPClientConfig.NewConfig(baseDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse auth config for `job_name` %q: %w", jobName, err)
|
|
}
|
|
proxyAC, err := sc.ProxyClientConfig.NewConfig(baseDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse proxy auth config for `job_name` %q: %w", jobName, err)
|
|
}
|
|
relabelConfigs, err := promrelabel.ParseRelabelConfigs(sc.RelabelConfigs, sc.RelabelDebug)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse `relabel_configs` for `job_name` %q: %w", jobName, err)
|
|
}
|
|
metricRelabelConfigs, err := promrelabel.ParseRelabelConfigs(sc.MetricRelabelConfigs, sc.MetricRelabelDebug)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse `metric_relabel_configs` for `job_name` %q: %w", jobName, err)
|
|
}
|
|
if (*streamParse || sc.StreamParse) && sc.SampleLimit > 0 {
|
|
return nil, fmt.Errorf("cannot use stream parsing mode when `sample_limit` is set for `job_name` %q", jobName)
|
|
}
|
|
if (*streamParse || sc.StreamParse) && sc.SeriesLimit > 0 {
|
|
return nil, fmt.Errorf("cannot use stream parsing mode when `series_limit` is set for `job_name` %q", jobName)
|
|
}
|
|
swc := &scrapeWorkConfig{
|
|
scrapeInterval: scrapeInterval,
|
|
scrapeIntervalString: scrapeInterval.String(),
|
|
scrapeTimeout: scrapeTimeout,
|
|
scrapeTimeoutString: scrapeTimeout.String(),
|
|
jobName: jobName,
|
|
metricsPath: metricsPath,
|
|
scheme: scheme,
|
|
params: params,
|
|
proxyURL: sc.ProxyURL,
|
|
proxyAuthConfig: proxyAC,
|
|
authConfig: ac,
|
|
honorLabels: honorLabels,
|
|
honorTimestamps: honorTimestamps,
|
|
denyRedirects: denyRedirects,
|
|
externalLabels: globalCfg.ExternalLabels,
|
|
relabelConfigs: relabelConfigs,
|
|
metricRelabelConfigs: metricRelabelConfigs,
|
|
sampleLimit: sc.SampleLimit,
|
|
disableCompression: sc.DisableCompression,
|
|
disableKeepAlive: sc.DisableKeepAlive,
|
|
streamParse: sc.StreamParse,
|
|
scrapeAlignInterval: sc.ScrapeAlignInterval.Duration(),
|
|
scrapeOffset: sc.ScrapeOffset.Duration(),
|
|
seriesLimit: sc.SeriesLimit,
|
|
}
|
|
return swc, nil
|
|
}
|
|
|
|
type scrapeWorkConfig struct {
|
|
scrapeInterval time.Duration
|
|
scrapeIntervalString string
|
|
scrapeTimeout time.Duration
|
|
scrapeTimeoutString string
|
|
jobName string
|
|
metricsPath string
|
|
scheme string
|
|
params map[string][]string
|
|
proxyURL *proxy.URL
|
|
proxyAuthConfig *promauth.Config
|
|
authConfig *promauth.Config
|
|
honorLabels bool
|
|
honorTimestamps bool
|
|
denyRedirects bool
|
|
externalLabels map[string]string
|
|
relabelConfigs *promrelabel.ParsedConfigs
|
|
metricRelabelConfigs *promrelabel.ParsedConfigs
|
|
sampleLimit int
|
|
disableCompression bool
|
|
disableKeepAlive bool
|
|
streamParse bool
|
|
scrapeAlignInterval time.Duration
|
|
scrapeOffset time.Duration
|
|
seriesLimit int
|
|
}
|
|
|
|
type targetLabelsGetter interface {
|
|
GetLabels(baseDir string) ([]map[string]string, error)
|
|
}
|
|
|
|
func appendSDScrapeWork(dst []*ScrapeWork, sdc targetLabelsGetter, baseDir string, swc *scrapeWorkConfig, discoveryType string) ([]*ScrapeWork, bool) {
|
|
targetLabels, err := sdc.GetLabels(baseDir)
|
|
if err != nil {
|
|
logger.Errorf("skipping %s targets for job_name %q because of error: %s", discoveryType, swc.jobName, err)
|
|
return dst, false
|
|
}
|
|
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, discoveryType), true
|
|
}
|
|
|
|
func appendScrapeWorkForTargetLabels(dst []*ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, discoveryType string) []*ScrapeWork {
|
|
startTime := time.Now()
|
|
// Process targetLabels in parallel in order to reduce processing time for big number of targetLabels.
|
|
type result struct {
|
|
sw *ScrapeWork
|
|
err error
|
|
}
|
|
goroutines := cgroup.AvailableCPUs()
|
|
resultCh := make(chan result, len(targetLabels))
|
|
workCh := make(chan map[string]string, goroutines)
|
|
for i := 0; i < goroutines; i++ {
|
|
go func() {
|
|
for metaLabels := range workCh {
|
|
target := metaLabels["__address__"]
|
|
sw, err := swc.getScrapeWork(target, nil, metaLabels)
|
|
if err != nil {
|
|
err = fmt.Errorf("skipping %s target %q for job_name %q because of error: %w", discoveryType, target, swc.jobName, err)
|
|
}
|
|
resultCh <- result{
|
|
sw: sw,
|
|
err: err,
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
for _, metaLabels := range targetLabels {
|
|
workCh <- metaLabels
|
|
}
|
|
close(workCh)
|
|
for range targetLabels {
|
|
r := <-resultCh
|
|
if r.err != nil {
|
|
logger.Errorf("%s", r.err)
|
|
continue
|
|
}
|
|
if r.sw != nil {
|
|
dst = append(dst, r.sw)
|
|
}
|
|
}
|
|
metrics.GetOrCreateHistogram(fmt.Sprintf("vm_promscrape_target_relabel_duration_seconds{type=%q}", discoveryType)).UpdateDuration(startTime)
|
|
return dst
|
|
}
|
|
|
|
func (sdc *FileSDConfig) appendScrapeWork(dst []*ScrapeWork, swsMapPrev map[string][]*ScrapeWork, baseDir string, swc *scrapeWorkConfig) []*ScrapeWork {
|
|
for _, file := range sdc.Files {
|
|
pathPattern := fs.GetFilepath(baseDir, file)
|
|
paths := []string{pathPattern}
|
|
if strings.Contains(pathPattern, "*") {
|
|
var err error
|
|
paths, err = filepath.Glob(pathPattern)
|
|
if err != nil {
|
|
// Do not return this error, since other files may contain valid scrape configs.
|
|
logger.Errorf("invalid pattern %q in `files` section: %s; skipping it", file, err)
|
|
continue
|
|
}
|
|
}
|
|
for _, path := range paths {
|
|
stcs, err := loadStaticConfigs(path)
|
|
if err != nil {
|
|
// Do not return this error, since other paths may contain valid scrape configs.
|
|
if sws := swsMapPrev[path]; sws != nil {
|
|
// Re-use the previous valid scrape work for this path.
|
|
logger.Errorf("keeping the previously loaded `static_configs` from %q because of error when re-loading the file: %s", path, err)
|
|
dst = append(dst, sws...)
|
|
} else {
|
|
logger.Errorf("skipping loading `static_configs` from %q because of error: %s", path, err)
|
|
}
|
|
continue
|
|
}
|
|
pathShort := path
|
|
if strings.HasPrefix(pathShort, baseDir) {
|
|
pathShort = path[len(baseDir):]
|
|
if len(pathShort) > 0 && pathShort[0] == filepath.Separator {
|
|
pathShort = pathShort[1:]
|
|
}
|
|
}
|
|
metaLabels := map[string]string{
|
|
"__meta_filepath": pathShort,
|
|
"__vm_filepath": path, // This label is needed for internal promscrape logic
|
|
}
|
|
for i := range stcs {
|
|
dst = stcs[i].appendScrapeWork(dst, swc, metaLabels)
|
|
}
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, metaLabels map[string]string) []*ScrapeWork {
|
|
for _, target := range stc.Targets {
|
|
if target == "" {
|
|
// Do not return this error, since other targets may be valid
|
|
logger.Errorf("`static_configs` target for `job_name` %q cannot be empty; skipping it", swc.jobName)
|
|
continue
|
|
}
|
|
sw, err := swc.getScrapeWork(target, stc.Labels, metaLabels)
|
|
if err != nil {
|
|
// Do not return this error, since other targets may be valid
|
|
logger.Errorf("error when parsing `static_configs` target %q for `job_name` %q: %s; skipping it", target, swc.jobName, err)
|
|
continue
|
|
}
|
|
if sw != nil {
|
|
dst = append(dst, sw)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func appendScrapeWorkKey(dst []byte, labels []prompbmarshal.Label) []byte {
|
|
for _, label := range labels {
|
|
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
|
|
dst = append(dst, label.Name...)
|
|
dst = append(dst, '=')
|
|
dst = append(dst, label.Value...)
|
|
dst = append(dst, ',')
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func needSkipScrapeWork(key string, membersCount, replicasCount, memberNum int) bool {
|
|
if membersCount <= 1 {
|
|
return false
|
|
}
|
|
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
|
idx := int(h % uint64(membersCount))
|
|
if replicasCount < 1 {
|
|
replicasCount = 1
|
|
}
|
|
for i := 0; i < replicasCount; i++ {
|
|
if idx == memberNum {
|
|
return false
|
|
}
|
|
idx++
|
|
if idx >= membersCount {
|
|
idx = 0
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
type labelsContext struct {
|
|
labels []prompbmarshal.Label
|
|
}
|
|
|
|
func getLabelsContext() *labelsContext {
|
|
v := labelsContextPool.Get()
|
|
if v == nil {
|
|
return &labelsContext{}
|
|
}
|
|
return v.(*labelsContext)
|
|
}
|
|
|
|
func putLabelsContext(lctx *labelsContext) {
|
|
labels := lctx.labels
|
|
for i := range labels {
|
|
labels[i].Name = ""
|
|
labels[i].Value = ""
|
|
}
|
|
lctx.labels = lctx.labels[:0]
|
|
labelsContextPool.Put(lctx)
|
|
}
|
|
|
|
var labelsContextPool sync.Pool
|
|
|
|
var scrapeWorkKeyBufPool bytesutil.ByteBufferPool
|
|
|
|
func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
|
|
lctx := getLabelsContext()
|
|
lctx.labels = mergeLabels(lctx.labels[:0], swc, target, extraLabels, metaLabels)
|
|
var originalLabels []prompbmarshal.Label
|
|
if !*dropOriginalLabels {
|
|
originalLabels = append([]prompbmarshal.Label{}, lctx.labels...)
|
|
}
|
|
lctx.labels = swc.relabelConfigs.Apply(lctx.labels, 0, false)
|
|
lctx.labels = promrelabel.RemoveMetaLabels(lctx.labels[:0], lctx.labels)
|
|
// Remove references to already deleted labels, so GC could clean strings for label name and label value past len(labels).
|
|
// This should reduce memory usage when relabeling creates big number of temporary labels with long names and/or values.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825 for details.
|
|
labels := append([]prompbmarshal.Label{}, lctx.labels...)
|
|
putLabelsContext(lctx)
|
|
|
|
// Verify whether the scrape work must be skipped because of `-promscrape.cluster.*` configs.
|
|
// Perform the verification on labels after the relabeling in order to guarantee that targets with the same set of labels
|
|
// go to the same vmagent shard.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687#issuecomment-940629495
|
|
if *clusterMembersCount > 1 {
|
|
bb := scrapeWorkKeyBufPool.Get()
|
|
bb.B = appendScrapeWorkKey(bb.B[:0], labels)
|
|
needSkip := needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B), *clusterMembersCount, *clusterReplicationFactor, clusterMemberID)
|
|
scrapeWorkKeyBufPool.Put(bb)
|
|
if needSkip {
|
|
return nil, nil
|
|
}
|
|
}
|
|
if !*dropOriginalLabels {
|
|
promrelabel.SortLabels(originalLabels)
|
|
// Reduce memory usage by interning all the strings in originalLabels.
|
|
internLabelStrings(originalLabels)
|
|
}
|
|
if len(labels) == 0 {
|
|
// Drop target without labels.
|
|
droppedTargetsMap.Register(originalLabels)
|
|
return nil, nil
|
|
}
|
|
// See https://www.robustperception.io/life-of-a-label
|
|
schemeRelabeled := promrelabel.GetLabelValueByName(labels, "__scheme__")
|
|
if len(schemeRelabeled) == 0 {
|
|
schemeRelabeled = "http"
|
|
}
|
|
addressRelabeled := promrelabel.GetLabelValueByName(labels, "__address__")
|
|
if len(addressRelabeled) == 0 {
|
|
// Drop target without scrape address.
|
|
droppedTargetsMap.Register(originalLabels)
|
|
return nil, nil
|
|
}
|
|
if strings.Contains(addressRelabeled, "/") {
|
|
// Drop target with '/'
|
|
droppedTargetsMap.Register(originalLabels)
|
|
return nil, nil
|
|
}
|
|
addressRelabeled = addMissingPort(addressRelabeled, schemeRelabeled == "https")
|
|
metricsPathRelabeled := promrelabel.GetLabelValueByName(labels, "__metrics_path__")
|
|
if metricsPathRelabeled == "" {
|
|
metricsPathRelabeled = "/metrics"
|
|
}
|
|
if !strings.HasPrefix(metricsPathRelabeled, "/") {
|
|
metricsPathRelabeled = "/" + metricsPathRelabeled
|
|
}
|
|
paramsRelabeled := getParamsFromLabels(labels, swc.params)
|
|
optionalQuestion := "?"
|
|
if len(paramsRelabeled) == 0 || strings.Contains(metricsPathRelabeled, "?") {
|
|
optionalQuestion = ""
|
|
}
|
|
paramsStr := url.Values(paramsRelabeled).Encode()
|
|
scrapeURL := fmt.Sprintf("%s://%s%s%s%s", schemeRelabeled, addressRelabeled, metricsPathRelabeled, optionalQuestion, paramsStr)
|
|
if _, err := url.Parse(scrapeURL); err != nil {
|
|
return nil, fmt.Errorf("invalid url %q for scheme=%q (%q), target=%q (%q), metrics_path=%q (%q) for `job_name` %q: %w",
|
|
scrapeURL, swc.scheme, schemeRelabeled, target, addressRelabeled, swc.metricsPath, metricsPathRelabeled, swc.jobName, err)
|
|
}
|
|
// Set missing "instance" label according to https://www.robustperception.io/life-of-a-label
|
|
if promrelabel.GetLabelByName(labels, "instance") == nil {
|
|
labels = append(labels, prompbmarshal.Label{
|
|
Name: "instance",
|
|
Value: addressRelabeled,
|
|
})
|
|
promrelabel.SortLabels(labels)
|
|
}
|
|
// Read __scrape_interval__ and __scrape_timeout__ from labels.
|
|
scrapeInterval := swc.scrapeInterval
|
|
if s := promrelabel.GetLabelValueByName(labels, "__scrape_interval__"); len(s) > 0 {
|
|
d, err := promutils.ParseDuration(s)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse __scrape_interval__=%q: %w", s, err)
|
|
}
|
|
scrapeInterval = d
|
|
}
|
|
scrapeTimeout := swc.scrapeTimeout
|
|
if s := promrelabel.GetLabelValueByName(labels, "__scrape_timeout__"); len(s) > 0 {
|
|
d, err := promutils.ParseDuration(s)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse __scrape_timeout__=%q: %w", s, err)
|
|
}
|
|
scrapeTimeout = d
|
|
}
|
|
// Read series_limit option from __series_limit__ label.
|
|
// See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter
|
|
seriesLimit := swc.seriesLimit
|
|
if s := promrelabel.GetLabelValueByName(labels, "__series_limit__"); len(s) > 0 {
|
|
n, err := strconv.Atoi(s)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse __series_limit__=%q: %w", s, err)
|
|
}
|
|
seriesLimit = n
|
|
}
|
|
// Read stream_parse option from __stream_parse__ label.
|
|
// See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
|
|
streamParse := swc.streamParse
|
|
if s := promrelabel.GetLabelValueByName(labels, "__stream_parse__"); len(s) > 0 {
|
|
b, err := strconv.ParseBool(s)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse __stream_parse__=%q: %w", s, err)
|
|
}
|
|
streamParse = b
|
|
}
|
|
// Reduce memory usage by interning all the strings in labels.
|
|
internLabelStrings(labels)
|
|
sw := &ScrapeWork{
|
|
ScrapeURL: scrapeURL,
|
|
ScrapeInterval: scrapeInterval,
|
|
ScrapeTimeout: scrapeTimeout,
|
|
HonorLabels: swc.honorLabels,
|
|
HonorTimestamps: swc.honorTimestamps,
|
|
DenyRedirects: swc.denyRedirects,
|
|
OriginalLabels: originalLabels,
|
|
Labels: labels,
|
|
ProxyURL: swc.proxyURL,
|
|
ProxyAuthConfig: swc.proxyAuthConfig,
|
|
AuthConfig: swc.authConfig,
|
|
MetricRelabelConfigs: swc.metricRelabelConfigs,
|
|
SampleLimit: swc.sampleLimit,
|
|
DisableCompression: swc.disableCompression,
|
|
DisableKeepAlive: swc.disableKeepAlive,
|
|
StreamParse: streamParse,
|
|
ScrapeAlignInterval: swc.scrapeAlignInterval,
|
|
ScrapeOffset: swc.scrapeOffset,
|
|
SeriesLimit: seriesLimit,
|
|
|
|
jobNameOriginal: swc.jobName,
|
|
}
|
|
return sw, nil
|
|
}
|
|
|
|
func internLabelStrings(labels []prompbmarshal.Label) {
|
|
for i := range labels {
|
|
label := &labels[i]
|
|
label.Name = internString(label.Name)
|
|
label.Value = internString(label.Value)
|
|
}
|
|
}
|
|
|
|
func internString(s string) string {
|
|
m := internStringsMap.Load().(*sync.Map)
|
|
if v, ok := m.Load(s); ok {
|
|
sp := v.(*string)
|
|
return *sp
|
|
}
|
|
// Make a new copy for s in order to remove references from possible bigger string s refers to.
|
|
sCopy := string(append([]byte{}, s...))
|
|
m.Store(sCopy, &sCopy)
|
|
n := atomic.AddUint64(&internStringsMapLen, 1)
|
|
if n > 100e3 {
|
|
atomic.StoreUint64(&internStringsMapLen, 0)
|
|
internStringsMap.Store(&sync.Map{})
|
|
}
|
|
return sCopy
|
|
}
|
|
|
|
var (
|
|
internStringsMap atomic.Value
|
|
internStringsMapLen uint64
|
|
)
|
|
|
|
func init() {
|
|
internStringsMap.Store(&sync.Map{})
|
|
}
|
|
|
|
func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string {
|
|
// See https://www.robustperception.io/life-of-a-label
|
|
m := make(map[string][]string)
|
|
for i := range labels {
|
|
label := &labels[i]
|
|
if !strings.HasPrefix(label.Name, "__param_") {
|
|
continue
|
|
}
|
|
name := label.Name[len("__param_"):]
|
|
values := []string{label.Value}
|
|
if p := paramsOrig[name]; len(p) > 1 {
|
|
values = append(values, p[1:]...)
|
|
}
|
|
m[name] = values
|
|
}
|
|
return m
|
|
}
|
|
|
|
func mergeLabels(dst []prompbmarshal.Label, swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) []prompbmarshal.Label {
|
|
if len(dst) > 0 {
|
|
logger.Panicf("BUG: len(dst) must be 0; got %d", len(dst))
|
|
}
|
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
|
|
for k, v := range swc.externalLabels {
|
|
dst = appendLabel(dst, k, v)
|
|
}
|
|
dst = appendLabel(dst, "job", swc.jobName)
|
|
dst = appendLabel(dst, "__address__", target)
|
|
dst = appendLabel(dst, "__scheme__", swc.scheme)
|
|
dst = appendLabel(dst, "__metrics_path__", swc.metricsPath)
|
|
dst = appendLabel(dst, "__scrape_interval__", swc.scrapeIntervalString)
|
|
dst = appendLabel(dst, "__scrape_timeout__", swc.scrapeTimeoutString)
|
|
for k, args := range swc.params {
|
|
if len(args) == 0 {
|
|
continue
|
|
}
|
|
k = "__param_" + k
|
|
v := args[0]
|
|
dst = appendLabel(dst, k, v)
|
|
}
|
|
for k, v := range extraLabels {
|
|
dst = appendLabel(dst, k, v)
|
|
}
|
|
for k, v := range metaLabels {
|
|
dst = appendLabel(dst, k, v)
|
|
}
|
|
if len(dst) < 2 {
|
|
return dst
|
|
}
|
|
// Remove duplicate labels if any.
|
|
// Stable sorting is needed in order to preserve the order for labels with identical names.
|
|
// This is needed in order to remove labels with duplicate names other than the last one.
|
|
promrelabel.SortLabelsStable(dst)
|
|
prevName := dst[0].Name
|
|
hasDuplicateLabels := false
|
|
for _, label := range dst[1:] {
|
|
if label.Name == prevName {
|
|
hasDuplicateLabels = true
|
|
break
|
|
}
|
|
prevName = label.Name
|
|
}
|
|
if !hasDuplicateLabels {
|
|
return dst
|
|
}
|
|
prevName = dst[0].Name
|
|
tmp := dst[:1]
|
|
for _, label := range dst[1:] {
|
|
if label.Name == prevName {
|
|
tmp[len(tmp)-1] = label
|
|
} else {
|
|
tmp = append(tmp, label)
|
|
prevName = label.Name
|
|
}
|
|
}
|
|
tail := dst[len(tmp):]
|
|
for i := range tail {
|
|
label := &tail[i]
|
|
label.Name = ""
|
|
label.Value = ""
|
|
}
|
|
return tmp
|
|
}
|
|
|
|
func appendLabel(dst []prompbmarshal.Label, name, value string) []prompbmarshal.Label {
|
|
return append(dst, prompbmarshal.Label{
|
|
Name: name,
|
|
Value: value,
|
|
})
|
|
}
|
|
|
|
const (
|
|
defaultScrapeInterval = time.Minute
|
|
defaultScrapeTimeout = 10 * time.Second
|
|
)
|