mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
4ee6def68b
* fix access to nil *url.URL Signed-off-by: Megrez Lu <lujiajing1126@gmail.com> * Update lib/promscrape/discovery/kubernetes/api_watcher.go Co-authored-by: Aliaksandr Valialkin <valyala@gmail.com>
809 lines
24 KiB
Go
809 lines
24 KiB
Go
package kubernetes
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kuberntes API server")
|
|
|
|
// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set.
|
|
//
|
|
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
|
|
type WatchEvent struct {
|
|
Type string
|
|
Object json.RawMessage
|
|
}
|
|
|
|
// object is any Kubernetes object.
|
|
type object interface {
|
|
key() string
|
|
getTargetLabels(gw *groupWatcher) []map[string]string
|
|
}
|
|
|
|
// parseObjectFunc must parse object from the given data.
|
|
type parseObjectFunc func(data []byte) (object, error)
|
|
|
|
// parseObjectListFunc must parse objectList from the given r.
|
|
type parseObjectListFunc func(r io.Reader) (map[string]object, ListMeta, error)
|
|
|
|
// apiWatcher is used for watching for Kuberntes object changes and caching their latest states.
|
|
type apiWatcher struct {
|
|
role string
|
|
|
|
// Constructor for creating ScrapeWork objects from labels
|
|
swcFunc ScrapeWorkConstructorFunc
|
|
|
|
gw *groupWatcher
|
|
|
|
// swos contains per-namepsace maps of ScrapeWork objects for the given apiWatcher
|
|
swosByNamespace map[string]map[string][]interface{}
|
|
swosByNamespaceLock sync.Mutex
|
|
|
|
swosCount *metrics.Counter
|
|
}
|
|
|
|
func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc ScrapeWorkConstructorFunc) *apiWatcher {
|
|
namespaces := sdc.Namespaces.Names
|
|
selectors := sdc.Selectors
|
|
proxyURL := sdc.ProxyURL.URL()
|
|
gw := getGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL)
|
|
return &apiWatcher{
|
|
role: sdc.Role,
|
|
swcFunc: swcFunc,
|
|
gw: gw,
|
|
swosByNamespace: make(map[string]map[string][]interface{}),
|
|
swosCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_scrape_works{role=%q}`, sdc.Role)),
|
|
}
|
|
}
|
|
|
|
func (aw *apiWatcher) mustStart() {
|
|
aw.gw.startWatchersForRole(aw.role, aw)
|
|
}
|
|
|
|
func (aw *apiWatcher) mustStop() {
|
|
aw.gw.unsubscribeAPIWatcher(aw)
|
|
aw.swosByNamespaceLock.Lock()
|
|
aw.swosByNamespace = make(map[string]map[string][]interface{})
|
|
aw.swosByNamespaceLock.Unlock()
|
|
}
|
|
|
|
func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByKey map[string][]interface{}) {
|
|
aw.swosByNamespaceLock.Lock()
|
|
aw.swosCount.Add(len(swosByKey) - len(aw.swosByNamespace[namespace]))
|
|
aw.swosByNamespace[namespace] = swosByKey
|
|
aw.swosByNamespaceLock.Unlock()
|
|
}
|
|
|
|
func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) {
|
|
swos := aw.getScrapeWorkObjectsForLabels(labels)
|
|
aw.swosByNamespaceLock.Lock()
|
|
swosByKey := aw.swosByNamespace[namespace]
|
|
if swosByKey == nil {
|
|
swosByKey = make(map[string][]interface{})
|
|
aw.swosByNamespace[namespace] = swosByKey
|
|
}
|
|
if len(swos) > 0 {
|
|
aw.swosCount.Add(len(swos) - len(swosByKey[key]))
|
|
swosByKey[key] = swos
|
|
} else {
|
|
aw.swosCount.Add(-len(swosByKey[key]))
|
|
delete(swosByKey, key)
|
|
}
|
|
aw.swosByNamespaceLock.Unlock()
|
|
}
|
|
|
|
func (aw *apiWatcher) removeScrapeWorks(namespace, key string) {
|
|
aw.swosByNamespaceLock.Lock()
|
|
swosByKey := aw.swosByNamespace[namespace]
|
|
if len(swosByKey) > 0 {
|
|
aw.swosCount.Add(-len(swosByKey[key]))
|
|
delete(swosByKey, key)
|
|
}
|
|
aw.swosByNamespaceLock.Unlock()
|
|
}
|
|
|
|
func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) []interface{} {
|
|
swos := make([]interface{}, 0, len(labelss))
|
|
for _, labels := range labelss {
|
|
swo := aw.swcFunc(labels)
|
|
// The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
|
|
if swo != nil && !reflect.ValueOf(swo).IsNil() {
|
|
swos = append(swos, swo)
|
|
}
|
|
}
|
|
return swos
|
|
}
|
|
|
|
// getScrapeWorkObjects returns all the ScrapeWork objects for the given aw.
|
|
func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
|
|
aw.gw.registerPendingAPIWatchers()
|
|
|
|
aw.swosByNamespaceLock.Lock()
|
|
defer aw.swosByNamespaceLock.Unlock()
|
|
|
|
size := 0
|
|
for _, swosByKey := range aw.swosByNamespace {
|
|
for _, swosLocal := range swosByKey {
|
|
size += len(swosLocal)
|
|
}
|
|
}
|
|
swos := make([]interface{}, 0, size)
|
|
for _, swosByKey := range aw.swosByNamespace {
|
|
for _, swosLocal := range swosByKey {
|
|
swos = append(swos, swosLocal...)
|
|
}
|
|
}
|
|
return swos
|
|
}
|
|
|
|
// groupWatcher watches for Kubernetes objects on the given apiServer with the given namespaces,
|
|
// selectors and authorization using the given client.
|
|
type groupWatcher struct {
|
|
apiServer string
|
|
namespaces []string
|
|
selectors []Selector
|
|
authorization string
|
|
client *http.Client
|
|
|
|
mu sync.Mutex
|
|
m map[string]*urlWatcher
|
|
}
|
|
|
|
func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher {
|
|
var proxy func(*http.Request) (*url.URL, error)
|
|
if proxyURL != nil {
|
|
proxy = http.ProxyURL(proxyURL)
|
|
}
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: ac.NewTLSConfig(),
|
|
Proxy: proxy,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
IdleConnTimeout: *apiServerTimeout,
|
|
},
|
|
Timeout: *apiServerTimeout,
|
|
}
|
|
return &groupWatcher{
|
|
apiServer: apiServer,
|
|
authorization: ac.Authorization,
|
|
namespaces: namespaces,
|
|
selectors: selectors,
|
|
client: client,
|
|
m: make(map[string]*urlWatcher),
|
|
}
|
|
}
|
|
|
|
func getGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher {
|
|
proxyURLStr := "<nil>"
|
|
if proxyURL != nil {
|
|
proxyURLStr = proxyURL.String()
|
|
}
|
|
key := fmt.Sprintf("apiServer=%s, namespaces=%s, selectors=%s, proxyURL=%s, authConfig=%s",
|
|
apiServer, namespaces, selectorsKey(selectors), proxyURLStr, ac.String())
|
|
groupWatchersLock.Lock()
|
|
gw := groupWatchers[key]
|
|
if gw == nil {
|
|
gw = newGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL)
|
|
groupWatchers[key] = gw
|
|
}
|
|
groupWatchersLock.Unlock()
|
|
return gw
|
|
}
|
|
|
|
func selectorsKey(selectors []Selector) string {
|
|
var sb strings.Builder
|
|
for _, s := range selectors {
|
|
fmt.Fprintf(&sb, "{role=%q, label=%q, field=%q}", s.Role, s.Label, s.Field)
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
var (
|
|
groupWatchersLock sync.Mutex
|
|
groupWatchers = make(map[string]*groupWatcher)
|
|
|
|
_ = metrics.NewGauge(`vm_promscrape_discovery_kubernetes_group_watchers`, func() float64 {
|
|
groupWatchersLock.Lock()
|
|
n := len(groupWatchers)
|
|
groupWatchersLock.Unlock()
|
|
return float64(n)
|
|
})
|
|
)
|
|
|
|
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
|
|
func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object {
|
|
if gw == nil {
|
|
// this is needed for testing
|
|
return nil
|
|
}
|
|
o := gw.getCachedObjectByRole(role, namespace, name)
|
|
if o != nil {
|
|
// Fast path: the object has been found in the cache.
|
|
return o
|
|
}
|
|
|
|
// The object wasn't found in the cache. Try querying it directly from API server.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details.
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_loads_total{role=%q}`, role)).Inc()
|
|
objectType := getObjectTypeByRole(role)
|
|
path := getAPIPath(objectType, namespace, "")
|
|
path += "/" + name
|
|
requestURL := gw.apiServer + path
|
|
resp, err := gw.doRequest(requestURL)
|
|
if err != nil {
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
|
|
logger.Errorf("cannot obtain data for object %s (namespace=%q, name=%q): %s", role, namespace, name, err)
|
|
return nil
|
|
}
|
|
data, err := ioutil.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
if err != nil {
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
|
|
logger.Errorf("cannot read response from %q: %s", requestURL, err)
|
|
return nil
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
if resp.StatusCode == http.StatusNotFound {
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_misses_total{role=%q}`, role)).Inc()
|
|
return nil
|
|
}
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
|
|
logger.Errorf("unexpected status code when reading response from %q; got %d; want %d; response body: %q", requestURL, resp.StatusCode, http.StatusOK, data)
|
|
return nil
|
|
}
|
|
parseObject, _ := getObjectParsersForRole(role)
|
|
o, err = parseObject(data)
|
|
if err != nil {
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
|
|
logger.Errorf("cannot parse object obtained from %q: %s; response body: %q", requestURL, err, data)
|
|
return nil
|
|
}
|
|
// There is no need in storing the object in urlWatcher cache, since it should be eventually populated there by urlWatcher itself.
|
|
return o
|
|
}
|
|
|
|
func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object {
|
|
key := namespace + "/" + name
|
|
gw.startWatchersForRole(role, nil)
|
|
uws := gw.getURLWatchers()
|
|
for _, uw := range uws {
|
|
if uw.role != role {
|
|
// Role mismatch
|
|
continue
|
|
}
|
|
if uw.namespace != "" && uw.namespace != namespace {
|
|
// Namespace mismatch
|
|
continue
|
|
}
|
|
uw.mu.Lock()
|
|
o := uw.objectsByKey[key]
|
|
uw.mu.Unlock()
|
|
if o != nil {
|
|
return o
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
|
|
paths, namespaces := getAPIPathsWithNamespaces(role, gw.namespaces, gw.selectors)
|
|
for i, path := range paths {
|
|
apiURL := gw.apiServer + path
|
|
gw.mu.Lock()
|
|
uw := gw.m[apiURL]
|
|
needStart := uw == nil
|
|
if needStart {
|
|
uw = newURLWatcher(role, namespaces[i], apiURL, gw)
|
|
gw.m[apiURL] = uw
|
|
}
|
|
gw.mu.Unlock()
|
|
if needStart {
|
|
uw.reloadObjects()
|
|
go uw.watchForUpdates()
|
|
}
|
|
if aw != nil {
|
|
uw.subscribeAPIWatcher(aw)
|
|
}
|
|
}
|
|
}
|
|
|
|
// doRequest performs http request to the given requestURL.
|
|
func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
|
|
req, err := http.NewRequest("GET", requestURL, nil)
|
|
if err != nil {
|
|
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
|
|
}
|
|
if gw.authorization != "" {
|
|
req.Header.Set("Authorization", gw.authorization)
|
|
}
|
|
return gw.client.Do(req)
|
|
}
|
|
|
|
func (gw *groupWatcher) registerPendingAPIWatchers() {
|
|
uws := gw.getURLWatchers()
|
|
for _, uw := range uws {
|
|
uw.registerPendingAPIWatchers()
|
|
}
|
|
}
|
|
|
|
func (gw *groupWatcher) getURLWatchers() []*urlWatcher {
|
|
gw.mu.Lock()
|
|
uws := make([]*urlWatcher, 0, len(gw.m))
|
|
for _, uw := range gw.m {
|
|
uws = append(uws, uw)
|
|
}
|
|
gw.mu.Unlock()
|
|
return uws
|
|
}
|
|
|
|
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
|
|
uws := gw.getURLWatchers()
|
|
for _, uw := range uws {
|
|
uw.unsubscribeAPIWatcher(aw)
|
|
}
|
|
}
|
|
|
|
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
|
|
type urlWatcher struct {
|
|
role string
|
|
namespace string
|
|
apiURL string
|
|
gw *groupWatcher
|
|
|
|
parseObject parseObjectFunc
|
|
parseObjectList parseObjectListFunc
|
|
|
|
// mu protects aws, awsPending, objectsByKey and resourceVersion
|
|
mu sync.Mutex
|
|
|
|
// awsPending contains pending apiWatcher objects, which are registered in a batch.
|
|
// Batch registering saves CPU time needed for registering big number of Kubernetes objects
|
|
// shared among big number of scrape jobs, since per-object labels are generated only once
|
|
// for all the scrape jobs (each scrape job is associated with a single apiWatcher).
|
|
// See reloadScrapeWorksForAPIWatchers for details.
|
|
awsPending map[*apiWatcher]struct{}
|
|
|
|
// aws contains registered apiWatcher objects
|
|
aws map[*apiWatcher]struct{}
|
|
|
|
// objectsByKey contains the latest state for objects obtained from apiURL
|
|
objectsByKey map[string]object
|
|
|
|
resourceVersion string
|
|
|
|
objectsCount *metrics.Counter
|
|
objectsAdded *metrics.Counter
|
|
objectsRemoved *metrics.Counter
|
|
objectsUpdated *metrics.Counter
|
|
staleResourceVersions *metrics.Counter
|
|
}
|
|
|
|
func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher {
|
|
parseObject, parseObjectList := getObjectParsersForRole(role)
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()
|
|
uw := &urlWatcher{
|
|
role: role,
|
|
namespace: namespace,
|
|
apiURL: apiURL,
|
|
gw: gw,
|
|
|
|
parseObject: parseObject,
|
|
parseObjectList: parseObjectList,
|
|
|
|
awsPending: make(map[*apiWatcher]struct{}),
|
|
aws: make(map[*apiWatcher]struct{}),
|
|
objectsByKey: make(map[string]object),
|
|
|
|
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
|
|
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
|
|
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
|
|
objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
|
|
staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)),
|
|
}
|
|
logger.Infof("started %s watcher for %q", uw.role, uw.apiURL)
|
|
return uw
|
|
}
|
|
|
|
func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) {
|
|
uw.mu.Lock()
|
|
if _, ok := uw.aws[aw]; !ok {
|
|
if _, ok := uw.awsPending[aw]; !ok {
|
|
uw.awsPending[aw] = struct{}{}
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc()
|
|
}
|
|
}
|
|
uw.mu.Unlock()
|
|
}
|
|
|
|
func (uw *urlWatcher) registerPendingAPIWatchers() {
|
|
uw.mu.Lock()
|
|
awsPending := make([]*apiWatcher, 0, len(uw.awsPending))
|
|
for aw := range uw.awsPending {
|
|
awsPending = append(awsPending, aw)
|
|
delete(uw.awsPending, aw)
|
|
}
|
|
uw.reloadScrapeWorksForAPIWatchers(awsPending, uw.objectsByKey)
|
|
uw.mu.Unlock()
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending))
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending))
|
|
}
|
|
|
|
func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
|
|
uw.mu.Lock()
|
|
if _, ok := uw.awsPending[aw]; ok {
|
|
delete(uw.awsPending, aw)
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec()
|
|
}
|
|
if _, ok := uw.aws[aw]; ok {
|
|
delete(uw.aws, aw)
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec()
|
|
}
|
|
uw.mu.Unlock()
|
|
}
|
|
|
|
func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
|
|
uw.mu.Lock()
|
|
uw.resourceVersion = resourceVersion
|
|
uw.mu.Unlock()
|
|
}
|
|
|
|
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
|
|
func (uw *urlWatcher) reloadObjects() string {
|
|
uw.mu.Lock()
|
|
resourceVersion := uw.resourceVersion
|
|
uw.mu.Unlock()
|
|
if resourceVersion != "" {
|
|
// Fast path - there is no need in reloading the objects.
|
|
return resourceVersion
|
|
}
|
|
|
|
requestURL := uw.apiURL
|
|
resp, err := uw.gw.doRequest(requestURL)
|
|
if err != nil {
|
|
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
|
|
return ""
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := ioutil.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
|
|
return ""
|
|
}
|
|
objectsByKey, metadata, err := uw.parseObjectList(resp.Body)
|
|
_ = resp.Body.Close()
|
|
if err != nil {
|
|
logger.Errorf("cannot parse objects from %q: %s", requestURL, err)
|
|
return ""
|
|
}
|
|
|
|
uw.mu.Lock()
|
|
var updated, removed, added int
|
|
for key := range uw.objectsByKey {
|
|
if o, ok := objectsByKey[key]; ok {
|
|
uw.objectsByKey[key] = o
|
|
updated++
|
|
} else {
|
|
delete(uw.objectsByKey, key)
|
|
removed++
|
|
}
|
|
}
|
|
for key, o := range objectsByKey {
|
|
if _, ok := uw.objectsByKey[key]; !ok {
|
|
uw.objectsByKey[key] = o
|
|
added++
|
|
}
|
|
}
|
|
uw.objectsUpdated.Add(updated)
|
|
uw.objectsRemoved.Add(removed)
|
|
uw.objectsAdded.Add(added)
|
|
uw.objectsCount.Add(added - removed)
|
|
uw.resourceVersion = metadata.ResourceVersion
|
|
aws := getAPIWatchers(uw.aws)
|
|
uw.mu.Unlock()
|
|
|
|
uw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey)
|
|
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
|
|
return uw.resourceVersion
|
|
}
|
|
|
|
func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) {
|
|
if len(aws) == 0 {
|
|
return
|
|
}
|
|
swosByKey := make([]map[string][]interface{}, len(aws))
|
|
for i := range aws {
|
|
swosByKey[i] = make(map[string][]interface{})
|
|
}
|
|
for key, o := range objectsByKey {
|
|
labels := o.getTargetLabels(uw.gw)
|
|
for i, aw := range aws {
|
|
swos := aw.getScrapeWorkObjectsForLabels(labels)
|
|
if len(swos) > 0 {
|
|
swosByKey[i][key] = swos
|
|
}
|
|
}
|
|
}
|
|
for i, aw := range aws {
|
|
aw.reloadScrapeWorks(uw.namespace, swosByKey[i])
|
|
}
|
|
}
|
|
|
|
func getAPIWatchers(awsMap map[*apiWatcher]struct{}) []*apiWatcher {
|
|
aws := make([]*apiWatcher, 0, len(awsMap))
|
|
for aw := range awsMap {
|
|
aws = append(aws, aw)
|
|
}
|
|
return aws
|
|
}
|
|
|
|
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
|
|
//
|
|
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
|
|
func (uw *urlWatcher) watchForUpdates() {
|
|
backoffDelay := time.Second
|
|
maxBackoffDelay := 30 * time.Second
|
|
backoffSleep := func() {
|
|
time.Sleep(backoffDelay)
|
|
backoffDelay *= 2
|
|
if backoffDelay > maxBackoffDelay {
|
|
backoffDelay = maxBackoffDelay
|
|
}
|
|
}
|
|
apiURL := uw.apiURL
|
|
delimiter := "?"
|
|
if strings.Contains(apiURL, "?") {
|
|
delimiter = "&"
|
|
}
|
|
timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
|
|
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
|
|
for {
|
|
resourceVersion := uw.reloadObjects()
|
|
if resourceVersion == "" {
|
|
backoffSleep()
|
|
continue
|
|
}
|
|
requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion)
|
|
resp, err := uw.gw.doRequest(requestURL)
|
|
if err != nil {
|
|
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
|
|
backoffSleep()
|
|
continue
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
if resp.StatusCode == 410 {
|
|
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
|
|
backoffDelay = time.Second
|
|
uw.staleResourceVersions.Inc()
|
|
uw.setResourceVersion("")
|
|
} else {
|
|
body, _ := ioutil.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
|
|
backoffSleep()
|
|
}
|
|
continue
|
|
}
|
|
backoffDelay = time.Second
|
|
err = uw.readObjectUpdateStream(resp.Body)
|
|
_ = resp.Body.Close()
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) {
|
|
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
|
|
}
|
|
backoffSleep()
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events.
|
|
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
|
|
d := json.NewDecoder(r)
|
|
var we WatchEvent
|
|
for {
|
|
if err := d.Decode(&we); err != nil {
|
|
return err
|
|
}
|
|
switch we.Type {
|
|
case "ADDED", "MODIFIED":
|
|
o, err := uw.parseObject(we.Object)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key := o.key()
|
|
uw.mu.Lock()
|
|
if _, ok := uw.objectsByKey[key]; !ok {
|
|
uw.objectsCount.Inc()
|
|
uw.objectsAdded.Inc()
|
|
} else {
|
|
uw.objectsUpdated.Inc()
|
|
}
|
|
uw.objectsByKey[key] = o
|
|
aws := getAPIWatchers(uw.aws)
|
|
uw.mu.Unlock()
|
|
labels := o.getTargetLabels(uw.gw)
|
|
for _, aw := range aws {
|
|
aw.setScrapeWorks(uw.namespace, key, labels)
|
|
}
|
|
case "DELETED":
|
|
o, err := uw.parseObject(we.Object)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key := o.key()
|
|
uw.mu.Lock()
|
|
if _, ok := uw.objectsByKey[key]; ok {
|
|
uw.objectsCount.Dec()
|
|
uw.objectsRemoved.Inc()
|
|
delete(uw.objectsByKey, key)
|
|
}
|
|
aws := getAPIWatchers(uw.aws)
|
|
uw.mu.Unlock()
|
|
for _, aw := range aws {
|
|
aw.removeScrapeWorks(uw.namespace, key)
|
|
}
|
|
case "BOOKMARK":
|
|
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
|
|
bm, err := parseBookmark(we.Object)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse bookmark from %q: %w", we.Object, err)
|
|
}
|
|
uw.setResourceVersion(bm.Metadata.ResourceVersion)
|
|
case "ERROR":
|
|
em, err := parseError(we.Object)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse error message from %q: %w", we.Object, err)
|
|
}
|
|
if em.Code == 410 {
|
|
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
|
|
uw.staleResourceVersions.Inc()
|
|
uw.setResourceVersion("")
|
|
return nil
|
|
}
|
|
return fmt.Errorf("unexpected error message: %q", we.Object)
|
|
default:
|
|
return fmt.Errorf("unexpected WatchEvent type %q: %q", we.Type, we.Object)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Bookmark is a bookmark message from Kubernetes Watch API.
|
|
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
|
|
type Bookmark struct {
|
|
Metadata struct {
|
|
ResourceVersion string
|
|
}
|
|
}
|
|
|
|
func parseBookmark(data []byte) (*Bookmark, error) {
|
|
var bm Bookmark
|
|
if err := json.Unmarshal(data, &bm); err != nil {
|
|
return nil, err
|
|
}
|
|
return &bm, nil
|
|
}
|
|
|
|
// Error is an error message from Kubernetes Watch API.
|
|
type Error struct {
|
|
Code int
|
|
}
|
|
|
|
func parseError(data []byte) (*Error, error) {
|
|
var em Error
|
|
if err := json.Unmarshal(data, &em); err != nil {
|
|
return nil, err
|
|
}
|
|
return &em, nil
|
|
}
|
|
|
|
func getAPIPathsWithNamespaces(role string, namespaces []string, selectors []Selector) ([]string, []string) {
|
|
objectType := getObjectTypeByRole(role)
|
|
if objectType == "nodes" || len(namespaces) == 0 {
|
|
query := joinSelectors(role, selectors)
|
|
path := getAPIPath(objectType, "", query)
|
|
return []string{path}, []string{""}
|
|
}
|
|
query := joinSelectors(role, selectors)
|
|
paths := make([]string, len(namespaces))
|
|
for i, namespace := range namespaces {
|
|
paths[i] = getAPIPath(objectType, namespace, query)
|
|
}
|
|
return paths, namespaces
|
|
}
|
|
|
|
func getAPIPath(objectType, namespace, query string) string {
|
|
suffix := objectType
|
|
if namespace != "" {
|
|
suffix = "namespaces/" + namespace + "/" + objectType
|
|
}
|
|
if len(query) > 0 {
|
|
suffix += "?" + query
|
|
}
|
|
if objectType == "ingresses" {
|
|
return "/apis/networking.k8s.io/v1beta1/" + suffix
|
|
}
|
|
if objectType == "endpointslices" {
|
|
return "/apis/discovery.k8s.io/v1beta1/" + suffix
|
|
}
|
|
return "/api/v1/" + suffix
|
|
}
|
|
|
|
func joinSelectors(role string, selectors []Selector) string {
|
|
var labelSelectors, fieldSelectors []string
|
|
for _, s := range selectors {
|
|
if s.Role != role {
|
|
continue
|
|
}
|
|
if s.Label != "" {
|
|
labelSelectors = append(labelSelectors, s.Label)
|
|
}
|
|
if s.Field != "" {
|
|
fieldSelectors = append(fieldSelectors, s.Field)
|
|
}
|
|
}
|
|
var args []string
|
|
if len(labelSelectors) > 0 {
|
|
args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ",")))
|
|
}
|
|
if len(fieldSelectors) > 0 {
|
|
args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ",")))
|
|
}
|
|
return strings.Join(args, "&")
|
|
}
|
|
|
|
func getObjectTypeByRole(role string) string {
|
|
switch role {
|
|
case "node":
|
|
return "nodes"
|
|
case "pod":
|
|
return "pods"
|
|
case "service":
|
|
return "services"
|
|
case "endpoints":
|
|
return "endpoints"
|
|
case "endpointslices":
|
|
return "endpointslices"
|
|
case "ingress":
|
|
return "ingresses"
|
|
default:
|
|
logger.Panicf("BUG: unknonw role=%q", role)
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc) {
|
|
switch role {
|
|
case "node":
|
|
return parseNode, parseNodeList
|
|
case "pod":
|
|
return parsePod, parsePodList
|
|
case "service":
|
|
return parseService, parseServiceList
|
|
case "endpoints":
|
|
return parseEndpoints, parseEndpointsList
|
|
case "endpointslices":
|
|
return parseEndpointSlice, parseEndpointSliceList
|
|
case "ingress":
|
|
return parseIngress, parseIngressList
|
|
default:
|
|
logger.Panicf("BUG: unsupported role=%q", role)
|
|
return nil, nil
|
|
}
|
|
}
|