diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 54969b1601..f26bbb48d7 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes" "gopkg.in/yaml.v2" ) @@ -57,6 +58,7 @@ type ScrapeConfig struct { TLSConfig *promauth.TLSConfig `yaml:"tls_config"` StaticConfigs []StaticConfig `yaml:"static_configs"` FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"` + KubernetesSDConfigs []KubernetesSDConfig `yaml:"kubernetes_sd_configs"` RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs"` MetricRelabelConfigs []promrelabel.RelabelConfig `yaml:"metric_relabel_configs"` ScrapeLimit int `yaml:"scrape_limit"` @@ -73,6 +75,25 @@ type FileSDConfig struct { // `refresh_interval` is ignored. See `-prometheus.fileSDCheckInterval` } +// KubernetesSDConfig represents kubernetes-based service discovery config. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config +type KubernetesSDConfig struct { + APIServer string `yaml:"api_server"` + Role string `yaml:"role"` + BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth"` + BearerToken string `yaml:"bearer_token"` + BearerTokenFile string `yaml:"bearer_token_file"` + TLSConfig *promauth.TLSConfig `yaml:"tls_config"` + Namespaces KubernetesNamespaces `yaml:"namespaces"` + Selectors []kubernetes.Selector `yaml:"selectors"` +} + +// KubernetesNamespaces represents namespaces for KubernetesSDConfig +type KubernetesNamespaces struct { + Names []string `yaml:"names"` +} + // StaticConfig represents essential parts for `static_config` section of Prometheus config. // // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config @@ -136,6 +157,14 @@ func unmarshalMaybeStrict(data []byte, dst interface{}) error { return err } +func (cfg *Config) kubernetesSDConfigsCount() int { + n := 0 + for i := range cfg.ScrapeConfigs { + n += len(cfg.ScrapeConfigs[i].KubernetesSDConfigs) + } + return n +} + func (cfg *Config) fileSDConfigsCount() int { n := 0 for i := range cfg.ScrapeConfigs { @@ -144,6 +173,17 @@ func (cfg *Config) fileSDConfigsCount() int { return n } +// getKubernetesSDcrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg. +func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork { + var dst []ScrapeWork + for _, sc := range cfg.ScrapeConfigs { + for _, sdc := range sc.KubernetesSDConfigs { + dst = sdc.appendScrapeWork(dst, cfg.baseDir, sc.swc) + } + } + return dst +} + // getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg. func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork { // Create a map for the previous scrape work. @@ -259,6 +299,74 @@ type scrapeWorkConfig struct { scrapeLimit int } +func (sdc *KubernetesSDConfig) appendScrapeWork(dst []ScrapeWork, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { + ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) + if err != nil { + logger.Errorf("cannot parse auth config for `kubernetes_sd_config` for `job_name` %q: %s; skipping it", swc.jobName, err) + return dst + } + cfg := &kubernetes.APIConfig{ + Server: sdc.APIServer, + AuthConfig: ac, + Namespaces: sdc.Namespaces.Names, + Selectors: sdc.Selectors, + } + switch sdc.Role { + case "node": + targetLabels, err := kubernetes.GetNodesLabels(cfg) + if err != nil { + logger.Errorf("error when discovering kubernetes nodes for `job_name` %q: %s; skipping it", swc.jobName, err) + return dst + } + return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role) + case "service": + targetLabels, err := kubernetes.GetServicesLabels(cfg) + if err != nil { + logger.Errorf("error when discovering kubernetes services for `job_name` %q: %s; skipping it", swc.jobName, err) + return dst + } + return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role) + case "pod": + targetLabels, err := kubernetes.GetPodsLabels(cfg) + if err != nil { + logger.Errorf("error when discovering kubernetes pods for `job_name` %q: %s; skipping it", swc.jobName, err) + return dst + } + return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role) + case "endpoints": + targetLabels, err := kubernetes.GetEndpointsLabels(cfg) + if err != nil { + logger.Errorf("error when discovering kubernetes endpoints for `job_name` %q: %s; skipping it", swc.jobName, err) + return dst + } + return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role) + case "ingress": + targetLabels, err := kubernetes.GetIngressesLabels(cfg) + if err != nil { + logger.Errorf("error when discovering kubernetes ingresses for `job_name` %q: %s; skipping it", swc.jobName, err) + return dst + } + return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role) + default: + logger.Errorf("unexpected `role`: %q; must be one of `node`, `service`, `pod`, `endpoints` or `ingress`; skipping it", sdc.Role) + return dst + } +} + +func appendKubernetesScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, role string) []ScrapeWork { + for _, metaLabels := range targetLabels { + target := metaLabels["__address__"] + var err error + dst, err = appendScrapeWork(dst, swc, target, nil, metaLabels) + if err != nil { + logger.Errorf("error when parsing `kubernetes_sd_config` target %q with role %q for `job_name` %q: %s; skipping it", + target, role, swc.jobName, err) + continue + } + } + return dst +} + func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swPrev map[string][]ScrapeWork, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { for _, file := range sdc.Files { pathPattern := getFilepath(baseDir, file) diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go new file mode 100644 index 0000000000..ba5c7feaf0 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -0,0 +1,167 @@ +package kubernetes + +import ( + "crypto/tls" + "fmt" + "net" + "os" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/valyala/fasthttp" +) + +func getAPIResponse(cfg *APIConfig, role, path string) ([]byte, error) { + hcv, err := getHostClient(cfg.Server, cfg.AuthConfig) + if err != nil { + return nil, err + } + query := joinSelectors(role, cfg.Namespaces, cfg.Selectors) + if len(query) > 0 { + path += "?" + query + } + requestURL := hcv.apiServer + path + var u fasthttp.URI + u.Update(requestURL) + var req fasthttp.Request + req.SetRequestURIBytes(u.RequestURI()) + req.SetHost(hcv.hostPort) + req.Header.Set("Accept-Encoding", "gzip") + if hcv.ac != nil && hcv.ac.Authorization != "" { + req.Header.Set("Authorization", hcv.ac.Authorization) + } + var resp fasthttp.Response + // There is no need in calling DoTimeout, since the timeout is already set in hc.ReadTimeout above. + if err := hcv.hc.Do(&req, &resp); err != nil { + return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err) + } + var data []byte + if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" { + dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body()) + if err != nil { + return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err) + } + data = dst + } else { + data = append(data[:0], resp.Body()...) + } + statusCode := resp.StatusCode() + if statusCode != fasthttp.StatusOK { + return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q", + requestURL, statusCode, fasthttp.StatusOK, data) + } + return data, nil +} + +func getHostClient(apiServer string, ac *promauth.Config) (hcValue, error) { + k := hcKey{ + apiServer: apiServer, + ac: ac, + } + hcMapLock.Lock() + defer hcMapLock.Unlock() + + if !hasHCMapCleaner { + go hcMapCleaner() + hasHCMapCleaner = true + } + hcv, ok := hcMap[k] + if !ok { + hcvNew, err := newHostClient(apiServer, ac) + if err != nil { + return hcv, fmt.Errorf("cannot create new HTTP client for %q: %s", apiServer, err) + } + hcMap[k] = hcvNew + hcv = hcvNew + } + return hcv, nil +} + +func hcMapCleaner() { + tc := time.NewTicker(15 * time.Minute) + for range tc.C { + hcMapLock.Lock() + hcMap = make(map[hcKey]hcValue) + hcMapLock.Unlock() + } +} + +type hcKey struct { + apiServer string + ac *promauth.Config +} + +type hcValue struct { + hc *fasthttp.HostClient + ac *promauth.Config + apiServer string + hostPort string +} + +var ( + hasHCMapCleaner bool + hcMap = make(map[hcKey]hcValue) + hcMapLock sync.Mutex +) + +func newHostClient(apiServer string, ac *promauth.Config) (hcValue, error) { + var hcv hcValue + if len(apiServer) == 0 { + // Assume we run at k8s pod. + // Discover apiServer and auth config according to k8s docs. + // See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller + host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT") + if len(host) == 0 { + return hcv, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " + + "probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?") + } + if len(port) == 0 { + return hcv, fmt.Errorf("cannot find KUBERNETES_SERVICE_PORT env var; it must be defined when running in k8s; "+ + "KUBERNETES_SERVICE_HOST=%q", host) + } + apiServer = "https://" + net.JoinHostPort(host, port) + tlsConfig := promauth.TLSConfig{ + CAFile: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", + } + acNew, err := promauth.NewConfig("/", nil, "", "/var/run/secrets/kubernetes.io/serviceaccount/token", &tlsConfig) + if err != nil { + return hcv, fmt.Errorf("cannot initialize service account auth: %s; probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?", err) + } + ac = acNew + } + + var u fasthttp.URI + u.Update(apiServer) + hostPort := string(u.Host()) + isTLS := string(u.Scheme()) == "https" + var tlsCfg *tls.Config + if isTLS && ac != nil { + tlsCfg = ac.NewTLSConfig() + } + if !strings.Contains(hostPort, ":") { + port := "80" + if isTLS { + port = "443" + } + hostPort = net.JoinHostPort(hostPort, port) + } + hc := &fasthttp.HostClient{ + Addr: hostPort, + Name: "vm_promscrape/discovery", + DialDualStack: netutil.TCP6Enabled(), + IsTLS: isTLS, + TLSConfig: tlsCfg, + ReadTimeout: time.Minute, + WriteTimeout: 10 * time.Second, + MaxResponseBodySize: 300 * 1024 * 1024, + } + return hcValue{ + hc: hc, + ac: ac, + apiServer: apiServer, + hostPort: hostPort, + }, nil +} diff --git a/lib/promscrape/discovery/kubernetes/common_types.go b/lib/promscrape/discovery/kubernetes/common_types.go new file mode 100644 index 0000000000..a32ef5c512 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/common_types.go @@ -0,0 +1,144 @@ +package kubernetes + +import ( + "encoding/json" + "fmt" + "net" + "net/url" + "regexp" + "sort" + "strconv" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +// ObjectMeta represents ObjectMeta from k8s API. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#objectmeta-v1-meta +type ObjectMeta struct { + Name string + Namespace string + UID string + Labels SortedLabels + Annotations SortedLabels + OwnerReferences []OwnerReference +} + +func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m map[string]string) { + for _, lb := range om.Labels { + ln := sanitizeLabelName(lb.Name) + m[fmt.Sprintf("%s_label_%s", prefix, ln)] = lb.Value + m[fmt.Sprintf("%s_labelpresent_%s", prefix, ln)] = "true" + } + for _, a := range om.Annotations { + an := sanitizeLabelName(a.Name) + m[fmt.Sprintf("%s_annotation_%s", prefix, an)] = a.Value + m[fmt.Sprintf("%s_annotationpresent_%s", prefix, an)] = "true" + } +} + +// sanitizeLabelName replaces anything that doesn't match +// client_label.LabelNameRE with an underscore. +// +// This has been copied from Prometheus sources at util/strutil/strconv.go +func sanitizeLabelName(name string) string { + return invalidLabelCharRE.ReplaceAllString(name, "_") +} + +var ( + invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) +) + +// SortedLabels represents sorted labels. +type SortedLabels []prompbmarshal.Label + +// UnmarshalJSON unmarshals JSON from data. +func (sls *SortedLabels) UnmarshalJSON(data []byte) error { + var m map[string]string + if err := json.Unmarshal(data, &m); err != nil { + return err + } + *sls = getSortedLabels(m) + return nil +} + +func getSortedLabels(m map[string]string) SortedLabels { + a := make([]prompbmarshal.Label, 0, len(m)) + for k, v := range m { + a = append(a, prompbmarshal.Label{ + Name: k, + Value: v, + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].Name < a[j].Name + }) + return a +} + +// OwnerReference represents OwnerReferense from k8s API. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ownerreference-v1-meta +type OwnerReference struct { + Name string + Controller bool + Kind string +} + +// DaemonEndpoint represents DaemonEndpoint from k8s API. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#daemonendpoint-v1-core +type DaemonEndpoint struct { + Port int +} + +func joinHostPort(host string, port int) string { + portStr := strconv.Itoa(port) + return net.JoinHostPort(host, portStr) +} + +// APIConfig contains config for API server +type APIConfig struct { + Server string + AuthConfig *promauth.Config + Namespaces []string + Selectors []Selector +} + +// Selector represents kubernetes selector. +// +// See https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/ +// and https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ +type Selector struct { + Role string `yaml:"role"` + Label string `yaml:"label"` + Field string `yaml:"field"` +} + +func joinSelectors(role string, namespaces []string, selectors []Selector) string { + var labelSelectors, fieldSelectors []string + for _, ns := range namespaces { + fieldSelectors = append(fieldSelectors, "metadata.namespace="+ns) + } + for _, s := range selectors { + if s.Role != role { + continue + } + if s.Label != "" { + labelSelectors = append(labelSelectors, s.Label) + } + if s.Field != "" { + fieldSelectors = append(fieldSelectors, s.Field) + } + } + var args []string + if len(labelSelectors) > 0 { + args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ","))) + } + if len(fieldSelectors) > 0 { + args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ","))) + } + return strings.Join(args, "&") +} diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go new file mode 100644 index 0000000000..c657a3ae04 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -0,0 +1,189 @@ +package kubernetes + +import ( + "encoding/json" + "fmt" +) + +// GetEndpointsLabels returns labels for k8s endpoints obtained from the given apiServer +func GetEndpointsLabels(cfg *APIConfig) ([]map[string]string, error) { + data, err := getAPIResponse(cfg, "endpoints", "/api/v1/endpoints") + if err != nil { + return nil, fmt.Errorf("cannot obtain endpoints data from API server: %s", err) + } + epl, err := parseEndpointsList(data) + if err != nil { + return nil, fmt.Errorf("cannot parse endpoints response from API server: %s", err) + } + pods, err := getPods(cfg) + if err != nil { + return nil, err + } + svcs, err := getServices(cfg) + if err != nil { + return nil, err + } + var ms []map[string]string + for _, ep := range epl.Items { + ms = ep.appendTargetLabels(ms, pods, svcs) + } + return ms, nil +} + +// EndpointsList implements k8s endpoints list. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointslist-v1-core +type EndpointsList struct { + Items []Endpoints +} + +// Endpoints implements k8s endpoints. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpoints-v1-core +type Endpoints struct { + Metadata ObjectMeta + Subsets []EndpointSubset +} + +// EndpointSubset implements k8s endpoint subset. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointsubset-v1-core +type EndpointSubset struct { + Addresses []EndpointAddress + NotReadyAddresses []EndpointAddress + Ports []EndpointPort +} + +// EndpointAddress implements k8s endpoint address. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointaddress-v1-core +type EndpointAddress struct { + Hostname string + IP string + NodeName string + TargetRef ObjectReference +} + +// ObjectReference implements k8s object reference. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#objectreference-v1-core +type ObjectReference struct { + Kind string + Name string + Namespace string +} + +// EndpointPort implements k8s endpoint port. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointport-v1beta1-discovery-k8s-io +type EndpointPort struct { + AppProtocol string + Name string + Port int + Protocol string +} + +// parseEndpointsList parses EndpointsList from data. +func parseEndpointsList(data []byte) (*EndpointsList, error) { + var esl EndpointsList + if err := json.Unmarshal(data, &esl); err != nil { + return nil, fmt.Errorf("cannot unmarshal EndpointsList from %q: %s", data, err) + } + return &esl, nil +} + +// appendTargetLabels appends labels for each endpoint in eps to ms and returns the result. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints +func (eps *Endpoints) appendTargetLabels(ms []map[string]string, pods []Pod, svcs []Service) []map[string]string { + svc := getService(svcs, eps.Metadata.Namespace, eps.Metadata.Name) + podPortsSeen := make(map[*Pod][]int) + for _, ess := range eps.Subsets { + for _, epp := range ess.Ports { + ms = appendEndpointLabelsForAddresses(ms, podPortsSeen, eps, ess.Addresses, epp, pods, svc, "true") + ms = appendEndpointLabelsForAddresses(ms, podPortsSeen, eps, ess.NotReadyAddresses, epp, pods, svc, "false") + } + } + + // Append labels for skipped ports on seen pods. + portSeen := func(port int, ports []int) bool { + for _, p := range ports { + if p == port { + return true + } + } + return false + } + for p, ports := range podPortsSeen { + for _, c := range p.Spec.Containers { + for _, cp := range c.Ports { + if portSeen(cp.ContainerPort, ports) { + continue + } + addr := joinHostPort(p.Status.PodIP, cp.ContainerPort) + m := map[string]string{ + "__address__": addr, + } + p.appendCommonLabels(m) + p.appendContainerLabels(m, c, &cp) + ms = append(ms, m) + } + } + } + return ms +} + +func appendEndpointLabelsForAddresses(ms []map[string]string, podPortsSeen map[*Pod][]int, eps *Endpoints, eas []EndpointAddress, epp EndpointPort, + pods []Pod, svc *Service, ready string) []map[string]string { + for _, ea := range eas { + p := getPod(pods, ea.TargetRef.Namespace, ea.TargetRef.Name) + m := getEndpointLabelsForAddressAndPort(podPortsSeen, eps, ea, epp, p, svc, ready) + ms = append(ms, m) + } + return ms +} + +func getEndpointLabelsForAddressAndPort(podPortsSeen map[*Pod][]int, eps *Endpoints, ea EndpointAddress, epp EndpointPort, p *Pod, svc *Service, ready string) map[string]string { + m := getEndpointLabels(eps.Metadata, ea, epp, ready) + if svc != nil { + svc.appendCommonLabels(m) + } + if ea.TargetRef.Kind != "Pod" || p == nil { + return m + } + p.appendCommonLabels(m) + for _, c := range p.Spec.Containers { + for _, cp := range c.Ports { + if cp.ContainerPort == epp.Port { + p.appendContainerLabels(m, c, &cp) + podPortsSeen[p] = append(podPortsSeen[p], cp.ContainerPort) + break + } + } + } + return m +} + +func getEndpointLabels(om ObjectMeta, ea EndpointAddress, epp EndpointPort, ready string) map[string]string { + addr := joinHostPort(ea.IP, epp.Port) + m := map[string]string{ + "__address__": addr, + "__meta_kubernetes_namespace": om.Namespace, + "__meta_kubernetes_endpoints_name": om.Name, + + "__meta_kubernetes_endpoint_ready": ready, + "__meta_kubernetes_endpoint_port_name": epp.Name, + "__meta_kubernetes_endpoint_port_protocol": epp.Protocol, + } + if ea.TargetRef.Kind != "" { + m["__meta_kubernetes_endpoint_address_target_kind"] = ea.TargetRef.Kind + m["__meta_kubernetes_endpoint_address_target_name"] = ea.TargetRef.Name + } + if ea.NodeName != "" { + m["__meta_kubernetes_endpoint_node_name"] = ea.NodeName + } + if ea.Hostname != "" { + m["__meta_kubernetes_endpoint_hostname"] = ea.Hostname + } + return m +} diff --git a/lib/promscrape/discovery/kubernetes/endpoints_test.go b/lib/promscrape/discovery/kubernetes/endpoints_test.go new file mode 100644 index 0000000000..cb8bb698e0 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/endpoints_test.go @@ -0,0 +1,107 @@ +package kubernetes + +import ( + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestParseEndpointsListFailure(t *testing.T) { + f := func(s string) { + t.Helper() + els, err := parseEndpointsList([]byte(s)) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if els != nil { + t.Fatalf("unexpected non-nil EnpointsList: %v", els) + } + } + f(``) + f(`[1,23]`) + f(`{"items":[{"metadata":1}]}`) + f(`{"items":[{"metadata":{"labels":[1]}}]}`) +} + +func TestParseEndpointsListSuccess(t *testing.T) { + data := ` +{ + "kind": "EndpointsList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/endpoints", + "resourceVersion": "128055" + }, + "items": [ + { + "metadata": { + "name": "kubernetes", + "namespace": "default", + "selfLink": "/api/v1/namespaces/default/endpoints/kubernetes", + "uid": "0972c7d9-c267-4b93-a090-a417eeb9b385", + "resourceVersion": "150", + "creationTimestamp": "2020-03-16T20:44:25Z" + }, + "subsets": [ + { + "addresses": [ + { + "hostname": "aaa.bbb", + "nodeName": "foobar", + "ip": "172.17.0.2", + "targetRef": { + "kind": "Pod", + "namespace": "kube-system", + "name": "coredns-6955765f44-lnp6t", + "uid": "cbddb2b6-5b85-40f1-8819-9a59385169bb", + "resourceVersion": "124878" + } + } + ], + "ports": [ + { + "name": "https", + "port": 8443, + "protocol": "TCP" + } + ] + } + ] + } + ] +} +` + els, err := parseEndpointsList([]byte(data)) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(els.Items) != 1 { + t.Fatalf("unexpected length of EndpointsList.Items; got %d; want %d", len(els.Items), 1) + } + endpoint := els.Items[0] + + // Check endpoint.appendTargetLabels() + labelss := endpoint.appendTargetLabels(nil, nil, nil) + var sortedLabelss [][]prompbmarshal.Label + for _, labels := range labelss { + sortedLabelss = append(sortedLabelss, getSortedLabels(labels)) + } + expectedLabelss := [][]prompbmarshal.Label{ + getSortedLabels(map[string]string{ + "__address__": "172.17.0.2:8443", + "__meta_kubernetes_endpoint_address_target_kind": "Pod", + "__meta_kubernetes_endpoint_address_target_name": "coredns-6955765f44-lnp6t", + "__meta_kubernetes_endpoint_hostname": "aaa.bbb", + "__meta_kubernetes_endpoint_node_name": "foobar", + "__meta_kubernetes_endpoint_port_name": "https", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + "__meta_kubernetes_endpoints_name": "kubernetes", + "__meta_kubernetes_namespace": "default", + }), + } + if !reflect.DeepEqual(sortedLabelss, expectedLabelss) { + t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss) + } +} diff --git a/lib/promscrape/discovery/kubernetes/ingress.go b/lib/promscrape/discovery/kubernetes/ingress.go new file mode 100644 index 0000000000..542111e31a --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/ingress.go @@ -0,0 +1,136 @@ +package kubernetes + +import ( + "encoding/json" + "fmt" +) + +// GetIngressesLabels returns labels for k8s ingresses obtained from the given apiServer +func GetIngressesLabels(cfg *APIConfig) ([]map[string]string, error) { + data, err := getAPIResponse(cfg, "ingress", "/apis/extensions/v1beta1/ingresses") + if err != nil { + return nil, fmt.Errorf("cannot obtain ingresses data from API server: %s", err) + } + igl, err := parseIngressList(data) + if err != nil { + return nil, fmt.Errorf("cannot parse ingresses response from API server: %s", err) + } + var ms []map[string]string + for _, ig := range igl.Items { + ms = ig.appendTargetLabels(ms) + } + return ms, nil +} + +// IngressList represents ingress list in k8s. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingresslist-v1beta1-extensions +type IngressList struct { + Items []Ingress +} + +// Ingress represents ingress in k8s. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingress-v1beta1-extensions +type Ingress struct { + Metadata ObjectMeta + Spec IngressSpec +} + +// IngressSpec represents ingress spec in k8s. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingressspec-v1beta1-extensions +type IngressSpec struct { + TLS []IngressTLS `json:"tls"` + Rules []IngressRule +} + +// IngressTLS represents ingress TLS spec in k8s. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingresstls-v1beta1-extensions +type IngressTLS struct { + Hosts []string +} + +// IngressRule represents ingress rule in k8s. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingressrule-v1beta1-extensions +type IngressRule struct { + Host string + HTTP HTTPIngressRuleValue `json:"http"` +} + +// HTTPIngressRuleValue represents HTTP ingress rule value in k8s. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#httpingressrulevalue-v1beta1-extensions +type HTTPIngressRuleValue struct { + Paths []HTTPIngressPath +} + +// HTTPIngressPath represents HTTP ingress path in k8s. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#httpingresspath-v1beta1-extensions +type HTTPIngressPath struct { + Path string +} + +// parseIngressList parses IngressList from data. +func parseIngressList(data []byte) (*IngressList, error) { + var il IngressList + if err := json.Unmarshal(data, &il); err != nil { + return nil, fmt.Errorf("cannot unmarshal IngressList from %q: %s", data, err) + } + return &il, nil +} + +// appendTargetLabels appends labels for Ingress ig to ms and returns the result. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress +func (ig *Ingress) appendTargetLabels(ms []map[string]string) []map[string]string { + tlsHosts := make(map[string]bool) + for _, tls := range ig.Spec.TLS { + for _, host := range tls.Hosts { + tlsHosts[host] = true + } + } + for _, r := range ig.Spec.Rules { + paths := getIngressRulePaths(r.HTTP.Paths) + scheme := "http" + if tlsHosts[r.Host] { + scheme = "https" + } + for _, path := range paths { + m := getLabelsForIngressPath(ig, scheme, r.Host, path) + ms = append(ms, m) + } + } + return ms +} + +func getLabelsForIngressPath(ig *Ingress, scheme, host, path string) map[string]string { + m := map[string]string{ + "__address__": host, + "__meta_kubernetes_namespace": ig.Metadata.Namespace, + "__meta_kubernetes_ingress_name": ig.Metadata.Name, + "__meta_kubernetes_ingress_scheme": scheme, + "__meta_kubernetes_ingress_host": host, + "__meta_kubernetes_ingress_path": path, + } + ig.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_ingress", m) + return m +} + +func getIngressRulePaths(paths []HTTPIngressPath) []string { + if len(paths) == 0 { + return []string{"/"} + } + var result []string + for _, p := range paths { + path := p.Path + if path == "" { + path = "/" + } + result = append(result, path) + } + return result +} diff --git a/lib/promscrape/discovery/kubernetes/ingress_test.go b/lib/promscrape/discovery/kubernetes/ingress_test.go new file mode 100644 index 0000000000..8a1fbae757 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/ingress_test.go @@ -0,0 +1,103 @@ +package kubernetes + +import ( + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestParseIngressListFailure(t *testing.T) { + f := func(s string) { + t.Helper() + nls, err := parseIngressList([]byte(s)) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if nls != nil { + t.Fatalf("unexpected non-nil IngressList: %v", nls) + } + } + f(``) + f(`[1,23]`) + f(`{"items":[{"metadata":1}]}`) + f(`{"items":[{"metadata":{"labels":[1]}}]}`) +} + +func TestParseIngressListSuccess(t *testing.T) { + data := ` +{ + "kind": "IngressList", + "apiVersion": "extensions/v1beta1", + "metadata": { + "selfLink": "/apis/extensions/v1beta1/ingresses", + "resourceVersion": "351452" + }, + "items": [ + { + "metadata": { + "name": "test-ingress", + "namespace": "default", + "selfLink": "/apis/extensions/v1beta1/namespaces/default/ingresses/test-ingress", + "uid": "6d3f38f9-de89-4bc9-b273-c8faf74e8a27", + "resourceVersion": "351445", + "generation": 1, + "creationTimestamp": "2020-04-13T16:43:52Z", + "annotations": { + "kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"networking.k8s.io/v1beta1\",\"kind\":\"Ingress\",\"metadata\":{\"annotations\":{},\"name\":\"test-ingress\",\"namespace\":\"default\"},\"spec\":{\"backend\":{\"serviceName\":\"testsvc\",\"servicePort\":80}}}\n" + } + }, + "spec": { + "backend": { + "serviceName": "testsvc", + "servicePort": 80 + }, + "rules": [ + { + "host": "foobar" + } + ] + }, + "status": { + "loadBalancer": { + "ingress": [ + { + "ip": "172.17.0.2" + } + ] + } + } + } + ] +}` + igs, err := parseIngressList([]byte(data)) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(igs.Items) != 1 { + t.Fatalf("unexpected length of IngressList.Items; got %d; want %d", len(igs.Items), 1) + } + ig := igs.Items[0] + + // Check ig.appendTargetLabels() + labelss := ig.appendTargetLabels(nil) + var sortedLabelss [][]prompbmarshal.Label + for _, labels := range labelss { + sortedLabelss = append(sortedLabelss, getSortedLabels(labels)) + } + expectedLabelss := [][]prompbmarshal.Label{ + getSortedLabels(map[string]string{ + "__address__": "foobar", + "__meta_kubernetes_ingress_annotation_kubectl_kubernetes_io_last_applied_configuration": `{"apiVersion":"networking.k8s.io/v1beta1","kind":"Ingress","metadata":{"annotations":{},"name":"test-ingress","namespace":"default"},"spec":{"backend":{"serviceName":"testsvc","servicePort":80}}}` + "\n", + "__meta_kubernetes_ingress_annotationpresent_kubectl_kubernetes_io_last_applied_configuration": "true", + "__meta_kubernetes_ingress_host": "foobar", + "__meta_kubernetes_ingress_name": "test-ingress", + "__meta_kubernetes_ingress_path": "/", + "__meta_kubernetes_ingress_scheme": "http", + "__meta_kubernetes_namespace": "default", + }), + } + if !reflect.DeepEqual(sortedLabelss, expectedLabelss) { + t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss) + } +} diff --git a/lib/promscrape/discovery/kubernetes/node.go b/lib/promscrape/discovery/kubernetes/node.go new file mode 100644 index 0000000000..d8c65b406b --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/node.go @@ -0,0 +1,131 @@ +package kubernetes + +import ( + "encoding/json" + "fmt" +) + +// GetNodesLabels returns labels for k8s nodes obtained from the given apiServer. +func GetNodesLabels(cfg *APIConfig) ([]map[string]string, error) { + data, err := getAPIResponse(cfg, "node", "/api/v1/nodes") + if err != nil { + return nil, fmt.Errorf("cannot obtain nodes data from API server: %s", err) + } + nl, err := parseNodeList(data) + if err != nil { + return nil, fmt.Errorf("cannot parse nodes response from API server: %s", err) + } + var ms []map[string]string + for _, n := range nl.Items { + // Do not apply namespaces, since they are missing in nodes. + ms = n.appendTargetLabels(ms) + } + return ms, nil +} + +// NodeList represents NodeList from k8s API. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodelist-v1-core +type NodeList struct { + Items []Node +} + +// Node represents Node from k8s API. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#node-v1-core +type Node struct { + Metadata ObjectMeta + Status NodeStatus +} + +// NodeStatus represents NodeStatus from k8s API. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodestatus-v1-core +type NodeStatus struct { + Addresses []NodeAddress + DaemonEndpoints NodeDaemonEndpoints +} + +// NodeAddress represents NodeAddress from k8s API. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodeaddress-v1-core +type NodeAddress struct { + Type string + Address string +} + +// NodeDaemonEndpoints represents NodeDaemonEndpoints from k8s API. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodedaemonendpoints-v1-core +type NodeDaemonEndpoints struct { + KubeletEndpoint DaemonEndpoint +} + +// parseNodeList parses NodeList from data. +func parseNodeList(data []byte) (*NodeList, error) { + var nl NodeList + if err := json.Unmarshal(data, &nl); err != nil { + return nil, fmt.Errorf("cannot unmarshal NodeList from %q: %s", data, err) + } + return &nl, nil +} + +// appendTargetLabels appends labels for the given Node n to ms and returns the result. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node +func (n *Node) appendTargetLabels(ms []map[string]string) []map[string]string { + addr := getNodeAddr(n.Status.Addresses) + if len(addr) == 0 { + // Skip node without address + return ms + } + addr = joinHostPort(addr, n.Status.DaemonEndpoints.KubeletEndpoint.Port) + m := map[string]string{ + "__address__": addr, + "instance": n.Metadata.Name, + "__meta_kubernetes_node_name": n.Metadata.Name, + } + n.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_node", m) + addrTypesUsed := make(map[string]bool, len(n.Status.Addresses)) + for _, a := range n.Status.Addresses { + if addrTypesUsed[a.Type] { + continue + } + addrTypesUsed[a.Type] = true + ln := sanitizeLabelName(a.Type) + m[fmt.Sprintf("__meta_kubernetes_node_address_%s", ln)] = a.Address + } + ms = append(ms, m) + return ms +} + +func getNodeAddr(nas []NodeAddress) string { + if addr := getAddrByType(nas, "InternalIP"); len(addr) > 0 { + return addr + } + if addr := getAddrByType(nas, "InternalDNS"); len(addr) > 0 { + return addr + } + if addr := getAddrByType(nas, "ExternalIP"); len(addr) > 0 { + return addr + } + if addr := getAddrByType(nas, "ExternalDNS"); len(addr) > 0 { + return addr + } + if addr := getAddrByType(nas, "LegacyHostIP"); len(addr) > 0 { + return addr + } + if addr := getAddrByType(nas, "Hostname"); len(addr) > 0 { + return addr + } + return "" +} + +func getAddrByType(nas []NodeAddress, typ string) string { + for _, na := range nas { + if na.Type == typ { + return na.Address + } + } + return "" +} diff --git a/lib/promscrape/discovery/kubernetes/node_test.go b/lib/promscrape/discovery/kubernetes/node_test.go new file mode 100644 index 0000000000..00ce2c89e1 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/node_test.go @@ -0,0 +1,320 @@ +package kubernetes + +import ( + "reflect" + "testing" +) + +func TestParseNodeListFailure(t *testing.T) { + f := func(s string) { + t.Helper() + nls, err := parseNodeList([]byte(s)) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if nls != nil { + t.Fatalf("unexpected non-nil NodeList: %v", nls) + } + } + f(``) + f(`[1,23]`) + f(`{"items":[{"metadata":1}]}`) + f(`{"items":[{"metadata":{"labels":[1]}}]}`) +} + +func TestParseNodeListSuccess(t *testing.T) { + data := `{ + "kind": "NodeList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/nodes", + "resourceVersion": "22627" + }, + "items": [ + { + "metadata": { + "name": "m01", + "selfLink": "/api/v1/nodes/m01", + "uid": "b48dd901-ead0-476a-b209-d2d908d65109", + "resourceVersion": "22309", + "creationTimestamp": "2020-03-16T20:44:23Z", + "labels": { + "beta.kubernetes.io/arch": "amd64", + "beta.kubernetes.io/os": "linux", + "kubernetes.io/arch": "amd64", + "kubernetes.io/hostname": "m01", + "kubernetes.io/os": "linux", + "minikube.k8s.io/commit": "eb13446e786c9ef70cb0a9f85a633194e62396a1", + "minikube.k8s.io/name": "minikube", + "minikube.k8s.io/updated_at": "2020_03_16T22_44_27_0700", + "minikube.k8s.io/version": "v1.8.2", + "node-role.kubernetes.io/master": "" + }, + "annotations": { + "kubeadm.alpha.kubernetes.io/cri-socket": "/var/run/dockershim.sock", + "node.alpha.kubernetes.io/ttl": "0", + "volumes.kubernetes.io/controller-managed-attach-detach": "true" + } + }, + "spec": { + "podCIDR": "10.244.0.0/24", + "podCIDRs": [ + "10.244.0.0/24" + ] + }, + "status": { + "capacity": { + "cpu": "4", + "ephemeral-storage": "474705032Ki", + "hugepages-1Gi": "0", + "hugepages-2Mi": "0", + "memory": "16314884Ki", + "pods": "110" + }, + "allocatable": { + "cpu": "4", + "ephemeral-storage": "437488156767", + "hugepages-1Gi": "0", + "hugepages-2Mi": "0", + "memory": "16212484Ki", + "pods": "110" + }, + "conditions": [ + { + "type": "MemoryPressure", + "status": "False", + "lastHeartbeatTime": "2020-03-20T13:30:38Z", + "lastTransitionTime": "2020-03-16T20:44:18Z", + "reason": "KubeletHasSufficientMemory", + "message": "kubelet has sufficient memory available" + }, + { + "type": "DiskPressure", + "status": "False", + "lastHeartbeatTime": "2020-03-20T13:30:38Z", + "lastTransitionTime": "2020-03-16T20:44:18Z", + "reason": "KubeletHasNoDiskPressure", + "message": "kubelet has no disk pressure" + }, + { + "type": "PIDPressure", + "status": "False", + "lastHeartbeatTime": "2020-03-20T13:30:38Z", + "lastTransitionTime": "2020-03-16T20:44:18Z", + "reason": "KubeletHasSufficientPID", + "message": "kubelet has sufficient PID available" + }, + { + "type": "Ready", + "status": "True", + "lastHeartbeatTime": "2020-03-20T13:30:38Z", + "lastTransitionTime": "2020-03-16T20:44:39Z", + "reason": "KubeletReady", + "message": "kubelet is posting ready status" + } + ], + "addresses": [ + { + "type": "InternalIP", + "address": "172.17.0.2" + }, + { + "type": "Hostname", + "address": "m01" + } + ], + "daemonEndpoints": { + "kubeletEndpoint": { + "Port": 10250 + } + }, + "nodeInfo": { + "machineID": "e64aad27e586485b9a9cbd699840c0ab", + "systemUUID": "4d9f5caa-25de-46c6-8d54-d1c5141b78cc", + "bootID": "947ffc57-db48-4a03-b7c6-18ce0b85238d", + "kernelVersion": "4.15.0-91-generic", + "osImage": "Ubuntu 19.10", + "containerRuntimeVersion": "docker://19.3.2", + "kubeletVersion": "v1.17.3", + "kubeProxyVersion": "v1.17.3", + "operatingSystem": "linux", + "architecture": "amd64" + }, + "images": [ + { + "names": [ + "k8s.gcr.io/etcd@sha256:4afb99b4690b418ffc2ceb67e1a17376457e441c1f09ab55447f0aaf992fa646", + "k8s.gcr.io/etcd:3.4.3-0" + ], + "sizeBytes": 288426917 + }, + { + "names": [ + "k8s.gcr.io/kube-apiserver@sha256:33400ea29255bd20714b6b8092b22ebb045ae134030d6bf476bddfed9d33e900", + "k8s.gcr.io/kube-apiserver:v1.17.3" + ], + "sizeBytes": 170986003 + }, + { + "names": [ + "k8s.gcr.io/kube-controller-manager@sha256:2f0bf4d08e72a1fd6327c8eca3a72ad21af3a608283423bb3c10c98e68759844", + "k8s.gcr.io/kube-controller-manager:v1.17.3" + ], + "sizeBytes": 160918035 + }, + { + "names": [ + "k8s.gcr.io/kube-proxy@sha256:3a70e2ab8d1d623680191a1a1f1dcb0bdbfd388784b1f153d5630a7397a63fd4", + "k8s.gcr.io/kube-proxy:v1.17.3" + ], + "sizeBytes": 115964919 + }, + { + "names": [ + "k8s.gcr.io/kube-scheduler@sha256:b091f0db3bc61a3339fd3ba7ebb06c984c4ded32e1f2b1ef0fbdfab638e88462", + "k8s.gcr.io/kube-scheduler:v1.17.3" + ], + "sizeBytes": 94435859 + }, + { + "names": [ + "kubernetesui/dashboard@sha256:fc90baec4fb62b809051a3227e71266c0427240685139bbd5673282715924ea7", + "kubernetesui/dashboard:v2.0.0-beta8" + ], + "sizeBytes": 90835427 + }, + { + "names": [ + "gcr.io/k8s-minikube/storage-provisioner@sha256:088daa9fcbccf04c3f415d77d5a6360d2803922190b675cb7fc88a9d2d91985a", + "gcr.io/k8s-minikube/storage-provisioner:v1.8.1" + ], + "sizeBytes": 80815640 + }, + { + "names": [ + "kindest/kindnetd@sha256:bc1833b3da442bb639008dd5a62861a0419d3f64b58fce6fb38b749105232555", + "kindest/kindnetd:0.5.3" + ], + "sizeBytes": 78486107 + }, + { + "names": [ + "k8s.gcr.io/coredns@sha256:7ec975f167d815311a7136c32e70735f0d00b73781365df1befd46ed35bd4fe7", + "k8s.gcr.io/coredns:1.6.5" + ], + "sizeBytes": 41578211 + }, + { + "names": [ + "kubernetesui/metrics-scraper@sha256:2026f9f7558d0f25cc6bab74ea201b4e9d5668fbc378ef64e13fddaea570efc0", + "kubernetesui/metrics-scraper:v1.0.2" + ], + "sizeBytes": 40101552 + }, + { + "names": [ + "k8s.gcr.io/pause@sha256:f78411e19d84a252e53bff71a4407a5686c46983a2c2eeed83929b888179acea", + "k8s.gcr.io/pause:3.1" + ], + "sizeBytes": 742472 + } + ] + } + } + ] +} +` + nls, err := parseNodeList([]byte(data)) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(nls.Items) != 1 { + t.Fatalf("unexpected length of NodeList.Items; got %d; want %d", len(nls.Items), 1) + } + node := nls.Items[0] + meta := node.Metadata + if meta.Name != "m01" { + t.Fatalf("unexpected ObjectMeta.Name; got %q; want %q", meta.Name, "m01") + } + expectedLabels := getSortedLabels(map[string]string{ + "beta.kubernetes.io/arch": "amd64", + "beta.kubernetes.io/os": "linux", + "kubernetes.io/arch": "amd64", + "kubernetes.io/hostname": "m01", + "kubernetes.io/os": "linux", + "minikube.k8s.io/commit": "eb13446e786c9ef70cb0a9f85a633194e62396a1", + "minikube.k8s.io/name": "minikube", + "minikube.k8s.io/updated_at": "2020_03_16T22_44_27_0700", + "minikube.k8s.io/version": "v1.8.2", + "node-role.kubernetes.io/master": "", + }) + if !reflect.DeepEqual(meta.Labels, expectedLabels) { + t.Fatalf("unexpected ObjectMeta.Labels\ngot\n%v\nwant\n%v", meta.Labels, expectedLabels) + } + expectedAnnotations := getSortedLabels(map[string]string{ + "kubeadm.alpha.kubernetes.io/cri-socket": "/var/run/dockershim.sock", + "node.alpha.kubernetes.io/ttl": "0", + "volumes.kubernetes.io/controller-managed-attach-detach": "true", + }) + if !reflect.DeepEqual(meta.Annotations, expectedAnnotations) { + t.Fatalf("unexpected ObjectMeta.Annotations\ngot\n%v\nwant\n%v", meta.Annotations, expectedAnnotations) + } + status := node.Status + expectedAddresses := []NodeAddress{ + { + Type: "InternalIP", + Address: "172.17.0.2", + }, + { + Type: "Hostname", + Address: "m01", + }, + } + if !reflect.DeepEqual(status.Addresses, expectedAddresses) { + t.Fatalf("unexpected addresses\ngot\n%v\nwant\n%v", status.Addresses, expectedAddresses) + } + + // Check node.appendTargetLabels() + labels := getSortedLabels(node.appendTargetLabels(nil)[0]) + expectedLabels = getSortedLabels(map[string]string{ + "instance": "m01", + "__address__": "172.17.0.2:10250", + "__meta_kubernetes_node_name": "m01", + + "__meta_kubernetes_node_label_beta_kubernetes_io_arch": "amd64", + "__meta_kubernetes_node_label_beta_kubernetes_io_os": "linux", + "__meta_kubernetes_node_label_kubernetes_io_arch": "amd64", + "__meta_kubernetes_node_label_kubernetes_io_hostname": "m01", + "__meta_kubernetes_node_label_kubernetes_io_os": "linux", + "__meta_kubernetes_node_label_minikube_k8s_io_commit": "eb13446e786c9ef70cb0a9f85a633194e62396a1", + "__meta_kubernetes_node_label_minikube_k8s_io_name": "minikube", + "__meta_kubernetes_node_label_minikube_k8s_io_updated_at": "2020_03_16T22_44_27_0700", + "__meta_kubernetes_node_label_minikube_k8s_io_version": "v1.8.2", + "__meta_kubernetes_node_label_node_role_kubernetes_io_master": "", + + "__meta_kubernetes_node_labelpresent_beta_kubernetes_io_arch": "true", + "__meta_kubernetes_node_labelpresent_beta_kubernetes_io_os": "true", + "__meta_kubernetes_node_labelpresent_kubernetes_io_arch": "true", + "__meta_kubernetes_node_labelpresent_kubernetes_io_hostname": "true", + "__meta_kubernetes_node_labelpresent_kubernetes_io_os": "true", + "__meta_kubernetes_node_labelpresent_minikube_k8s_io_commit": "true", + "__meta_kubernetes_node_labelpresent_minikube_k8s_io_name": "true", + "__meta_kubernetes_node_labelpresent_minikube_k8s_io_updated_at": "true", + "__meta_kubernetes_node_labelpresent_minikube_k8s_io_version": "true", + "__meta_kubernetes_node_labelpresent_node_role_kubernetes_io_master": "true", + + "__meta_kubernetes_node_annotation_kubeadm_alpha_kubernetes_io_cri_socket": "/var/run/dockershim.sock", + "__meta_kubernetes_node_annotation_node_alpha_kubernetes_io_ttl": "0", + "__meta_kubernetes_node_annotation_volumes_kubernetes_io_controller_managed_attach_detach": "true", + + "__meta_kubernetes_node_annotationpresent_kubeadm_alpha_kubernetes_io_cri_socket": "true", + "__meta_kubernetes_node_annotationpresent_node_alpha_kubernetes_io_ttl": "true", + "__meta_kubernetes_node_annotationpresent_volumes_kubernetes_io_controller_managed_attach_detach": "true", + + "__meta_kubernetes_node_address_InternalIP": "172.17.0.2", + "__meta_kubernetes_node_address_Hostname": "m01", + }) + if !reflect.DeepEqual(labels, expectedLabels) { + t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", labels, expectedLabels) + } +} diff --git a/lib/promscrape/discovery/kubernetes/pod.go b/lib/promscrape/discovery/kubernetes/pod.go new file mode 100644 index 0000000000..cae95e73cc --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/pod.go @@ -0,0 +1,198 @@ +package kubernetes + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" +) + +// GetPodsLabels returns labels for k8s pods obtained from the given apiServer +func GetPodsLabels(cfg *APIConfig) ([]map[string]string, error) { + pods, err := getPods(cfg) + if err != nil { + return nil, err + } + var ms []map[string]string + for _, p := range pods { + ms = p.appendTargetLabels(ms) + } + return ms, nil +} + +func getPods(cfg *APIConfig) ([]Pod, error) { + data, err := getAPIResponse(cfg, "pod", "/api/v1/pods") + if err != nil { + return nil, fmt.Errorf("cannot obtain pods data from API server: %s", err) + } + pl, err := parsePodList(data) + if err != nil { + return nil, fmt.Errorf("cannot parse pods response from API server: %s", err) + } + return pl.Items, nil +} + +// PodList implements k8s pod list. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podlist-v1-core +type PodList struct { + Items []Pod +} + +// Pod implements k8s pod. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#pod-v1-core +type Pod struct { + Metadata ObjectMeta + Spec PodSpec + Status PodStatus +} + +// PodSpec implements k8s pod spec. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podspec-v1-core +type PodSpec struct { + NodeName string + Containers []Container + InitContainers []Container +} + +// Container implements k8s container. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#container-v1-core +type Container struct { + Name string + Ports []ContainerPort +} + +// ContainerPort implements k8s container port. +type ContainerPort struct { + Name string + ContainerPort int + Protocol string +} + +// PodStatus implements k8s pod status. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podstatus-v1-core +type PodStatus struct { + Phase string + PodIP string + HostIP string + Conditions []PodCondition +} + +// PodCondition implements k8s pod condition. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podcondition-v1-core +type PodCondition struct { + Type string + Status string +} + +// parsePodList parses PodList from data. +func parsePodList(data []byte) (*PodList, error) { + var pl PodList + if err := json.Unmarshal(data, &pl); err != nil { + return nil, fmt.Errorf("cannot unmarshal PodList from %q: %s", data, err) + } + return &pl, nil +} + +// appendTargetLabels appends labels for each port of the given Pod p to ms and returns the result. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#pod +func (p *Pod) appendTargetLabels(ms []map[string]string) []map[string]string { + if len(p.Status.PodIP) == 0 { + // Skip pod without IP + return ms + } + ms = appendPodLabels(ms, p, p.Spec.Containers, "false") + ms = appendPodLabels(ms, p, p.Spec.InitContainers, "true") + return ms +} + +func appendPodLabels(ms []map[string]string, p *Pod, cs []Container, isInit string) []map[string]string { + for _, c := range cs { + for _, cp := range c.Ports { + m := getPodLabels(p, c, &cp, isInit) + ms = append(ms, m) + } + if len(c.Ports) == 0 { + m := getPodLabels(p, c, nil, isInit) + ms = append(ms, m) + } + } + return ms +} + +func getPodLabels(p *Pod, c Container, cp *ContainerPort, isInit string) map[string]string { + addr := p.Status.PodIP + if cp != nil { + addr = joinHostPort(addr, cp.ContainerPort) + } + m := map[string]string{ + "__address__": addr, + "__meta_kubernetes_pod_container_init": isInit, + } + p.appendCommonLabels(m) + p.appendContainerLabels(m, c, cp) + return m +} + +func (p *Pod) appendContainerLabels(m map[string]string, c Container, cp *ContainerPort) { + m["__meta_kubernetes_pod_container_name"] = c.Name + if cp != nil { + m["__meta_kubernetes_pod_container_port_name"] = cp.Name + m["__meta_kubernetes_pod_container_port_number"] = strconv.Itoa(cp.ContainerPort) + m["__meta_kubernetes_pod_container_port_protocol"] = cp.Protocol + } +} + +func (p *Pod) appendCommonLabels(m map[string]string) { + m["__meta_kubernetes_pod_name"] = p.Metadata.Name + m["__meta_kubernetes_pod_ip"] = p.Status.PodIP + m["__meta_kubernetes_pod_ready"] = getPodReadyStatus(p.Status.Conditions) + m["__meta_kubernetes_pod_phase"] = p.Status.Phase + m["__meta_kubernetes_pod_node_name"] = p.Spec.NodeName + m["__meta_kubernetes_pod_host_ip"] = p.Status.HostIP + m["__meta_kubernetes_pod_uid"] = p.Metadata.UID + m["__meta_kubernetes_namespace"] = p.Metadata.Namespace + if pc := getPodController(p.Metadata.OwnerReferences); pc != nil { + if pc.Kind != "" { + m["__meta_kubernetes_pod_controller_kind"] = pc.Kind + } + if pc.Name != "" { + m["__meta_kubernetes_pod_controller_name"] = pc.Name + } + } + p.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_pod", m) +} + +func getPodController(ors []OwnerReference) *OwnerReference { + for _, or := range ors { + if or.Controller { + return &or + } + } + return nil +} + +func getPodReadyStatus(conds []PodCondition) string { + for _, c := range conds { + if c.Type == "Ready" { + return strings.ToLower(c.Status) + } + } + return "unknown" +} + +func getPod(pods []Pod, namespace, name string) *Pod { + for i := range pods { + pod := &pods[i] + if pod.Metadata.Name == name && pod.Metadata.Namespace == namespace { + return pod + } + } + return nil +} diff --git a/lib/promscrape/discovery/kubernetes/pod_test.go b/lib/promscrape/discovery/kubernetes/pod_test.go new file mode 100644 index 0000000000..fb38428281 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/pod_test.go @@ -0,0 +1,285 @@ +package kubernetes + +import ( + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestParsePodListFailure(t *testing.T) { + f := func(s string) { + t.Helper() + nls, err := parsePodList([]byte(s)) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if nls != nil { + t.Fatalf("unexpected non-nil PodList: %v", nls) + } + } + f(``) + f(`[1,23]`) + f(`{"items":[{"metadata":1}]}`) + f(`{"items":[{"metadata":{"labels":[1]}}]}`) +} + +func TestParsePodListSuccess(t *testing.T) { + data := ` +{ + "kind": "PodList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/pods", + "resourceVersion": "72425" + }, + "items": [ + { + "metadata": { + "name": "etcd-m01", + "namespace": "kube-system", + "selfLink": "/api/v1/namespaces/kube-system/pods/etcd-m01", + "uid": "9d328156-75d1-411a-bdd0-aeacb53a38de", + "resourceVersion": "22318", + "creationTimestamp": "2020-03-16T20:44:30Z", + "labels": { + "component": "etcd", + "tier": "control-plane" + }, + "annotations": { + "kubernetes.io/config.hash": "3ec997b76fb6ed3b78da8e0b5676dac4", + "kubernetes.io/config.mirror": "3ec997b76fb6ed3b78da8e0b5676dac4", + "kubernetes.io/config.seen": "2020-03-16T20:44:26.538136233Z", + "kubernetes.io/config.source": "file" + }, + "ownerReferences": [ + { + "apiVersion": "v1", + "kind": "Node", + "name": "m01", + "uid": "b48dd901-ead0-476a-b209-d2d908d65109", + "controller": true + } + ] + }, + "spec": { + "volumes": [ + { + "name": "etcd-certs", + "hostPath": { + "path": "/var/lib/minikube/certs/etcd", + "type": "DirectoryOrCreate" + } + }, + { + "name": "etcd-data", + "hostPath": { + "path": "/var/lib/minikube/etcd", + "type": "DirectoryOrCreate" + } + } + ], + "containers": [ + { + "name": "etcd", + "image": "k8s.gcr.io/etcd:3.4.3-0", + "command": [ + "etcd", + "--advertise-client-urls=https://172.17.0.2:2379", + "--cert-file=/var/lib/minikube/certs/etcd/server.crt", + "--client-cert-auth=true", + "--data-dir=/var/lib/minikube/etcd", + "--initial-advertise-peer-urls=https://172.17.0.2:2380", + "--initial-cluster=m01=https://172.17.0.2:2380", + "--key-file=/var/lib/minikube/certs/etcd/server.key", + "--listen-client-urls=https://127.0.0.1:2379,https://172.17.0.2:2379", + "--listen-metrics-urls=http://127.0.0.1:2381", + "--listen-peer-urls=https://172.17.0.2:2380", + "--name=m01", + "--peer-cert-file=/var/lib/minikube/certs/etcd/peer.crt", + "--peer-client-cert-auth=true", + "--peer-key-file=/var/lib/minikube/certs/etcd/peer.key", + "--peer-trusted-ca-file=/var/lib/minikube/certs/etcd/ca.crt", + "--snapshot-count=10000", + "--trusted-ca-file=/var/lib/minikube/certs/etcd/ca.crt" + ], + "resources": { + + }, + "ports": [ + { + "name": "foobar", + "containerPort": 1234, + "protocol": "TCP" + } + ], + "volumeMounts": [ + { + "name": "etcd-data", + "mountPath": "/var/lib/minikube/etcd" + }, + { + "name": "etcd-certs", + "mountPath": "/var/lib/minikube/certs/etcd" + } + ], + "livenessProbe": { + "httpGet": { + "path": "/health", + "port": 2381, + "host": "127.0.0.1", + "scheme": "HTTP" + }, + "initialDelaySeconds": 15, + "timeoutSeconds": 15, + "periodSeconds": 10, + "successThreshold": 1, + "failureThreshold": 8 + }, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "imagePullPolicy": "IfNotPresent" + } + ], + "restartPolicy": "Always", + "terminationGracePeriodSeconds": 30, + "dnsPolicy": "ClusterFirst", + "nodeName": "m01", + "hostNetwork": true, + "securityContext": { + + }, + "schedulerName": "default-scheduler", + "tolerations": [ + { + "operator": "Exists", + "effect": "NoExecute" + } + ], + "priorityClassName": "system-cluster-critical", + "priority": 2000000000, + "enableServiceLinks": true + }, + "status": { + "phase": "Running", + "conditions": [ + { + "type": "Initialized", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-03-20T13:30:29Z" + }, + { + "type": "Ready", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-03-20T13:30:32Z" + }, + { + "type": "ContainersReady", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-03-20T13:30:32Z" + }, + { + "type": "PodScheduled", + "status": "True", + "lastProbeTime": null, + "lastTransitionTime": "2020-03-20T13:30:29Z" + } + ], + "hostIP": "172.17.0.2", + "podIP": "172.17.0.2", + "podIPs": [ + { + "ip": "172.17.0.2" + } + ], + "startTime": "2020-03-20T13:30:29Z", + "containerStatuses": [ + { + "name": "etcd", + "state": { + "running": { + "startedAt": "2020-03-20T13:30:30Z" + } + }, + "lastState": { + "terminated": { + "exitCode": 0, + "reason": "Completed", + "startedAt": "2020-03-17T18:56:24Z", + "finishedAt": "2020-03-20T13:29:54Z", + "containerID": "docker://24eea6f192d4598fcc129b5f163a02d1457137f4ec34e8c80c6049a65604cb07" + } + }, + "ready": true, + "restartCount": 2, + "image": "k8s.gcr.io/etcd:3.4.3-0", + "imageID": "docker-pullable://k8s.gcr.io/etcd@sha256:4afb99b4690b418ffc2ceb67e1a17376457e441c1f09ab55447f0aaf992fa646", + "containerID": "docker://a28f0800855008485376c1eece1cf61de97cb7026b9188d138b0d55d92fc2f5c", + "started": true + } + ], + "qosClass": "BestEffort" + } + } + ] +} +` + pls, err := parsePodList([]byte(data)) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(pls.Items) != 1 { + t.Fatalf("unexpected length of PodList.Items; got %d; want %d", len(pls.Items), 1) + } + pod := pls.Items[0] + + // Check pod.appendTargetLabels() + labelss := pod.appendTargetLabels(nil) + var sortedLabelss [][]prompbmarshal.Label + for _, labels := range labelss { + sortedLabelss = append(sortedLabelss, getSortedLabels(labels)) + } + expectedLabels := [][]prompbmarshal.Label{ + getSortedLabels(map[string]string{ + "__address__": "172.17.0.2:1234", + + "__meta_kubernetes_namespace": "kube-system", + "__meta_kubernetes_pod_name": "etcd-m01", + "__meta_kubernetes_pod_ip": "172.17.0.2", + "__meta_kubernetes_pod_container_name": "etcd", + "__meta_kubernetes_pod_container_port_name": "foobar", + "__meta_kubernetes_pod_container_port_number": "1234", + "__meta_kubernetes_pod_container_port_protocol": "TCP", + "__meta_kubernetes_pod_ready": "true", + "__meta_kubernetes_pod_phase": "Running", + "__meta_kubernetes_pod_node_name": "m01", + "__meta_kubernetes_pod_host_ip": "172.17.0.2", + "__meta_kubernetes_pod_uid": "9d328156-75d1-411a-bdd0-aeacb53a38de", + "__meta_kubernetes_pod_controller_kind": "Node", + "__meta_kubernetes_pod_controller_name": "m01", + "__meta_kubernetes_pod_container_init": "false", + + "__meta_kubernetes_pod_label_component": "etcd", + "__meta_kubernetes_pod_label_tier": "control-plane", + + "__meta_kubernetes_pod_labelpresent_component": "true", + "__meta_kubernetes_pod_labelpresent_tier": "true", + + "__meta_kubernetes_pod_annotation_kubernetes_io_config_hash": "3ec997b76fb6ed3b78da8e0b5676dac4", + "__meta_kubernetes_pod_annotation_kubernetes_io_config_mirror": "3ec997b76fb6ed3b78da8e0b5676dac4", + "__meta_kubernetes_pod_annotation_kubernetes_io_config_seen": "2020-03-16T20:44:26.538136233Z", + "__meta_kubernetes_pod_annotation_kubernetes_io_config_source": "file", + + "__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_hash": "true", + "__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_mirror": "true", + "__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_seen": "true", + "__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_source": "true", + }), + } + if !reflect.DeepEqual(sortedLabelss, expectedLabels) { + t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabels) + } +} diff --git a/lib/promscrape/discovery/kubernetes/service.go b/lib/promscrape/discovery/kubernetes/service.go new file mode 100644 index 0000000000..b1830e9ce9 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/service.go @@ -0,0 +1,114 @@ +package kubernetes + +import ( + "encoding/json" + "fmt" +) + +// GetServicesLabels returns labels for k8s services obtained from the given apiServer +func GetServicesLabels(cfg *APIConfig) ([]map[string]string, error) { + svcs, err := getServices(cfg) + if err != nil { + return nil, err + } + var ms []map[string]string + for _, svc := range svcs { + ms = svc.appendTargetLabels(ms) + } + return ms, nil +} + +func getServices(cfg *APIConfig) ([]Service, error) { + data, err := getAPIResponse(cfg, "service", "/api/v1/services") + if err != nil { + return nil, fmt.Errorf("cannot obtain services data from API server: %s", err) + } + sl, err := parseServiceList(data) + if err != nil { + return nil, fmt.Errorf("cannot parse services response from API server: %s", err) + } + return sl.Items, nil +} + +// ServiceList is k8s service list. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicelist-v1-core +type ServiceList struct { + Items []Service +} + +// Service is k8s service. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#service-v1-core +type Service struct { + Metadata ObjectMeta + Spec ServiceSpec +} + +// ServiceSpec is k8s service spec. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicespec-v1-core +type ServiceSpec struct { + ClusterIP string + ExternalName string + Type string + Ports []ServicePort +} + +// ServicePort is k8s service port. +// +// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#serviceport-v1-core +type ServicePort struct { + Name string + Protocol string + Port int +} + +// parseServiceList parses ServiceList from data. +func parseServiceList(data []byte) (*ServiceList, error) { + var sl ServiceList + if err := json.Unmarshal(data, &sl); err != nil { + return nil, fmt.Errorf("cannot unmarshal ServiceList from %q: %s", data, err) + } + return &sl, nil +} + +// appendTargetLabels appends labels for each port of the given Service s to ms and returns the result. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service +func (s *Service) appendTargetLabels(ms []map[string]string) []map[string]string { + host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace) + for _, sp := range s.Spec.Ports { + addr := joinHostPort(host, sp.Port) + m := map[string]string{ + "__address__": addr, + "__meta_kubernetes_service_port_name": sp.Name, + "__meta_kubernetes_service_port_protocol": sp.Protocol, + } + s.appendCommonLabels(m) + ms = append(ms, m) + } + return ms +} + +func (s *Service) appendCommonLabels(m map[string]string) { + m["__meta_kubernetes_namespace"] = s.Metadata.Namespace + m["__meta_kubernetes_service_name"] = s.Metadata.Name + m["__meta_kubernetes_service_type"] = s.Spec.Type + if s.Spec.Type != "ExternalName" { + m["__meta_kubernetes_service_cluster_ip"] = s.Spec.ClusterIP + } else { + m["__meta_kubernetes_service_external_name"] = s.Spec.ExternalName + } + s.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_service", m) +} + +func getService(svcs []Service, namespace, name string) *Service { + for i := range svcs { + svc := &svcs[i] + if svc.Metadata.Name == name && svc.Metadata.Namespace == namespace { + return svc + } + } + return nil +} diff --git a/lib/promscrape/discovery/kubernetes/service_test.go b/lib/promscrape/discovery/kubernetes/service_test.go new file mode 100644 index 0000000000..caf7215e39 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/service_test.go @@ -0,0 +1,227 @@ +package kubernetes + +import ( + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestParseServiceListFailure(t *testing.T) { + f := func(s string) { + t.Helper() + nls, err := parseServiceList([]byte(s)) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if nls != nil { + t.Fatalf("unexpected non-nil ServiceList: %v", nls) + } + } + f(``) + f(`[1,23]`) + f(`{"items":[{"metadata":1}]}`) + f(`{"items":[{"metadata":{"labels":[1]}}]}`) +} + +func TestParseServiceListSuccess(t *testing.T) { + data := `{ + "kind": "ServiceList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/services", + "resourceVersion": "60485" + }, + "items": [ + { + "metadata": { + "name": "kube-dns", + "namespace": "kube-system", + "selfLink": "/api/v1/namespaces/kube-system/services/kube-dns", + "uid": "38a396f1-17fe-46c2-a5f4-3b225c18dcdf", + "resourceVersion": "177", + "creationTimestamp": "2020-03-16T20:44:26Z", + "labels": { + "k8s-app": "kube-dns", + "kubernetes.io/cluster-service": "true", + "kubernetes.io/name": "KubeDNS" + }, + "annotations": { + "prometheus.io/port": "9153", + "prometheus.io/scrape": "true" + } + }, + "spec": { + "ports": [ + { + "name": "dns", + "protocol": "UDP", + "port": 53, + "targetPort": 53 + }, + { + "name": "dns-tcp", + "protocol": "TCP", + "port": 53, + "targetPort": 53 + }, + { + "name": "metrics", + "protocol": "TCP", + "port": 9153, + "targetPort": 9153 + } + ], + "selector": { + "k8s-app": "kube-dns" + }, + "clusterIP": "10.96.0.10", + "type": "ClusterIP", + "sessionAffinity": "None" + }, + "status": { + "loadBalancer": { + + } + } + } + ] +} +` + sls, err := parseServiceList([]byte(data)) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(sls.Items) != 1 { + t.Fatalf("unexpected length of ServiceList.Items; got %d; want %d", len(sls.Items), 1) + } + service := sls.Items[0] + meta := service.Metadata + if meta.Name != "kube-dns" { + t.Fatalf("unexpected ObjectMeta.Name; got %q; want %q", meta.Name, "kube-dns") + } + expectedLabels := getSortedLabels(map[string]string{ + "k8s-app": "kube-dns", + "kubernetes.io/cluster-service": "true", + "kubernetes.io/name": "KubeDNS", + }) + if !reflect.DeepEqual(meta.Labels, expectedLabels) { + t.Fatalf("unexpected ObjectMeta.Labels\ngot\n%v\nwant\n%v", meta.Labels, expectedLabels) + } + expectedAnnotations := getSortedLabels(map[string]string{ + "prometheus.io/port": "9153", + "prometheus.io/scrape": "true", + }) + if !reflect.DeepEqual(meta.Annotations, expectedAnnotations) { + t.Fatalf("unexpected ObjectMeta.Annotations\ngot\n%v\nwant\n%v", meta.Annotations, expectedAnnotations) + } + spec := service.Spec + expectedClusterIP := "10.96.0.10" + if spec.ClusterIP != expectedClusterIP { + t.Fatalf("unexpected clusterIP; got %q; want %q", spec.ClusterIP, expectedClusterIP) + } + if spec.Type != "ClusterIP" { + t.Fatalf("unexpected type; got %q; want %q", spec.Type, "ClusterIP") + } + expectedPorts := []ServicePort{ + { + Name: "dns", + Protocol: "UDP", + Port: 53, + }, + { + Name: "dns-tcp", + Protocol: "TCP", + Port: 53, + }, + { + Name: "metrics", + Protocol: "TCP", + Port: 9153, + }, + } + if !reflect.DeepEqual(spec.Ports, expectedPorts) { + t.Fatalf("unexpected ports\ngot\n%v\nwant\n%v", spec.Ports, expectedPorts) + } + + // Check service.appendTargetLabels() + labelss := service.appendTargetLabels(nil) + var sortedLabelss [][]prompbmarshal.Label + for _, labels := range labelss { + sortedLabelss = append(sortedLabelss, getSortedLabels(labels)) + } + expectedLabelss := [][]prompbmarshal.Label{ + getSortedLabels(map[string]string{ + "__address__": "kube-dns.kube-system.svc:53", + "__meta_kubernetes_namespace": "kube-system", + "__meta_kubernetes_service_name": "kube-dns", + "__meta_kubernetes_service_type": "ClusterIP", + "__meta_kubernetes_service_port_name": "dns", + "__meta_kubernetes_service_port_protocol": "UDP", + "__meta_kubernetes_service_cluster_ip": "10.96.0.10", + + "__meta_kubernetes_service_label_k8s_app": "kube-dns", + "__meta_kubernetes_service_label_kubernetes_io_cluster_service": "true", + "__meta_kubernetes_service_label_kubernetes_io_name": "KubeDNS", + + "__meta_kubernetes_service_labelpresent_k8s_app": "true", + "__meta_kubernetes_service_labelpresent_kubernetes_io_cluster_service": "true", + "__meta_kubernetes_service_labelpresent_kubernetes_io_name": "true", + + "__meta_kubernetes_service_annotation_prometheus_io_port": "9153", + "__meta_kubernetes_service_annotation_prometheus_io_scrape": "true", + + "__meta_kubernetes_service_annotationpresent_prometheus_io_port": "true", + "__meta_kubernetes_service_annotationpresent_prometheus_io_scrape": "true", + }), + getSortedLabels(map[string]string{ + "__address__": "kube-dns.kube-system.svc:53", + "__meta_kubernetes_namespace": "kube-system", + "__meta_kubernetes_service_name": "kube-dns", + "__meta_kubernetes_service_type": "ClusterIP", + "__meta_kubernetes_service_port_name": "dns-tcp", + "__meta_kubernetes_service_port_protocol": "TCP", + "__meta_kubernetes_service_cluster_ip": "10.96.0.10", + + "__meta_kubernetes_service_label_k8s_app": "kube-dns", + "__meta_kubernetes_service_label_kubernetes_io_cluster_service": "true", + "__meta_kubernetes_service_label_kubernetes_io_name": "KubeDNS", + + "__meta_kubernetes_service_labelpresent_k8s_app": "true", + "__meta_kubernetes_service_labelpresent_kubernetes_io_cluster_service": "true", + "__meta_kubernetes_service_labelpresent_kubernetes_io_name": "true", + + "__meta_kubernetes_service_annotation_prometheus_io_port": "9153", + "__meta_kubernetes_service_annotation_prometheus_io_scrape": "true", + + "__meta_kubernetes_service_annotationpresent_prometheus_io_port": "true", + "__meta_kubernetes_service_annotationpresent_prometheus_io_scrape": "true", + }), + getSortedLabels(map[string]string{ + "__address__": "kube-dns.kube-system.svc:9153", + "__meta_kubernetes_namespace": "kube-system", + "__meta_kubernetes_service_name": "kube-dns", + "__meta_kubernetes_service_type": "ClusterIP", + "__meta_kubernetes_service_port_name": "metrics", + "__meta_kubernetes_service_port_protocol": "TCP", + "__meta_kubernetes_service_cluster_ip": "10.96.0.10", + + "__meta_kubernetes_service_label_k8s_app": "kube-dns", + "__meta_kubernetes_service_label_kubernetes_io_cluster_service": "true", + "__meta_kubernetes_service_label_kubernetes_io_name": "KubeDNS", + + "__meta_kubernetes_service_labelpresent_k8s_app": "true", + "__meta_kubernetes_service_labelpresent_kubernetes_io_cluster_service": "true", + "__meta_kubernetes_service_labelpresent_kubernetes_io_name": "true", + + "__meta_kubernetes_service_annotation_prometheus_io_port": "9153", + "__meta_kubernetes_service_annotation_prometheus_io_scrape": "true", + + "__meta_kubernetes_service_annotationpresent_prometheus_io_port": "true", + "__meta_kubernetes_service_annotationpresent_prometheus_io_scrape": "true", + }), + } + if !reflect.DeepEqual(sortedLabelss, expectedLabelss) { + t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss) + } +} diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index e832a8c9ad..22cc273ad5 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -14,8 +14,11 @@ import ( ) var ( - fileSDCheckInterval = flag.Duration("promscrape.fileSDCheckInterval", time.Minute, "Interval for checking for changes in 'file_sd_config'. "+ + fileSDCheckInterval = flag.Duration("promscrape.fileSDCheckInterval", 30*time.Second, "Interval for checking for changes in 'file_sd_config'. "+ "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config") + kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+ + "This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+ + "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details") promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+ "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config for details") ) @@ -58,6 +61,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) } swsStatic := cfg.getStaticScrapeWork() swsFileSD := cfg.getFileSDScrapeWork(nil) + swsK8S := cfg.getKubernetesSDScrapeWork() mustStop := false for !mustStop { @@ -73,6 +77,11 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) defer wg.Done() runFileSDScrapers(swsFileSD, cfg, pushData, stopCh) }() + wg.Add(1) + go func() { + defer wg.Done() + runKubernetesSDScrapers(swsK8S, cfg, pushData, stopCh) + }() waitForChans: select { @@ -86,6 +95,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) cfg = cfgNew swsStatic = cfg.getStaticScrapeWork() swsFileSD = cfg.getFileSDScrapeWork(swsFileSD) + swsK8S = cfg.getKubernetesSDScrapeWork() case <-globalStopCh: mustStop = true } @@ -114,6 +124,50 @@ func runStaticScrapers(sws []ScrapeWork, pushData func(wr *prompbmarshal.WriteRe var staticTargets = metrics.NewCounter(`vm_promscrape_targets{type="static"}`) +func runKubernetesSDScrapers(sws []ScrapeWork, cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { + if cfg.kubernetesSDConfigsCount() == 0 { + return + } + ticker := time.NewTicker(*kubernetesSDCheckInterval) + defer ticker.Stop() + mustStop := false + for !mustStop { + localStopCh := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func(sws []ScrapeWork) { + defer wg.Done() + logger.Infof("starting %d scrapers for `kubernetes_sd_config` targets", len(sws)) + kubernetesSDTargets.Set(uint64(len(sws))) + runScrapeWorkers(sws, pushData, localStopCh) + kubernetesSDTargets.Set(0) + logger.Infof("stopped all the %d scrapers for `kubernetes_sd_config` targets", len(sws)) + }(sws) + waitForChans: + select { + case <-ticker.C: + swsNew := cfg.getKubernetesSDScrapeWork() + if equalStaticConfigForScrapeWorks(swsNew, sws) { + // Nothing changed, continue waiting for updated scrape work + goto waitForChans + } + logger.Infof("restarting scrapers for changed `kubernetes_sd_config` targets") + sws = swsNew + case <-stopCh: + mustStop = true + } + + close(localStopCh) + wg.Wait() + kubernetesSDReloads.Inc() + } +} + +var ( + kubernetesSDTargets = metrics.NewCounter(`vm_promscrape_targets{type="kubernetes_sd"}`) + kubernetesSDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="kubernetes_sd"}`) +) + func runFileSDScrapers(sws []ScrapeWork, cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { if cfg.fileSDConfigsCount() == 0 { return