From 7a889f68502009190261216f8a587b98588e3049 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 3 Dec 2020 19:50:50 +0200 Subject: [PATCH] lib/promscrape: code cleanup after c6dee6c52dada09c4e45b807e805e291bf47d2d9 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/574 --- docs/CHANGELOG.md | 2 + lib/promscrape/discovery/consul/api.go | 52 ++- lib/promscrape/discovery/consul/consul.go | 19 +- .../discovery/consul/service_node.go | 8 +- lib/promscrape/discovery/consul/watch.go | 345 ++++++++++-------- lib/promscrape/discoveryutils/client.go | 22 +- 6 files changed, 235 insertions(+), 213 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 43c291865c..a09a3de684 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,8 @@ # tip +* FEATURE: optimize Consul service discovery speed when discovering big number of services. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/574 + * BUGFIX: properly parse timestamps in OpenMetrics format - they are exposed as floating-point number in seconds instead of integer milliseconds unlike in Prometheus exposition format. See [the docs](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#timestamps). * BUGFIX: return `nan` for `a >bool b` query when `a` equals to `nan` like Prometheus does. Previously `0` was returned in this case. This applies to any comparison operation diff --git a/lib/promscrape/discovery/consul/api.go b/lib/promscrape/discovery/consul/api.go index f041a06d1c..d761b3d0e3 100644 --- a/lib/promscrape/discovery/consul/api.go +++ b/lib/promscrape/discovery/consul/api.go @@ -6,6 +6,7 @@ import ( "os" "strconv" "strings" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" @@ -13,11 +14,10 @@ import ( "github.com/VictoriaMetrics/fasthttp" ) -// apiConfig contains config for API server -// with consulWatch service. +// apiConfig contains config for API server. type apiConfig struct { tagSeparator string - consulWatcher *consulWatch + consulWatcher *consulWatcher } var configMap = discoveryutils.NewConfigMap() @@ -71,10 +71,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { return nil, err } - cw, err := newConsulWatch(client, sdc, dc) - if err != nil { - return nil, fmt.Errorf("cannot start consul watcher: %w", err) - } + cw := newConsulWatcher(client, sdc, dc) cfg := &apiConfig{ tagSeparator: tagSeparator, consulWatcher: cw, @@ -114,35 +111,28 @@ func getDatacenter(client *discoveryutils.Client, dc string) (string, error) { return a.Config.Datacenter, nil } -// returns ServiceNodesState and version index. -func getServiceState(client *discoveryutils.Client, svc, baseArgs string, index uint64) ([]ServiceNode, uint64, error) { - path := fmt.Sprintf("/v1/health/service/%s%s", svc, baseArgs) - - data, newIndex, err := getBlockingAPIResponse(client, path, index) - if err != nil { - return nil, index, err - } - sns, err := parseServiceNodes(data) - if err != nil { - return nil, index, err - } - return sns, newIndex, nil -} - -// returns consul api response with new index version of object. +// maxWaitTime is duration for consul blocking request, maximum wait time is 10 min. +// But fasthttp client has readTimeout for 1 min, so we use 50s timeout. +// also consul adds random delay up to wait/16, so there is no need in jitter. // https://www.consul.io/api-docs/features/blocking -func getBlockingAPIResponse(client *discoveryutils.Client, path string, index uint64) ([]byte, uint64, error) { - path += "&index=" + strconv.FormatUint(index, 10) - path = path + fmt.Sprintf("&wait=%s", watchTime) +const maxWaitTime = 50 * time.Second + +var maxWaitTimeStr = maxWaitTime.String() + +// getBlockingAPIResponse perfoms blocking request to Consul via client and returns response. +// +// See https://www.consul.io/api-docs/features/blocking . +func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) { + path += "&index=" + strconv.FormatInt(index, 10) + path += "&wait=" + maxWaitTimeStr getMeta := func(resp *fasthttp.Response) { if ind := resp.Header.Peek("X-Consul-Index"); len(ind) > 0 { - newIndex, err := strconv.ParseUint(string(ind), 10, 64) + newIndex, err := strconv.ParseInt(string(ind), 10, 64) if err != nil { - logger.Errorf("failed to parse consul index: %v", err) + logger.Errorf("cannot parse X-Consul-Index header value in response from %q: %s", path, err) return } - // reset index - // https://www.consul.io/api-docs/features/blocking#implementation-details + // Properly handle the returned newIndex according to https://www.consul.io/api-docs/features/blocking#implementation-details if newIndex < 1 { index = 1 return @@ -156,7 +146,7 @@ func getBlockingAPIResponse(client *discoveryutils.Client, path string, index ui } data, err := client.GetBlockingAPIResponse(path, getMeta) if err != nil { - return nil, index, fmt.Errorf("failed query consul api path=%q, err=%w", path, err) + return nil, index, fmt.Errorf("cannot perform blocking Consul API request at %q: %w", path, err) } return data, index, nil } diff --git a/lib/promscrape/discovery/consul/consul.go b/lib/promscrape/discovery/consul/consul.go index 72ed464ac7..6376d40626 100644 --- a/lib/promscrape/discovery/consul/consul.go +++ b/lib/promscrape/discovery/consul/consul.go @@ -1,25 +1,11 @@ package consul import ( - "flag" "fmt" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" ) -var ( - // SDCheckInterval - check interval for consul discovery. - SDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+ - "This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+ - "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details") - // duration for consul blocking request, maximum wait time is 10 min. - // But fasthttp client has readTimeout for 1 min, so we use 50s timeout. - // also consul adds random delay up to wait/16, so there is no need in jitter. - // https://www.consul.io/api-docs/features/blocking - watchTime = time.Second * 50 -) - // SDConfig represents service discovery config for Consul. // // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config @@ -46,9 +32,6 @@ func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) { if err != nil { return nil, fmt.Errorf("cannot get API config: %w", err) } - ms, err := getServiceNodesLabels(cfg) - if err != nil { - return nil, fmt.Errorf("error when fetching service nodes data from Consul: %w", err) - } + ms := getServiceNodesLabels(cfg) return ms, nil } diff --git a/lib/promscrape/discovery/consul/service_node.go b/lib/promscrape/discovery/consul/service_node.go index ae332091be..4086bd3b78 100644 --- a/lib/promscrape/discovery/consul/service_node.go +++ b/lib/promscrape/discovery/consul/service_node.go @@ -9,14 +9,14 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) -// getServiceNodesLabels returns labels for Consul service nodes with given tagSeparator. -func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) { - sns := cfg.consulWatcher.getServiceNodes() +// getServiceNodesLabels returns labels for Consul service nodes with given cfg. +func getServiceNodesLabels(cfg *apiConfig) []map[string]string { + sns := cfg.consulWatcher.getServiceNodesSnapshot() var ms []map[string]string for _, sn := range sns { ms = sn.appendTargetLabels(ms, cfg.tagSeparator) } - return ms, nil + return ms } // ServiceNode is Consul service node. diff --git a/lib/promscrape/discovery/consul/watch.go b/lib/promscrape/discovery/consul/watch.go index 80dd39e70a..533b3edbc8 100644 --- a/lib/promscrape/discovery/consul/watch.go +++ b/lib/promscrape/discovery/consul/watch.go @@ -2,213 +2,250 @@ package consul import ( "encoding/json" + "flag" "fmt" "net/url" "sync" - "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" + "github.com/VictoriaMetrics/metrics" ) -type serviceWatch struct { - stopCh chan struct{} +// SDCheckInterval is check interval for Consul service discovery. +var SDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in Consul. "+ + "This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+ + "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details") + +// consulWatcher is a watcher for consul api, updates services map in background with long-polling. +type consulWatcher struct { + client *discoveryutils.Client + + serviceNamesQueryArgs string + serviceNodesQueryArgs string + watchServices []string + watchTags []string + + // servicesLock protects services and servicesLastAccessTime + servicesLock sync.Mutex + services map[string]*serviceWatcher + servicesLastAccessTime time.Time + + wg sync.WaitGroup +} + +type serviceWatcher struct { + serviceName string serviceNodes []ServiceNode -} - -// watcher for consul api, updates targets in background with long-polling. -type consulWatch struct { - baseQueryArgs string - client *discoveryutils.Client - lastAccessTime atomic.Value - nodeMeta string - shouldWatchServices []string - shouldWatchTags []string - // guards services - servicesLock sync.Mutex - services map[string]serviceWatch stopCh chan struct{} } -// init new watcher and start background service discovery for Consul. -func newConsulWatch(client *discoveryutils.Client, sdc *SDConfig, datacenter string) (*consulWatch, error) { - baseQueryArgs := fmt.Sprintf("?sdc=%s", url.QueryEscape(datacenter)) - var nodeMeta string - if len(sdc.NodeMeta) > 0 { - for k, v := range sdc.NodeMeta { - nodeMeta += fmt.Sprintf("&node-meta=%s", url.QueryEscape(k+":"+v)) - } - } +// newConsulWatcher creates new watcher and start background service discovery for Consul. +func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter string) *consulWatcher { + baseQueryArgs := "?sdc=" + url.QueryEscape(datacenter) if sdc.AllowStale { baseQueryArgs += "&stale" } - cw := consulWatch{ - client: client, - baseQueryArgs: baseQueryArgs, - shouldWatchServices: sdc.Services, - shouldWatchTags: sdc.Tags, - services: make(map[string]serviceWatch), + for k, v := range sdc.NodeMeta { + baseQueryArgs += "&node-meta=" + url.QueryEscape(k+":"+v) } - - watchServiceNames, _, err := cw.getServiceNames(0) - if err != nil { - return nil, err + serviceNodesQueryArgs := baseQueryArgs + for _, tag := range sdc.Tags { + serviceNodesQueryArgs += "&tag=" + url.QueryEscape(tag) } - var wg sync.WaitGroup - cw.servicesLock.Lock() - for serviceName := range watchServiceNames { - stopCh := make(chan struct{}) - cw.services[serviceName] = serviceWatch{stopCh: stopCh} - wg.Add(1) - go func(serviceName string) { - defer wg.Done() - cw.watchForServiceUpdates(serviceName, stopCh) - }(serviceName) + cw := &consulWatcher{ + client: client, + serviceNamesQueryArgs: baseQueryArgs, + serviceNodesQueryArgs: serviceNodesQueryArgs, + watchServices: sdc.Services, + watchTags: sdc.Tags, + services: make(map[string]*serviceWatcher), + servicesLastAccessTime: time.Now(), } - cw.servicesLock.Unlock() - // wait for first init. - wg.Wait() - go cw.watchForServices() - return &cw, nil + go cw.watchForServicesUpdates() + return cw } -// stops all service watchers. -func (cw *consulWatch) stopServiceWatchersAll() { - cw.servicesLock.Lock() - for _, sw := range cw.services { - close(sw.stopCh) +// watchForServicesUpdates watches for new services and updates it in cw. +func (cw *consulWatcher) watchForServicesUpdates() { + checkInterval := getCheckInterval() + ticker := time.NewTicker(checkInterval / 2) + defer ticker.Stop() + index := int64(0) + clientAddr := cw.client.Addr() + f := func() { + serviceNames, newIndex, err := cw.getBlockingServiceNames(index) + if err != nil { + logger.Errorf("cannot obtain Consul serviceNames from %q: %s", clientAddr, err) + return + } + if index == newIndex { + // Nothing changed. + return + } + + cw.servicesLock.Lock() + // Start watchers for new services. + for _, serviceName := range serviceNames { + if _, ok := cw.services[serviceName]; ok { + // The watcher for serviceName already exists. + continue + } + sw := &serviceWatcher{ + serviceName: serviceName, + stopCh: make(chan struct{}), + } + cw.services[serviceName] = sw + cw.wg.Add(1) + serviceWatchersCreated.Inc() + go func() { + serviceWatchersCount.Inc() + sw.watchForServiceNodesUpdates(cw) + serviceWatchersCount.Dec() + cw.wg.Done() + }() + } + // Stop watchers for removed services. + newServiceNamesMap := make(map[string]struct{}, len(serviceNames)) + for _, serviceName := range serviceNames { + newServiceNamesMap[serviceName] = struct{}{} + } + for serviceName, sw := range cw.services { + if _, ok := newServiceNamesMap[serviceName]; ok { + continue + } + close(sw.stopCh) + delete(cw.services, serviceName) + serviceWatchersStopped.Inc() + + // Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime + // if it is blocked in Consul API request. + } + cw.servicesLock.Unlock() + + index = newIndex + } + + logger.Infof("started Consul service watcher for %q", clientAddr) + f() + for range ticker.C { + cw.servicesLock.Lock() + lastAccessTime := cw.servicesLastAccessTime + cw.servicesLock.Unlock() + if time.Since(lastAccessTime) > 3*checkInterval { + // The given cw is no longer used. Stop all service watchers and exit. + logger.Infof("starting to stop Consul service watchers for %q", clientAddr) + cw.servicesLock.Lock() + for _, sw := range cw.services { + close(sw.stopCh) + } + cw.servicesLock.Unlock() + cw.wg.Wait() + logger.Infof("stopped Consul service watcher for %q", clientAddr) + return + } + + f() } - cw.servicesLock.Unlock() } -// getServiceNames returns serviceNames and index version. -func (cw *consulWatch) getServiceNames(index uint64) (map[string]struct{}, uint64, error) { - sns := make(map[string]struct{}) - path := fmt.Sprintf("/v1/catalog/services%s", cw.baseQueryArgs) - if len(cw.nodeMeta) > 0 { - path += cw.nodeMeta - } +var ( + serviceWatchersCreated = metrics.NewCounter("vm_promscrape_discovery_consul_service_watchers_created_total") + serviceWatchersStopped = metrics.NewCounter("vm_promscrape_discovery_consul_service_watchers_stopped_total") + serviceWatchersCount = metrics.NewCounter("vm_promscrape_discovery_consul_service_watchers") +) + +// getBlockingServiceNames obtains serviceNames via blocking request to Consul. +// +// It returns an empty serviceNames list if response contains the same index. +func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64, error) { + path := "/v1/catalog/services" + cw.serviceNamesQueryArgs data, newIndex, err := getBlockingAPIResponse(cw.client, path, index) if err != nil { return nil, index, err } + if index == newIndex { + // Nothing changed - return an empty serviceNames list. + return nil, index, nil + } var m map[string][]string if err := json.Unmarshal(data, &m); err != nil { - return nil, index, fmt.Errorf("cannot parse services response=%q, err=%w", data, err) + return nil, index, fmt.Errorf("cannot parse response from %q: %w; data=%q", path, err, data) } - for k, tags := range m { - if !shouldCollectServiceByName(cw.shouldWatchServices, k) { + serviceNames := make([]string, 0, len(m)) + for serviceName, tags := range m { + if !shouldCollectServiceByName(cw.watchServices, serviceName) { continue } - if !shouldCollectServiceByTags(cw.shouldWatchTags, tags) { + if !shouldCollectServiceByTags(cw.watchTags, tags) { continue } - sns[k] = struct{}{} + serviceNames = append(serviceNames, serviceName) } - return sns, newIndex, nil + return serviceNames, newIndex, nil } -// listen for new services and update it. -func (cw *consulWatch) watchForServices() { - ticker := time.NewTicker(*SDCheckInterval) +// watchForServiceNodesUpdates watches for Consul serviceNode changes for the given serviceName. +func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher) { + checkInterval := getCheckInterval() + ticker := time.NewTicker(checkInterval / 2) defer ticker.Stop() - var index uint64 + clientAddr := cw.client.Addr() + index := int64(0) + path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs + f := func() { + data, newIndex, err := getBlockingAPIResponse(cw.client, path, index) + if err != nil { + logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err) + return + } + if index == newIndex { + // Nothing changed. + return + } + sns, err := parseServiceNodes(data) + if err != nil { + logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err) + return + } + + cw.servicesLock.Lock() + sw.serviceNodes = sns + cw.servicesLock.Unlock() + + index = newIndex + } + + f() for { select { - case <-cw.stopCh: - cw.stopServiceWatchersAll() - return case <-ticker.C: - if time.Since(cw.lastAccessTime.Load().(time.Time)) > *SDCheckInterval*2 { - // exit watch and stop all background watchers. - cw.stopServiceWatchersAll() - return - } - m, newIndex, err := cw.getServiceNames(index) - if err != nil { - logger.Errorf("failed get serviceNames from consul api: err=%v", err) - continue - } - // nothing changed. - if index == newIndex { - continue - } - cw.servicesLock.Lock() - // start new services watchers. - for svc := range m { - if _, ok := cw.services[svc]; !ok { - stopCh := make(chan struct{}) - cw.services[svc] = serviceWatch{stopCh: stopCh} - go cw.watchForServiceUpdates(svc, stopCh) - } - } - // stop watch for removed services. - for svc, s := range cw.services { - if _, ok := m[svc]; !ok { - close(s.stopCh) - delete(cw.services, svc) - } - } - cw.servicesLock.Unlock() - index = newIndex + f() + case <-sw.stopCh: + return } } - } -// start watching for consul service changes. -func (cw *consulWatch) watchForServiceUpdates(svc string, stopCh chan struct{}) { - ticker := time.NewTicker(*SDCheckInterval) - defer ticker.Stop() - updateServiceState := func(index uint64) uint64 { - sns, newIndex, err := getServiceState(cw.client, svc, cw.baseQueryArgs, index) - if err != nil { - logger.Errorf("failed update service state, service_name=%q, err=%v", svc, err) - return index - } - if newIndex == index { - return index - } - cw.servicesLock.Lock() - if s, ok := cw.services[svc]; ok { - s.serviceNodes = sns - cw.services[svc] = s - } - cw.servicesLock.Unlock() - return newIndex - } - watchIndex := updateServiceState(0) - go func() { - for { - select { - case <-ticker.C: - watchIndex = updateServiceState(watchIndex) - case <-stopCh: - return - } - } - }() -} - -// returns ServiceNodes. -func (cw *consulWatch) getServiceNodes() []ServiceNode { +// getServiceNodesSnapshot returns a snapshot of discovered ServiceNodes. +func (cw *consulWatcher) getServiceNodesSnapshot() []ServiceNode { var sns []ServiceNode cw.servicesLock.Lock() - for _, v := range cw.services { - sns = append(sns, v.serviceNodes...) + for _, sw := range cw.services { + sns = append(sns, sw.serviceNodes...) } + cw.servicesLastAccessTime = time.Now() cw.servicesLock.Unlock() - cw.lastAccessTime.Store(time.Now()) return sns } -func shouldCollectServiceByName(filterServices []string, service string) bool { +func shouldCollectServiceByName(filterServices []string, serviceName string) bool { if len(filterServices) == 0 { return true } for _, filterService := range filterServices { - if filterService == service { + if filterService == serviceName { return true } } @@ -233,3 +270,11 @@ func shouldCollectServiceByTags(filterTags, tags []string) bool { } return true } + +func getCheckInterval() time.Duration { + d := *SDCheckInterval + if d <= time.Second { + return time.Second + } + return d +} diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index ab772dc1c2..bb27fe3469 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -33,12 +33,15 @@ func GetHTTPClient() *http.Client { // Client is http client, which talks to the given apiServer. type Client struct { + // hc is used for short requests. hc *fasthttp.HostClient - // blockingClient is used for performing long-polling requests. + + // blockingClient is used for long-polling requests. blockingClient *fasthttp.HostClient - ac *promauth.Config - apiServer string - hostPort string + + ac *promauth.Config + apiServer string + hostPort string } // NewClient returns new Client for the given apiServer and the given ac. @@ -82,7 +85,7 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { MaxConns: 2 * *maxConcurrency, Dial: dialFunc, } - wc := &fasthttp.HostClient{ + blockingClient := &fasthttp.HostClient{ Addr: hostPort, Name: "vm_promscrape/discovery", DialDualStack: netutil.TCP6Enabled(), @@ -96,7 +99,7 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { } return &Client{ hc: hc, - blockingClient: wc, + blockingClient: blockingClient, ac: ac, apiServer: apiServer, hostPort: hostPort, @@ -112,10 +115,9 @@ func concurrencyLimitChInit() { concurrencyLimitCh = make(chan struct{}, *maxConcurrency) } -// APIRequestParams modifies api request with given params. -type APIRequestParams struct { - FetchFromResponse func(resp *fasthttp.Response) - SetToRequest func(req *fasthttp.Request) +// Addr returns the address the client connects to. +func (c *Client) Addr() string { + return c.hc.Addr } // GetAPIResponse returns response for the given absolute path.