diff --git a/lib/promscrape/discovery/consul/api.go b/lib/promscrape/discovery/consul/api.go index 8010616ac5..f041a06d1c 100644 --- a/lib/promscrape/discovery/consul/api.go +++ b/lib/promscrape/discovery/consul/api.go @@ -3,23 +3,21 @@ package consul import ( "fmt" "io/ioutil" - "net/url" "os" + "strconv" "strings" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" + "github.com/VictoriaMetrics/fasthttp" ) // apiConfig contains config for API server +// with consulWatch service. type apiConfig struct { - client *discoveryutils.Client - tagSeparator string - services []string - tags []string - datacenter string - allowStale bool - nodeMeta map[string]string + tagSeparator string + consulWatcher *consulWatch } var configMap = discoveryutils.NewConfigMap() @@ -72,15 +70,14 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { if err != nil { return nil, err } - cfg := &apiConfig{ - client: client, - tagSeparator: tagSeparator, - services: sdc.Services, - tags: sdc.Tags, - datacenter: dc, - allowStale: sdc.AllowStale, - nodeMeta: sdc.NodeMeta, + cw, err := newConsulWatch(client, sdc, dc) + if err != nil { + return nil, fmt.Errorf("cannot start consul watcher: %w", err) + } + cfg := &apiConfig{ + tagSeparator: tagSeparator, + consulWatcher: cw, } return cfg, nil } @@ -117,20 +114,49 @@ func getDatacenter(client *discoveryutils.Client, dc string) (string, error) { return a.Config.Datacenter, nil } -func getAPIResponse(cfg *apiConfig, path string) ([]byte, error) { - separator := "?" - if strings.Contains(path, "?") { - separator = "&" +// returns ServiceNodesState and version index. +func getServiceState(client *discoveryutils.Client, svc, baseArgs string, index uint64) ([]ServiceNode, uint64, error) { + path := fmt.Sprintf("/v1/health/service/%s%s", svc, baseArgs) + + data, newIndex, err := getBlockingAPIResponse(client, path, index) + if err != nil { + return nil, index, err } - path += fmt.Sprintf("%sdc=%s", separator, url.QueryEscape(cfg.datacenter)) - if cfg.allowStale { - // See https://www.consul.io/api/features/consistency - path += "&stale" + sns, err := parseServiceNodes(data) + if err != nil { + return nil, index, err } - if len(cfg.nodeMeta) > 0 { - for k, v := range cfg.nodeMeta { - path += fmt.Sprintf("&node-meta=%s", url.QueryEscape(k+":"+v)) + return sns, newIndex, nil +} + +// returns consul api response with new index version of object. +// https://www.consul.io/api-docs/features/blocking +func getBlockingAPIResponse(client *discoveryutils.Client, path string, index uint64) ([]byte, uint64, error) { + path += "&index=" + strconv.FormatUint(index, 10) + path = path + fmt.Sprintf("&wait=%s", watchTime) + getMeta := func(resp *fasthttp.Response) { + if ind := resp.Header.Peek("X-Consul-Index"); len(ind) > 0 { + newIndex, err := strconv.ParseUint(string(ind), 10, 64) + if err != nil { + logger.Errorf("failed to parse consul index: %v", err) + return + } + // reset index + // https://www.consul.io/api-docs/features/blocking#implementation-details + if newIndex < 1 { + index = 1 + return + } + if index > newIndex { + index = 0 + return + } + index = newIndex } } - return cfg.client.GetAPIResponse(path) + data, err := client.GetBlockingAPIResponse(path, getMeta) + if err != nil { + return nil, index, fmt.Errorf("failed query consul api path=%q, err=%w", path, err) + } + return data, index, nil } diff --git a/lib/promscrape/discovery/consul/consul.go b/lib/promscrape/discovery/consul/consul.go index 735190839a..72ed464ac7 100644 --- a/lib/promscrape/discovery/consul/consul.go +++ b/lib/promscrape/discovery/consul/consul.go @@ -1,11 +1,25 @@ package consul import ( + "flag" "fmt" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" ) +var ( + // SDCheckInterval - check interval for consul discovery. + SDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+ + "This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+ + "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details") + // duration for consul blocking request, maximum wait time is 10 min. + // But fasthttp client has readTimeout for 1 min, so we use 50s timeout. + // also consul adds random delay up to wait/16, so there is no need in jitter. + // https://www.consul.io/api-docs/features/blocking + watchTime = time.Second * 50 +) + // SDConfig represents service discovery config for Consul. // // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config diff --git a/lib/promscrape/discovery/consul/service_node.go b/lib/promscrape/discovery/consul/service_node.go index 2b3b9821a1..ae332091be 100644 --- a/lib/promscrape/discovery/consul/service_node.go +++ b/lib/promscrape/discovery/consul/service_node.go @@ -3,19 +3,15 @@ package consul import ( "encoding/json" "fmt" - "net/url" "strconv" "strings" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) -// getServiceNodesLabels returns labels for Consul service nodes obtained from the given cfg +// getServiceNodesLabels returns labels for Consul service nodes with given tagSeparator. func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) { - sns, err := getAllServiceNodes(cfg) - if err != nil { - return nil, err - } + sns := cfg.consulWatcher.getServiceNodes() var ms []map[string]string for _, sn := range sns { ms = sn.appendTargetLabels(ms, cfg.tagSeparator) @@ -23,113 +19,6 @@ func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) { return ms, nil } -func getAllServiceNodes(cfg *apiConfig) ([]ServiceNode, error) { - // Obtain a list of services - // See https://www.consul.io/api/catalog.html#list-services - data, err := getAPIResponse(cfg, "/v1/catalog/services") - if err != nil { - return nil, fmt.Errorf("cannot obtain services: %w", err) - } - var m map[string][]string - if err := json.Unmarshal(data, &m); err != nil { - return nil, fmt.Errorf("cannot parse services response %q: %w", data, err) - } - serviceNames := make(map[string]bool) - for serviceName, tags := range m { - if !shouldCollectServiceByName(cfg.services, serviceName) { - continue - } - if !shouldCollectServiceByTags(cfg.tags, tags) { - continue - } - serviceNames[serviceName] = true - } - - // Query all the serviceNames in parallel - type response struct { - sns []ServiceNode - err error - } - responsesCh := make(chan response, len(serviceNames)) - for serviceName := range serviceNames { - go func(serviceName string) { - sns, err := getServiceNodes(cfg, serviceName) - responsesCh <- response{ - sns: sns, - err: err, - } - }(serviceName) - } - var sns []ServiceNode - err = nil - for i := 0; i < len(serviceNames); i++ { - resp := <-responsesCh - if resp.err != nil && err == nil { - err = resp.err - } - sns = append(sns, resp.sns...) - } - if err != nil { - return nil, err - } - return sns, nil -} - -func shouldCollectServiceByName(filterServices []string, service string) bool { - if len(filterServices) == 0 { - return true - } - for _, filterService := range filterServices { - if filterService == service { - return true - } - } - return false -} - -func shouldCollectServiceByTags(filterTags, tags []string) bool { - if len(filterTags) == 0 { - return true - } - for _, filterTag := range filterTags { - hasTag := false - for _, tag := range tags { - if tag == filterTag { - hasTag = true - break - } - } - if !hasTag { - return false - } - } - return true -} - -func getServiceNodes(cfg *apiConfig, serviceName string) ([]ServiceNode, error) { - // See https://www.consul.io/api/health.html#list-nodes-for-service - path := fmt.Sprintf("/v1/health/service/%s", serviceName) - // The /v1/health/service/:service endpoint supports background refresh caching, - // which guarantees fresh results obtained from local Consul agent. - // See https://www.consul.io/api-docs/health#list-nodes-for-service - // and https://www.consul.io/api/features/caching for details. - // Query cached results in order to reduce load on Consul cluster. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/574 . - path += "?cached" - var tagsArgs []string - for _, tag := range cfg.tags { - tagsArgs = append(tagsArgs, fmt.Sprintf("tag=%s", url.QueryEscape(tag))) - } - if len(tagsArgs) > 0 { - path += "&" + strings.Join(tagsArgs, "&") - } - data, err := getAPIResponse(cfg, path) - if err != nil { - return nil, fmt.Errorf("cannot obtain instances for serviceName=%q: %w", serviceName, err) - } - return parseServiceNodes(data) -} - // ServiceNode is Consul service node. // // See https://www.consul.io/api/health.html#list-nodes-for-service diff --git a/lib/promscrape/discovery/consul/watch.go b/lib/promscrape/discovery/consul/watch.go new file mode 100644 index 0000000000..80dd39e70a --- /dev/null +++ b/lib/promscrape/discovery/consul/watch.go @@ -0,0 +1,235 @@ +package consul + +import ( + "encoding/json" + "fmt" + "net/url" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" +) + +type serviceWatch struct { + stopCh chan struct{} + serviceNodes []ServiceNode +} + +// watcher for consul api, updates targets in background with long-polling. +type consulWatch struct { + baseQueryArgs string + client *discoveryutils.Client + lastAccessTime atomic.Value + nodeMeta string + shouldWatchServices []string + shouldWatchTags []string + // guards services + servicesLock sync.Mutex + services map[string]serviceWatch + stopCh chan struct{} +} + +// init new watcher and start background service discovery for Consul. +func newConsulWatch(client *discoveryutils.Client, sdc *SDConfig, datacenter string) (*consulWatch, error) { + baseQueryArgs := fmt.Sprintf("?sdc=%s", url.QueryEscape(datacenter)) + var nodeMeta string + if len(sdc.NodeMeta) > 0 { + for k, v := range sdc.NodeMeta { + nodeMeta += fmt.Sprintf("&node-meta=%s", url.QueryEscape(k+":"+v)) + } + } + if sdc.AllowStale { + baseQueryArgs += "&stale" + } + cw := consulWatch{ + client: client, + baseQueryArgs: baseQueryArgs, + shouldWatchServices: sdc.Services, + shouldWatchTags: sdc.Tags, + services: make(map[string]serviceWatch), + } + + watchServiceNames, _, err := cw.getServiceNames(0) + if err != nil { + return nil, err + } + var wg sync.WaitGroup + cw.servicesLock.Lock() + for serviceName := range watchServiceNames { + stopCh := make(chan struct{}) + cw.services[serviceName] = serviceWatch{stopCh: stopCh} + wg.Add(1) + go func(serviceName string) { + defer wg.Done() + cw.watchForServiceUpdates(serviceName, stopCh) + }(serviceName) + } + cw.servicesLock.Unlock() + // wait for first init. + wg.Wait() + go cw.watchForServices() + return &cw, nil +} + +// stops all service watchers. +func (cw *consulWatch) stopServiceWatchersAll() { + cw.servicesLock.Lock() + for _, sw := range cw.services { + close(sw.stopCh) + } + cw.servicesLock.Unlock() +} + +// getServiceNames returns serviceNames and index version. +func (cw *consulWatch) getServiceNames(index uint64) (map[string]struct{}, uint64, error) { + sns := make(map[string]struct{}) + path := fmt.Sprintf("/v1/catalog/services%s", cw.baseQueryArgs) + if len(cw.nodeMeta) > 0 { + path += cw.nodeMeta + } + data, newIndex, err := getBlockingAPIResponse(cw.client, path, index) + if err != nil { + return nil, index, err + } + var m map[string][]string + if err := json.Unmarshal(data, &m); err != nil { + return nil, index, fmt.Errorf("cannot parse services response=%q, err=%w", data, err) + } + for k, tags := range m { + if !shouldCollectServiceByName(cw.shouldWatchServices, k) { + continue + } + if !shouldCollectServiceByTags(cw.shouldWatchTags, tags) { + continue + } + sns[k] = struct{}{} + } + return sns, newIndex, nil +} + +// listen for new services and update it. +func (cw *consulWatch) watchForServices() { + ticker := time.NewTicker(*SDCheckInterval) + defer ticker.Stop() + var index uint64 + for { + select { + case <-cw.stopCh: + cw.stopServiceWatchersAll() + return + case <-ticker.C: + if time.Since(cw.lastAccessTime.Load().(time.Time)) > *SDCheckInterval*2 { + // exit watch and stop all background watchers. + cw.stopServiceWatchersAll() + return + } + m, newIndex, err := cw.getServiceNames(index) + if err != nil { + logger.Errorf("failed get serviceNames from consul api: err=%v", err) + continue + } + // nothing changed. + if index == newIndex { + continue + } + cw.servicesLock.Lock() + // start new services watchers. + for svc := range m { + if _, ok := cw.services[svc]; !ok { + stopCh := make(chan struct{}) + cw.services[svc] = serviceWatch{stopCh: stopCh} + go cw.watchForServiceUpdates(svc, stopCh) + } + } + // stop watch for removed services. + for svc, s := range cw.services { + if _, ok := m[svc]; !ok { + close(s.stopCh) + delete(cw.services, svc) + } + } + cw.servicesLock.Unlock() + index = newIndex + } + } + +} + +// start watching for consul service changes. +func (cw *consulWatch) watchForServiceUpdates(svc string, stopCh chan struct{}) { + ticker := time.NewTicker(*SDCheckInterval) + defer ticker.Stop() + updateServiceState := func(index uint64) uint64 { + sns, newIndex, err := getServiceState(cw.client, svc, cw.baseQueryArgs, index) + if err != nil { + logger.Errorf("failed update service state, service_name=%q, err=%v", svc, err) + return index + } + if newIndex == index { + return index + } + cw.servicesLock.Lock() + if s, ok := cw.services[svc]; ok { + s.serviceNodes = sns + cw.services[svc] = s + } + cw.servicesLock.Unlock() + return newIndex + } + watchIndex := updateServiceState(0) + go func() { + for { + select { + case <-ticker.C: + watchIndex = updateServiceState(watchIndex) + case <-stopCh: + return + } + } + }() +} + +// returns ServiceNodes. +func (cw *consulWatch) getServiceNodes() []ServiceNode { + var sns []ServiceNode + cw.servicesLock.Lock() + for _, v := range cw.services { + sns = append(sns, v.serviceNodes...) + } + cw.servicesLock.Unlock() + cw.lastAccessTime.Store(time.Now()) + return sns +} + +func shouldCollectServiceByName(filterServices []string, service string) bool { + if len(filterServices) == 0 { + return true + } + for _, filterService := range filterServices { + if filterService == service { + return true + } + } + return false +} + +func shouldCollectServiceByTags(filterTags, tags []string) bool { + if len(filterTags) == 0 { + return true + } + for _, filterTag := range filterTags { + hasTag := false + for _, tag := range tags { + if tag == filterTag { + hasTag = true + break + } + } + if !hasTag { + return false + } + } + return true +} diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 014edced41..ab772dc1c2 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -33,10 +33,12 @@ func GetHTTPClient() *http.Client { // Client is http client, which talks to the given apiServer. type Client struct { - hc *fasthttp.HostClient - ac *promauth.Config - apiServer string - hostPort string + hc *fasthttp.HostClient + // blockingClient is used for performing long-polling requests. + blockingClient *fasthttp.HostClient + ac *promauth.Config + apiServer string + hostPort string } // NewClient returns new Client for the given apiServer and the given ac. @@ -80,11 +82,24 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { MaxConns: 2 * *maxConcurrency, Dial: dialFunc, } + wc := &fasthttp.HostClient{ + Addr: hostPort, + Name: "vm_promscrape/discovery", + DialDualStack: netutil.TCP6Enabled(), + IsTLS: isTLS, + TLSConfig: tlsCfg, + ReadTimeout: time.Minute * 3, + WriteTimeout: 10 * time.Second, + MaxResponseBodySize: 300 * 1024 * 1024, + MaxConns: 20 * *maxConcurrency, + Dial: dialFunc, + } return &Client{ - hc: hc, - ac: ac, - apiServer: apiServer, - hostPort: hostPort, + hc: hc, + blockingClient: wc, + ac: ac, + apiServer: apiServer, + hostPort: hostPort, }, nil } @@ -97,6 +112,12 @@ func concurrencyLimitChInit() { concurrencyLimitCh = make(chan struct{}, *maxConcurrency) } +// APIRequestParams modifies api request with given params. +type APIRequestParams struct { + FetchFromResponse func(resp *fasthttp.Response) + SetToRequest func(req *fasthttp.Request) +} + // GetAPIResponse returns response for the given absolute path. func (c *Client) GetAPIResponse(path string) ([]byte, error) { // Limit the number of concurrent API requests. @@ -111,7 +132,17 @@ func (c *Client) GetAPIResponse(path string) ([]byte, error) { c.apiServer, *maxWaitTime, *maxConcurrency) } defer func() { <-concurrencyLimitCh }() + return c.getAPIResponseWithParamsAndClient(c.hc, path, nil) +} +// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response, +// inspectResponse - should never reference data from response. +func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *fasthttp.Response)) ([]byte, error) { + return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, inspectResponse) +} + +// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for response. +func (c *Client) getAPIResponseWithParamsAndClient(client *fasthttp.HostClient, path string, inspectResponse func(resp *fasthttp.Response)) ([]byte, error) { requestURL := c.apiServer + path var u fasthttp.URI u.Update(requestURL) @@ -122,9 +153,10 @@ func (c *Client) GetAPIResponse(path string) ([]byte, error) { if c.ac != nil && c.ac.Authorization != "" { req.Header.Set("Authorization", c.ac.Authorization) } + var resp fasthttp.Response - deadline := time.Now().Add(c.hc.ReadTimeout) - if err := doRequestWithPossibleRetry(c.hc, &req, &resp, deadline); err != nil { + deadline := time.Now().Add(client.ReadTimeout) + if err := doRequestWithPossibleRetry(client, &req, &resp, deadline); err != nil { return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err) } var data []byte @@ -137,6 +169,9 @@ func (c *Client) GetAPIResponse(path string) ([]byte, error) { } else { data = append(data[:0], resp.Body()...) } + if inspectResponse != nil { + inspectResponse(&resp) + } statusCode := resp.StatusCode() if statusCode != fasthttp.StatusOK { return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q", diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 40f82860a3..ccb0d4aaff 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul" "github.com/VictoriaMetrics/metrics" ) @@ -25,9 +26,6 @@ var ( openstackSDCheckInterval = flag.Duration("promscrape.openstackSDCheckInterval", 30*time.Second, "Interval for checking for changes in openstack API server. "+ "This works only if `openstack_sd_configs` is configured in '-promscrape.config' file. "+ "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config for details") - consulSDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+ - "This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+ - "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details") eurekaSDCheckInterval = flag.Duration("promscrape.eurekaSDCheckInterval", 30*time.Second, "Interval for checking for changes in eureka. "+ "This works only if `eureka_sd_configs` is configured in '-promscrape.config' file. "+ "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config for details") @@ -101,7 +99,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) }) scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) }) scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) }) - scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) }) + scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) }) scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) }) scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) }) scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })