lib/promscrape: code cleanup after c6dee6c52d

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/574
This commit is contained in:
Aliaksandr Valialkin 2020-12-03 19:50:50 +02:00
parent 0b302d33cb
commit 7a889f6850
6 changed files with 235 additions and 213 deletions

View File

@ -2,6 +2,8 @@
# tip # tip
* FEATURE: optimize Consul service discovery speed when discovering big number of services. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/574
* BUGFIX: properly parse timestamps in OpenMetrics format - they are exposed as floating-point number in seconds instead of integer milliseconds * BUGFIX: properly parse timestamps in OpenMetrics format - they are exposed as floating-point number in seconds instead of integer milliseconds
unlike in Prometheus exposition format. See [the docs](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#timestamps). unlike in Prometheus exposition format. See [the docs](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#timestamps).
* BUGFIX: return `nan` for `a >bool b` query when `a` equals to `nan` like Prometheus does. Previously `0` was returned in this case. This applies to any comparison operation * BUGFIX: return `nan` for `a >bool b` query when `a` equals to `nan` like Prometheus does. Previously `0` was returned in this case. This applies to any comparison operation

View File

@ -6,6 +6,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
@ -13,11 +14,10 @@ import (
"github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/fasthttp"
) )
// apiConfig contains config for API server // apiConfig contains config for API server.
// with consulWatch service.
type apiConfig struct { type apiConfig struct {
tagSeparator string tagSeparator string
consulWatcher *consulWatch consulWatcher *consulWatcher
} }
var configMap = discoveryutils.NewConfigMap() var configMap = discoveryutils.NewConfigMap()
@ -71,10 +71,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
return nil, err return nil, err
} }
cw, err := newConsulWatch(client, sdc, dc) cw := newConsulWatcher(client, sdc, dc)
if err != nil {
return nil, fmt.Errorf("cannot start consul watcher: %w", err)
}
cfg := &apiConfig{ cfg := &apiConfig{
tagSeparator: tagSeparator, tagSeparator: tagSeparator,
consulWatcher: cw, consulWatcher: cw,
@ -114,35 +111,28 @@ func getDatacenter(client *discoveryutils.Client, dc string) (string, error) {
return a.Config.Datacenter, nil return a.Config.Datacenter, nil
} }
// returns ServiceNodesState and version index. // maxWaitTime is duration for consul blocking request, maximum wait time is 10 min.
func getServiceState(client *discoveryutils.Client, svc, baseArgs string, index uint64) ([]ServiceNode, uint64, error) { // But fasthttp client has readTimeout for 1 min, so we use 50s timeout.
path := fmt.Sprintf("/v1/health/service/%s%s", svc, baseArgs) // also consul adds random delay up to wait/16, so there is no need in jitter.
data, newIndex, err := getBlockingAPIResponse(client, path, index)
if err != nil {
return nil, index, err
}
sns, err := parseServiceNodes(data)
if err != nil {
return nil, index, err
}
return sns, newIndex, nil
}
// returns consul api response with new index version of object.
// https://www.consul.io/api-docs/features/blocking // https://www.consul.io/api-docs/features/blocking
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index uint64) ([]byte, uint64, error) { const maxWaitTime = 50 * time.Second
path += "&index=" + strconv.FormatUint(index, 10)
path = path + fmt.Sprintf("&wait=%s", watchTime) var maxWaitTimeStr = maxWaitTime.String()
// getBlockingAPIResponse perfoms blocking request to Consul via client and returns response.
//
// See https://www.consul.io/api-docs/features/blocking .
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
path += "&index=" + strconv.FormatInt(index, 10)
path += "&wait=" + maxWaitTimeStr
getMeta := func(resp *fasthttp.Response) { getMeta := func(resp *fasthttp.Response) {
if ind := resp.Header.Peek("X-Consul-Index"); len(ind) > 0 { if ind := resp.Header.Peek("X-Consul-Index"); len(ind) > 0 {
newIndex, err := strconv.ParseUint(string(ind), 10, 64) newIndex, err := strconv.ParseInt(string(ind), 10, 64)
if err != nil { if err != nil {
logger.Errorf("failed to parse consul index: %v", err) logger.Errorf("cannot parse X-Consul-Index header value in response from %q: %s", path, err)
return return
} }
// reset index // Properly handle the returned newIndex according to https://www.consul.io/api-docs/features/blocking#implementation-details
// https://www.consul.io/api-docs/features/blocking#implementation-details
if newIndex < 1 { if newIndex < 1 {
index = 1 index = 1
return return
@ -156,7 +146,7 @@ func getBlockingAPIResponse(client *discoveryutils.Client, path string, index ui
} }
data, err := client.GetBlockingAPIResponse(path, getMeta) data, err := client.GetBlockingAPIResponse(path, getMeta)
if err != nil { if err != nil {
return nil, index, fmt.Errorf("failed query consul api path=%q, err=%w", path, err) return nil, index, fmt.Errorf("cannot perform blocking Consul API request at %q: %w", path, err)
} }
return data, index, nil return data, index, nil
} }

View File

@ -1,25 +1,11 @@
package consul package consul
import ( import (
"flag"
"fmt" "fmt"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
) )
var (
// SDCheckInterval - check interval for consul discovery.
SDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+
"This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details")
// duration for consul blocking request, maximum wait time is 10 min.
// But fasthttp client has readTimeout for 1 min, so we use 50s timeout.
// also consul adds random delay up to wait/16, so there is no need in jitter.
// https://www.consul.io/api-docs/features/blocking
watchTime = time.Second * 50
)
// SDConfig represents service discovery config for Consul. // SDConfig represents service discovery config for Consul.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config
@ -46,9 +32,6 @@ func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot get API config: %w", err) return nil, fmt.Errorf("cannot get API config: %w", err)
} }
ms, err := getServiceNodesLabels(cfg) ms := getServiceNodesLabels(cfg)
if err != nil {
return nil, fmt.Errorf("error when fetching service nodes data from Consul: %w", err)
}
return ms, nil return ms, nil
} }

