lib/promscrape: preserve the previously discovered targets on discovery errors per each job_name

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/582
This commit is contained in:
Aliaksandr Valialkin 2020-06-23 15:35:19 +03:00
parent 6d9c5ad422
commit de7e585ac8
4 changed files with 148 additions and 43 deletions

View File

@ -158,77 +158,160 @@ func unmarshalMaybeStrict(data []byte, dst interface{}) error {
return err return err
} }
func getSWSByJob(sws []ScrapeWork) map[string][]ScrapeWork {
m := make(map[string][]ScrapeWork)
for _, sw := range sws {
m[sw.jobNameOriginal] = append(m[sw.jobNameOriginal], sw)
}
return m
}
// getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg. // getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork { func (cfg *Config) getKubernetesSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork var dst []ScrapeWork
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.KubernetesSDConfigs { for j := range sc.KubernetesSDConfigs {
sdc := &sc.KubernetesSDConfigs[j] sdc := &sc.KubernetesSDConfigs[j]
dst = appendKubernetesScrapeWork(dst, sdc, cfg.baseDir, sc.swc) var okLocal bool
dst, okLocal = appendKubernetesScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering kubernetes targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
} }
} }
return dst return dst
} }
// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg. // getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getConsulSDScrapeWork() []ScrapeWork { func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork var dst []ScrapeWork
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.ConsulSDConfigs { for j := range sc.ConsulSDConfigs {
sdc := &sc.ConsulSDConfigs[j] sdc := &sc.ConsulSDConfigs[j]
dst = appendConsulScrapeWork(dst, sdc, cfg.baseDir, sc.swc) var okLocal bool
dst, okLocal = appendConsulScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering consul targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
} }
} }
return dst return dst
} }
// getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg. // getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getDNSSDScrapeWork() []ScrapeWork { func (cfg *Config) getDNSSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork var dst []ScrapeWork
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.DNSSDConfigs { for j := range sc.DNSSDConfigs {
sdc := &sc.DNSSDConfigs[j] sdc := &sc.DNSSDConfigs[j]
dst = appendDNSScrapeWork(dst, sdc, sc.swc) var okLocal bool
dst, okLocal = appendDNSScrapeWork(dst, sdc, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering dns targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
} }
} }
return dst return dst
} }
// getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg. // getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getEC2SDScrapeWork() []ScrapeWork { func (cfg *Config) getEC2SDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork var dst []ScrapeWork
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.EC2SDConfigs { for j := range sc.EC2SDConfigs {
sdc := &sc.EC2SDConfigs[j] sdc := &sc.EC2SDConfigs[j]
dst = appendEC2ScrapeWork(dst, sdc, sc.swc) var okLocal bool
dst, okLocal = appendEC2ScrapeWork(dst, sdc, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering ec2 targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
} }
} }
return dst return dst
} }
// getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg. // getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getGCESDScrapeWork() []ScrapeWork { func (cfg *Config) getGCESDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork var dst []ScrapeWork
for i := range cfg.ScrapeConfigs { for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i] sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.GCESDConfigs { for j := range sc.GCESDConfigs {
sdc := &sc.GCESDConfigs[j] sdc := &sc.GCESDConfigs[j]
dst = appendGCEScrapeWork(dst, sdc, sc.swc) var okLocal bool
dst, okLocal = appendGCEScrapeWork(dst, sdc, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering gce targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
} }
} }
return dst return dst
} }
// getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg. // getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getFileSDScrapeWork(swsPrev []ScrapeWork) []ScrapeWork { func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
// Create a map for the previous scrape work. // Create a map for the previous scrape work.
swsMapPrev := make(map[string][]ScrapeWork) swsMapPrev := make(map[string][]ScrapeWork)
for i := range swsPrev { for i := range prev {
sw := &swsPrev[i] sw := &prev[i]
filepath := promrelabel.GetLabelValueByName(sw.Labels, "__vm_filepath") filepath := promrelabel.GetLabelValueByName(sw.Labels, "__vm_filepath")
if len(filepath) == 0 { if len(filepath) == 0 {
logger.Panicf("BUG: missing `__vm_filepath` label") logger.Panicf("BUG: missing `__vm_filepath` label")
@ -341,49 +424,49 @@ type scrapeWorkConfig struct {
sampleLimit int sampleLimit int
} }
func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := kubernetes.GetLabels(sdc, baseDir) targetLabels, err := kubernetes.GetLabels(sdc, baseDir)
if err != nil { if err != nil {
logger.Errorf("error when discovering kubernetes targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering kubernetes targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst return dst, false
} }
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config") return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config"), true
} }
func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := consul.GetLabels(sdc, baseDir) targetLabels, err := consul.GetLabels(sdc, baseDir)
if err != nil { if err != nil {
logger.Errorf("error when discovering consul targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering consul targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst return dst, false
} }
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config") return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config"), true
} }
func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) []ScrapeWork { func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := dns.GetLabels(sdc) targetLabels, err := dns.GetLabels(sdc)
if err != nil { if err != nil {
logger.Errorf("error when discovering dns targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering dns targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst return dst, false
} }
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dns_sd_config") return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dns_sd_config"), true
} }
func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) []ScrapeWork { func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := ec2.GetLabels(sdc) targetLabels, err := ec2.GetLabels(sdc)
if err != nil { if err != nil {
logger.Errorf("error when discovering ec2 targets for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering ec2 targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst return dst, false
} }
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config") return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config"), true
} }
func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) []ScrapeWork { func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := gce.GetLabels(sdc) targetLabels, err := gce.GetLabels(sdc)
if err != nil { if err != nil {
logger.Errorf("error when discovering gce targets for `job_name` %q: %s; skippint it", swc.jobName, err) logger.Errorf("error when discovering gce targets for `job_name` %q: %s; skippint it", swc.jobName, err)
return dst return dst, false
} }
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config") return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config"), true
} }
func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []ScrapeWork { func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []ScrapeWork {
@ -519,6 +602,8 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex
AuthConfig: swc.authConfig, AuthConfig: swc.authConfig,
MetricRelabelConfigs: swc.metricRelabelConfigs, MetricRelabelConfigs: swc.metricRelabelConfigs,
SampleLimit: swc.sampleLimit, SampleLimit: swc.sampleLimit,
jobNameOriginal: swc.jobName,
}) })
return dst, nil return dst, nil
} }

