mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
lib/promscrape: add Prometheus-compatible service discovery for Consul aka consul_sd_configs
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/330
This commit is contained in:
parent
28e0e8fd88
commit
89aa6dbf56
@ -143,10 +143,11 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
|
||||
* if `zone` arg is missing, then `vmagent` uses the zone for the instance where it runs;
|
||||
* if `zone` arg equals to `"*"`, then `vmagent` discovers all the zones for the given project;
|
||||
* `zone` may contain arbitrary number of zones, i.e. `zone: [us-east1-a, us-east1-b]`.
|
||||
* `consul_sd_configs` - for scraping targets registered in Consul.
|
||||
See [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) for details.
|
||||
|
||||
The following service discovery mechanisms will be added to `vmagent` soon:
|
||||
|
||||
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
|
||||
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)
|
||||
|
||||
|
||||
|
@ -262,6 +262,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
|
||||
* [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config)
|
||||
* [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config)
|
||||
* [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config)
|
||||
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
|
||||
|
||||
In the future other `*_sd_config` types will be supported.
|
||||
|
||||
|
@ -143,10 +143,11 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
|
||||
* if `zone` arg is missing, then `vmagent` uses the zone for the instance where it runs;
|
||||
* if `zone` arg equals to `"*"`, then `vmagent` discovers all the zones for the given project;
|
||||
* `zone` may contain arbitrary number of zones, i.e. `zone: [us-east1-a, us-east1-b]`.
|
||||
* `consul_sd_configs` - for scraping targets registered in Consul.
|
||||
See [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) for details.
|
||||
|
||||
The following service discovery mechanisms will be added to `vmagent` soon:
|
||||
|
||||
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
|
||||
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)
|
||||
|
||||
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
|
||||
@ -62,6 +63,7 @@ type ScrapeConfig struct {
|
||||
StaticConfigs []StaticConfig `yaml:"static_configs"`
|
||||
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"`
|
||||
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs"`
|
||||
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs"`
|
||||
EC2SDConfigs []ec2.SDConfig `yaml:"ec2_sd_configs"`
|
||||
GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs"`
|
||||
RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs"`
|
||||
@ -156,6 +158,19 @@ func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork {
|
||||
return dst
|
||||
}
|
||||
|
||||
// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
|
||||
func (cfg *Config) getConsulSDScrapeWork() []ScrapeWork {
|
||||
var dst []ScrapeWork
|
||||
for i := range cfg.ScrapeConfigs {
|
||||
sc := &cfg.ScrapeConfigs[i]
|
||||
for j := range sc.ConsulSDConfigs {
|
||||
sdc := &sc.ConsulSDConfigs[j]
|
||||
dst = appendConsulScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
|
||||
}
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg.
|
||||
func (cfg *Config) getEC2SDScrapeWork() []ScrapeWork {
|
||||
var dst []ScrapeWork
|
||||
@ -309,6 +324,15 @@ func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, base
|
||||
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config")
|
||||
}
|
||||
|
||||
func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork {
|
||||
targetLabels, err := consul.GetLabels(sdc, baseDir)
|
||||
if err != nil {
|
||||
logger.Errorf("error when discovering consul nodes for `job_name` %q: %s; skipping it", swc.jobName, err)
|
||||
return dst
|
||||
}
|
||||
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config")
|
||||
}
|
||||
|
||||
func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) []ScrapeWork {
|
||||
targetLabels, err := ec2.GetLabels(sdc)
|
||||
if err != nil {
|
||||
|
28
lib/promscrape/discovery/consul/agent.go
Normal file
28
lib/promscrape/discovery/consul/agent.go
Normal file
@ -0,0 +1,28 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Agent is Consul agent.
|
||||
//
|
||||
// See https://www.consul.io/api/agent.html#read-configuration
|
||||
type Agent struct {
|
||||
Config AgentConfig
|
||||
}
|
||||
|
||||
// AgentConfig is Consul agent config.
|
||||
//
|
||||
// See https://www.consul.io/api/agent.html#read-configuration
|
||||
type AgentConfig struct {
|
||||
Datacenter string
|
||||
}
|
||||
|
||||
func parseAgent(data []byte) (*Agent, error) {
|
||||
var a Agent
|
||||
if err := json.Unmarshal(data, &a); err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal agent info from %q: %s", data, err)
|
||||
}
|
||||
return &a, nil
|
||||
}
|
81
lib/promscrape/discovery/consul/agent_test.go
Normal file
81
lib/promscrape/discovery/consul/agent_test.go
Normal file
@ -0,0 +1,81 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseAgentFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
a, err := parseAgent([]byte(s))
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
if a != nil {
|
||||
t.Fatalf("unexpected non-nil Agent: %v", a)
|
||||
}
|
||||
}
|
||||
f(``)
|
||||
f(`[1,23]`)
|
||||
}
|
||||
|
||||
func TestParseAgentSuccess(t *testing.T) {
|
||||
data := `
|
||||
{
|
||||
"Config": {
|
||||
"Datacenter": "dc1",
|
||||
"NodeName": "foobar",
|
||||
"NodeID": "9d754d17-d864-b1d3-e758-f3fe25a9874f",
|
||||
"Server": true,
|
||||
"Revision": "deadbeef",
|
||||
"Version": "1.0.0"
|
||||
},
|
||||
"DebugConfig": {
|
||||
},
|
||||
"Coord": {
|
||||
"Adjustment": 0,
|
||||
"Error": 1.5,
|
||||
"Vec": [0,0,0,0,0,0,0,0]
|
||||
},
|
||||
"Member": {
|
||||
"Name": "foobar",
|
||||
"Addr": "10.1.10.12",
|
||||
"Port": 8301,
|
||||
"Tags": {
|
||||
"bootstrap": "1",
|
||||
"dc": "dc1",
|
||||
"id": "40e4a748-2192-161a-0510-9bf59fe950b5",
|
||||
"port": "8300",
|
||||
"role": "consul",
|
||||
"vsn": "1",
|
||||
"vsn_max": "1",
|
||||
"vsn_min": "1"
|
||||
},
|
||||
"Status": 1,
|
||||
"ProtocolMin": 1,
|
||||
"ProtocolMax": 2,
|
||||
"ProtocolCur": 2,
|
||||
"DelegateMin": 2,
|
||||
"DelegateMax": 4,
|
||||
"DelegateCur": 4
|
||||
},
|
||||
"Meta": {
|
||||
"instance_type": "i2.xlarge",
|
||||
"os_version": "ubuntu_16.04"
|
||||
}
|
||||
}
|
||||
`
|
||||
a, err := parseAgent([]byte(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
aExpected := &Agent{
|
||||
Config: AgentConfig{
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(a, aExpected) {
|
||||
t.Fatalf("unexpected Agent parsed;\ngot\n%v\nwant\n%v", a, aExpected)
|
||||
}
|
||||
}
|
135
lib/promscrape/discovery/consul/api.go
Normal file
135
lib/promscrape/discovery/consul/api.go
Normal file
@ -0,0 +1,135 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
// apiConfig contains config for API server
|
||||
type apiConfig struct {
|
||||
client *discoveryutils.Client
|
||||
tagSeparator string
|
||||
services []string
|
||||
tags []string
|
||||
datacenter string
|
||||
allowStale bool
|
||||
nodeMeta map[string]string
|
||||
}
|
||||
|
||||
var configMap = discoveryutils.NewConfigMap()
|
||||
|
||||
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v.(*apiConfig), nil
|
||||
}
|
||||
|
||||
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
token, err := getToken(sdc.Token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ba *promauth.BasicAuthConfig
|
||||
if len(sdc.Username) > 0 {
|
||||
ba = &promauth.BasicAuthConfig{
|
||||
Username: sdc.Username,
|
||||
Password: sdc.Password,
|
||||
}
|
||||
token = ""
|
||||
}
|
||||
ac, err := promauth.NewConfig(baseDir, ba, token, "", sdc.TLSConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse auth config: %s", err)
|
||||
}
|
||||
apiServer := sdc.Server
|
||||
if apiServer == "" {
|
||||
apiServer = "localhost:8500"
|
||||
}
|
||||
if !strings.Contains(apiServer, "://") {
|
||||
scheme := sdc.Scheme
|
||||
if scheme == "" {
|
||||
scheme = "http"
|
||||
}
|
||||
apiServer = scheme + "://" + apiServer
|
||||
}
|
||||
client, err := discoveryutils.NewClient(apiServer, ac)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %s", apiServer, err)
|
||||
}
|
||||
tagSeparator := ","
|
||||
if sdc.TagSeparator != nil {
|
||||
tagSeparator = *sdc.TagSeparator
|
||||
}
|
||||
dc, err := getDatacenter(client, sdc.Datacenter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg := &apiConfig{
|
||||
client: client,
|
||||
|
||||
tagSeparator: tagSeparator,
|
||||
services: sdc.Services,
|
||||
tags: sdc.Tags,
|
||||
datacenter: dc,
|
||||
allowStale: sdc.AllowStale,
|
||||
nodeMeta: sdc.NodeMeta,
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func getToken(token string) (string, error) {
|
||||
if token != "" {
|
||||
return token, nil
|
||||
}
|
||||
if tokenFile := os.Getenv("CONSUL_HTTP_TOKEN_FILE"); tokenFile != "" {
|
||||
data, err := ioutil.ReadFile(tokenFile)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cannot read consul token file %q; probably, `token` arg is missing in `consul_sd_config`? error: %s", tokenFile, err)
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
||||
token = os.Getenv("CONSUL_HTTP_TOKEN")
|
||||
// Allow empty token - it shouls work if authorization is disabled in Consul
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func getDatacenter(client *discoveryutils.Client, dc string) (string, error) {
|
||||
if dc != "" {
|
||||
return dc, nil
|
||||
}
|
||||
// See https://www.consul.io/api/agent.html#read-configuration
|
||||
data, err := client.GetAPIResponse("/v1/agent/self")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cannot query consul agent info: %s", err)
|
||||
}
|
||||
a, err := parseAgent(data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return a.Config.Datacenter, nil
|
||||
}
|
||||
|
||||
func getAPIResponse(cfg *apiConfig, path string) ([]byte, error) {
|
||||
separator := "?"
|
||||
if strings.Contains(path, "?") {
|
||||
separator = "&"
|
||||
}
|
||||
path += fmt.Sprintf("%sdc=%s", separator, url.QueryEscape(cfg.datacenter))
|
||||
if cfg.allowStale {
|
||||
path += "&stale"
|
||||
}
|
||||
if len(cfg.nodeMeta) > 0 {
|
||||
for k, v := range cfg.nodeMeta {
|
||||
path += fmt.Sprintf("&node-meta=%s", url.QueryEscape(k+":"+v))
|
||||
}
|
||||
}
|
||||
return cfg.client.GetAPIResponse(path)
|
||||
}
|
40
lib/promscrape/discovery/consul/consul.go
Normal file
40
lib/promscrape/discovery/consul/consul.go
Normal file
@ -0,0 +1,40 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
)
|
||||
|
||||
// SDConfig represents service discovery config for Consul.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config
|
||||
type SDConfig struct {
|
||||
Server string `yaml:"server"`
|
||||
Token string `yaml:"token"`
|
||||
Datacenter string `yaml:"datacenter"`
|
||||
Scheme string `yaml:"scheme"`
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config"`
|
||||
Services []string `yaml:"services"`
|
||||
Tags []string `yaml:"tags"`
|
||||
NodeMeta map[string]string `yaml:"node_meta"`
|
||||
TagSeparator *string `yaml:"tag_separator"`
|
||||
AllowStale bool `yaml:"allow_stale"`
|
||||
// RefreshInterval time.Duration `yaml:"refresh_interval"`
|
||||
// refresh_interval is obtained from `-promscrape.consulSDCheckInterval` command-line option.
|
||||
}
|
||||
|
||||
// GetLabels returns Consul labels according to sdc.
|
||||
func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) {
|
||||
cfg, err := getAPIConfig(sdc, baseDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get API config: %s", err)
|
||||
}
|
||||
ms, err := getServiceNodesLabels(cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when fetching service nodes data from Consul: %s", err)
|
||||
}
|
||||
return ms, nil
|
||||
}
|
245
lib/promscrape/discovery/consul/service_node.go
Normal file
245
lib/promscrape/discovery/consul/service_node.go
Normal file
@ -0,0 +1,245 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
// getServiceNodesLabels returns labels for Consul service nodes obtained from the given cfg
|
||||
func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) {
|
||||
sns, err := getAllServiceNodes(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ms []map[string]string
|
||||
for _, sn := range sns {
|
||||
ms = sn.appendTargetLabels(ms, cfg.tagSeparator)
|
||||
}
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
func getAllServiceNodes(cfg *apiConfig) ([]ServiceNode, error) {
|
||||
// Obtain a list of services
|
||||
// See https://www.consul.io/api/catalog.html#list-services
|
||||
data, err := getAPIResponse(cfg, "/v1/catalog/services")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain services: %s", err)
|
||||
}
|
||||
var m map[string][]string
|
||||
if err := json.Unmarshal(data, &m); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse services response %q: %s", data, err)
|
||||
}
|
||||
serviceNames := make(map[string]bool)
|
||||
for serviceName, tags := range m {
|
||||
if !shouldCollectServiceByName(cfg.services, serviceName) {
|
||||
continue
|
||||
}
|
||||
if !shouldCollectServiceByTags(cfg.tags, tags) {
|
||||
continue
|
||||
}
|
||||
serviceNames[serviceName] = true
|
||||
}
|
||||
|
||||
// Query all the serviceNames in parallel
|
||||
type response struct {
|
||||
sns []ServiceNode
|
||||
err error
|
||||
}
|
||||
responsesCh := make(chan response, len(serviceNames))
|
||||
for serviceName := range serviceNames {
|
||||
go func(serviceName string) {
|
||||
sns, err := getServiceNodes(cfg, serviceName)
|
||||
responsesCh <- response{
|
||||
sns: sns,
|
||||
err: err,
|
||||
}
|
||||
}(serviceName)
|
||||
}
|
||||
var sns []ServiceNode
|
||||
err = nil
|
||||
for i := 0; i < len(serviceNames); i++ {
|
||||
resp := <-responsesCh
|
||||
if resp.err != nil && err == nil {
|
||||
err = resp.err
|
||||
}
|
||||
sns = append(sns, resp.sns...)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sns, nil
|
||||
}
|
||||
|
||||
func shouldCollectServiceByName(filterServices []string, service string) bool {
|
||||
if len(filterServices) == 0 {
|
||||
return true
|
||||
}
|
||||
for _, filterService := range filterServices {
|
||||
if filterService == service {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func shouldCollectServiceByTags(filterTags, tags []string) bool {
|
||||
if len(filterTags) == 0 {
|
||||
return true
|
||||
}
|
||||
for _, filterTag := range filterTags {
|
||||
hasTag := false
|
||||
for _, tag := range tags {
|
||||
if tag == filterTag {
|
||||
hasTag = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasTag {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func getServiceNodes(cfg *apiConfig, serviceName string) ([]ServiceNode, error) {
|
||||
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
||||
path := fmt.Sprintf("/v1/health/service/%s", serviceName)
|
||||
var tagsArgs []string
|
||||
for _, tag := range cfg.tags {
|
||||
tagsArgs = append(tagsArgs, fmt.Sprintf("tag=%s", url.QueryEscape(tag)))
|
||||
}
|
||||
if len(tagsArgs) > 0 {
|
||||
path += "?" + strings.Join(tagsArgs, "&")
|
||||
}
|
||||
data, err := getAPIResponse(cfg, path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain instances for serviceName=%q: %s", serviceName, err)
|
||||
}
|
||||
return parseServiceNodes(data)
|
||||
}
|
||||
|
||||
// ServiceNode is Consul service node.
|
||||
//
|
||||
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
||||
type ServiceNode struct {
|
||||
Service Service
|
||||
Node Node
|
||||
Checks []Check
|
||||
}
|
||||
|
||||
// Service is Consul service.
|
||||
//
|
||||
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
||||
type Service struct {
|
||||
ID string
|
||||
Service string
|
||||
Address string
|
||||
Port int
|
||||
Tags []string
|
||||
Meta map[string]string
|
||||
}
|
||||
|
||||
// Node is Consul node.
|
||||
//
|
||||
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
||||
type Node struct {
|
||||
Address string
|
||||
Datacenter string
|
||||
Node string
|
||||
Meta map[string]string
|
||||
TaggedAddresses map[string]string
|
||||
}
|
||||
|
||||
// Check is Consul check.
|
||||
//
|
||||
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
||||
type Check struct {
|
||||
CheckID string
|
||||
Status string
|
||||
}
|
||||
|
||||
func parseServiceNodes(data []byte) ([]ServiceNode, error) {
|
||||
var sns []ServiceNode
|
||||
if err := json.Unmarshal(data, &sns); err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal ServiceNodes from %q: %s", data, err)
|
||||
}
|
||||
return sns, nil
|
||||
}
|
||||
|
||||
func (sn *ServiceNode) appendTargetLabels(ms []map[string]string, tagSeparator string) []map[string]string {
|
||||
var addr string
|
||||
if sn.Service.Address != "" {
|
||||
addr = discoveryutils.JoinHostPort(sn.Service.Address, sn.Service.Port)
|
||||
} else {
|
||||
addr = discoveryutils.JoinHostPort(sn.Node.Address, sn.Service.Port)
|
||||
}
|
||||
m := map[string]string{
|
||||
"__address__": addr,
|
||||
"__meta_consul_address": sn.Node.Address,
|
||||
"__meta_consul_dc": sn.Node.Datacenter,
|
||||
"__meta_consul_health": aggregatedStatus(sn.Checks),
|
||||
"__meta_consul_node": sn.Node.Node,
|
||||
"__meta_consul_service": sn.Service.Service,
|
||||
"__meta_consul_service_address": sn.Service.Address,
|
||||
"__meta_consul_service_id": sn.Service.ID,
|
||||
"__meta_consul_service_port": strconv.Itoa(sn.Service.Port),
|
||||
}
|
||||
// We surround the separated list with the separator as well. This way regular expressions
|
||||
// in relabeling rules don't have to consider tag positions.
|
||||
m["__meta_consul_tags"] = tagSeparator + strings.Join(sn.Service.Tags, tagSeparator) + tagSeparator
|
||||
|
||||
for k, v := range sn.Node.Meta {
|
||||
key := discoveryutils.SanitizeLabelName(k)
|
||||
m["__meta_consul_metadata_"+key] = v
|
||||
}
|
||||
for k, v := range sn.Service.Meta {
|
||||
key := discoveryutils.SanitizeLabelName(k)
|
||||
m["__meta_consul_service_metadata_"+key] = v
|
||||
}
|
||||
for k, v := range sn.Node.TaggedAddresses {
|
||||
key := discoveryutils.SanitizeLabelName(k)
|
||||
m["__meta_consul_tagged_address_"+key] = v
|
||||
}
|
||||
ms = append(ms, m)
|
||||
return ms
|
||||
}
|
||||
|
||||
func aggregatedStatus(checks []Check) string {
|
||||
// The code has been copy-pasted from HealthChecks.AggregatedStatus in Consul
|
||||
var passing, warning, critical, maintenance bool
|
||||
for _, check := range checks {
|
||||
id := check.CheckID
|
||||
if id == "_node_maintenance" || strings.HasPrefix(id, "_service_maintenance:") {
|
||||
maintenance = true
|
||||
continue
|
||||
}
|
||||
|
||||
switch check.Status {
|
||||
case "passing":
|
||||
passing = true
|
||||
case "warning":
|
||||
warning = true
|
||||
case "critical":
|
||||
critical = true
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
switch {
|
||||
case maintenance:
|
||||
return "maintenance"
|
||||
case critical:
|
||||
return "critical"
|
||||
case warning:
|
||||
return "warning"
|
||||
case passing:
|
||||
return "passing"
|
||||
default:
|
||||
return "passing"
|
||||
}
|
||||
}
|
135
lib/promscrape/discovery/consul/service_node_test.go
Normal file
135
lib/promscrape/discovery/consul/service_node_test.go
Normal file
@ -0,0 +1,135 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
func TestParseServiceNodesFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
sns, err := parseServiceNodes([]byte(s))
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
if sns != nil {
|
||||
t.Fatalf("unexpected non-nil ServiceNodes: %v", sns)
|
||||
}
|
||||
}
|
||||
f(``)
|
||||
f(`[1,23]`)
|
||||
f(`{"items":[{"metadata":1}]}`)
|
||||
}
|
||||
|
||||
func TestParseServiceNodesSuccess(t *testing.T) {
|
||||
data := `
|
||||
[
|
||||
{
|
||||
"Node": {
|
||||
"ID": "40e4a748-2192-161a-0510-9bf59fe950b5",
|
||||
"Node": "foobar",
|
||||
"Address": "10.1.10.12",
|
||||
"Datacenter": "dc1",
|
||||
"TaggedAddresses": {
|
||||
"lan": "10.1.10.12",
|
||||
"wan": "10.1.10.12"
|
||||
},
|
||||
"Meta": {
|
||||
"instance_type": "t2.medium"
|
||||
}
|
||||
},
|
||||
"Service": {
|
||||
"ID": "redis",
|
||||
"Service": "redis",
|
||||
"Tags": ["primary"],
|
||||
"Address": "10.1.10.12",
|
||||
"TaggedAddresses": {
|
||||
"lan": {
|
||||
"address": "10.1.10.12",
|
||||
"port": 8000
|
||||
},
|
||||
"wan": {
|
||||
"address": "198.18.1.2",
|
||||
"port": 80
|
||||
}
|
||||
},
|
||||
"Meta": {
|
||||
"redis_version": "4.0"
|
||||
},
|
||||
"Port": 8000,
|
||||
"Weights": {
|
||||
"Passing": 10,
|
||||
"Warning": 1
|
||||
},
|
||||
"Namespace": "default"
|
||||
},
|
||||
"Checks": [
|
||||
{
|
||||
"Node": "foobar",
|
||||
"CheckID": "service:redis",
|
||||
"Name": "Service 'redis' check",
|
||||
"Status": "passing",
|
||||
"Notes": "",
|
||||
"Output": "",
|
||||
"ServiceID": "redis",
|
||||
"ServiceName": "redis",
|
||||
"ServiceTags": ["primary"],
|
||||
"Namespace": "default"
|
||||
},
|
||||
{
|
||||
"Node": "foobar",
|
||||
"CheckID": "serfHealth",
|
||||
"Name": "Serf Health Status",
|
||||
"Status": "passing",
|
||||
"Notes": "",
|
||||
"Output": "",
|
||||
"ServiceID": "",
|
||||
"ServiceName": "",
|
||||
"ServiceTags": [],
|
||||
"Namespace": "default"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
`
|
||||
sns, err := parseServiceNodes([]byte(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if len(sns) != 1 {
|
||||
t.Fatalf("unexpected length of ServiceNodes; got %d; want %d", len(sns), 1)
|
||||
}
|
||||
sn := sns[0]
|
||||
|
||||
// Check sn.appendTargetLabels()
|
||||
tagSeparator := ","
|
||||
labelss := sn.appendTargetLabels(nil, tagSeparator)
|
||||
var sortedLabelss [][]prompbmarshal.Label
|
||||
for _, labels := range labelss {
|
||||
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
|
||||
}
|
||||
expectedLabelss := [][]prompbmarshal.Label{
|
||||
discoveryutils.GetSortedLabels(map[string]string{
|
||||
"__address__": "10.1.10.12:8000",
|
||||
"__meta_consul_address": "10.1.10.12",
|
||||
"__meta_consul_dc": "dc1",
|
||||
"__meta_consul_health": "passing",
|
||||
"__meta_consul_metadata_instance_type": "t2.medium",
|
||||
"__meta_consul_node": "foobar",
|
||||
"__meta_consul_service": "redis",
|
||||
"__meta_consul_service_address": "10.1.10.12",
|
||||
"__meta_consul_service_id": "redis",
|
||||
"__meta_consul_service_metadata_redis_version": "4.0",
|
||||
"__meta_consul_service_port": "8000",
|
||||
"__meta_consul_tagged_address_lan": "10.1.10.12",
|
||||
"__meta_consul_tagged_address_wan": "10.1.10.12",
|
||||
"__meta_consul_tags": ",primary,",
|
||||
}),
|
||||
}
|
||||
if !reflect.DeepEqual(sortedLabelss, expectedLabelss) {
|
||||
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss)
|
||||
}
|
||||
}
|
@ -1,25 +1,17 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/fasthttp"
|
||||
)
|
||||
|
||||
// apiConfig contains config for API server
|
||||
type apiConfig struct {
|
||||
client *fasthttp.HostClient
|
||||
server string
|
||||
hostPort string
|
||||
authConfig *promauth.Config
|
||||
client *discoveryutils.Client
|
||||
namespaces []string
|
||||
selectors []Selector
|
||||
}
|
||||
@ -39,67 +31,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse auth config: %s", err)
|
||||
}
|
||||
hcv, err := newHostClient(sdc.APIServer, ac)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %s", sdc.APIServer, err)
|
||||
}
|
||||
cfg := &apiConfig{
|
||||
client: hcv.hc,
|
||||
server: hcv.apiServer,
|
||||
hostPort: hcv.hostPort,
|
||||
authConfig: hcv.ac,
|
||||
namespaces: sdc.Namespaces.Names,
|
||||
selectors: sdc.Selectors,
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) {
|
||||
query := joinSelectors(role, cfg.namespaces, cfg.selectors)
|
||||
if len(query) > 0 {
|
||||
path += "?" + query
|
||||
}
|
||||
requestURL := cfg.server + path
|
||||
var u fasthttp.URI
|
||||
u.Update(requestURL)
|
||||
var req fasthttp.Request
|
||||
req.SetRequestURIBytes(u.RequestURI())
|
||||
req.SetHost(cfg.hostPort)
|
||||
req.Header.Set("Accept-Encoding", "gzip")
|
||||
if cfg.authConfig != nil && cfg.authConfig.Authorization != "" {
|
||||
req.Header.Set("Authorization", cfg.authConfig.Authorization)
|
||||
}
|
||||
var resp fasthttp.Response
|
||||
// There is no need in calling DoTimeout, since the timeout is already set in hc.ReadTimeout above.
|
||||
if err := cfg.client.Do(&req, &resp); err != nil {
|
||||
return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err)
|
||||
}
|
||||
var data []byte
|
||||
if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" {
|
||||
dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err)
|
||||
}
|
||||
data = dst
|
||||
} else {
|
||||
data = append(data[:0], resp.Body()...)
|
||||
}
|
||||
statusCode := resp.StatusCode()
|
||||
if statusCode != fasthttp.StatusOK {
|
||||
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
|
||||
requestURL, statusCode, fasthttp.StatusOK, data)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
type hcValue struct {
|
||||
hc *fasthttp.HostClient
|
||||
ac *promauth.Config
|
||||
apiServer string
|
||||
hostPort string
|
||||
}
|
||||
|
||||
func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) {
|
||||
apiServer := sdc.APIServer
|
||||
if len(apiServer) == 0 {
|
||||
// Assume we run at k8s pod.
|
||||
// Discover apiServer and auth config according to k8s docs.
|
||||
@ -124,36 +56,22 @@ func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) {
|
||||
}
|
||||
ac = acNew
|
||||
}
|
||||
|
||||
var u fasthttp.URI
|
||||
u.Update(apiServer)
|
||||
hostPort := string(u.Host())
|
||||
isTLS := string(u.Scheme()) == "https"
|
||||
var tlsCfg *tls.Config
|
||||
if isTLS && ac != nil {
|
||||
tlsCfg = ac.NewTLSConfig()
|
||||
client, err := discoveryutils.NewClient(apiServer, ac)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %s", apiServer, err)
|
||||
}
|
||||
if !strings.Contains(hostPort, ":") {
|
||||
port := "80"
|
||||
if isTLS {
|
||||
port = "443"
|
||||
}
|
||||
hostPort = net.JoinHostPort(hostPort, port)
|
||||
cfg := &apiConfig{
|
||||
client: client,
|
||||
namespaces: sdc.Namespaces.Names,
|
||||
selectors: sdc.Selectors,
|
||||
}
|
||||
hc := &fasthttp.HostClient{
|
||||
Addr: hostPort,
|
||||
Name: "vm_promscrape/discovery",
|
||||
DialDualStack: netutil.TCP6Enabled(),
|
||||
IsTLS: isTLS,
|
||||
TLSConfig: tlsCfg,
|
||||
ReadTimeout: time.Minute,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
MaxResponseBodySize: 300 * 1024 * 1024,
|
||||
}
|
||||
return &hcValue{
|
||||
hc: hc,
|
||||
ac: ac,
|
||||
apiServer: apiServer,
|
||||
hostPort: hostPort,
|
||||
}, nil
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) {
|
||||
query := joinSelectors(role, cfg.namespaces, cfg.selectors)
|
||||
if len(query) > 0 {
|
||||
path += "?" + query
|
||||
}
|
||||
return cfg.client.GetAPIResponse(path)
|
||||
}
|
||||
|
@ -1,8 +1,16 @@
|
||||
package discoveryutils
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/fasthttp"
|
||||
)
|
||||
|
||||
var defaultClient = &http.Client{
|
||||
@ -13,3 +21,81 @@ var defaultClient = &http.Client{
|
||||
func GetHTTPClient() *http.Client {
|
||||
return defaultClient
|
||||
}
|
||||
|
||||
// Client is http client, which talks to the given apiServer.
|
||||
type Client struct {
|
||||
hc *fasthttp.HostClient
|
||||
ac *promauth.Config
|
||||
apiServer string
|
||||
hostPort string
|
||||
}
|
||||
|
||||
// NewClient returns new Client for the given apiServer and the given ac.
|
||||
func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
|
||||
var u fasthttp.URI
|
||||
u.Update(apiServer)
|
||||
hostPort := string(u.Host())
|
||||
isTLS := string(u.Scheme()) == "https"
|
||||
var tlsCfg *tls.Config
|
||||
if isTLS && ac != nil {
|
||||
tlsCfg = ac.NewTLSConfig()
|
||||
}
|
||||
if !strings.Contains(hostPort, ":") {
|
||||
port := "80"
|
||||
if isTLS {
|
||||
port = "443"
|
||||
}
|
||||
hostPort = net.JoinHostPort(hostPort, port)
|
||||
}
|
||||
hc := &fasthttp.HostClient{
|
||||
Addr: hostPort,
|
||||
Name: "vm_promscrape/discovery",
|
||||
DialDualStack: netutil.TCP6Enabled(),
|
||||
IsTLS: isTLS,
|
||||
TLSConfig: tlsCfg,
|
||||
ReadTimeout: time.Minute,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
MaxResponseBodySize: 300 * 1024 * 1024,
|
||||
}
|
||||
return &Client{
|
||||
hc: hc,
|
||||
ac: ac,
|
||||
apiServer: apiServer,
|
||||
hostPort: hostPort,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetAPIResponse returns response for the given absolute path.
|
||||
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
||||
requestURL := c.apiServer + path
|
||||
var u fasthttp.URI
|
||||
u.Update(requestURL)
|
||||
var req fasthttp.Request
|
||||
req.SetRequestURIBytes(u.RequestURI())
|
||||
req.SetHost(c.hostPort)
|
||||
req.Header.Set("Accept-Encoding", "gzip")
|
||||
if c.ac != nil && c.ac.Authorization != "" {
|
||||
req.Header.Set("Authorization", c.ac.Authorization)
|
||||
}
|
||||
var resp fasthttp.Response
|
||||
// There is no need in calling DoTimeout, since the timeout is already set in c.hc.ReadTimeout above.
|
||||
if err := c.hc.Do(&req, &resp); err != nil {
|
||||
return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err)
|
||||
}
|
||||
var data []byte
|
||||
if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" {
|
||||
dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err)
|
||||
}
|
||||
data = dst
|
||||
} else {
|
||||
data = append(data[:0], resp.Body()...)
|
||||
}
|
||||
statusCode := resp.StatusCode()
|
||||
if statusCode != fasthttp.StatusOK {
|
||||
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
|
||||
requestURL, statusCode, fasthttp.StatusOK, data)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
@ -23,6 +23,9 @@ var (
|
||||
kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+
|
||||
"This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+
|
||||
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details")
|
||||
consulSDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+
|
||||
"This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+
|
||||
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details")
|
||||
ec2SDCheckInterval = flag.Duration("promscrape.ec2SDCheckInterval", time.Minute, "Interval for checking for changes in ec2. "+
|
||||
"This works only if `ec2_sd_configs` is configured in '-promscrape.config' file. "+
|
||||
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config for details")
|
||||
@ -72,6 +75,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
|
||||
scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() })
|
||||
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
|
||||
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork() })
|
||||
scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork() })
|
||||
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork() })
|
||||
scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork() })
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user