mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 23:39:48 +01:00
Changes consul discovery api (#921)
* adds consul watch api, it must reduce load on consul service with blocking wait requests, changed discoveryClient api with fetchResponseMeta callback. * small fix * fix after master merge * adds watch client at discovery utils * fixes consul watcher, changes namings, fixes data race * small typo fix * sanity fix * fix naming and service node update
This commit is contained in:
parent
fca915dcf3
commit
0b302d33cb
@ -3,23 +3,21 @@ package consul
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/url"
|
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||||
|
"github.com/VictoriaMetrics/fasthttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
// apiConfig contains config for API server
|
// apiConfig contains config for API server
|
||||||
|
// with consulWatch service.
|
||||||
type apiConfig struct {
|
type apiConfig struct {
|
||||||
client *discoveryutils.Client
|
tagSeparator string
|
||||||
tagSeparator string
|
consulWatcher *consulWatch
|
||||||
services []string
|
|
||||||
tags []string
|
|
||||||
datacenter string
|
|
||||||
allowStale bool
|
|
||||||
nodeMeta map[string]string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var configMap = discoveryutils.NewConfigMap()
|
var configMap = discoveryutils.NewConfigMap()
|
||||||
@ -72,15 +70,14 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cfg := &apiConfig{
|
|
||||||
client: client,
|
|
||||||
|
|
||||||
tagSeparator: tagSeparator,
|
cw, err := newConsulWatch(client, sdc, dc)
|
||||||
services: sdc.Services,
|
if err != nil {
|
||||||
tags: sdc.Tags,
|
return nil, fmt.Errorf("cannot start consul watcher: %w", err)
|
||||||
datacenter: dc,
|
}
|
||||||
allowStale: sdc.AllowStale,
|
cfg := &apiConfig{
|
||||||
nodeMeta: sdc.NodeMeta,
|
tagSeparator: tagSeparator,
|
||||||
|
consulWatcher: cw,
|
||||||
}
|
}
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
@ -117,20 +114,49 @@ func getDatacenter(client *discoveryutils.Client, dc string) (string, error) {
|
|||||||
return a.Config.Datacenter, nil
|
return a.Config.Datacenter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getAPIResponse(cfg *apiConfig, path string) ([]byte, error) {
|
// returns ServiceNodesState and version index.
|
||||||
separator := "?"
|
func getServiceState(client *discoveryutils.Client, svc, baseArgs string, index uint64) ([]ServiceNode, uint64, error) {
|
||||||
if strings.Contains(path, "?") {
|
path := fmt.Sprintf("/v1/health/service/%s%s", svc, baseArgs)
|
||||||
separator = "&"
|
|
||||||
|
data, newIndex, err := getBlockingAPIResponse(client, path, index)
|
||||||
|
if err != nil {
|
||||||
|
return nil, index, err
|
||||||
}
|
}
|
||||||
path += fmt.Sprintf("%sdc=%s", separator, url.QueryEscape(cfg.datacenter))
|
sns, err := parseServiceNodes(data)
|
||||||
if cfg.allowStale {
|
if err != nil {
|
||||||
// See https://www.consul.io/api/features/consistency
|
return nil, index, err
|
||||||
path += "&stale"
|
|
||||||
}
|
}
|
||||||
if len(cfg.nodeMeta) > 0 {
|
return sns, newIndex, nil
|
||||||
for k, v := range cfg.nodeMeta {
|
}
|
||||||
path += fmt.Sprintf("&node-meta=%s", url.QueryEscape(k+":"+v))
|
|
||||||
|
// returns consul api response with new index version of object.
|
||||||
|
// https://www.consul.io/api-docs/features/blocking
|
||||||
|
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index uint64) ([]byte, uint64, error) {
|
||||||
|
path += "&index=" + strconv.FormatUint(index, 10)
|
||||||
|
path = path + fmt.Sprintf("&wait=%s", watchTime)
|
||||||
|
getMeta := func(resp *fasthttp.Response) {
|
||||||
|
if ind := resp.Header.Peek("X-Consul-Index"); len(ind) > 0 {
|
||||||
|
newIndex, err := strconv.ParseUint(string(ind), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("failed to parse consul index: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// reset index
|
||||||
|
// https://www.consul.io/api-docs/features/blocking#implementation-details
|
||||||
|
if newIndex < 1 {
|
||||||
|
index = 1
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if index > newIndex {
|
||||||
|
index = 0
|
||||||
|
return
|
||||||
|
}
|
||||||
|
index = newIndex
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return cfg.client.GetAPIResponse(path)
|
data, err := client.GetBlockingAPIResponse(path, getMeta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, index, fmt.Errorf("failed query consul api path=%q, err=%w", path, err)
|
||||||
|
}
|
||||||
|
return data, index, nil
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,25 @@
|
|||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// SDCheckInterval - check interval for consul discovery.
|
||||||
|
SDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+
|
||||||
|
"This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+
|
||||||
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details")
|
||||||
|
// duration for consul blocking request, maximum wait time is 10 min.
|
||||||
|
// But fasthttp client has readTimeout for 1 min, so we use 50s timeout.
|
||||||
|
// also consul adds random delay up to wait/16, so there is no need in jitter.
|
||||||
|
// https://www.consul.io/api-docs/features/blocking
|
||||||
|
watchTime = time.Second * 50
|
||||||
|
)
|
||||||
|
|
||||||
// SDConfig represents service discovery config for Consul.
|
// SDConfig represents service discovery config for Consul.
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config
|
||||||
|
@ -3,19 +3,15 @@ package consul
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// getServiceNodesLabels returns labels for Consul service nodes obtained from the given cfg
|
// getServiceNodesLabels returns labels for Consul service nodes with given tagSeparator.
|
||||||
func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) {
|
func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) {
|
||||||
sns, err := getAllServiceNodes(cfg)
|
sns := cfg.consulWatcher.getServiceNodes()
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var ms []map[string]string
|
var ms []map[string]string
|
||||||
for _, sn := range sns {
|
for _, sn := range sns {
|
||||||
ms = sn.appendTargetLabels(ms, cfg.tagSeparator)
|
ms = sn.appendTargetLabels(ms, cfg.tagSeparator)
|
||||||
@ -23,113 +19,6 @@ func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) {
|
|||||||
return ms, nil
|
return ms, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getAllServiceNodes(cfg *apiConfig) ([]ServiceNode, error) {
|
|
||||||
// Obtain a list of services
|
|
||||||
// See https://www.consul.io/api/catalog.html#list-services
|
|
||||||
data, err := getAPIResponse(cfg, "/v1/catalog/services")
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot obtain services: %w", err)
|
|
||||||
}
|
|
||||||
var m map[string][]string
|
|
||||||
if err := json.Unmarshal(data, &m); err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot parse services response %q: %w", data, err)
|
|
||||||
}
|
|
||||||
serviceNames := make(map[string]bool)
|
|
||||||
for serviceName, tags := range m {
|
|
||||||
if !shouldCollectServiceByName(cfg.services, serviceName) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !shouldCollectServiceByTags(cfg.tags, tags) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
serviceNames[serviceName] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query all the serviceNames in parallel
|
|
||||||
type response struct {
|
|
||||||
sns []ServiceNode
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
responsesCh := make(chan response, len(serviceNames))
|
|
||||||
for serviceName := range serviceNames {
|
|
||||||
go func(serviceName string) {
|
|
||||||
sns, err := getServiceNodes(cfg, serviceName)
|
|
||||||
responsesCh <- response{
|
|
||||||
sns: sns,
|
|
||||||
err: err,
|
|
||||||
}
|
|
||||||
}(serviceName)
|
|
||||||
}
|
|
||||||
var sns []ServiceNode
|
|
||||||
err = nil
|
|
||||||
for i := 0; i < len(serviceNames); i++ {
|
|
||||||
resp := <-responsesCh
|
|
||||||
if resp.err != nil && err == nil {
|
|
||||||
err = resp.err
|
|
||||||
}
|
|
||||||
sns = append(sns, resp.sns...)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return sns, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func shouldCollectServiceByName(filterServices []string, service string) bool {
|
|
||||||
if len(filterServices) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
for _, filterService := range filterServices {
|
|
||||||
if filterService == service {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func shouldCollectServiceByTags(filterTags, tags []string) bool {
|
|
||||||
if len(filterTags) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
for _, filterTag := range filterTags {
|
|
||||||
hasTag := false
|
|
||||||
for _, tag := range tags {
|
|
||||||
if tag == filterTag {
|
|
||||||
hasTag = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !hasTag {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func getServiceNodes(cfg *apiConfig, serviceName string) ([]ServiceNode, error) {
|
|
||||||
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
|
||||||
path := fmt.Sprintf("/v1/health/service/%s", serviceName)
|
|
||||||
// The /v1/health/service/:service endpoint supports background refresh caching,
|
|
||||||
// which guarantees fresh results obtained from local Consul agent.
|
|
||||||
// See https://www.consul.io/api-docs/health#list-nodes-for-service
|
|
||||||
// and https://www.consul.io/api/features/caching for details.
|
|
||||||
// Query cached results in order to reduce load on Consul cluster.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/574 .
|
|
||||||
path += "?cached"
|
|
||||||
var tagsArgs []string
|
|
||||||
for _, tag := range cfg.tags {
|
|
||||||
tagsArgs = append(tagsArgs, fmt.Sprintf("tag=%s", url.QueryEscape(tag)))
|
|
||||||
}
|
|
||||||
if len(tagsArgs) > 0 {
|
|
||||||
path += "&" + strings.Join(tagsArgs, "&")
|
|
||||||
}
|
|
||||||
data, err := getAPIResponse(cfg, path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot obtain instances for serviceName=%q: %w", serviceName, err)
|
|
||||||
}
|
|
||||||
return parseServiceNodes(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServiceNode is Consul service node.
|
// ServiceNode is Consul service node.
|
||||||
//
|
//
|
||||||
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
||||||
|
235
lib/promscrape/discovery/consul/watch.go
Normal file
235
lib/promscrape/discovery/consul/watch.go
Normal file
@ -0,0 +1,235 @@
|
|||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type serviceWatch struct {
|
||||||
|
stopCh chan struct{}
|
||||||
|
serviceNodes []ServiceNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// watcher for consul api, updates targets in background with long-polling.
|
||||||
|
type consulWatch struct {
|
||||||
|
baseQueryArgs string
|
||||||
|
client *discoveryutils.Client
|
||||||
|
lastAccessTime atomic.Value
|
||||||
|
nodeMeta string
|
||||||
|
shouldWatchServices []string
|
||||||
|
shouldWatchTags []string
|
||||||
|
// guards services
|
||||||
|
servicesLock sync.Mutex
|
||||||
|
services map[string]serviceWatch
|
||||||
|
stopCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// init new watcher and start background service discovery for Consul.
|
||||||
|
func newConsulWatch(client *discoveryutils.Client, sdc *SDConfig, datacenter string) (*consulWatch, error) {
|
||||||
|
baseQueryArgs := fmt.Sprintf("?sdc=%s", url.QueryEscape(datacenter))
|
||||||
|
var nodeMeta string
|
||||||
|
if len(sdc.NodeMeta) > 0 {
|
||||||
|
for k, v := range sdc.NodeMeta {
|
||||||
|
nodeMeta += fmt.Sprintf("&node-meta=%s", url.QueryEscape(k+":"+v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if sdc.AllowStale {
|
||||||
|
baseQueryArgs += "&stale"
|
||||||
|
}
|
||||||
|
cw := consulWatch{
|
||||||
|
client: client,
|
||||||
|
baseQueryArgs: baseQueryArgs,
|
||||||
|
shouldWatchServices: sdc.Services,
|
||||||
|
shouldWatchTags: sdc.Tags,
|
||||||
|
services: make(map[string]serviceWatch),
|
||||||
|
}
|
||||||
|
|
||||||
|
watchServiceNames, _, err := cw.getServiceNames(0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
cw.servicesLock.Lock()
|
||||||
|
for serviceName := range watchServiceNames {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
cw.services[serviceName] = serviceWatch{stopCh: stopCh}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(serviceName string) {
|
||||||
|
defer wg.Done()
|
||||||
|
cw.watchForServiceUpdates(serviceName, stopCh)
|
||||||
|
}(serviceName)
|
||||||
|
}
|
||||||
|
cw.servicesLock.Unlock()
|
||||||
|
// wait for first init.
|
||||||
|
wg.Wait()
|
||||||
|
go cw.watchForServices()
|
||||||
|
return &cw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// stops all service watchers.
|
||||||
|
func (cw *consulWatch) stopServiceWatchersAll() {
|
||||||
|
cw.servicesLock.Lock()
|
||||||
|
for _, sw := range cw.services {
|
||||||
|
close(sw.stopCh)
|
||||||
|
}
|
||||||
|
cw.servicesLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// getServiceNames returns serviceNames and index version.
|
||||||
|
func (cw *consulWatch) getServiceNames(index uint64) (map[string]struct{}, uint64, error) {
|
||||||
|
sns := make(map[string]struct{})
|
||||||
|
path := fmt.Sprintf("/v1/catalog/services%s", cw.baseQueryArgs)
|
||||||
|
if len(cw.nodeMeta) > 0 {
|
||||||
|
path += cw.nodeMeta
|
||||||
|
}
|
||||||
|
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
|
||||||
|
if err != nil {
|
||||||
|
return nil, index, err
|
||||||
|
}
|
||||||
|
var m map[string][]string
|
||||||
|
if err := json.Unmarshal(data, &m); err != nil {
|
||||||
|
return nil, index, fmt.Errorf("cannot parse services response=%q, err=%w", data, err)
|
||||||
|
}
|
||||||
|
for k, tags := range m {
|
||||||
|
if !shouldCollectServiceByName(cw.shouldWatchServices, k) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !shouldCollectServiceByTags(cw.shouldWatchTags, tags) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sns[k] = struct{}{}
|
||||||
|
}
|
||||||
|
return sns, newIndex, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// listen for new services and update it.
|
||||||
|
func (cw *consulWatch) watchForServices() {
|
||||||
|
ticker := time.NewTicker(*SDCheckInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
var index uint64
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-cw.stopCh:
|
||||||
|
cw.stopServiceWatchersAll()
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if time.Since(cw.lastAccessTime.Load().(time.Time)) > *SDCheckInterval*2 {
|
||||||
|
// exit watch and stop all background watchers.
|
||||||
|
cw.stopServiceWatchersAll()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m, newIndex, err := cw.getServiceNames(index)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("failed get serviceNames from consul api: err=%v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// nothing changed.
|
||||||
|
if index == newIndex {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cw.servicesLock.Lock()
|
||||||
|
// start new services watchers.
|
||||||
|
for svc := range m {
|
||||||
|
if _, ok := cw.services[svc]; !ok {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
cw.services[svc] = serviceWatch{stopCh: stopCh}
|
||||||
|
go cw.watchForServiceUpdates(svc, stopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// stop watch for removed services.
|
||||||
|
for svc, s := range cw.services {
|
||||||
|
if _, ok := m[svc]; !ok {
|
||||||
|
close(s.stopCh)
|
||||||
|
delete(cw.services, svc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cw.servicesLock.Unlock()
|
||||||
|
index = newIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// start watching for consul service changes.
|
||||||
|
func (cw *consulWatch) watchForServiceUpdates(svc string, stopCh chan struct{}) {
|
||||||
|
ticker := time.NewTicker(*SDCheckInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
updateServiceState := func(index uint64) uint64 {
|
||||||
|
sns, newIndex, err := getServiceState(cw.client, svc, cw.baseQueryArgs, index)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("failed update service state, service_name=%q, err=%v", svc, err)
|
||||||
|
return index
|
||||||
|
}
|
||||||
|
if newIndex == index {
|
||||||
|
return index
|
||||||
|
}
|
||||||
|
cw.servicesLock.Lock()
|
||||||
|
if s, ok := cw.services[svc]; ok {
|
||||||
|
s.serviceNodes = sns
|
||||||
|
cw.services[svc] = s
|
||||||
|
}
|
||||||
|
cw.servicesLock.Unlock()
|
||||||
|
return newIndex
|
||||||
|
}
|
||||||
|
watchIndex := updateServiceState(0)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
watchIndex = updateServiceState(watchIndex)
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns ServiceNodes.
|
||||||
|
func (cw *consulWatch) getServiceNodes() []ServiceNode {
|
||||||
|
var sns []ServiceNode
|
||||||
|
cw.servicesLock.Lock()
|
||||||
|
for _, v := range cw.services {
|
||||||
|
sns = append(sns, v.serviceNodes...)
|
||||||
|
}
|
||||||
|
cw.servicesLock.Unlock()
|
||||||
|
cw.lastAccessTime.Store(time.Now())
|
||||||
|
return sns
|
||||||
|
}
|
||||||
|
|
||||||
|
func shouldCollectServiceByName(filterServices []string, service string) bool {
|
||||||
|
if len(filterServices) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
for _, filterService := range filterServices {
|
||||||
|
if filterService == service {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func shouldCollectServiceByTags(filterTags, tags []string) bool {
|
||||||
|
if len(filterTags) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
for _, filterTag := range filterTags {
|
||||||
|
hasTag := false
|
||||||
|
for _, tag := range tags {
|
||||||
|
if tag == filterTag {
|
||||||
|
hasTag = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !hasTag {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
@ -33,10 +33,12 @@ func GetHTTPClient() *http.Client {
|
|||||||
|
|
||||||
// Client is http client, which talks to the given apiServer.
|
// Client is http client, which talks to the given apiServer.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
hc *fasthttp.HostClient
|
hc *fasthttp.HostClient
|
||||||
ac *promauth.Config
|
// blockingClient is used for performing long-polling requests.
|
||||||
apiServer string
|
blockingClient *fasthttp.HostClient
|
||||||
hostPort string
|
ac *promauth.Config
|
||||||
|
apiServer string
|
||||||
|
hostPort string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient returns new Client for the given apiServer and the given ac.
|
// NewClient returns new Client for the given apiServer and the given ac.
|
||||||
@ -80,11 +82,24 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
|
|||||||
MaxConns: 2 * *maxConcurrency,
|
MaxConns: 2 * *maxConcurrency,
|
||||||
Dial: dialFunc,
|
Dial: dialFunc,
|
||||||
}
|
}
|
||||||
|
wc := &fasthttp.HostClient{
|
||||||
|
Addr: hostPort,
|
||||||
|
Name: "vm_promscrape/discovery",
|
||||||
|
DialDualStack: netutil.TCP6Enabled(),
|
||||||
|
IsTLS: isTLS,
|
||||||
|
TLSConfig: tlsCfg,
|
||||||
|
ReadTimeout: time.Minute * 3,
|
||||||
|
WriteTimeout: 10 * time.Second,
|
||||||
|
MaxResponseBodySize: 300 * 1024 * 1024,
|
||||||
|
MaxConns: 20 * *maxConcurrency,
|
||||||
|
Dial: dialFunc,
|
||||||
|
}
|
||||||
return &Client{
|
return &Client{
|
||||||
hc: hc,
|
hc: hc,
|
||||||
ac: ac,
|
blockingClient: wc,
|
||||||
apiServer: apiServer,
|
ac: ac,
|
||||||
hostPort: hostPort,
|
apiServer: apiServer,
|
||||||
|
hostPort: hostPort,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,6 +112,12 @@ func concurrencyLimitChInit() {
|
|||||||
concurrencyLimitCh = make(chan struct{}, *maxConcurrency)
|
concurrencyLimitCh = make(chan struct{}, *maxConcurrency)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// APIRequestParams modifies api request with given params.
|
||||||
|
type APIRequestParams struct {
|
||||||
|
FetchFromResponse func(resp *fasthttp.Response)
|
||||||
|
SetToRequest func(req *fasthttp.Request)
|
||||||
|
}
|
||||||
|
|
||||||
// GetAPIResponse returns response for the given absolute path.
|
// GetAPIResponse returns response for the given absolute path.
|
||||||
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
||||||
// Limit the number of concurrent API requests.
|
// Limit the number of concurrent API requests.
|
||||||
@ -111,7 +132,17 @@ func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
|||||||
c.apiServer, *maxWaitTime, *maxConcurrency)
|
c.apiServer, *maxWaitTime, *maxConcurrency)
|
||||||
}
|
}
|
||||||
defer func() { <-concurrencyLimitCh }()
|
defer func() { <-concurrencyLimitCh }()
|
||||||
|
return c.getAPIResponseWithParamsAndClient(c.hc, path, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
|
||||||
|
// inspectResponse - should never reference data from response.
|
||||||
|
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *fasthttp.Response)) ([]byte, error) {
|
||||||
|
return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, inspectResponse)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for response.
|
||||||
|
func (c *Client) getAPIResponseWithParamsAndClient(client *fasthttp.HostClient, path string, inspectResponse func(resp *fasthttp.Response)) ([]byte, error) {
|
||||||
requestURL := c.apiServer + path
|
requestURL := c.apiServer + path
|
||||||
var u fasthttp.URI
|
var u fasthttp.URI
|
||||||
u.Update(requestURL)
|
u.Update(requestURL)
|
||||||
@ -122,9 +153,10 @@ func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
|||||||
if c.ac != nil && c.ac.Authorization != "" {
|
if c.ac != nil && c.ac.Authorization != "" {
|
||||||
req.Header.Set("Authorization", c.ac.Authorization)
|
req.Header.Set("Authorization", c.ac.Authorization)
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp fasthttp.Response
|
var resp fasthttp.Response
|
||||||
deadline := time.Now().Add(c.hc.ReadTimeout)
|
deadline := time.Now().Add(client.ReadTimeout)
|
||||||
if err := doRequestWithPossibleRetry(c.hc, &req, &resp, deadline); err != nil {
|
if err := doRequestWithPossibleRetry(client, &req, &resp, deadline); err != nil {
|
||||||
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
|
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
|
||||||
}
|
}
|
||||||
var data []byte
|
var data []byte
|
||||||
@ -137,6 +169,9 @@ func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
|||||||
} else {
|
} else {
|
||||||
data = append(data[:0], resp.Body()...)
|
data = append(data[:0], resp.Body()...)
|
||||||
}
|
}
|
||||||
|
if inspectResponse != nil {
|
||||||
|
inspectResponse(&resp)
|
||||||
|
}
|
||||||
statusCode := resp.StatusCode()
|
statusCode := resp.StatusCode()
|
||||||
if statusCode != fasthttp.StatusOK {
|
if statusCode != fasthttp.StatusOK {
|
||||||
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
|
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -25,9 +26,6 @@ var (
|
|||||||
openstackSDCheckInterval = flag.Duration("promscrape.openstackSDCheckInterval", 30*time.Second, "Interval for checking for changes in openstack API server. "+
|
openstackSDCheckInterval = flag.Duration("promscrape.openstackSDCheckInterval", 30*time.Second, "Interval for checking for changes in openstack API server. "+
|
||||||
"This works only if `openstack_sd_configs` is configured in '-promscrape.config' file. "+
|
"This works only if `openstack_sd_configs` is configured in '-promscrape.config' file. "+
|
||||||
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config for details")
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config for details")
|
||||||
consulSDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+
|
|
||||||
"This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+
|
|
||||||
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details")
|
|
||||||
eurekaSDCheckInterval = flag.Duration("promscrape.eurekaSDCheckInterval", 30*time.Second, "Interval for checking for changes in eureka. "+
|
eurekaSDCheckInterval = flag.Duration("promscrape.eurekaSDCheckInterval", 30*time.Second, "Interval for checking for changes in eureka. "+
|
||||||
"This works only if `eureka_sd_configs` is configured in '-promscrape.config' file. "+
|
"This works only if `eureka_sd_configs` is configured in '-promscrape.config' file. "+
|
||||||
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config for details")
|
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config for details")
|
||||||
@ -101,7 +99,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
|
|||||||
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
|
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
|
||||||
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
|
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
|
||||||
scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
|
scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
|
||||||
scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
|
scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
|
||||||
scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) })
|
scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) })
|
||||||
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
|
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
|
||||||
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
|
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
|
||||||
|
Loading…
Reference in New Issue
Block a user