added new consulagent service discovery (#3953) (#4217)

This commit is contained in:
Alexander Marshalov 2023-05-04 11:36:21 +02:00 committed by Aliaksandr Valialkin
parent f7dd084890
commit 26fc4afff8
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
19 changed files with 870 additions and 69 deletions

View File

@ -1351,6 +1351,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Wait time used by Consul service discovery. Default value is used if not set
-promscrape.consulSDCheckInterval duration
Interval for checking for changes in Consul. This works only if consul_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs for details (default 30s)
-promscrape.consulagentSDCheckInterval duration
Interval for checking for changes in Consul Agent. This works only if consulagent_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#consulagent_sd_configs for details (default 30s)
-promscrape.digitaloceanSDCheckInterval duration
Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#digitalocean_sd_configs for details (default 1m0s)
-promscrape.disableCompression

View File

@ -30,6 +30,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager.html): add `created_at` field to the output of `/api/v1/backups` API and `vmbackupmanager backup list` command. See this [doc](https://docs.victoriametrics.com/vmbackupmanager.html#api-methods) for data format details.
* FEATURE: deprecate `-bigMergeConcurrency` command-line flag, since improper configuration for this flag frequently led to uncontrolled growth of unmerged parts, which, in turn, could lead to queries slowdown and increased CPU usage. The concurrency for [background merges](https://docs.victoriametrics.com/#storage) can be controlled via `-smallMergeConcurrency` command-line flag, though it isn't recommended to do in general case.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support new filtering options `filter` and `node_filter` for [consul service discovery](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4183) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support new [consulagent service discovery](https://docs.victoriametrics.com/sd_configs.html#consulagent_sd_configs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3953) for details.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add support for the different time formats for `--vm-native-filter-time-start` and `--vm-native-filter-time-end` flags if the native binary protocol is used for migration. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4091).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): integrate WITH template playground. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3811).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add a comparison of data from the previous day with data from the current day to the `Cardinality Explorer`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3967).

View File

@ -18,6 +18,7 @@ aliases:
* `azure_sd_configs` is for scraping the targets registered in [Azure Cloud](https://azure.microsoft.com/en-us/). See [these docs](#azure_sd_configs).
* `consul_sd_configs` is for discovering and scraping targets registered in [Consul](https://www.consul.io/). See [these docs](#consul_sd_configs).
* `consulagent_sd_configs` is for discovering and scraping targets registered in [Consul Agent](https://developer.hashicorp.com/consul/api-docs/agent/service). See [these docs](#consulagent_sd_configs).
* `digitalocean_sd_configs` is for discovering and scraping targerts registered in [DigitalOcean](https://www.digitalocean.com/). See [these docs](#digitalocean_sd_configs).
* `dns_sd_configs` is for discovering and scraping targets from [DNS](https://it.wikipedia.org/wiki/Domain_Name_System) records (SRV, A and AAAA). See [these docs](#dns_sd_configs).
* `docker_sd_configs` is for discovering and scraping [Docker](https://www.docker.com/) targets. See [these docs](#docker_sd_configs).
@ -203,6 +204,89 @@ The following meta labels are available on discovered targets during [relabeling
The list of discovered Consul targets is refreshed at the interval, which can be configured via `-promscrape.consulSDCheckInterval` command-line flag.
If you have performance issues with consul_sd_configs on a large cluster, then consider using [consulagent_sd_configs](#consulagent_sd_configs) instead.
## consulagent_sd_configs
Consul Agent SD configuration allows retrieving scrape targets from [Consul's Agent API](https://developer.hashicorp.com/consul/api-docs/agent/service).
When using the Agent API, each running vmagent will only get services registered in the local Consul Agent running on the same node when discovering new targets.
It's suitable for huge clusters for which using the [Catalog API](https://developer.hashicorp.com/consul/api-docs/catalog#list-services) would be too slow or resource intensive,
in other cases we recommend to use [consul_sd_configs](#consul_sd_configs).
Configuration example:
```yaml
scrape_configs:
- job_name: consul
consulagent_sd_configs:
# server is an optional Consul agent to connect to. By default localhost:8500 is used
- server: "localhost:8500"
# token is an optional Consul API token.
# If the token isn't specified, then it is read from a file pointed by CONSUL_HTTP_TOKEN_FILE
# environment var or from the CONSUL_HTTP_TOKEN environment var.
# token: "..."
# datacenter is an optional Consul API datacenter.
# If the datacenter isn't specified, then it is read from Consul server.
# See https://www.consul.io/api-docs/agent#read-configuration
# datacenter: "..."
# namespace is an optional Consul namespace.
# See https://developer.hashicorp.com/consul/docs/enterprise/namespaces
# If the namespace isn't specified, then it is read from CONSUL_NAMESPACE environment var.
# namespace: "..."
# scheme is an optional scheme (http or https) to use for connecting to Consul server.
# By default http scheme is used.
# scheme: "..."
# services is an optional list of services for which targets are retrieved.
# If omitted, all services are scraped.
# See https://www.consul.io/api-docs/catalog#list-nodes-for-service .
# services: ["...", "..."]
# tag_separator is an optional string by which Consul tags are joined into the __meta_consul_tags label.
# By default "," is used as a tag separator.
# Individual tags are also available via __meta_consul_tag_<tagname> labels - see below.
# tag_separator: "..."
# filter is optional filter for service nodes discovery request.
# Replaces tags and node_metadata options.
# consul supports it since 1.14 version
# list of supported filters https://developer.hashicorp.com/consul/api-docs/catalog#filtering-1
# syntax examples https://developer.hashicorp.com/consul/api-docs/features/filtering
# filter: "..."
# Additional HTTP API client options can be specified here.
# See https://docs.victoriametrics.com/sd_configs.html#http-api-client-options
```
Each discovered target has an [`__address__`](https://docs.victoriametrics.com/relabeling.html#how-to-modify-scrape-urls-in-targets) label set
to `<service_or_node_addr>:<service_port>`, where `<service_or_node_addr>` is the service address. If the service address is empty,
then the node address is used instead. The `<service_port>` is the service port.
The following meta labels are available on discovered targets during [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling):
* `__meta_consulagent_address`: the address of the target
* `__meta_consulagent_dc`: the datacenter name for the target
* `__meta_consulagent_health`: the health status of the service
* `__meta_consulagent_metadata_<key>`: each node metadata key value of the target
* `__meta_consulagent_namespace`: namespace of the service - see [namespace docs](https://developer.hashicorp.com/consul/docs/enterprise/namespaces)
* `__meta_consulagent_node`: the node name defined for the target
* `__meta_consulagent_service_address`: the service address of the target
* `__meta_consulagent_service_id`: the service ID of the target
* `__meta_consulagent_service_metadata_<key>`: each service metadata key value of the target
* `__meta_consulagent_service_port`: the service port of the target
* `__meta_consulagent_service`: the name of the service the target belongs to
* `__meta_consulagent_tagged_address_<key>`: each node tagged address key value of the target
* `__meta_consulagent_tag_<tagname>`: the value for the given <tagname> tag of the target
* `__meta_consulagent_tagpresent_<tagname>`: "true" for every <tagname> tag of the target
* `__meta_consulagent_tags`: the list of tags of the target joined by the `tag_separator`
The list of discovered Consul Agent targets is refreshed at the interval, which can be configured via `-promscrape.consulagentSDCheckInterval` command-line flag.
## digitalocean_sd_configs
DigitalOcean SD configuration allows retrieving scrape targets from [DigitalOcean's Droplets API](https://docs.digitalocean.com/reference/api/api-reference/#tag/Droplets).

View File

@ -1355,6 +1355,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Wait time used by Consul service discovery. Default value is used if not set
-promscrape.consulSDCheckInterval duration
Interval for checking for changes in Consul. This works only if consul_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs for details (default 30s)
-promscrape.consulagentSDCheckInterval duration
Interval for checking for changes in Consul Agent. This works only if consulagent_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#consulagent_sd_configs for details (default 30s)
-promscrape.digitaloceanSDCheckInterval duration
Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#digitalocean_sd_configs for details (default 1m0s)
-promscrape.disableCompression

View File

@ -21,6 +21,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consulagent"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
@ -255,6 +256,7 @@ type ScrapeConfig struct {
AzureSDConfigs []azure.SDConfig `yaml:"azure_sd_configs,omitempty"`
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs,omitempty"`
ConsulAgentSDConfigs []consulagent.SDConfig `yaml:"consulagent_sd_configs,omitempty"`
DigitaloceanSDConfigs []digitalocean.SDConfig `yaml:"digitalocean_sd_configs,omitempty"`
DNSSDConfigs []dns.SDConfig `yaml:"dns_sd_configs,omitempty"`
DockerSDConfigs []docker.SDConfig `yaml:"docker_sd_configs,omitempty"`
@ -307,6 +309,9 @@ func (sc *ScrapeConfig) mustStop() {
for i := range sc.ConsulSDConfigs {
sc.ConsulSDConfigs[i].MustStop()
}
for i := range sc.ConsulAgentSDConfigs {
sc.ConsulAgentSDConfigs[i].MustStop()
}
for i := range sc.DigitaloceanSDConfigs {
sc.DigitaloceanSDConfigs[i].MustStop()
}
@ -547,6 +552,33 @@ func (cfg *Config) getConsulSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
return dst
}
// getConsulAgentSDScrapeWork returns `consulagent_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getConsulAgentSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.ConsulAgentSDConfigs {
sdc := &sc.ConsulAgentSDConfigs[j]
var okLocal bool
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "consulagent_sd_config")
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering consulagent targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getDigitalOceanDScrapeWork returns `digitalocean_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getDigitalOceanDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)

View File

@ -10,6 +10,8 @@ import (
// See https://www.consul.io/api/agent.html#read-configuration
type Agent struct {
Config AgentConfig
Member AgentMember
Meta map[string]string
}
// AgentConfig is Consul agent config.
@ -17,9 +19,18 @@ type Agent struct {
// See https://www.consul.io/api/agent.html#read-configuration
type AgentConfig struct {
Datacenter string
NodeName string
}
func parseAgent(data []byte) (*Agent, error) {
// AgentMember is Consul agent member info.
//
// See https://www.consul.io/api/agent.html#read-configuration
type AgentMember struct {
Addr string
}
// ParseAgent parses Consul agent information from bytes.
func ParseAgent(data []byte) (*Agent, error) {
var a Agent
if err := json.Unmarshal(data, &a); err != nil {
return nil, fmt.Errorf("cannot unmarshal agent info from %q: %w", data, err)

View File

@ -8,7 +8,7 @@ import (
func TestParseAgentFailure(t *testing.T) {
f := func(s string) {
t.Helper()
a, err := parseAgent([]byte(s))
a, err := ParseAgent([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -66,13 +66,21 @@ func TestParseAgentSuccess(t *testing.T) {
}
}
`
a, err := parseAgent([]byte(data))
a, err := ParseAgent([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
aExpected := &Agent{
Config: AgentConfig{
Datacenter: "dc1",
NodeName: "foobar",
},
Member: AgentMember{
Addr: "10.1.10.12",
},
Meta: map[string]string{
"instance_type": "i2.xlarge",
"os_version": "ubuntu_16.04",
},
}
if !reflect.DeepEqual(a, aExpected) {

View File

@ -39,7 +39,7 @@ func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
hcc := sdc.HTTPClientConfig
token, err := getToken(sdc.Token)
token, err := GetToken(sdc.Token)
if err != nil {
return nil, err
}
@ -107,7 +107,8 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
return cfg, nil
}
func getToken(token *promauth.Secret) (string, error) {
// GetToken returns Consul token.
func GetToken(token *promauth.Secret) (string, error) {
if token != nil {
return token.String(), nil
}
@ -123,20 +124,29 @@ func getToken(token *promauth.Secret) (string, error) {
return t, nil
}
// GetAgentInfo returns information about current consul agent.
func GetAgentInfo(client *discoveryutils.Client) (*Agent, error) {
// See https://www.consul.io/api/agent.html#read-configuration
data, err := client.GetAPIResponse("/v1/agent/self")
if err != nil {
return nil, fmt.Errorf("cannot query consul agent info: %w", err)
}
a, err := ParseAgent(data)
if err != nil {
return nil, err
}
return a, nil
}
func getDatacenter(client *discoveryutils.Client, dc string) (string, error) {
if dc != "" {
return dc, nil
}
// See https://www.consul.io/api/agent.html#read-configuration
data, err := client.GetAPIResponse("/v1/agent/self")
agent, err := GetAgentInfo(client)
if err != nil {
return "", fmt.Errorf("cannot query consul agent info: %w", err)
}
a, err := parseAgent(data)
if err != nil {
return "", err
}
return a.Config.Datacenter, nil
return agent.Config.Datacenter, nil
}
// maxWaitTime is duration for consul blocking request.

View File

@ -35,14 +35,24 @@ type ServiceNode struct {
//
// See https://www.consul.io/api/health.html#list-nodes-for-service
type Service struct {
ID string
Service string
Address string
Namespace string
Partition string
Port int
Tags []string
Meta map[string]string
ID string
Service string
Address string
Namespace string
Partition string
Port int
Tags []string
Meta map[string]string
TaggedAddresses map[string]ServiceTaggedAddress
Datacenter string
}
// ServiceTaggedAddress is Consul service.
//
// See https://www.consul.io/api/health.html#list-nodes-for-service
type ServiceTaggedAddress struct {
Address string
Port int
}
// Node is Consul node.
@ -64,7 +74,8 @@ type Check struct {
Status string
}
func parseServiceNodes(data []byte) ([]ServiceNode, error) {
// ParseServiceNodes return parsed slice of ServiceNode by data.
func ParseServiceNodes(data []byte) ([]ServiceNode, error) {
var sns []ServiceNode
if err := json.Unmarshal(data, &sns); err != nil {
return nil, fmt.Errorf("cannot unmarshal ServiceNodes from %q: %w", data, err)
@ -83,7 +94,7 @@ func (sn *ServiceNode) appendTargetLabels(ms []*promutils.Labels, serviceName, t
m.Add("__address__", addr)
m.Add("__meta_consul_address", sn.Node.Address)
m.Add("__meta_consul_dc", sn.Node.Datacenter)
m.Add("__meta_consul_health", aggregatedStatus(sn.Checks))
m.Add("__meta_consul_health", AggregatedStatus(sn.Checks))
m.Add("__meta_consul_namespace", sn.Service.Namespace)
m.Add("__meta_consul_partition", sn.Service.Partition)
m.Add("__meta_consul_node", sn.Node.Node)
@ -91,27 +102,8 @@ func (sn *ServiceNode) appendTargetLabels(ms []*promutils.Labels, serviceName, t
m.Add("__meta_consul_service_address", sn.Service.Address)
m.Add("__meta_consul_service_id", sn.Service.ID)
m.Add("__meta_consul_service_port", strconv.Itoa(sn.Service.Port))
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
m.Add("__meta_consul_tags", tagSeparator+strings.Join(sn.Service.Tags, tagSeparator)+tagSeparator)
// Expose individual tags via __meta_consul_tag_* labels, so users could move all the tags
// into the discovered scrape target with the following relabeling rule in the way similar to kubernetes_sd_configs:
//
// - action: labelmap
// regex: __meta_consul_tag_(.+)
//
// This solves https://stackoverflow.com/questions/44339461/relabeling-in-prometheus
for _, tag := range sn.Service.Tags {
k := tag
v := ""
if n := strings.IndexByte(tag, '='); n >= 0 {
k = tag[:n]
v = tag[n+1:]
}
m.Add(discoveryutils.SanitizeLabelName("__meta_consul_tag_"+k), v)
m.Add(discoveryutils.SanitizeLabelName("__meta_consul_tagpresent_"+k), "true")
}
discoveryutils.AddTagsToLabels(m, sn.Service.Tags, "__meta_consul_", tagSeparator)
for k, v := range sn.Node.Meta {
m.Add(discoveryutils.SanitizeLabelName("__meta_consul_metadata_"+k), v)
@ -126,7 +118,8 @@ func (sn *ServiceNode) appendTargetLabels(ms []*promutils.Labels, serviceName, t
return ms
}
func aggregatedStatus(checks []Check) string {
// AggregatedStatus returns aggregated status of service node checks.
func AggregatedStatus(checks []Check) string {
// The code has been copy-pasted from HealthChecks.AggregatedStatus in Consul
var passing, warning, critical, maintenance bool
for _, check := range checks {

View File

@ -10,7 +10,7 @@ import (
func TestParseServiceNodesFailure(t *testing.T) {
f := func(s string) {
t.Helper()
sns, err := parseServiceNodes([]byte(s))
sns, err := ParseServiceNodes([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -95,7 +95,7 @@ func TestParseServiceNodesSuccess(t *testing.T) {
}
]
`
sns, err := parseServiceNodes([]byte(data))
sns, err := ParseServiceNodes([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View File

@ -235,7 +235,7 @@ func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64,
}
serviceNames := make([]string, 0, len(m))
for serviceName, tags := range m {
if !shouldCollectServiceByName(cw.watchServices, serviceName) {
if !ShouldCollectServiceByName(cw.watchServices, serviceName) {
continue
}
if !shouldCollectServiceByTags(cw.watchTags, tags) {
@ -265,7 +265,7 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG
// Nothing changed.
return
}
sns, err := parseServiceNodes(data)
sns, err := ParseServiceNodes(data)
if err != nil {
logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
return
@ -307,7 +307,8 @@ func (cw *consulWatcher) getServiceNodesSnapshot() map[string][]ServiceNode {
return sns
}
func shouldCollectServiceByName(filterServices []string, serviceName string) bool {
// ShouldCollectServiceByName returns true if the given serviceName must be collected (present in filterServices).
func ShouldCollectServiceByName(filterServices []string, serviceName string) bool {
if len(filterServices) == 0 {
return true
}

View File

@ -0,0 +1,107 @@
package consulagent
import (
"fmt"
"os"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// apiConfig contains config for API server.
type apiConfig struct {
tagSeparator string
consulWatcher *consulAgentWatcher
agent *consul.Agent
}
func (ac *apiConfig) mustStop() {
ac.consulWatcher.mustStop()
}
var configMap = discoveryutils.NewConfigMap()
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
hcc := sdc.HTTPClientConfig
token, err := consul.GetToken(sdc.Token)
if err != nil {
return nil, err
}
if token != "" {
if hcc.BearerToken != nil {
return nil, fmt.Errorf("cannot set both token and bearer_token configs")
}
hcc.BearerToken = promauth.NewSecret(token)
}
if len(sdc.Username) > 0 {
if hcc.BasicAuth != nil {
return nil, fmt.Errorf("cannot set both username and basic_auth configs")
}
hcc.BasicAuth = &promauth.BasicAuthConfig{
Username: sdc.Username,
Password: sdc.Password,
}
}
ac, err := hcc.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %w", err)
}
apiServer := sdc.Server
if apiServer == "" {
apiServer = "localhost:8500"
}
if !strings.Contains(apiServer, "://") {
scheme := sdc.Scheme
if scheme == "" {
scheme = "http"
if hcc.TLSConfig != nil {
scheme = "https"
}
}
apiServer = scheme + "://" + apiServer
}
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
}
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL, proxyAC)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
}
tagSeparator := ","
if sdc.TagSeparator != nil {
tagSeparator = *sdc.TagSeparator
}
agent, err := consul.GetAgentInfo(client)
if err != nil {
return nil, fmt.Errorf("cannot obtain consul datacenter: %w", err)
}
dc := sdc.Datacenter
if dc == "" {
dc = agent.Config.Datacenter
}
namespace := sdc.Namespace
// default namespace can be detected from env var.
if namespace == "" {
namespace = os.Getenv("CONSUL_NAMESPACE")
}
cw := newConsulAgentWatcher(client, sdc, dc, namespace)
cfg := &apiConfig{
tagSeparator: tagSeparator,
consulWatcher: cw,
agent: agent,
}
return cfg, nil
}

View File

@ -0,0 +1,57 @@
package consulagent
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
// SDConfig represents service discovery config for Consul Agent.
//
// See https://grafana.com/docs/loki/latest/clients/promtail/configuration/#consulagent_sd_config
type SDConfig struct {
Server string `yaml:"server,omitempty"`
Token *promauth.Secret `yaml:"token"`
Datacenter string `yaml:"datacenter"`
// Namespace only supported at enterprise consul.
// https://www.consul.io/docs/enterprise/namespaces
Namespace string `yaml:"namespace,omitempty"`
Scheme string `yaml:"scheme,omitempty"`
Username string `yaml:"username"`
Password *promauth.Secret `yaml:"password"`
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
ProxyURL *proxy.URL `yaml:"proxy_url,omitempty"`
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
Services []string `yaml:"services,omitempty"`
TagSeparator *string `yaml:"tag_separator,omitempty"`
// See https://developer.hashicorp.com/consul/api-docs/features/filtering
// list of supported filters https://developer.hashicorp.com/consul/api-docs/catalog#filtering-1
Filter string `yaml:"filter,omitempty"`
// RefreshInterval time.Duration `yaml:"refresh_interval"`
// refresh_interval is obtained from `-promscrape.consulagentSDCheckInterval` command-line option.
}
// GetLabels returns Consul labels according to sdc.
func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
cfg, err := getAPIConfig(sdc, baseDir)
if err != nil {
return nil, fmt.Errorf("cannot get API config: %w", err)
}
ms := getServiceNodesLabels(cfg)
return ms, nil
}
// MustStop stops further usage for sdc.
func (sdc *SDConfig) MustStop() {
v := configMap.Delete(sdc)
if v != nil {
// v can be nil if GetLabels wasn't called yet.
cfg := v.(*apiConfig)
cfg.mustStop()
}
}

View File

@ -0,0 +1,65 @@
package consulagent
import (
"fmt"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
// getServiceNodesLabels returns labels for Consul service nodes with given cfg.
func getServiceNodesLabels(cfg *apiConfig) []*promutils.Labels {
sns := cfg.consulWatcher.getServiceNodesSnapshot()
var ms []*promutils.Labels
for svc, sn := range sns {
for i := range sn {
ms = appendTargetLabels(sn[i], ms, svc, cfg.tagSeparator, cfg.agent)
}
}
return ms
}
func appendTargetLabels(sn consul.ServiceNode, ms []*promutils.Labels, serviceName, tagSeparator string, agent *consul.Agent) []*promutils.Labels {
const metaPrefix = "__meta_consulagent_"
var addr string
// If the service address is not empty it should be used instead of the node address
// since the service may be registered remotely through a different node.
if sn.Service.Address != "" {
addr = discoveryutils.JoinHostPort(sn.Service.Address, sn.Service.Port)
} else {
addr = discoveryutils.JoinHostPort(agent.Member.Addr, sn.Service.Port)
}
m := promutils.NewLabels(16)
m.Add("__address__", addr)
m.Add(metaPrefix+"address", agent.Member.Addr)
m.Add(metaPrefix+"dc", agent.Config.Datacenter)
m.Add(metaPrefix+"health", consul.AggregatedStatus(sn.Checks))
m.Add(metaPrefix+"namespace", sn.Service.Namespace)
m.Add(metaPrefix+"node", agent.Config.NodeName)
m.Add(metaPrefix+"service", serviceName)
m.Add(metaPrefix+"service_address", sn.Service.Address)
m.Add(metaPrefix+"service_id", sn.Service.ID)
m.Add(metaPrefix+"service_port", strconv.Itoa(sn.Service.Port))
discoveryutils.AddTagsToLabels(m, sn.Service.Tags, metaPrefix, tagSeparator)
for k, v := range agent.Meta {
m.Add(discoveryutils.SanitizeLabelName(metaPrefix+"metadata_"+k), v)
}
for k, v := range sn.Service.Meta {
m.Add(discoveryutils.SanitizeLabelName(metaPrefix+"service_metadata_"+k), v)
}
for k, v := range sn.Node.TaggedAddresses {
m.Add(discoveryutils.SanitizeLabelName(metaPrefix+"tagged_address_"+k), v)
}
for k, v := range sn.Service.TaggedAddresses {
address := fmt.Sprintf("%s:%d", v.Address, v.Port)
m.Add(discoveryutils.SanitizeLabelName(metaPrefix+"tagged_address_"+k), address)
}
ms = append(ms, m)
return ms
}

View File

@ -0,0 +1,141 @@
package consulagent
import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
func TestParseServiceNodesFailure(t *testing.T) {
f := func(s string) {
t.Helper()
sns, err := consul.ParseServiceNodes([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if sns != nil {
t.Fatalf("unexpected non-nil ServiceNodes: %v", sns)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
}
func TestParseServiceNodesSuccess(t *testing.T) {
data := `
[
{
"Service": {
"ID": "redis",
"Service": "redis",
"Tags": ["primary","foo=bar"],
"Address": "10.1.10.12",
"TaggedAddresses": {
"lan": {
"address": "10.1.10.12",
"port": 8000
},
"wan": {
"address": "198.18.1.2",
"port": 80
}
},
"Meta": {
"redis_version": "4.0"
},
"Port": 8000,
"Weights": {
"Passing": 10,
"Warning": 1
},
"Namespace": "ns-dev",
"Partition": "part-foobar"
},
"Checks": [
{
"Node": "foobar",
"CheckID": "service:redis",
"Name": "Service 'redis' check",
"Status": "passing",
"Notes": "",
"Output": "",
"ServiceID": "redis",
"ServiceName": "redis",
"ServiceTags": ["primary"],
"Namespace": "default"
},
{
"Node": "foobar",
"CheckID": "serfHealth",
"Name": "Serf Health Status",
"Status": "passing",
"Notes": "",
"Output": "",
"ServiceID": "",
"ServiceName": "",
"ServiceTags": [],
"Namespace": "default"
}
]
}
]
`
sns, err := consul.ParseServiceNodes([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(sns) != 1 {
t.Fatalf("unexpected length of ServiceNodes; got %d; want %d", len(sns), 1)
}
sn := sns[0]
agentData := `
{
"Member": {
"Addr": "10.1.10.12"
},
"Config": {
"Datacenter": "dc1",
"NodeName": "foobar"
},
"Meta": {
"instance_type": "t2.medium"
}
}
`
agent, err := consul.ParseAgent([]byte(agentData))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
// Check sn.appendTargetLabels()
tagSeparator := ","
labelss := appendTargetLabels(sn, nil, "redis", tagSeparator, agent)
expectedLabelss := []*promutils.Labels{
promutils.NewLabelsFromMap(map[string]string{
"__address__": "10.1.10.12:8000",
"__meta_consulagent_address": "10.1.10.12",
"__meta_consulagent_dc": "dc1",
"__meta_consulagent_health": "passing",
"__meta_consulagent_metadata_instance_type": "t2.medium",
"__meta_consulagent_namespace": "ns-dev",
"__meta_consulagent_node": "foobar",
"__meta_consulagent_service": "redis",
"__meta_consulagent_service_address": "10.1.10.12",
"__meta_consulagent_service_id": "redis",
"__meta_consulagent_service_metadata_redis_version": "4.0",
"__meta_consulagent_service_port": "8000",
"__meta_consulagent_tagged_address_lan": "10.1.10.12:8000",
"__meta_consulagent_tagged_address_wan": "198.18.1.2:80",
"__meta_consulagent_tag_foo": "bar",
"__meta_consulagent_tag_primary": "",
"__meta_consulagent_tagpresent_foo": "true",
"__meta_consulagent_tagpresent_primary": "true",
"__meta_consulagent_tags": ",primary,foo=bar,",
}),
}
discoveryutils.TestEqualLabelss(t, labelss, expectedLabelss)
}

View File

@ -0,0 +1,280 @@
package consulagent
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"net/url"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/metrics"
)
// SDCheckInterval is check interval for Consul Agent service discovery.
var SDCheckInterval = flag.Duration("promscrape.consulagentSDCheckInterval", 30*time.Second, "Interval for checking for changes in Consul Agent. "+
"This works only if consulagent_sd_configs is configured in '-promscrape.config' file. "+
"See https://docs.victoriametrics.com/sd_configs.html#consulagent_sd_configs for details")
// consulAgentWatcher is a watcher for consul api, updates services map in background with long-polling.
type consulAgentWatcher struct {
client *discoveryutils.Client
servicesQueryArgs string
watchServices []string
watchDatacenter string
// servicesLock protects services
servicesLock sync.Mutex
services map[string]*serviceWatcher
stoppedCh chan struct{}
}
type serviceWatcher struct {
serviceName string
serviceNodes []consul.ServiceNode
stoppedCh chan struct{}
requestCtx context.Context
requestCancel context.CancelFunc
}
// newConsulAgentWatcher creates new watcher and starts background service discovery for Consul.
func newConsulAgentWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, namespace string) *consulAgentWatcher {
var qv = url.Values{}
if namespace != "" {
qv.Set("ns", namespace)
}
if len(sdc.Filter) > 0 {
qv.Set("filter", sdc.Filter)
}
cw := &consulAgentWatcher{
client: client,
servicesQueryArgs: "?" + qv.Encode(),
watchServices: sdc.Services,
watchDatacenter: datacenter,
services: make(map[string]*serviceWatcher),
stoppedCh: make(chan struct{}),
}
initCh := make(chan struct{})
go func() {
cw.watchForServicesUpdates(initCh)
close(cw.stoppedCh)
}()
// wait for initialization to complete
<-initCh
return cw
}
func (cw *consulAgentWatcher) mustStop() {
cw.client.Stop()
<-cw.stoppedCh
}
func (cw *consulAgentWatcher) updateServices(serviceNames []string) {
var initWG sync.WaitGroup
// Start watchers for new services.
cw.servicesLock.Lock()
for _, serviceName := range serviceNames {
if _, ok := cw.services[serviceName]; ok {
// The watcher for serviceName already exists.
continue
}
ctx, cancel := context.WithCancel(cw.client.Context())
sw := &serviceWatcher{
serviceName: serviceName,
stoppedCh: make(chan struct{}),
requestCtx: ctx,
requestCancel: cancel,
}
cw.services[serviceName] = sw
serviceWatchersCreated.Inc()
initWG.Add(1)
go func() {
serviceWatchersCount.Inc()
sw.watchForServiceNodesUpdates(cw, &initWG)
serviceWatchersCount.Dec()
close(sw.stoppedCh)
}()
}
// Stop watchers for removed services.
newServiceNamesMap := make(map[string]struct{}, len(serviceNames))
for _, serviceName := range serviceNames {
newServiceNamesMap[serviceName] = struct{}{}
}
var swsStopped []*serviceWatcher
for serviceName, sw := range cw.services {
if _, ok := newServiceNamesMap[serviceName]; ok {
continue
}
sw.requestCancel()
delete(cw.services, serviceName)
swsStopped = append(swsStopped, sw)
}
cw.servicesLock.Unlock()
// Wait until deleted service watchers are stopped.
for _, sw := range swsStopped {
<-sw.stoppedCh
serviceWatchersStopped.Inc()
}
// Wait until added service watchers are initialized.
initWG.Wait()
}
// watchForServicesUpdates watches for new services and updates it in cw.
//
// watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done.
func (cw *consulAgentWatcher) watchForServicesUpdates(initCh chan struct{}) {
apiServer := cw.client.APIServer()
f := func() {
serviceNames, err := cw.getServiceNames()
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot obtain Consul Agent services from %q: %s", apiServer, err)
}
return
}
cw.updateServices(serviceNames)
}
logger.Infof("started Consul Agent service watcher for %q", apiServer)
f()
// send signal that initialization is complete
close(initCh)
ticker := time.NewTicker(getCheckInterval())
defer ticker.Stop()
stopCh := cw.client.Context().Done()
for {
select {
case <-ticker.C:
f()
case <-stopCh:
logger.Infof("stopping Consul Agent service watchers for %q", apiServer)
startTime := time.Now()
var swsStopped []*serviceWatcher
cw.servicesLock.Lock()
for _, sw := range cw.services {
sw.requestCancel()
swsStopped = append(swsStopped, sw)
}
cw.servicesLock.Unlock()
for _, sw := range swsStopped {
<-sw.stoppedCh
serviceWatchersStopped.Inc()
}
logger.Infof("stopped Consul Agent service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
return
}
}
}
var (
serviceWatchersCreated = metrics.NewCounter("vm_promscrape_discovery_consulagent_service_watchers_created_total")
serviceWatchersStopped = metrics.NewCounter("vm_promscrape_discovery_consulagent_service_watchers_stopped_total")
serviceWatchersCount = metrics.NewCounter("vm_promscrape_discovery_consulagent_service_watchers")
)
// getServiceNames obtains serviceNames via request to Consul Agent.
//
// It returns an empty serviceNames list if response contains the same index.
func (cw *consulAgentWatcher) getServiceNames() ([]string, error) {
path := "/v1/agent/services" + cw.servicesQueryArgs
data, err := cw.client.GetAPIResponse(path)
if err != nil {
return nil, fmt.Errorf("cannot perform Consul Agent API request at %q: %w", path, err)
}
var m map[string]consul.Service
if err := json.Unmarshal(data, &m); err != nil {
return nil, fmt.Errorf("cannot parse response from %q: %w; data=%q", path, err, data)
}
serviceNames := make([]string, 0, len(m))
for serviceName, service := range m {
if service.Datacenter != cw.watchDatacenter {
continue
}
if !consul.ShouldCollectServiceByName(cw.watchServices, serviceName) {
continue
}
serviceNames = append(serviceNames, serviceName)
}
return serviceNames, nil
}
// watchForServiceNodesUpdates watches for Consul serviceNode changes for the given serviceName.
//
// watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done.
func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulAgentWatcher, initWG *sync.WaitGroup) {
apiServer := cw.client.APIServer()
path := "/v1/agent/health/service/name/" + sw.serviceName
f := func() {
data, err := cw.client.GetAPIResponseWithParamsCtx(sw.requestCtx, path, nil, nil)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot obtain Consul Agent serviceNodes for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
}
return
}
sns, err := consul.ParseServiceNodes(data)
if err != nil {
logger.Errorf("cannot parse Consul Agent serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
return
}
cw.servicesLock.Lock()
sw.serviceNodes = sns
cw.servicesLock.Unlock()
}
f()
// Notify caller that initialization is complete
initWG.Done()
ticker := time.NewTicker(getCheckInterval())
defer ticker.Stop()
stopCh := sw.requestCtx.Done()
for {
select {
case <-ticker.C:
f()
case <-stopCh:
return
}
}
}
// getServiceNodesSnapshot returns a snapshot of discovered ServiceNodes.
func (cw *consulAgentWatcher) getServiceNodesSnapshot() map[string][]consul.ServiceNode {
cw.servicesLock.Lock()
sns := make(map[string][]consul.ServiceNode, len(cw.services))
for svc, sw := range cw.services {
sns[svc] = sw.serviceNodes
}
cw.servicesLock.Unlock()
return sns
}
func getCheckInterval() time.Duration {
d := *SDCheckInterval
if d <= time.Second {
return time.Second
}
return d
}

View File

@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
@ -69,27 +68,8 @@ func (svc *Service) appendTargetLabels(ms []*promutils.Labels, tagSeparator stri
m.Add("__meta_nomad_service_id", svc.ID)
m.Add("__meta_nomad_service_job_id", svc.JobID)
m.Add("__meta_nomad_service_port", strconv.Itoa(svc.Port))
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
m.Add("__meta_nomad_tags", tagSeparator+strings.Join(svc.Tags, tagSeparator)+tagSeparator)
// Expose individual tags via __meta_nomad_tag_* labels, so users could move all the tags
// into the discovered scrape target with the following relabeling rule in the way similar to kubernetes_sd_configs:
//
// - action: labelmap
// regex: __meta_nomad_tag_(.+)
//
// This solves https://stackoverflow.com/questions/44339461/relabeling-in-prometheus
for _, tag := range svc.Tags {
k := tag
v := ""
if n := strings.IndexByte(tag, '='); n >= 0 {
k = tag[:n]
v = tag[n+1:]
}
m.Add(discoveryutils.SanitizeLabelName("__meta_nomad_tag_"+k), v)
m.Add(discoveryutils.SanitizeLabelName("__meta_nomad_tagpresent_"+k), "true")
}
discoveryutils.AddTagsToLabels(m, svc.Tags, "__meta_nomad_", tagSeparator)
ms = append(ms, m)
return ms

View File

@ -62,3 +62,28 @@ func TestEqualLabelss(t *testing.T, got, want []*promutils.Labels) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", gotCopy, want)
}
}
// AddTagsToLabels adds <prefix>_tags (separated with tagSeparator) to labels
// and exposes individual tags via <prefix>_tag_* labels, so users could move all the tags
// into the discovered scrape target with the following relabeling rule in the way similar to kubernetes_sd_configs:
//
// - action: labelmap
// regex: <prefix>_tag_(.+)
//
// This solves https://stackoverflow.com/questions/44339461/relabeling-in-prometheus
func AddTagsToLabels(m *promutils.Labels, tags []string, prefix, tagSeparator string) {
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
m.Add(prefix+"tags", tagSeparator+strings.Join(tags, tagSeparator)+tagSeparator)
for _, tag := range tags {
k := tag
v := ""
if n := strings.IndexByte(tag, '='); n >= 0 {
k = tag[:n]
v = tag[n+1:]
}
m.Add(SanitizeLabelName(prefix+"tag_"+k), v)
m.Add(SanitizeLabelName(prefix+"tagpresent_"+k), "true")
}
}

View File

@ -17,6 +17,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consulagent"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
@ -125,6 +126,7 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
scs := newScrapeConfigs(pushData, globalStopCh)
scs.add("azure_sd_configs", *azure.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getAzureSDScrapeWork(swsPrev) })
scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
scs.add("consulagent_sd_configs", *consulagent.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulAgentSDScrapeWork(swsPrev) })
scs.add("digitalocean_sd_configs", *digitalocean.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDigitalOceanDScrapeWork(swsPrev) })
scs.add("dns_sd_configs", *dns.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
scs.add("docker_sd_configs", *docker.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDockerSDScrapeWork(swsPrev) })