lib/promscrape: add Prometheus-compatible service discovery for Consul aka consul_sd_configs

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/330
This commit is contained in:
Aliaksandr Valialkin 2020-05-04 20:48:02 +03:00
parent 83f0e35b7b
commit 40c3ffb359
14 changed files with 802 additions and 102 deletions

View File

@ -262,6 +262,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
* [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config)
* [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config)
* [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config)
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
In the future other `*_sd_config` types will be supported.

View File

@ -143,10 +143,11 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
* if `zone` arg is missing, then `vmagent` uses the zone for the instance where it runs;
* if `zone` arg equals to `"*"`, then `vmagent` discovers all the zones for the given project;
* `zone` may contain arbitrary number of zones, i.e. `zone: [us-east1-a, us-east1-b]`.
* `consul_sd_configs` - for scraping targets registered in Consul.
See [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) for details.
The following service discovery mechanisms will be added to `vmagent` soon:
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)

View File

@ -262,6 +262,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
* [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config)
* [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config)
* [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config)
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
In the future other `*_sd_config` types will be supported.

View File

@ -143,10 +143,11 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
* if `zone` arg is missing, then `vmagent` uses the zone for the instance where it runs;
* if `zone` arg equals to `"*"`, then `vmagent` discovers all the zones for the given project;
* `zone` may contain arbitrary number of zones, i.e. `zone: [us-east1-a, us-east1-b]`.
* `consul_sd_configs` - for scraping targets registered in Consul.
See [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) for details.
The following service discovery mechanisms will be added to `vmagent` soon:
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)

View File

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
@ -62,6 +63,7 @@ type ScrapeConfig struct {
StaticConfigs []StaticConfig `yaml:"static_configs"`
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"`
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs"`
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs"`
EC2SDConfigs []ec2.SDConfig `yaml:"ec2_sd_configs"`
GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs"`
RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs"`
@ -156,6 +158,19 @@ func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork {
return dst
}
// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getConsulSDScrapeWork() []ScrapeWork {
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for j := range sc.ConsulSDConfigs {
sdc := &sc.ConsulSDConfigs[j]
dst = appendConsulScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
}
}
return dst
}
// getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getEC2SDScrapeWork() []ScrapeWork {
var dst []ScrapeWork
@ -309,6 +324,15 @@ func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, base
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config")
}
func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork {
targetLabels, err := consul.GetLabels(sdc, baseDir)
if err != nil {
logger.Errorf("error when discovering consul nodes for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config")
}
func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) []ScrapeWork {
targetLabels, err := ec2.GetLabels(sdc)
if err != nil {

View File

@ -0,0 +1,28 @@
package consul
import (
"encoding/json"
"fmt"
)
// Agent is Consul agent.
//
// See https://www.consul.io/api/agent.html#read-configuration
type Agent struct {
Config AgentConfig
}
// AgentConfig is Consul agent config.
//
// See https://www.consul.io/api/agent.html#read-configuration
type AgentConfig struct {
Datacenter string
}
func parseAgent(data []byte) (*Agent, error) {
var a Agent
if err := json.Unmarshal(data, &a); err != nil {
return nil, fmt.Errorf("cannot unmarshal agent info from %q: %s", data, err)
}
return &a, nil
}

View File

@ -0,0 +1,81 @@
package consul
import (
"reflect"
"testing"
)
func TestParseAgentFailure(t *testing.T) {
f := func(s string) {
t.Helper()
a, err := parseAgent([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if a != nil {
t.Fatalf("unexpected non-nil Agent: %v", a)
}
}
f(``)
f(`[1,23]`)
}
func TestParseAgentSuccess(t *testing.T) {
data := `
{
"Config": {
"Datacenter": "dc1",
"NodeName": "foobar",
"NodeID": "9d754d17-d864-b1d3-e758-f3fe25a9874f",
"Server": true,
"Revision": "deadbeef",
"Version": "1.0.0"
},
"DebugConfig": {
},
"Coord": {
"Adjustment": 0,
"Error": 1.5,
"Vec": [0,0,0,0,0,0,0,0]
},
"Member": {
"Name": "foobar",
"Addr": "10.1.10.12",
"Port": 8301,
"Tags": {
"bootstrap": "1",
"dc": "dc1",
"id": "40e4a748-2192-161a-0510-9bf59fe950b5",
"port": "8300",
"role": "consul",
"vsn": "1",
"vsn_max": "1",
"vsn_min": "1"
},
"Status": 1,
"ProtocolMin": 1,
"ProtocolMax": 2,
"ProtocolCur": 2,
"DelegateMin": 2,
"DelegateMax": 4,
"DelegateCur": 4
},
"Meta": {
"instance_type": "i2.xlarge",
"os_version": "ubuntu_16.04"
}
}
`
a, err := parseAgent([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
aExpected := &Agent{
Config: AgentConfig{
Datacenter: "dc1",
},
}
if !reflect.DeepEqual(a, aExpected) {
t.Fatalf("unexpected Agent parsed;\ngot\n%v\nwant\n%v", a, aExpected)
}
}

View File

@ -0,0 +1,135 @@
package consul
import (
"fmt"
"io/ioutil"
"net/url"
"os"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// apiConfig contains config for API server
type apiConfig struct {
client *discoveryutils.Client
tagSeparator string
services []string
tags []string
datacenter string
allowStale bool
nodeMeta map[string]string
}
var configMap = discoveryutils.NewConfigMap()
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
token, err := getToken(sdc.Token)
if err != nil {
return nil, err
}
var ba *promauth.BasicAuthConfig
if len(sdc.Username) > 0 {
ba = &promauth.BasicAuthConfig{
Username: sdc.Username,
Password: sdc.Password,
}
token = ""
}
ac, err := promauth.NewConfig(baseDir, ba, token, "", sdc.TLSConfig)
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %s", err)
}
apiServer := sdc.Server
if apiServer == "" {
apiServer = "localhost:8500"
}
if !strings.Contains(apiServer, "://") {
scheme := sdc.Scheme
if scheme == "" {
scheme = "http"
}
apiServer = scheme + "://" + apiServer
}
client, err := discoveryutils.NewClient(apiServer, ac)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %s", apiServer, err)
}
tagSeparator := ","
if sdc.TagSeparator != nil {
tagSeparator = *sdc.TagSeparator
}
dc, err := getDatacenter(client, sdc.Datacenter)
if err != nil {
return nil, err
}
cfg := &apiConfig{
client: client,
tagSeparator: tagSeparator,
services: sdc.Services,
tags: sdc.Tags,
datacenter: dc,
allowStale: sdc.AllowStale,
nodeMeta: sdc.NodeMeta,
}
return cfg, nil
}
func getToken(token string) (string, error) {
if token != "" {
return token, nil
}
if tokenFile := os.Getenv("CONSUL_HTTP_TOKEN_FILE"); tokenFile != "" {
data, err := ioutil.ReadFile(tokenFile)
if err != nil {
return "", fmt.Errorf("cannot read consul token file %q; probably, `token` arg is missing in `consul_sd_config`? error: %s", tokenFile, err)
}
return string(data), nil
}
token = os.Getenv("CONSUL_HTTP_TOKEN")
// Allow empty token - it shouls work if authorization is disabled in Consul
return token, nil
}
func getDatacenter(client *discoveryutils.Client, dc string) (string, error) {
if dc != "" {
return dc, nil
}
// See https://www.consul.io/api/agent.html#read-configuration
data, err := client.GetAPIResponse("/v1/agent/self")
if err != nil {
return "", fmt.Errorf("cannot query consul agent info: %s", err)
}
a, err := parseAgent(data)
if err != nil {
return "", err
}
return a.Config.Datacenter, nil
}
func getAPIResponse(cfg *apiConfig, path string) ([]byte, error) {
separator := "?"
if strings.Contains(path, "?") {
separator = "&"
}
path += fmt.Sprintf("%sdc=%s", separator, url.QueryEscape(cfg.datacenter))
if cfg.allowStale {
path += "&stale"
}
if len(cfg.nodeMeta) > 0 {
for k, v := range cfg.nodeMeta {
path += fmt.Sprintf("&node-meta=%s", url.QueryEscape(k+":"+v))
}
}
return cfg.client.GetAPIResponse(path)
}

View File

@ -0,0 +1,40 @@
package consul
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
// SDConfig represents service discovery config for Consul.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config
type SDConfig struct {
Server string `yaml:"server"`
Token string `yaml:"token"`
Datacenter string `yaml:"datacenter"`
Scheme string `yaml:"scheme"`
Username string `yaml:"username"`
Password string `yaml:"password"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config"`
Services []string `yaml:"services"`
Tags []string `yaml:"tags"`
NodeMeta map[string]string `yaml:"node_meta"`
TagSeparator *string `yaml:"tag_separator"`
AllowStale bool `yaml:"allow_stale"`
// RefreshInterval time.Duration `yaml:"refresh_interval"`
// refresh_interval is obtained from `-promscrape.consulSDCheckInterval` command-line option.
}
// GetLabels returns Consul labels according to sdc.
func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) {
cfg, err := getAPIConfig(sdc, baseDir)
if err != nil {
return nil, fmt.Errorf("cannot get API config: %s", err)
}
ms, err := getServiceNodesLabels(cfg)
if err != nil {
return nil, fmt.Errorf("error when fetching service nodes data from Consul: %s", err)
}
return ms, nil
}

View File

@ -0,0 +1,245 @@
package consul
import (
"encoding/json"
"fmt"
"net/url"
"strconv"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getServiceNodesLabels returns labels for Consul service nodes obtained from the given cfg
func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) {
sns, err := getAllServiceNodes(cfg)
if err != nil {
return nil, err
}
var ms []map[string]string
for _, sn := range sns {
ms = sn.appendTargetLabels(ms, cfg.tagSeparator)
}
return ms, nil
}
func getAllServiceNodes(cfg *apiConfig) ([]ServiceNode, error) {
// Obtain a list of services
// See https://www.consul.io/api/catalog.html#list-services
data, err := getAPIResponse(cfg, "/v1/catalog/services")
if err != nil {
return nil, fmt.Errorf("cannot obtain services: %s", err)
}
var m map[string][]string
if err := json.Unmarshal(data, &m); err != nil {
return nil, fmt.Errorf("cannot parse services response %q: %s", data, err)
}
serviceNames := make(map[string]bool)
for serviceName, tags := range m {
if !shouldCollectServiceByName(cfg.services, serviceName) {
continue
}
if !shouldCollectServiceByTags(cfg.tags, tags) {
continue
}
serviceNames[serviceName] = true
}
// Query all the serviceNames in parallel
type response struct {
sns []ServiceNode
err error
}
responsesCh := make(chan response, len(serviceNames))
for serviceName := range serviceNames {
go func(serviceName string) {
sns, err := getServiceNodes(cfg, serviceName)
responsesCh <- response{
sns: sns,
err: err,
}
}(serviceName)
}
var sns []ServiceNode
err = nil
for i := 0; i < len(serviceNames); i++ {
resp := <-responsesCh
if resp.err != nil && err == nil {
err = resp.err
}
sns = append(sns, resp.sns...)
}
if err != nil {
return nil, err
}
return sns, nil
}
func shouldCollectServiceByName(filterServices []string, service string) bool {
if len(filterServices) == 0 {
return true
}
for _, filterService := range filterServices {
if filterService == service {
return true
}
}
return false
}
func shouldCollectServiceByTags(filterTags, tags []string) bool {
if len(filterTags) == 0 {
return true
}
for _, filterTag := range filterTags {
hasTag := false
for _, tag := range tags {
if tag == filterTag {
hasTag = true
break
}
}
if !hasTag {
return false
}
}
return true
}
func getServiceNodes(cfg *apiConfig, serviceName string) ([]ServiceNode, error) {
// See https://www.consul.io/api/health.html#list-nodes-for-service
path := fmt.Sprintf("/v1/health/service/%s", serviceName)
var tagsArgs []string
for _, tag := range cfg.tags {
tagsArgs = append(tagsArgs, fmt.Sprintf("tag=%s", url.QueryEscape(tag)))
}
if len(tagsArgs) > 0 {
path += "?" + strings.Join(tagsArgs, "&")
}
data, err := getAPIResponse(cfg, path)
if err != nil {
return nil, fmt.Errorf("cannot obtain instances for serviceName=%q: %s", serviceName, err)
}
return parseServiceNodes(data)
}
// ServiceNode is Consul service node.
//
// See https://www.consul.io/api/health.html#list-nodes-for-service
type ServiceNode struct {
Service Service
Node Node
Checks []Check
}
// Service is Consul service.
//
// See https://www.consul.io/api/health.html#list-nodes-for-service
type Service struct {
ID string
Service string
Address string
Port int
Tags []string
Meta map[string]string
}
// Node is Consul node.
//
// See https://www.consul.io/api/health.html#list-nodes-for-service
type Node struct {
Address string
Datacenter string
Node string
Meta map[string]string
TaggedAddresses map[string]string
}
// Check is Consul check.
//
// See https://www.consul.io/api/health.html#list-nodes-for-service
type Check struct {
CheckID string
Status string
}
func parseServiceNodes(data []byte) ([]ServiceNode, error) {
var sns []ServiceNode
if err := json.Unmarshal(data, &sns); err != nil {
return nil, fmt.Errorf("cannot unmarshal ServiceNodes from %q: %s", data, err)
}
return sns, nil
}
func (sn *ServiceNode) appendTargetLabels(ms []map[string]string, tagSeparator string) []map[string]string {
var addr string
if sn.Service.Address != "" {
addr = discoveryutils.JoinHostPort(sn.Service.Address, sn.Service.Port)
} else {
addr = discoveryutils.JoinHostPort(sn.Node.Address, sn.Service.Port)
}
m := map[string]string{
"__address__": addr,
"__meta_consul_address": sn.Node.Address,
"__meta_consul_dc": sn.Node.Datacenter,
"__meta_consul_health": aggregatedStatus(sn.Checks),
"__meta_consul_node": sn.Node.Node,
"__meta_consul_service": sn.Service.Service,
"__meta_consul_service_address": sn.Service.Address,
"__meta_consul_service_id": sn.Service.ID,
"__meta_consul_service_port": strconv.Itoa(sn.Service.Port),
}
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
m["__meta_consul_tags"] = tagSeparator + strings.Join(sn.Service.Tags, tagSeparator) + tagSeparator
for k, v := range sn.Node.Meta {
key := discoveryutils.SanitizeLabelName(k)
m["__meta_consul_metadata_"+key] = v
}
for k, v := range sn.Service.Meta {
key := discoveryutils.SanitizeLabelName(k)
m["__meta_consul_service_metadata_"+key] = v
}
for k, v := range sn.Node.TaggedAddresses {
key := discoveryutils.SanitizeLabelName(k)
m["__meta_consul_tagged_address_"+key] = v
}
ms = append(ms, m)
return ms
}
func aggregatedStatus(checks []Check) string {
// The code has been copy-pasted from HealthChecks.AggregatedStatus in Consul
var passing, warning, critical, maintenance bool
for _, check := range checks {
id := check.CheckID
if id == "_node_maintenance" || strings.HasPrefix(id, "_service_maintenance:") {
maintenance = true
continue
}
switch check.Status {
case "passing":
passing = true
case "warning":
warning = true
case "critical":
critical = true
default:
return ""
}
}
switch {
case maintenance:
return "maintenance"
case critical:
return "critical"
case warning:
return "warning"
case passing:
return "passing"
default:
return "passing"
}
}

View File

@ -0,0 +1,135 @@
package consul
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func TestParseServiceNodesFailure(t *testing.T) {
f := func(s string) {
t.Helper()
sns, err := parseServiceNodes([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if sns != nil {
t.Fatalf("unexpected non-nil ServiceNodes: %v", sns)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
}
func TestParseServiceNodesSuccess(t *testing.T) {
data := `
[
{
"Node": {
"ID": "40e4a748-2192-161a-0510-9bf59fe950b5",
"Node": "foobar",
"Address": "10.1.10.12",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "10.1.10.12",
"wan": "10.1.10.12"
},
"Meta": {
"instance_type": "t2.medium"
}
},
"Service": {
"ID": "redis",
"Service": "redis",
"Tags": ["primary"],
"Address": "10.1.10.12",
"TaggedAddresses": {
"lan": {
"address": "10.1.10.12",
"port": 8000
},
"wan": {
"address": "198.18.1.2",
"port": 80
}
},
"Meta": {
"redis_version": "4.0"
},
"Port": 8000,
"Weights": {
"Passing": 10,
"Warning": 1
},
"Namespace": "default"
},
"Checks": [
{
"Node": "foobar",
"CheckID": "service:redis",
"Name": "Service 'redis' check",
"Status": "passing",
"Notes": "",
"Output": "",
"ServiceID": "redis",
"ServiceName": "redis",
"ServiceTags": ["primary"],
"Namespace": "default"
},
{
"Node": "foobar",
"CheckID": "serfHealth",
"Name": "Serf Health Status",
"Status": "passing",
"Notes": "",
"Output": "",
"ServiceID": "",
"ServiceName": "",
"ServiceTags": [],
"Namespace": "default"
}
]
}
]
`
sns, err := parseServiceNodes([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(sns) != 1 {
t.Fatalf("unexpected length of ServiceNodes; got %d; want %d", len(sns), 1)
}
sn := sns[0]
// Check sn.appendTargetLabels()
tagSeparator := ","
labelss := sn.appendTargetLabels(nil, tagSeparator)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "10.1.10.12:8000",
"__meta_consul_address": "10.1.10.12",
"__meta_consul_dc": "dc1",
"__meta_consul_health": "passing",
"__meta_consul_metadata_instance_type": "t2.medium",
"__meta_consul_node": "foobar",
"__meta_consul_service": "redis",
"__meta_consul_service_address": "10.1.10.12",
"__meta_consul_service_id": "redis",
"__meta_consul_service_metadata_redis_version": "4.0",
"__meta_consul_service_port": "8000",
"__meta_consul_tagged_address_lan": "10.1.10.12",
"__meta_consul_tagged_address_wan": "10.1.10.12",
"__meta_consul_tags": ",primary,",
}),
}
if !reflect.DeepEqual(sortedLabelss, expectedLabelss) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss)
}
}

View File

@ -1,25 +1,17 @@
package kubernetes
import (
"crypto/tls"
"fmt"
"net"
"os"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/fasthttp"
)
// apiConfig contains config for API server
type apiConfig struct {
client *fasthttp.HostClient
server string
hostPort string
authConfig *promauth.Config
client *discoveryutils.Client
namespaces []string
selectors []Selector
}
@ -39,67 +31,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %s", err)
}
hcv, err := newHostClient(sdc.APIServer, ac)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %s", sdc.APIServer, err)
}
cfg := &apiConfig{
client: hcv.hc,
server: hcv.apiServer,
hostPort: hcv.hostPort,
authConfig: hcv.ac,
namespaces: sdc.Namespaces.Names,
selectors: sdc.Selectors,
}
return cfg, nil
}
func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) {
query := joinSelectors(role, cfg.namespaces, cfg.selectors)
if len(query) > 0 {
path += "?" + query
}
requestURL := cfg.server + path
var u fasthttp.URI
u.Update(requestURL)
var req fasthttp.Request
req.SetRequestURIBytes(u.RequestURI())
req.SetHost(cfg.hostPort)
req.Header.Set("Accept-Encoding", "gzip")
if cfg.authConfig != nil && cfg.authConfig.Authorization != "" {
req.Header.Set("Authorization", cfg.authConfig.Authorization)
}
var resp fasthttp.Response
// There is no need in calling DoTimeout, since the timeout is already set in hc.ReadTimeout above.
if err := cfg.client.Do(&req, &resp); err != nil {
return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err)
}
var data []byte
if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" {
dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body())
if err != nil {
return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err)
}
data = dst
} else {
data = append(data[:0], resp.Body()...)
}
statusCode := resp.StatusCode()
if statusCode != fasthttp.StatusOK {
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
requestURL, statusCode, fasthttp.StatusOK, data)
}
return data, nil
}
type hcValue struct {
hc *fasthttp.HostClient
ac *promauth.Config
apiServer string
hostPort string
}
func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) {
apiServer := sdc.APIServer
if len(apiServer) == 0 {
// Assume we run at k8s pod.
// Discover apiServer and auth config according to k8s docs.
@ -124,36 +56,22 @@ func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) {
}
ac = acNew
}
var u fasthttp.URI
u.Update(apiServer)
hostPort := string(u.Host())
isTLS := string(u.Scheme()) == "https"
var tlsCfg *tls.Config
if isTLS && ac != nil {
tlsCfg = ac.NewTLSConfig()
client, err := discoveryutils.NewClient(apiServer, ac)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %s", apiServer, err)
}
if !strings.Contains(hostPort, ":") {
port := "80"
if isTLS {
port = "443"
}
hostPort = net.JoinHostPort(hostPort, port)
cfg := &apiConfig{
client: client,
namespaces: sdc.Namespaces.Names,
selectors: sdc.Selectors,
}
hc := &fasthttp.HostClient{
Addr: hostPort,
Name: "vm_promscrape/discovery",
DialDualStack: netutil.TCP6Enabled(),
IsTLS: isTLS,
TLSConfig: tlsCfg,
ReadTimeout: time.Minute,
WriteTimeout: 10 * time.Second,
MaxResponseBodySize: 300 * 1024 * 1024,
}
return &hcValue{
hc: hc,
ac: ac,
apiServer: apiServer,
hostPort: hostPort,
}, nil
return cfg, nil
}
func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) {
query := joinSelectors(role, cfg.namespaces, cfg.selectors)
if len(query) > 0 {
path += "?" + query
}
return cfg.client.GetAPIResponse(path)
}