View File

@ -9,14 +9,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
) )
// getServiceNodesLabels returns labels for Consul service nodes with given tagSeparator. // getServiceNodesLabels returns labels for Consul service nodes with given cfg.
func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) { func getServiceNodesLabels(cfg *apiConfig) []map[string]string {
sns := cfg.consulWatcher.getServiceNodes() sns := cfg.consulWatcher.getServiceNodesSnapshot()
var ms []map[string]string var ms []map[string]string
for _, sn := range sns { for _, sn := range sns {
ms = sn.appendTargetLabels(ms, cfg.tagSeparator) ms = sn.appendTargetLabels(ms, cfg.tagSeparator)
} }
return ms, nil return ms
} }
// ServiceNode is Consul service node. // ServiceNode is Consul service node.

View File

@ -2,213 +2,250 @@ package consul
import ( import (
"encoding/json" "encoding/json"
"flag"
"fmt" "fmt"
"net/url" "net/url"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/metrics"
) )
type serviceWatch struct { // SDCheckInterval is check interval for Consul service discovery.
stopCh chan struct{} var SDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in Consul. "+
"This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details")
// consulWatcher is a watcher for consul api, updates services map in background with long-polling.
type consulWatcher struct {
client *discoveryutils.Client
serviceNamesQueryArgs string
serviceNodesQueryArgs string
watchServices []string
watchTags []string
// servicesLock protects services and servicesLastAccessTime
servicesLock sync.Mutex
services map[string]*serviceWatcher
servicesLastAccessTime time.Time
wg sync.WaitGroup
}
type serviceWatcher struct {
serviceName string
serviceNodes []ServiceNode serviceNodes []ServiceNode
}
// watcher for consul api, updates targets in background with long-polling.
type consulWatch struct {
baseQueryArgs string
client *discoveryutils.Client
lastAccessTime atomic.Value
nodeMeta string
shouldWatchServices []string
shouldWatchTags []string
// guards services
servicesLock sync.Mutex
services map[string]serviceWatch
stopCh chan struct{} stopCh chan struct{}
} }
// init new watcher and start background service discovery for Consul. // newConsulWatcher creates new watcher and start background service discovery for Consul.
func newConsulWatch(client *discoveryutils.Client, sdc *SDConfig, datacenter string) (*consulWatch, error) { func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter string) *consulWatcher {
baseQueryArgs := fmt.Sprintf("?sdc=%s", url.QueryEscape(datacenter)) baseQueryArgs := "?sdc=" + url.QueryEscape(datacenter)
var nodeMeta string
if len(sdc.NodeMeta) > 0 {
for k, v := range sdc.NodeMeta {
nodeMeta += fmt.Sprintf("&node-meta=%s", url.QueryEscape(k+":"+v))
}
}
if sdc.AllowStale { if sdc.AllowStale {
baseQueryArgs += "&stale" baseQueryArgs += "&stale"
} }
cw := consulWatch{ for k, v := range sdc.NodeMeta {
client: client, baseQueryArgs += "&node-meta=" + url.QueryEscape(k+":"+v)
baseQueryArgs: baseQueryArgs,
shouldWatchServices: sdc.Services,
shouldWatchTags: sdc.Tags,
services: make(map[string]serviceWatch),
} }
serviceNodesQueryArgs := baseQueryArgs
watchServiceNames, _, err := cw.getServiceNames(0) for _, tag := range sdc.Tags {
if err != nil { serviceNodesQueryArgs += "&tag=" + url.QueryEscape(tag)
return nil, err
} }
var wg sync.WaitGroup cw := &consulWatcher{
cw.servicesLock.Lock() client: client,
for serviceName := range watchServiceNames { serviceNamesQueryArgs: baseQueryArgs,
stopCh := make(chan struct{}) serviceNodesQueryArgs: serviceNodesQueryArgs,
cw.services[serviceName] = serviceWatch{stopCh: stopCh} watchServices: sdc.Services,
wg.Add(1) watchTags: sdc.Tags,
go func(serviceName string) { services: make(map[string]*serviceWatcher),
defer wg.Done() servicesLastAccessTime: time.Now(),
cw.watchForServiceUpdates(serviceName, stopCh)
}(serviceName)
} }
cw.servicesLock.Unlock() go cw.watchForServicesUpdates()
// wait for first init. return cw
wg.Wait()
go cw.watchForServices()
return &cw, nil
} }
// stops all service watchers. // watchForServicesUpdates watches for new services and updates it in cw.
func (cw *consulWatch) stopServiceWatchersAll() { func (cw *consulWatcher) watchForServicesUpdates() {
cw.servicesLock.Lock() checkInterval := getCheckInterval()
for _, sw := range cw.services { ticker := time.NewTicker(checkInterval / 2)
close(sw.stopCh) defer ticker.Stop()
index := int64(0)
clientAddr := cw.client.Addr()
f := func() {
serviceNames, newIndex, err := cw.getBlockingServiceNames(index)
if err != nil {
logger.Errorf("cannot obtain Consul serviceNames from %q: %s", clientAddr, err)
return
}
if index == newIndex {
// Nothing changed.
return
}
cw.servicesLock.Lock()
// Start watchers for new services.
for _, serviceName := range serviceNames {
if _, ok := cw.services[serviceName]; ok {
// The watcher for serviceName already exists.
continue
}
sw := &serviceWatcher{
serviceName: serviceName,
stopCh: make(chan struct{}),
}
cw.services[serviceName] = sw
cw.wg.Add(1)
serviceWatchersCreated.Inc()
go func() {
serviceWatchersCount.Inc()
sw.watchForServiceNodesUpdates(cw)
serviceWatchersCount.Dec()
cw.wg.Done()
}()
}
// Stop watchers for removed services.
newServiceNamesMap := make(map[string]struct{}, len(serviceNames))
for _, serviceName := range serviceNames {
newServiceNamesMap[serviceName] = struct{}{}
}
for serviceName, sw := range cw.services {
if _, ok := newServiceNamesMap[serviceName]; ok {
continue
}
close(sw.stopCh)
delete(cw.services, serviceName)
serviceWatchersStopped.Inc()
// Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime
// if it is blocked in Consul API request.
}
cw.servicesLock.Unlock()
index = newIndex
}
logger.Infof("started Consul service watcher for %q", clientAddr)
f()
for range ticker.C {
cw.servicesLock.Lock()
lastAccessTime := cw.servicesLastAccessTime
cw.servicesLock.Unlock()
if time.Since(lastAccessTime) > 3*checkInterval {
// The given cw is no longer used. Stop all service watchers and exit.
logger.Infof("starting to stop Consul service watchers for %q", clientAddr)
cw.servicesLock.Lock()
for _, sw := range cw.services {
close(sw.stopCh)
}
cw.servicesLock.Unlock()
cw.wg.Wait()
logger.Infof("stopped Consul service watcher for %q", clientAddr)
return
}
f()
} }
cw.servicesLock.Unlock()
} }
// getServiceNames returns serviceNames and index version. var (
func (cw *consulWatch) getServiceNames(index uint64) (map[string]struct{}, uint64, error) { serviceWatchersCreated = metrics.NewCounter("vm_promscrape_discovery_consul_service_watchers_created_total")
sns := make(map[string]struct{}) serviceWatchersStopped = metrics.NewCounter("vm_promscrape_discovery_consul_service_watchers_stopped_total")
path := fmt.Sprintf("/v1/catalog/services%s", cw.baseQueryArgs) serviceWatchersCount = metrics.NewCounter("vm_promscrape_discovery_consul_service_watchers")
if len(cw.nodeMeta) > 0 { )
path += cw.nodeMeta
} // getBlockingServiceNames obtains serviceNames via blocking request to Consul.
//
// It returns an empty serviceNames list if response contains the same index.
func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64, error) {
path := "/v1/catalog/services" + cw.serviceNamesQueryArgs
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index) data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
if err != nil { if err != nil {
return nil, index, err return nil, index, err
} }
if index == newIndex {
// Nothing changed - return an empty serviceNames list.
return nil, index, nil
}
var m map[string][]string var m map[string][]string
if err := json.Unmarshal(data, &m); err != nil { if err := json.Unmarshal(data, &m); err != nil {
return nil, index, fmt.Errorf("cannot parse services response=%q, err=%w", data, err) return nil, index, fmt.Errorf("cannot parse response from %q: %w; data=%q", path, err, data)
} }
for k, tags := range m { serviceNames := make([]string, 0, len(m))
if !shouldCollectServiceByName(cw.shouldWatchServices, k) { for serviceName, tags := range m {
if !shouldCollectServiceByName(cw.watchServices, serviceName) {
continue continue
} }
if !shouldCollectServiceByTags(cw.shouldWatchTags, tags) { if !shouldCollectServiceByTags(cw.watchTags, tags) {
continue continue
} }
sns[k] = struct{}{} serviceNames = append(serviceNames, serviceName)
} }
return sns, newIndex, nil return serviceNames, newIndex, nil
} }
// listen for new services and update it. // watchForServiceNodesUpdates watches for Consul serviceNode changes for the given serviceName.
func (cw *consulWatch) watchForServices() { func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher) {
ticker := time.NewTicker(*SDCheckInterval) checkInterval := getCheckInterval()
ticker := time.NewTicker(checkInterval / 2)
defer ticker.Stop() defer ticker.Stop()
var index uint64 clientAddr := cw.client.Addr()
index := int64(0)
path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs
f := func() {
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
if err != nil {
logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
return
}
if index == newIndex {
// Nothing changed.
return
}
sns, err := parseServiceNodes(data)
if err != nil {
logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
return
}
cw.servicesLock.Lock()
sw.serviceNodes = sns
cw.servicesLock.Unlock()
index = newIndex
}
f()
for { for {
select { select {
case <-cw.stopCh:
cw.stopServiceWatchersAll()
return
case <-ticker.C: case <-ticker.C:
if time.Since(cw.lastAccessTime.Load().(time.Time)) > *SDCheckInterval*2 { f()
// exit watch and stop all background watchers. case <-sw.stopCh:
cw.stopServiceWatchersAll() return
return
}
m, newIndex, err := cw.getServiceNames(index)
if err != nil {
logger.Errorf("failed get serviceNames from consul api: err=%v", err)
continue
}
// nothing changed.
if index == newIndex {
continue
}
cw.servicesLock.Lock()
// start new services watchers.
for svc := range m {
if _, ok := cw.services[svc]; !ok {
stopCh := make(chan struct{})
cw.services[svc] = serviceWatch{stopCh: stopCh}
go cw.watchForServiceUpdates(svc, stopCh)
}
}
// stop watch for removed services.
for svc, s := range cw.services {
if _, ok := m[svc]; !ok {
close(s.stopCh)
delete(cw.services, svc)
}
}
cw.servicesLock.Unlock()
index = newIndex
} }
} }
} }
// start watching for consul service changes. // getServiceNodesSnapshot returns a snapshot of discovered ServiceNodes.
func (cw *consulWatch) watchForServiceUpdates(svc string, stopCh chan struct{}) { func (cw *consulWatcher) getServiceNodesSnapshot() []ServiceNode {
ticker := time.NewTicker(*SDCheckInterval)
defer ticker.Stop()
updateServiceState := func(index uint64) uint64 {
sns, newIndex, err := getServiceState(cw.client, svc, cw.baseQueryArgs, index)
if err != nil {
logger.Errorf("failed update service state, service_name=%q, err=%v", svc, err)
return index
}
if newIndex == index {
return index
}
cw.servicesLock.Lock()
if s, ok := cw.services[svc]; ok {
s.serviceNodes = sns
cw.services[svc] = s
}
cw.servicesLock.Unlock()
return newIndex
}
watchIndex := updateServiceState(0)
go func() {
for {
select {
case <-ticker.C:
watchIndex = updateServiceState(watchIndex)
case <-stopCh:
return
}
}
}()
}
// returns ServiceNodes.
func (cw *consulWatch) getServiceNodes() []ServiceNode {
var sns []ServiceNode var sns []ServiceNode
cw.servicesLock.Lock() cw.servicesLock.Lock()
for _, v := range cw.services { for _, sw := range cw.services {
sns = append(sns, v.serviceNodes...) sns = append(sns, sw.serviceNodes...)
} }
cw.servicesLastAccessTime = time.Now()
cw.servicesLock.Unlock() cw.servicesLock.Unlock()
cw.lastAccessTime.Store(time.Now())
return sns return sns
} }
func shouldCollectServiceByName(filterServices []string, service string) bool { func shouldCollectServiceByName(filterServices []string, serviceName string) bool {
if len(filterServices) == 0 { if len(filterServices) == 0 {
return true return true
} }
for _, filterService := range filterServices { for _, filterService := range filterServices {
if filterService == service { if filterService == serviceName {
return true return true
} }
} }
@ -233,3 +270,11 @@ func shouldCollectServiceByTags(filterTags, tags []string) bool {
} }
return true return true
} }
func getCheckInterval() time.Duration {
d := *SDCheckInterval
if d <= time.Second {
return time.Second
}
return d
}