View File

@ -451,7 +451,8 @@ scrape_configs:
Value: "rty", Value: "rty",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
}, },
{ {
ScrapeURL: "http://host2:80/abc/de", ScrapeURL: "http://host2:80/abc/de",
@ -489,7 +490,8 @@ scrape_configs:
Value: "rty", Value: "rty",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
}, },
{ {
ScrapeURL: "http://localhost:9090/abc/de", ScrapeURL: "http://localhost:9090/abc/de",
@ -527,7 +529,8 @@ scrape_configs:
Value: "test", Value: "test",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
}, },
}) })
} }
@ -579,7 +582,8 @@ scrape_configs:
Value: "foo", Value: "foo",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
}, },
}) })
f(` f(`
@ -628,7 +632,8 @@ scrape_configs:
Value: "xxx", Value: "xxx",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
}, },
}) })
f(` f(`
@ -700,6 +705,7 @@ scrape_configs:
AuthConfig: &promauth.Config{ AuthConfig: &promauth.Config{
Authorization: "Bearer xyz", Authorization: "Bearer xyz",
}, },
jobNameOriginal: "foo",
}, },
{ {
ScrapeURL: "https://aaa:443/foo/bar?p=x%26y&p=%3D", ScrapeURL: "https://aaa:443/foo/bar?p=x%26y&p=%3D",
@ -740,6 +746,7 @@ scrape_configs:
AuthConfig: &promauth.Config{ AuthConfig: &promauth.Config{
Authorization: "Bearer xyz", Authorization: "Bearer xyz",
}, },
jobNameOriginal: "foo",
}, },
{ {
ScrapeURL: "http://1.2.3.4:80/metrics", ScrapeURL: "http://1.2.3.4:80/metrics",
@ -774,6 +781,7 @@ scrape_configs:
TLSServerName: "foobar", TLSServerName: "foobar",
TLSInsecureSkipVerify: true, TLSInsecureSkipVerify: true,
}, },
jobNameOriginal: "qwer",
}, },
}) })
f(` f(`
@ -846,7 +854,8 @@ scrape_configs:
Value: "http://foo.bar:1234/metrics", Value: "http://foo.bar:1234/metrics",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
}, },
}) })
f(` f(`
@ -907,7 +916,8 @@ scrape_configs:
Value: "https", Value: "https",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
}, },
}) })
f(` f(`
@ -949,7 +959,8 @@ scrape_configs:
Value: "3", Value: "3",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
}, },
}) })
@ -997,6 +1008,7 @@ scrape_configs:
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
MetricRelabelConfigs: prcs, MetricRelabelConfigs: prcs,
jobNameOriginal: "foo",
}, },
}) })
f(` f(`
@ -1037,6 +1049,7 @@ scrape_configs:
AuthConfig: &promauth.Config{ AuthConfig: &promauth.Config{
Authorization: "Basic eHl6OnNlY3JldC1wYXNz", Authorization: "Basic eHl6OnNlY3JldC1wYXNz",
}, },
jobNameOriginal: "foo",
}, },
}) })
f(` f(`
@ -1075,6 +1088,7 @@ scrape_configs:
AuthConfig: &promauth.Config{ AuthConfig: &promauth.Config{
Authorization: "Bearer secret-pass", Authorization: "Bearer secret-pass",
}, },
jobNameOriginal: "foo",
}, },
}) })
snakeoilCert, err := tls.LoadX509KeyPair("testdata/ssl-cert-snakeoil.pem", "testdata/ssl-cert-snakeoil.key") snakeoilCert, err := tls.LoadX509KeyPair("testdata/ssl-cert-snakeoil.pem", "testdata/ssl-cert-snakeoil.key")
@ -1119,6 +1133,7 @@ scrape_configs:
AuthConfig: &promauth.Config{ AuthConfig: &promauth.Config{
TLSCertificate: &snakeoilCert, TLSCertificate: &snakeoilCert,
}, },
jobNameOriginal: "foo",
}, },
}) })
f(` f(`
@ -1179,7 +1194,8 @@ scrape_configs:
Value: "qwe", Value: "qwe",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "aaa",
}, },
}) })
f(` f(`
@ -1233,7 +1249,8 @@ scrape_configs:
Value: "snmp", Value: "snmp",
}, },
}, },
AuthConfig: &promauth.Config{}, AuthConfig: &promauth.Config{},
jobNameOriginal: "snmp",
}, },
}) })
} }

View File

@ -84,11 +84,11 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
scs := newScrapeConfigs(pushData) scs := newScrapeConfigs(pushData)
scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() }) scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() })
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) }) scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork() }) scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork() }) scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork() }) scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork() }) scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork() }) scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })
sighupCh := procutil.NewSighupChan() sighupCh := procutil.NewSighupChan()

View File

@ -66,6 +66,9 @@ type ScrapeWork struct {
// The maximum number of metrics to scrape after relabeling. // The maximum number of metrics to scrape after relabeling.
SampleLimit int SampleLimit int
// The original 'job_name'
jobNameOriginal string
} }
// key returns unique identifier for the given sw. // key returns unique identifier for the given sw.