package kubernetes import ( "encoding/json" "errors" "flag" "fmt" "io" "io/ioutil" "net" "net/http" "net/url" "os" "strconv" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" "github.com/VictoriaMetrics/metrics" ) var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 10*time.Minute, "How frequently to reload the full state from Kuberntes API server") // apiConfig contains config for API server type apiConfig struct { aw *apiWatcher } var configMap = discoveryutils.NewConfigMap() func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) }) if err != nil { return nil, err } return v.(*apiConfig), nil } func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) if err != nil { return nil, fmt.Errorf("cannot parse auth config: %w", err) } apiServer := sdc.APIServer if len(apiServer) == 0 { // Assume we run at k8s pod. // Discover apiServer and auth config according to k8s docs. // See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller host := os.Getenv("KUBERNETES_SERVICE_HOST") port := os.Getenv("KUBERNETES_SERVICE_PORT") if len(host) == 0 { return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " + "probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?") } if len(port) == 0 { return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_PORT env var; it must be defined when running in k8s; "+ "KUBERNETES_SERVICE_HOST=%q", host) } apiServer = "https://" + net.JoinHostPort(host, port) tlsConfig := promauth.TLSConfig{ CAFile: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", } acNew, err := promauth.NewConfig(".", nil, "", "/var/run/secrets/kubernetes.io/serviceaccount/token", &tlsConfig) if err != nil { return nil, fmt.Errorf("cannot initialize service account auth: %w; probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?", err) } ac = acNew } if !strings.Contains(apiServer, "://") { proto := "http" if sdc.TLSConfig != nil { proto = "https" } apiServer = proto + "://" + apiServer } for strings.HasSuffix(apiServer, "/") { apiServer = apiServer[:len(apiServer)-1] } var proxy func(*http.Request) (*url.URL, error) if proxyURL := sdc.ProxyURL.URL(); proxyURL != nil { proxy = http.ProxyURL(proxyURL) } client := &http.Client{ Transport: &http.Transport{ TLSClientConfig: ac.NewTLSConfig(), Proxy: proxy, TLSHandshakeTimeout: 10 * time.Second, IdleConnTimeout: *apiServerTimeout, }, Timeout: *apiServerTimeout, } aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors) cfg := &apiConfig{ aw: aw, } return cfg, nil } // WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes type WatchEvent struct { Type string Object json.RawMessage } // object is any Kubernetes object. type object interface { key() string getTargetLabels(aw *apiWatcher) []map[string]string } // parseObjectFunc must parse object from the given data. type parseObjectFunc func(data []byte) (object, error) // parseObjectListFunc must parse objectList from the given data. type parseObjectListFunc func(data []byte) (map[string]object, ListMeta, error) // apiWatcher is used for watching for Kuberntes object changes and caching their latest states. type apiWatcher struct { // The client used for watching for object changes client *http.Client // Kubenetes API server address in the form http://api-server apiServer string // The contents for `Authorization` HTTP request header authorization string // Namespaces to watch namespaces []string // Selectors to apply during watch selectors []Selector // mu protects watchersByURL and lastAccessTime mu sync.Mutex // a map of watchers keyed by request paths watchersByURL map[string]*urlWatcher // The last time the apiWatcher was queried for cached objects. // It is used for stopping unused watchers. lastAccessTime uint64 } func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector) *apiWatcher { return &apiWatcher{ apiServer: apiServer, authorization: authorization, client: client, namespaces: namespaces, selectors: selectors, watchersByURL: make(map[string]*urlWatcher), lastAccessTime: fasttime.UnixTimestamp(), } } // getLabelsForRole returns all the sets of labels for the given role. func (aw *apiWatcher) getLabelsForRole(role string) []map[string]string { aw.startWatchersForRole(role) var ms []map[string]string aw.mu.Lock() for _, uw := range aw.watchersByURL { if uw.role != role { continue } uw.mu.Lock() for _, labels := range uw.labelsByKey { ms = append(ms, labels...) } uw.mu.Unlock() } aw.lastAccessTime = fasttime.UnixTimestamp() aw.mu.Unlock() return ms } // getObjectByRole returns an object with the given (namespace, name) key and the given role. func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { if aw == nil { return nil } key := namespace + "/" + name aw.startWatchersForRole(role) var o object aw.mu.Lock() for _, uw := range aw.watchersByURL { if uw.role != role { continue } uw.mu.Lock() o = uw.objectsByKey[key] uw.mu.Unlock() if o != nil { break } } aw.lastAccessTime = fasttime.UnixTimestamp() aw.mu.Unlock() return o } func (aw *apiWatcher) startWatchersForRole(role string) { parseObject, parseObjectList := getObjectParsersForRole(role) paths := getAPIPaths(role, aw.namespaces, aw.selectors) for _, path := range paths { apiURL := aw.apiServer + path aw.startWatcherForURL(role, apiURL, parseObject, parseObjectList) } } func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) { aw.mu.Lock() if aw.watchersByURL[apiURL] != nil { // Watcher for the given path already exists. aw.mu.Unlock() return } uw := aw.newURLWatcher(role, apiURL, parseObject, parseObjectList) aw.watchersByURL[apiURL] = uw aw.mu.Unlock() uw.watchersCount.Inc() uw.watchersCreated.Inc() resourceVersion := uw.reloadObjects() go func() { uw.watchForUpdates(resourceVersion) aw.mu.Lock() delete(aw.watchersByURL, apiURL) aw.mu.Unlock() uw.watchersCount.Dec() uw.watchersStopped.Inc() }() } // needStop returns true if aw wasn't used for long time. func (aw *apiWatcher) needStop() bool { aw.mu.Lock() defer aw.mu.Unlock() return fasttime.UnixTimestamp() > aw.lastAccessTime+5*60 } // doRequest performs http request to the given requestURL. func (aw *apiWatcher) doRequest(requestURL string) (*http.Response, error) { req, err := http.NewRequest("GET", requestURL, nil) if err != nil { logger.Fatalf("cannot create a request for %q: %s", requestURL, err) } if aw.authorization != "" { req.Header.Set("Authorization", aw.authorization) } return aw.client.Do(req) } // urlWatcher watches for an apiURL and updates object states in objectsByKey. type urlWatcher struct { role string apiURL string parseObject parseObjectFunc parseObjectList parseObjectListFunc // mu protects objectsByKey and labelsByKey mu sync.Mutex // objectsByKey contains the latest state for objects obtained from apiURL objectsByKey map[string]object labelsByKey map[string][]map[string]string // the parent apiWatcher aw *apiWatcher watchersCount *metrics.Counter watchersCreated *metrics.Counter watchersStopped *metrics.Counter objectsCount *metrics.Counter objectsAdded *metrics.Counter objectsRemoved *metrics.Counter } func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher { return &urlWatcher{ role: role, apiURL: apiURL, parseObject: parseObject, parseObjectList: parseObjectList, objectsByKey: make(map[string]object), labelsByKey: make(map[string][]map[string]string), aw: aw, watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)), watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)), watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)), objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), } } // reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. func (uw *urlWatcher) reloadObjects() string { requestURL := uw.apiURL resp, err := uw.aw.doRequest(requestURL) if err != nil { logger.Errorf("error when performing a request to %q: %s", requestURL, err) return "" } body, _ := ioutil.ReadAll(resp.Body) _ = resp.Body.Close() if resp.StatusCode != http.StatusOK { logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) return "" } objectsByKey, metadata, err := uw.parseObjectList(body) if err != nil { logger.Errorf("cannot parse response from %q: %s", requestURL, err) return "" } labelsByKey := make(map[string][]map[string]string, len(objectsByKey)) for k, o := range objectsByKey { labelsByKey[k] = o.getTargetLabels(uw.aw) } uw.mu.Lock() uw.objectsRemoved.Add(-len(uw.objectsByKey)) uw.objectsAdded.Add(len(objectsByKey)) uw.objectsCount.Add(len(objectsByKey) - len(uw.objectsByKey)) uw.objectsByKey = objectsByKey uw.labelsByKey = labelsByKey uw.mu.Unlock() return metadata.ResourceVersion } // watchForUpdates watches for object updates starting from resourceVersion and updates the corresponding objects to the latest state. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes func (uw *urlWatcher) watchForUpdates(resourceVersion string) { aw := uw.aw backoffDelay := time.Second maxBackoffDelay := 30 * time.Second backoffSleep := func() { time.Sleep(backoffDelay) backoffDelay *= 2 if backoffDelay > maxBackoffDelay { backoffDelay = maxBackoffDelay } } apiURL := uw.apiURL delimiter := "?" if strings.Contains(apiURL, "?") { delimiter = "&" } timeoutSeconds := time.Duration(0.9 * float64(aw.client.Timeout)).Seconds() apiURL += delimiter + "watch=1&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) logger.Infof("started watcher for %q", apiURL) for { if aw.needStop() { logger.Infof("stopped unused watcher for %q", apiURL) return } requestURL := apiURL if resourceVersion != "" { requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion) } resp, err := aw.doRequest(requestURL) if err != nil { logger.Errorf("error when performing a request to %q: %s", requestURL, err) backoffSleep() resourceVersion = uw.reloadObjects() continue } if resp.StatusCode != http.StatusOK { body, _ := ioutil.ReadAll(resp.Body) _ = resp.Body.Close() logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) if resp.StatusCode == 410 { // There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses backoffDelay = time.Second } else { backoffSleep() } resourceVersion = uw.reloadObjects() continue } backoffDelay = time.Second err = uw.readObjectUpdateStream(resp.Body) _ = resp.Body.Close() if err != nil { if !errors.Is(err, io.EOF) { logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err) } backoffSleep() resourceVersion = uw.reloadObjects() continue } } } // readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events. func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { d := json.NewDecoder(r) var we WatchEvent for { if err := d.Decode(&we); err != nil { return err } o, err := uw.parseObject(we.Object) if err != nil { return err } key := o.key() switch we.Type { case "ADDED", "MODIFIED": uw.mu.Lock() if uw.objectsByKey[key] == nil { uw.objectsAdded.Inc() uw.objectsCount.Inc() } uw.objectsByKey[key] = o uw.mu.Unlock() labels := o.getTargetLabels(uw.aw) uw.mu.Lock() uw.labelsByKey[key] = labels uw.mu.Unlock() case "DELETED": uw.mu.Lock() if uw.objectsByKey[key] != nil { uw.objectsRemoved.Inc() uw.objectsCount.Dec() } delete(uw.objectsByKey, key) delete(uw.labelsByKey, key) uw.mu.Unlock() default: return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role) } } } func getAPIPaths(role string, namespaces []string, selectors []Selector) []string { objectName := getObjectNameByRole(role) if objectName == "nodes" || len(namespaces) == 0 { query := joinSelectors(role, selectors) path := getAPIPath(objectName, "", query) return []string{path} } query := joinSelectors(role, selectors) paths := make([]string, len(namespaces)) for i, namespace := range namespaces { paths[i] = getAPIPath(objectName, namespace, query) } return paths } func getAPIPath(objectName, namespace, query string) string { suffix := objectName if namespace != "" { suffix = "namespaces/" + namespace + "/" + objectName } if len(query) > 0 { suffix += "?" + query } if objectName == "endpointslices" { return "/apis/discovery.k8s.io/v1beta1/" + suffix } return "/api/v1/" + suffix } func joinSelectors(role string, selectors []Selector) string { var labelSelectors, fieldSelectors []string for _, s := range selectors { if s.Role != role { continue } if s.Label != "" { labelSelectors = append(labelSelectors, s.Label) } if s.Field != "" { fieldSelectors = append(fieldSelectors, s.Field) } } var args []string if len(labelSelectors) > 0 { args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ","))) } if len(fieldSelectors) > 0 { args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ","))) } return strings.Join(args, "&") } func getObjectNameByRole(role string) string { switch role { case "node": return "nodes" case "pod": return "pods" case "service": return "services" case "endpoints": return "endpoints" case "endpointslices": return "endpointslices" case "ingress": return "ingresses" default: logger.Panicf("BUG: unknonw role=%q", role) return "" } } func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc) { switch role { case "node": return parseNode, parseNodeList case "pod": return parsePod, parsePodList case "service": return parseService, parseServiceList case "endpoints": return parseEndpoints, parseEndpointsList case "endpointslices": return parseEndpointSlice, parseEndpointSliceList case "ingress": return parseIngress, parseIngressList default: logger.Panicf("BUG: unsupported role=%q", role) return nil, nil } }