View File

@ -33,12 +33,15 @@ func GetHTTPClient() *http.Client {
// Client is http client, which talks to the given apiServer. // Client is http client, which talks to the given apiServer.
type Client struct { type Client struct {
// hc is used for short requests.
hc *fasthttp.HostClient hc *fasthttp.HostClient
// blockingClient is used for performing long-polling requests.
// blockingClient is used for long-polling requests.
blockingClient *fasthttp.HostClient blockingClient *fasthttp.HostClient
ac *promauth.Config
apiServer string ac *promauth.Config
hostPort string apiServer string
hostPort string
} }
// NewClient returns new Client for the given apiServer and the given ac. // NewClient returns new Client for the given apiServer and the given ac.
@ -82,7 +85,7 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
MaxConns: 2 * *maxConcurrency, MaxConns: 2 * *maxConcurrency,
Dial: dialFunc, Dial: dialFunc,
} }
wc := &fasthttp.HostClient{ blockingClient := &fasthttp.HostClient{
Addr: hostPort, Addr: hostPort,
Name: "vm_promscrape/discovery", Name: "vm_promscrape/discovery",
DialDualStack: netutil.TCP6Enabled(), DialDualStack: netutil.TCP6Enabled(),
@ -96,7 +99,7 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
} }
return &Client{ return &Client{
hc: hc, hc: hc,
blockingClient: wc, blockingClient: blockingClient,
ac: ac, ac: ac,
apiServer: apiServer, apiServer: apiServer,
hostPort: hostPort, hostPort: hostPort,
@ -112,10 +115,9 @@ func concurrencyLimitChInit() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrency) concurrencyLimitCh = make(chan struct{}, *maxConcurrency)
} }
// APIRequestParams modifies api request with given params. // Addr returns the address the client connects to.
type APIRequestParams struct { func (c *Client) Addr() string {
FetchFromResponse func(resp *fasthttp.Response) return c.hc.Addr
SetToRequest func(req *fasthttp.Request)
} }
// GetAPIResponse returns response for the given absolute path. // GetAPIResponse returns response for the given absolute path.