View File

@ -1,8 +1,16 @@
package discoveryutils
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/fasthttp"
)
var defaultClient = &http.Client{
@ -13,3 +21,81 @@ var defaultClient = &http.Client{
func GetHTTPClient() *http.Client {
return defaultClient
}
// Client is http client, which talks to the given apiServer.
type Client struct {
hc *fasthttp.HostClient
ac *promauth.Config
apiServer string
hostPort string
}
// NewClient returns new Client for the given apiServer and the given ac.
func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
var u fasthttp.URI
u.Update(apiServer)
hostPort := string(u.Host())
isTLS := string(u.Scheme()) == "https"
var tlsCfg *tls.Config
if isTLS && ac != nil {
tlsCfg = ac.NewTLSConfig()
}
if !strings.Contains(hostPort, ":") {
port := "80"
if isTLS {
port = "443"
}
hostPort = net.JoinHostPort(hostPort, port)
}
hc := &fasthttp.HostClient{
Addr: hostPort,
Name: "vm_promscrape/discovery",
DialDualStack: netutil.TCP6Enabled(),
IsTLS: isTLS,
TLSConfig: tlsCfg,
ReadTimeout: time.Minute,
WriteTimeout: 10 * time.Second,
MaxResponseBodySize: 300 * 1024 * 1024,
}
return &Client{
hc: hc,
ac: ac,
apiServer: apiServer,
hostPort: hostPort,
}, nil
}
// GetAPIResponse returns response for the given absolute path.
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
requestURL := c.apiServer + path
var u fasthttp.URI
u.Update(requestURL)
var req fasthttp.Request
req.SetRequestURIBytes(u.RequestURI())
req.SetHost(c.hostPort)
req.Header.Set("Accept-Encoding", "gzip")
if c.ac != nil && c.ac.Authorization != "" {
req.Header.Set("Authorization", c.ac.Authorization)
}
var resp fasthttp.Response
// There is no need in calling DoTimeout, since the timeout is already set in c.hc.ReadTimeout above.
if err := c.hc.Do(&req, &resp); err != nil {
return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err)
}
var data []byte
if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" {
dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body())
if err != nil {
return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err)
}
data = dst
} else {
data = append(data[:0], resp.Body()...)
}
statusCode := resp.StatusCode()
if statusCode != fasthttp.StatusOK {
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
requestURL, statusCode, fasthttp.StatusOK, data)
}
return data, nil
}

View File

@ -23,6 +23,9 @@ var (
kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+
"This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details")
consulSDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+
"This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details")
ec2SDCheckInterval = flag.Duration("promscrape.ec2SDCheckInterval", time.Minute, "Interval for checking for changes in ec2. "+
"This works only if `ec2_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config for details")
@ -72,6 +75,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() })
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork() })
scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork() })
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork() })
scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork() })