lib/promscrape: add initial support for kubernetes_sd_config

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/334
This commit is contained in:
Aliaksandr Valialkin 2020-04-13 21:02:27 +03:00
parent 4017163393
commit 7c4fb038e3
14 changed files with 2284 additions and 1 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"gopkg.in/yaml.v2"
)
@ -57,6 +58,7 @@ type ScrapeConfig struct {
TLSConfig *promauth.TLSConfig `yaml:"tls_config"`
StaticConfigs []StaticConfig `yaml:"static_configs"`
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"`
KubernetesSDConfigs []KubernetesSDConfig `yaml:"kubernetes_sd_configs"`
RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs"`
MetricRelabelConfigs []promrelabel.RelabelConfig `yaml:"metric_relabel_configs"`
ScrapeLimit int `yaml:"scrape_limit"`
@ -73,6 +75,25 @@ type FileSDConfig struct {
// `refresh_interval` is ignored. See `-prometheus.fileSDCheckInterval`
}
// KubernetesSDConfig represents kubernetes-based service discovery config.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
type KubernetesSDConfig struct {
APIServer string `yaml:"api_server"`
Role string `yaml:"role"`
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth"`
BearerToken string `yaml:"bearer_token"`
BearerTokenFile string `yaml:"bearer_token_file"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config"`
Namespaces KubernetesNamespaces `yaml:"namespaces"`
Selectors []kubernetes.Selector `yaml:"selectors"`
}
// KubernetesNamespaces represents namespaces for KubernetesSDConfig
type KubernetesNamespaces struct {
Names []string `yaml:"names"`
}
// StaticConfig represents essential parts for `static_config` section of Prometheus config.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config
@ -136,6 +157,14 @@ func unmarshalMaybeStrict(data []byte, dst interface{}) error {
return err
}
func (cfg *Config) kubernetesSDConfigsCount() int {
n := 0
for i := range cfg.ScrapeConfigs {
n += len(cfg.ScrapeConfigs[i].KubernetesSDConfigs)
}
return n
}
func (cfg *Config) fileSDConfigsCount() int {
n := 0
for i := range cfg.ScrapeConfigs {
@ -144,6 +173,17 @@ func (cfg *Config) fileSDConfigsCount() int {
return n
}
// getKubernetesSDcrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork {
var dst []ScrapeWork
for _, sc := range cfg.ScrapeConfigs {
for _, sdc := range sc.KubernetesSDConfigs {
dst = sdc.appendScrapeWork(dst, cfg.baseDir, sc.swc)
}
}
return dst
}
// getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
// Create a map for the previous scrape work.
@ -259,6 +299,74 @@ type scrapeWorkConfig struct {
scrapeLimit int
}
func (sdc *KubernetesSDConfig) appendScrapeWork(dst []ScrapeWork, baseDir string, swc *scrapeWorkConfig) []ScrapeWork {
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil {
logger.Errorf("cannot parse auth config for `kubernetes_sd_config` for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
}
cfg := &kubernetes.APIConfig{
Server: sdc.APIServer,
AuthConfig: ac,
Namespaces: sdc.Namespaces.Names,
Selectors: sdc.Selectors,
}
switch sdc.Role {
case "node":
targetLabels, err := kubernetes.GetNodesLabels(cfg)
if err != nil {
logger.Errorf("error when discovering kubernetes nodes for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
}
return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role)
case "service":
targetLabels, err := kubernetes.GetServicesLabels(cfg)
if err != nil {
logger.Errorf("error when discovering kubernetes services for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
}
return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role)
case "pod":
targetLabels, err := kubernetes.GetPodsLabels(cfg)
if err != nil {
logger.Errorf("error when discovering kubernetes pods for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
}
return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role)
case "endpoints":
targetLabels, err := kubernetes.GetEndpointsLabels(cfg)
if err != nil {
logger.Errorf("error when discovering kubernetes endpoints for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
}
return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role)
case "ingress":
targetLabels, err := kubernetes.GetIngressesLabels(cfg)
if err != nil {
logger.Errorf("error when discovering kubernetes ingresses for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
}
return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role)
default:
logger.Errorf("unexpected `role`: %q; must be one of `node`, `service`, `pod`, `endpoints` or `ingress`; skipping it", sdc.Role)
return dst
}
}
func appendKubernetesScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, role string) []ScrapeWork {
for _, metaLabels := range targetLabels {
target := metaLabels["__address__"]
var err error
dst, err = appendScrapeWork(dst, swc, target, nil, metaLabels)
if err != nil {
logger.Errorf("error when parsing `kubernetes_sd_config` target %q with role %q for `job_name` %q: %s; skipping it",
target, role, swc.jobName, err)
continue
}
}
return dst
}
func (sdc *FileSDConfig) appendScrapeWork(dst []ScrapeWork, swPrev map[string][]ScrapeWork, baseDir string, swc *scrapeWorkConfig) []ScrapeWork {
for _, file := range sdc.Files {
pathPattern := getFilepath(baseDir, file)

View File

@ -0,0 +1,167 @@
package kubernetes
import (
"crypto/tls"
"fmt"
"net"
"os"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/valyala/fasthttp"
)
func getAPIResponse(cfg *APIConfig, role, path string) ([]byte, error) {
hcv, err := getHostClient(cfg.Server, cfg.AuthConfig)
if err != nil {
return nil, err
}
query := joinSelectors(role, cfg.Namespaces, cfg.Selectors)
if len(query) > 0 {
path += "?" + query
}
requestURL := hcv.apiServer + path
var u fasthttp.URI
u.Update(requestURL)
var req fasthttp.Request
req.SetRequestURIBytes(u.RequestURI())
req.SetHost(hcv.hostPort)
req.Header.Set("Accept-Encoding", "gzip")
if hcv.ac != nil && hcv.ac.Authorization != "" {
req.Header.Set("Authorization", hcv.ac.Authorization)
}
var resp fasthttp.Response
// There is no need in calling DoTimeout, since the timeout is already set in hc.ReadTimeout above.
if err := hcv.hc.Do(&req, &resp); err != nil {
return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err)
}
var data []byte
if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" {
dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body())
if err != nil {
return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err)
}
data = dst
} else {
data = append(data[:0], resp.Body()...)
}
statusCode := resp.StatusCode()
if statusCode != fasthttp.StatusOK {
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
requestURL, statusCode, fasthttp.StatusOK, data)
}
return data, nil
}
func getHostClient(apiServer string, ac *promauth.Config) (hcValue, error) {
k := hcKey{
apiServer: apiServer,
ac: ac,
}
hcMapLock.Lock()
defer hcMapLock.Unlock()
if !hasHCMapCleaner {
go hcMapCleaner()
hasHCMapCleaner = true
}
hcv, ok := hcMap[k]
if !ok {
hcvNew, err := newHostClient(apiServer, ac)
if err != nil {
return hcv, fmt.Errorf("cannot create new HTTP client for %q: %s", apiServer, err)
}
hcMap[k] = hcvNew
hcv = hcvNew
}
return hcv, nil
}
func hcMapCleaner() {
tc := time.NewTicker(15 * time.Minute)
for range tc.C {
hcMapLock.Lock()
hcMap = make(map[hcKey]hcValue)
hcMapLock.Unlock()
}
}
type hcKey struct {
apiServer string
ac *promauth.Config
}
type hcValue struct {
hc *fasthttp.HostClient
ac *promauth.Config
apiServer string
hostPort string
}
var (
hasHCMapCleaner bool
hcMap = make(map[hcKey]hcValue)
hcMapLock sync.Mutex
)
func newHostClient(apiServer string, ac *promauth.Config) (hcValue, error) {
var hcv hcValue
if len(apiServer) == 0 {
// Assume we run at k8s pod.
// Discover apiServer and auth config according to k8s docs.
// See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 {
return hcv, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " +
"probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?")
}
if len(port) == 0 {
return hcv, fmt.Errorf("cannot find KUBERNETES_SERVICE_PORT env var; it must be defined when running in k8s; "+
"KUBERNETES_SERVICE_HOST=%q", host)
}
apiServer = "https://" + net.JoinHostPort(host, port)
tlsConfig := promauth.TLSConfig{
CAFile: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
}
acNew, err := promauth.NewConfig("/", nil, "", "/var/run/secrets/kubernetes.io/serviceaccount/token", &tlsConfig)
if err != nil {
return hcv, fmt.Errorf("cannot initialize service account auth: %s; probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?", err)
}
ac = acNew
}
var u fasthttp.URI
u.Update(apiServer)
hostPort := string(u.Host())
isTLS := string(u.Scheme()) == "https"
var tlsCfg *tls.Config
if isTLS && ac != nil {
tlsCfg = ac.NewTLSConfig()
}
if !strings.Contains(hostPort, ":") {
port := "80"
if isTLS {
port = "443"
}
hostPort = net.JoinHostPort(hostPort, port)
}
hc := &fasthttp.HostClient{
Addr: hostPort,
Name: "vm_promscrape/discovery",
DialDualStack: netutil.TCP6Enabled(),
IsTLS: isTLS,
TLSConfig: tlsCfg,
ReadTimeout: time.Minute,
WriteTimeout: 10 * time.Second,
MaxResponseBodySize: 300 * 1024 * 1024,
}
return hcValue{
hc: hc,
ac: ac,
apiServer: apiServer,
hostPort: hostPort,
}, nil
}

View File

@ -0,0 +1,144 @@
package kubernetes
import (
"encoding/json"
"fmt"
"net"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// ObjectMeta represents ObjectMeta from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#objectmeta-v1-meta
type ObjectMeta struct {
Name string
Namespace string
UID string
Labels SortedLabels
Annotations SortedLabels
OwnerReferences []OwnerReference
}
func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m map[string]string) {
for _, lb := range om.Labels {
ln := sanitizeLabelName(lb.Name)
m[fmt.Sprintf("%s_label_%s", prefix, ln)] = lb.Value
m[fmt.Sprintf("%s_labelpresent_%s", prefix, ln)] = "true"
}
for _, a := range om.Annotations {
an := sanitizeLabelName(a.Name)
m[fmt.Sprintf("%s_annotation_%s", prefix, an)] = a.Value
m[fmt.Sprintf("%s_annotationpresent_%s", prefix, an)] = "true"
}
}
// sanitizeLabelName replaces anything that doesn't match
// client_label.LabelNameRE with an underscore.
//
// This has been copied from Prometheus sources at util/strutil/strconv.go
func sanitizeLabelName(name string) string {
return invalidLabelCharRE.ReplaceAllString(name, "_")
}
var (
invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
)
// SortedLabels represents sorted labels.
type SortedLabels []prompbmarshal.Label
// UnmarshalJSON unmarshals JSON from data.
func (sls *SortedLabels) UnmarshalJSON(data []byte) error {
var m map[string]string
if err := json.Unmarshal(data, &m); err != nil {
return err
}
*sls = getSortedLabels(m)
return nil
}
func getSortedLabels(m map[string]string) SortedLabels {
a := make([]prompbmarshal.Label, 0, len(m))
for k, v := range m {
a = append(a, prompbmarshal.Label{
Name: k,
Value: v,
})
}
sort.Slice(a, func(i, j int) bool {
return a[i].Name < a[j].Name
})
return a
}
// OwnerReference represents OwnerReferense from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ownerreference-v1-meta
type OwnerReference struct {
Name string
Controller bool
Kind string
}
// DaemonEndpoint represents DaemonEndpoint from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#daemonendpoint-v1-core
type DaemonEndpoint struct {
Port int
}
func joinHostPort(host string, port int) string {
portStr := strconv.Itoa(port)
return net.JoinHostPort(host, portStr)
}
// APIConfig contains config for API server
type APIConfig struct {
Server string
AuthConfig *promauth.Config
Namespaces []string
Selectors []Selector
}
// Selector represents kubernetes selector.
//
// See https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/
// and https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
type Selector struct {
Role string `yaml:"role"`
Label string `yaml:"label"`
Field string `yaml:"field"`
}
func joinSelectors(role string, namespaces []string, selectors []Selector) string {
var labelSelectors, fieldSelectors []string
for _, ns := range namespaces {
fieldSelectors = append(fieldSelectors, "metadata.namespace="+ns)
}
for _, s := range selectors {
if s.Role != role {
continue
}
if s.Label != "" {
labelSelectors = append(labelSelectors, s.Label)
}
if s.Field != "" {
fieldSelectors = append(fieldSelectors, s.Field)
}
}
var args []string
if len(labelSelectors) > 0 {
args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ",")))
}
if len(fieldSelectors) > 0 {
args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ",")))
}
return strings.Join(args, "&")
}

View File

@ -0,0 +1,189 @@
package kubernetes
import (
"encoding/json"
"fmt"
)
// GetEndpointsLabels returns labels for k8s endpoints obtained from the given apiServer
func GetEndpointsLabels(cfg *APIConfig) ([]map[string]string, error) {
data, err := getAPIResponse(cfg, "endpoints", "/api/v1/endpoints")
if err != nil {
return nil, fmt.Errorf("cannot obtain endpoints data from API server: %s", err)
}
epl, err := parseEndpointsList(data)
if err != nil {
return nil, fmt.Errorf("cannot parse endpoints response from API server: %s", err)
}
pods, err := getPods(cfg)
if err != nil {
return nil, err
}
svcs, err := getServices(cfg)
if err != nil {
return nil, err
}
var ms []map[string]string
for _, ep := range epl.Items {
ms = ep.appendTargetLabels(ms, pods, svcs)
}
return ms, nil
}
// EndpointsList implements k8s endpoints list.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointslist-v1-core
type EndpointsList struct {
Items []Endpoints
}
// Endpoints implements k8s endpoints.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpoints-v1-core
type Endpoints struct {
Metadata ObjectMeta
Subsets []EndpointSubset
}
// EndpointSubset implements k8s endpoint subset.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointsubset-v1-core
type EndpointSubset struct {
Addresses []EndpointAddress
NotReadyAddresses []EndpointAddress
Ports []EndpointPort
}
// EndpointAddress implements k8s endpoint address.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointaddress-v1-core
type EndpointAddress struct {
Hostname string
IP string
NodeName string
TargetRef ObjectReference
}
// ObjectReference implements k8s object reference.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#objectreference-v1-core
type ObjectReference struct {
Kind string
Name string
Namespace string
}
// EndpointPort implements k8s endpoint port.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointport-v1beta1-discovery-k8s-io
type EndpointPort struct {
AppProtocol string
Name string
Port int
Protocol string
}
// parseEndpointsList parses EndpointsList from data.
func parseEndpointsList(data []byte) (*EndpointsList, error) {
var esl EndpointsList
if err := json.Unmarshal(data, &esl); err != nil {
return nil, fmt.Errorf("cannot unmarshal EndpointsList from %q: %s", data, err)
}
return &esl, nil
}
// appendTargetLabels appends labels for each endpoint in eps to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints
func (eps *Endpoints) appendTargetLabels(ms []map[string]string, pods []Pod, svcs []Service) []map[string]string {
svc := getService(svcs, eps.Metadata.Namespace, eps.Metadata.Name)
podPortsSeen := make(map[*Pod][]int)
for _, ess := range eps.Subsets {
for _, epp := range ess.Ports {
ms = appendEndpointLabelsForAddresses(ms, podPortsSeen, eps, ess.Addresses, epp, pods, svc, "true")
ms = appendEndpointLabelsForAddresses(ms, podPortsSeen, eps, ess.NotReadyAddresses, epp, pods, svc, "false")
}
}
// Append labels for skipped ports on seen pods.
portSeen := func(port int, ports []int) bool {
for _, p := range ports {
if p == port {
return true
}
}
return false
}
for p, ports := range podPortsSeen {
for _, c := range p.Spec.Containers {
for _, cp := range c.Ports {
if portSeen(cp.ContainerPort, ports) {
continue
}
addr := joinHostPort(p.Status.PodIP, cp.ContainerPort)
m := map[string]string{
"__address__": addr,
}
p.appendCommonLabels(m)
p.appendContainerLabels(m, c, &cp)
ms = append(ms, m)
}
}
}
return ms
}
func appendEndpointLabelsForAddresses(ms []map[string]string, podPortsSeen map[*Pod][]int, eps *Endpoints, eas []EndpointAddress, epp EndpointPort,
pods []Pod, svc *Service, ready string) []map[string]string {
for _, ea := range eas {
p := getPod(pods, ea.TargetRef.Namespace, ea.TargetRef.Name)
m := getEndpointLabelsForAddressAndPort(podPortsSeen, eps, ea, epp, p, svc, ready)
ms = append(ms, m)
}
return ms
}
func getEndpointLabelsForAddressAndPort(podPortsSeen map[*Pod][]int, eps *Endpoints, ea EndpointAddress, epp EndpointPort, p *Pod, svc *Service, ready string) map[string]string {
m := getEndpointLabels(eps.Metadata, ea, epp, ready)
if svc != nil {
svc.appendCommonLabels(m)
}
if ea.TargetRef.Kind != "Pod" || p == nil {
return m
}
p.appendCommonLabels(m)
for _, c := range p.Spec.Containers {
for _, cp := range c.Ports {
if cp.ContainerPort == epp.Port {
p.appendContainerLabels(m, c, &cp)
podPortsSeen[p] = append(podPortsSeen[p], cp.ContainerPort)
break
}
}
}
return m
}
func getEndpointLabels(om ObjectMeta, ea EndpointAddress, epp EndpointPort, ready string) map[string]string {
addr := joinHostPort(ea.IP, epp.Port)
m := map[string]string{
"__address__": addr,
"__meta_kubernetes_namespace": om.Namespace,
"__meta_kubernetes_endpoints_name": om.Name,
"__meta_kubernetes_endpoint_ready": ready,
"__meta_kubernetes_endpoint_port_name": epp.Name,
"__meta_kubernetes_endpoint_port_protocol": epp.Protocol,
}
if ea.TargetRef.Kind != "" {
m["__meta_kubernetes_endpoint_address_target_kind"] = ea.TargetRef.Kind
m["__meta_kubernetes_endpoint_address_target_name"] = ea.TargetRef.Name
}
if ea.NodeName != "" {
m["__meta_kubernetes_endpoint_node_name"] = ea.NodeName
}
if ea.Hostname != "" {
m["__meta_kubernetes_endpoint_hostname"] = ea.Hostname
}
return m
}

View File

@ -0,0 +1,107 @@
package kubernetes
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestParseEndpointsListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
els, err := parseEndpointsList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if els != nil {
t.Fatalf("unexpected non-nil EnpointsList: %v", els)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
}
func TestParseEndpointsListSuccess(t *testing.T) {
data := `
{
"kind": "EndpointsList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/endpoints",
"resourceVersion": "128055"
},
"items": [
{
"metadata": {
"name": "kubernetes",
"namespace": "default",
"selfLink": "/api/v1/namespaces/default/endpoints/kubernetes",
"uid": "0972c7d9-c267-4b93-a090-a417eeb9b385",
"resourceVersion": "150",
"creationTimestamp": "2020-03-16T20:44:25Z"
},
"subsets": [
{
"addresses": [
{
"hostname": "aaa.bbb",
"nodeName": "foobar",
"ip": "172.17.0.2",
"targetRef": {
"kind": "Pod",
"namespace": "kube-system",
"name": "coredns-6955765f44-lnp6t",
"uid": "cbddb2b6-5b85-40f1-8819-9a59385169bb",
"resourceVersion": "124878"
}
}
],
"ports": [
{
"name": "https",
"port": 8443,
"protocol": "TCP"
}
]
}
]
}
]
}
`
els, err := parseEndpointsList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(els.Items) != 1 {
t.Fatalf("unexpected length of EndpointsList.Items; got %d; want %d", len(els.Items), 1)
}
endpoint := els.Items[0]
// Check endpoint.appendTargetLabels()
labelss := endpoint.appendTargetLabels(nil, nil, nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, getSortedLabels(labels))
}
expectedLabelss := [][]prompbmarshal.Label{
getSortedLabels(map[string]string{
"__address__": "172.17.0.2:8443",
"__meta_kubernetes_endpoint_address_target_kind": "Pod",
"__meta_kubernetes_endpoint_address_target_name": "coredns-6955765f44-lnp6t",
"__meta_kubernetes_endpoint_hostname": "aaa.bbb",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "https",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
"__meta_kubernetes_endpoints_name": "kubernetes",
"__meta_kubernetes_namespace": "default",
}),
}
if !reflect.DeepEqual(sortedLabelss, expectedLabelss) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss)
}
}

View File

@ -0,0 +1,136 @@
package kubernetes
import (
"encoding/json"
"fmt"
)
// GetIngressesLabels returns labels for k8s ingresses obtained from the given apiServer
func GetIngressesLabels(cfg *APIConfig) ([]map[string]string, error) {
data, err := getAPIResponse(cfg, "ingress", "/apis/extensions/v1beta1/ingresses")
if err != nil {
return nil, fmt.Errorf("cannot obtain ingresses data from API server: %s", err)
}
igl, err := parseIngressList(data)
if err != nil {
return nil, fmt.Errorf("cannot parse ingresses response from API server: %s", err)
}
var ms []map[string]string
for _, ig := range igl.Items {
ms = ig.appendTargetLabels(ms)
}
return ms, nil
}
// IngressList represents ingress list in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingresslist-v1beta1-extensions
type IngressList struct {
Items []Ingress
}
// Ingress represents ingress in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingress-v1beta1-extensions
type Ingress struct {
Metadata ObjectMeta
Spec IngressSpec
}
// IngressSpec represents ingress spec in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingressspec-v1beta1-extensions
type IngressSpec struct {
TLS []IngressTLS `json:"tls"`
Rules []IngressRule
}
// IngressTLS represents ingress TLS spec in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingresstls-v1beta1-extensions
type IngressTLS struct {
Hosts []string
}
// IngressRule represents ingress rule in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingressrule-v1beta1-extensions
type IngressRule struct {
Host string
HTTP HTTPIngressRuleValue `json:"http"`
}
// HTTPIngressRuleValue represents HTTP ingress rule value in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#httpingressrulevalue-v1beta1-extensions
type HTTPIngressRuleValue struct {
Paths []HTTPIngressPath
}
// HTTPIngressPath represents HTTP ingress path in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#httpingresspath-v1beta1-extensions
type HTTPIngressPath struct {
Path string
}
// parseIngressList parses IngressList from data.
func parseIngressList(data []byte) (*IngressList, error) {
var il IngressList
if err := json.Unmarshal(data, &il); err != nil {
return nil, fmt.Errorf("cannot unmarshal IngressList from %q: %s", data, err)
}
return &il, nil
}
// appendTargetLabels appends labels for Ingress ig to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress
func (ig *Ingress) appendTargetLabels(ms []map[string]string) []map[string]string {
tlsHosts := make(map[string]bool)
for _, tls := range ig.Spec.TLS {
for _, host := range tls.Hosts {
tlsHosts[host] = true
}
}
for _, r := range ig.Spec.Rules {
paths := getIngressRulePaths(r.HTTP.Paths)
scheme := "http"
if tlsHosts[r.Host] {
scheme = "https"
}
for _, path := range paths {
m := getLabelsForIngressPath(ig, scheme, r.Host, path)
ms = append(ms, m)
}
}
return ms
}
func getLabelsForIngressPath(ig *Ingress, scheme, host, path string) map[string]string {
m := map[string]string{
"__address__": host,
"__meta_kubernetes_namespace": ig.Metadata.Namespace,
"__meta_kubernetes_ingress_name": ig.Metadata.Name,
"__meta_kubernetes_ingress_scheme": scheme,
"__meta_kubernetes_ingress_host": host,
"__meta_kubernetes_ingress_path": path,
}
ig.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_ingress", m)
return m
}
func getIngressRulePaths(paths []HTTPIngressPath) []string {
if len(paths) == 0 {
return []string{"/"}
}
var result []string
for _, p := range paths {
path := p.Path
if path == "" {
path = "/"
}
result = append(result, path)
}
return result
}

View File

@ -0,0 +1,103 @@
package kubernetes
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestParseIngressListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
nls, err := parseIngressList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if nls != nil {
t.Fatalf("unexpected non-nil IngressList: %v", nls)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
}
func TestParseIngressListSuccess(t *testing.T) {
data := `
{
"kind": "IngressList",
"apiVersion": "extensions/v1beta1",
"metadata": {
"selfLink": "/apis/extensions/v1beta1/ingresses",
"resourceVersion": "351452"
},
"items": [
{
"metadata": {
"name": "test-ingress",
"namespace": "default",
"selfLink": "/apis/extensions/v1beta1/namespaces/default/ingresses/test-ingress",
"uid": "6d3f38f9-de89-4bc9-b273-c8faf74e8a27",
"resourceVersion": "351445",
"generation": 1,
"creationTimestamp": "2020-04-13T16:43:52Z",
"annotations": {
"kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"networking.k8s.io/v1beta1\",\"kind\":\"Ingress\",\"metadata\":{\"annotations\":{},\"name\":\"test-ingress\",\"namespace\":\"default\"},\"spec\":{\"backend\":{\"serviceName\":\"testsvc\",\"servicePort\":80}}}\n"
}
},
"spec": {
"backend": {
"serviceName": "testsvc",
"servicePort": 80
},
"rules": [
{
"host": "foobar"
}
]
},
"status": {
"loadBalancer": {
"ingress": [
{
"ip": "172.17.0.2"
}
]
}
}
}
]
}`
igs, err := parseIngressList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(igs.Items) != 1 {
t.Fatalf("unexpected length of IngressList.Items; got %d; want %d", len(igs.Items), 1)
}
ig := igs.Items[0]
// Check ig.appendTargetLabels()
labelss := ig.appendTargetLabels(nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, getSortedLabels(labels))
}
expectedLabelss := [][]prompbmarshal.Label{
getSortedLabels(map[string]string{
"__address__": "foobar",
"__meta_kubernetes_ingress_annotation_kubectl_kubernetes_io_last_applied_configuration": `{"apiVersion":"networking.k8s.io/v1beta1","kind":"Ingress","metadata":{"annotations":{},"name":"test-ingress","namespace":"default"},"spec":{"backend":{"serviceName":"testsvc","servicePort":80}}}` + "\n",
"__meta_kubernetes_ingress_annotationpresent_kubectl_kubernetes_io_last_applied_configuration": "true",
"__meta_kubernetes_ingress_host": "foobar",
"__meta_kubernetes_ingress_name": "test-ingress",
"__meta_kubernetes_ingress_path": "/",
"__meta_kubernetes_ingress_scheme": "http",
"__meta_kubernetes_namespace": "default",
}),
}
if !reflect.DeepEqual(sortedLabelss, expectedLabelss) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss)
}
}

View File

@ -0,0 +1,131 @@
package kubernetes
import (
"encoding/json"
"fmt"
)
// GetNodesLabels returns labels for k8s nodes obtained from the given apiServer.
func GetNodesLabels(cfg *APIConfig) ([]map[string]string, error) {
data, err := getAPIResponse(cfg, "node", "/api/v1/nodes")
if err != nil {
return nil, fmt.Errorf("cannot obtain nodes data from API server: %s", err)
}
nl, err := parseNodeList(data)
if err != nil {
return nil, fmt.Errorf("cannot parse nodes response from API server: %s", err)
}
var ms []map[string]string
for _, n := range nl.Items {
// Do not apply namespaces, since they are missing in nodes.
ms = n.appendTargetLabels(ms)
}
return ms, nil
}
// NodeList represents NodeList from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodelist-v1-core
type NodeList struct {
Items []Node
}
// Node represents Node from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#node-v1-core
type Node struct {
Metadata ObjectMeta
Status NodeStatus
}
// NodeStatus represents NodeStatus from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodestatus-v1-core
type NodeStatus struct {
Addresses []NodeAddress
DaemonEndpoints NodeDaemonEndpoints
}
// NodeAddress represents NodeAddress from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodeaddress-v1-core
type NodeAddress struct {
Type string
Address string
}
// NodeDaemonEndpoints represents NodeDaemonEndpoints from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodedaemonendpoints-v1-core
type NodeDaemonEndpoints struct {
KubeletEndpoint DaemonEndpoint
}
// parseNodeList parses NodeList from data.
func parseNodeList(data []byte) (*NodeList, error) {
var nl NodeList
if err := json.Unmarshal(data, &nl); err != nil {
return nil, fmt.Errorf("cannot unmarshal NodeList from %q: %s", data, err)
}
return &nl, nil
}
// appendTargetLabels appends labels for the given Node n to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node
func (n *Node) appendTargetLabels(ms []map[string]string) []map[string]string {
addr := getNodeAddr(n.Status.Addresses)
if len(addr) == 0 {
// Skip node without address
return ms
}
addr = joinHostPort(addr, n.Status.DaemonEndpoints.KubeletEndpoint.Port)
m := map[string]string{
"__address__": addr,
"instance": n.Metadata.Name,
"__meta_kubernetes_node_name": n.Metadata.Name,
}
n.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_node", m)
addrTypesUsed := make(map[string]bool, len(n.Status.Addresses))
for _, a := range n.Status.Addresses {
if addrTypesUsed[a.Type] {
continue
}
addrTypesUsed[a.Type] = true
ln := sanitizeLabelName(a.Type)
m[fmt.Sprintf("__meta_kubernetes_node_address_%s", ln)] = a.Address
}
ms = append(ms, m)
return ms
}
func getNodeAddr(nas []NodeAddress) string {
if addr := getAddrByType(nas, "InternalIP"); len(addr) > 0 {
return addr
}
if addr := getAddrByType(nas, "InternalDNS"); len(addr) > 0 {
return addr
}
if addr := getAddrByType(nas, "ExternalIP"); len(addr) > 0 {
return addr
}
if addr := getAddrByType(nas, "ExternalDNS"); len(addr) > 0 {
return addr
}
if addr := getAddrByType(nas, "LegacyHostIP"); len(addr) > 0 {
return addr
}
if addr := getAddrByType(nas, "Hostname"); len(addr) > 0 {
return addr
}
return ""
}
func getAddrByType(nas []NodeAddress, typ string) string {
for _, na := range nas {
if na.Type == typ {
return na.Address
}
}
return ""
}

View File

@ -0,0 +1,320 @@
package kubernetes
import (
"reflect"
"testing"
)
func TestParseNodeListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
nls, err := parseNodeList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if nls != nil {
t.Fatalf("unexpected non-nil NodeList: %v", nls)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
}
func TestParseNodeListSuccess(t *testing.T) {
data := `{
"kind": "NodeList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/nodes",
"resourceVersion": "22627"
},
"items": [
{
"metadata": {
"name": "m01",
"selfLink": "/api/v1/nodes/m01",
"uid": "b48dd901-ead0-476a-b209-d2d908d65109",
"resourceVersion": "22309",
"creationTimestamp": "2020-03-16T20:44:23Z",
"labels": {
"beta.kubernetes.io/arch": "amd64",
"beta.kubernetes.io/os": "linux",
"kubernetes.io/arch": "amd64",
"kubernetes.io/hostname": "m01",
"kubernetes.io/os": "linux",
"minikube.k8s.io/commit": "eb13446e786c9ef70cb0a9f85a633194e62396a1",
"minikube.k8s.io/name": "minikube",
"minikube.k8s.io/updated_at": "2020_03_16T22_44_27_0700",
"minikube.k8s.io/version": "v1.8.2",
"node-role.kubernetes.io/master": ""
},
"annotations": {
"kubeadm.alpha.kubernetes.io/cri-socket": "/var/run/dockershim.sock",
"node.alpha.kubernetes.io/ttl": "0",
"volumes.kubernetes.io/controller-managed-attach-detach": "true"
}
},
"spec": {
"podCIDR": "10.244.0.0/24",
"podCIDRs": [
"10.244.0.0/24"
]
},
"status": {
"capacity": {
"cpu": "4",
"ephemeral-storage": "474705032Ki",
"hugepages-1Gi": "0",
"hugepages-2Mi": "0",
"memory": "16314884Ki",
"pods": "110"
},
"allocatable": {
"cpu": "4",
"ephemeral-storage": "437488156767",
"hugepages-1Gi": "0",
"hugepages-2Mi": "0",
"memory": "16212484Ki",
"pods": "110"
},
"conditions": [
{
"type": "MemoryPressure",
"status": "False",
"lastHeartbeatTime": "2020-03-20T13:30:38Z",
"lastTransitionTime": "2020-03-16T20:44:18Z",
"reason": "KubeletHasSufficientMemory",
"message": "kubelet has sufficient memory available"
},
{
"type": "DiskPressure",
"status": "False",
"lastHeartbeatTime": "2020-03-20T13:30:38Z",
"lastTransitionTime": "2020-03-16T20:44:18Z",
"reason": "KubeletHasNoDiskPressure",
"message": "kubelet has no disk pressure"
},
{
"type": "PIDPressure",
"status": "False",
"lastHeartbeatTime": "2020-03-20T13:30:38Z",
"lastTransitionTime": "2020-03-16T20:44:18Z",
"reason": "KubeletHasSufficientPID",
"message": "kubelet has sufficient PID available"
},
{
"type": "Ready",
"status": "True",
"lastHeartbeatTime": "2020-03-20T13:30:38Z",
"lastTransitionTime": "2020-03-16T20:44:39Z",
"reason": "KubeletReady",
"message": "kubelet is posting ready status"
}
],
"addresses": [
{
"type": "InternalIP",
"address": "172.17.0.2"
},
{
"type": "Hostname",
"address": "m01"
}
],
"daemonEndpoints": {
"kubeletEndpoint": {
"Port": 10250
}
},
"nodeInfo": {
"machineID": "e64aad27e586485b9a9cbd699840c0ab",
"systemUUID": "4d9f5caa-25de-46c6-8d54-d1c5141b78cc",
"bootID": "947ffc57-db48-4a03-b7c6-18ce0b85238d",
"kernelVersion": "4.15.0-91-generic",
"osImage": "Ubuntu 19.10",
"containerRuntimeVersion": "docker://19.3.2",
"kubeletVersion": "v1.17.3",
"kubeProxyVersion": "v1.17.3",
"operatingSystem": "linux",
"architecture": "amd64"
},
"images": [
{
"names": [
"k8s.gcr.io/etcd@sha256:4afb99b4690b418ffc2ceb67e1a17376457e441c1f09ab55447f0aaf992fa646",
"k8s.gcr.io/etcd:3.4.3-0"
],
"sizeBytes": 288426917
},
{
"names": [
"k8s.gcr.io/kube-apiserver@sha256:33400ea29255bd20714b6b8092b22ebb045ae134030d6bf476bddfed9d33e900",
"k8s.gcr.io/kube-apiserver:v1.17.3"
],
"sizeBytes": 170986003
},
{
"names": [
"k8s.gcr.io/kube-controller-manager@sha256:2f0bf4d08e72a1fd6327c8eca3a72ad21af3a608283423bb3c10c98e68759844",
"k8s.gcr.io/kube-controller-manager:v1.17.3"
],
"sizeBytes": 160918035
},
{
"names": [
"k8s.gcr.io/kube-proxy@sha256:3a70e2ab8d1d623680191a1a1f1dcb0bdbfd388784b1f153d5630a7397a63fd4",
"k8s.gcr.io/kube-proxy:v1.17.3"
],
"sizeBytes": 115964919
},
{
"names": [
"k8s.gcr.io/kube-scheduler@sha256:b091f0db3bc61a3339fd3ba7ebb06c984c4ded32e1f2b1ef0fbdfab638e88462",
"k8s.gcr.io/kube-scheduler:v1.17.3"
],
"sizeBytes": 94435859
},
{
"names": [
"kubernetesui/dashboard@sha256:fc90baec4fb62b809051a3227e71266c0427240685139bbd5673282715924ea7",
"kubernetesui/dashboard:v2.0.0-beta8"
],
"sizeBytes": 90835427
},
{
"names": [
"gcr.io/k8s-minikube/storage-provisioner@sha256:088daa9fcbccf04c3f415d77d5a6360d2803922190b675cb7fc88a9d2d91985a",
"gcr.io/k8s-minikube/storage-provisioner:v1.8.1"
],
"sizeBytes": 80815640
},
{
"names": [
"kindest/kindnetd@sha256:bc1833b3da442bb639008dd5a62861a0419d3f64b58fce6fb38b749105232555",
"kindest/kindnetd:0.5.3"
],
"sizeBytes": 78486107
},
{
"names": [
"k8s.gcr.io/coredns@sha256:7ec975f167d815311a7136c32e70735f0d00b73781365df1befd46ed35bd4fe7",
"k8s.gcr.io/coredns:1.6.5"
],
"sizeBytes": 41578211
},
{
"names": [
"kubernetesui/metrics-scraper@sha256:2026f9f7558d0f25cc6bab74ea201b4e9d5668fbc378ef64e13fddaea570efc0",
"kubernetesui/metrics-scraper:v1.0.2"
],
"sizeBytes": 40101552
},
{
"names": [
"k8s.gcr.io/pause@sha256:f78411e19d84a252e53bff71a4407a5686c46983a2c2eeed83929b888179acea",
"k8s.gcr.io/pause:3.1"
],
"sizeBytes": 742472
}
]
}
}
]
}
`
nls, err := parseNodeList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(nls.Items) != 1 {
t.Fatalf("unexpected length of NodeList.Items; got %d; want %d", len(nls.Items), 1)
}
node := nls.Items[0]
meta := node.Metadata
if meta.Name != "m01" {
t.Fatalf("unexpected ObjectMeta.Name; got %q; want %q", meta.Name, "m01")
}
expectedLabels := getSortedLabels(map[string]string{
"beta.kubernetes.io/arch": "amd64",
"beta.kubernetes.io/os": "linux",
"kubernetes.io/arch": "amd64",
"kubernetes.io/hostname": "m01",
"kubernetes.io/os": "linux",
"minikube.k8s.io/commit": "eb13446e786c9ef70cb0a9f85a633194e62396a1",
"minikube.k8s.io/name": "minikube",
"minikube.k8s.io/updated_at": "2020_03_16T22_44_27_0700",
"minikube.k8s.io/version": "v1.8.2",
"node-role.kubernetes.io/master": "",
})
if !reflect.DeepEqual(meta.Labels, expectedLabels) {
t.Fatalf("unexpected ObjectMeta.Labels\ngot\n%v\nwant\n%v", meta.Labels, expectedLabels)
}
expectedAnnotations := getSortedLabels(map[string]string{
"kubeadm.alpha.kubernetes.io/cri-socket": "/var/run/dockershim.sock",
"node.alpha.kubernetes.io/ttl": "0",
"volumes.kubernetes.io/controller-managed-attach-detach": "true",
})
if !reflect.DeepEqual(meta.Annotations, expectedAnnotations) {
t.Fatalf("unexpected ObjectMeta.Annotations\ngot\n%v\nwant\n%v", meta.Annotations, expectedAnnotations)
}
status := node.Status
expectedAddresses := []NodeAddress{
{
Type: "InternalIP",
Address: "172.17.0.2",
},
{
Type: "Hostname",
Address: "m01",
},
}
if !reflect.DeepEqual(status.Addresses, expectedAddresses) {
t.Fatalf("unexpected addresses\ngot\n%v\nwant\n%v", status.Addresses, expectedAddresses)
}
// Check node.appendTargetLabels()
labels := getSortedLabels(node.appendTargetLabels(nil)[0])
expectedLabels = getSortedLabels(map[string]string{
"instance": "m01",
"__address__": "172.17.0.2:10250",
"__meta_kubernetes_node_name": "m01",
"__meta_kubernetes_node_label_beta_kubernetes_io_arch": "amd64",
"__meta_kubernetes_node_label_beta_kubernetes_io_os": "linux",
"__meta_kubernetes_node_label_kubernetes_io_arch": "amd64",
"__meta_kubernetes_node_label_kubernetes_io_hostname": "m01",
"__meta_kubernetes_node_label_kubernetes_io_os": "linux",
"__meta_kubernetes_node_label_minikube_k8s_io_commit": "eb13446e786c9ef70cb0a9f85a633194e62396a1",
"__meta_kubernetes_node_label_minikube_k8s_io_name": "minikube",
"__meta_kubernetes_node_label_minikube_k8s_io_updated_at": "2020_03_16T22_44_27_0700",
"__meta_kubernetes_node_label_minikube_k8s_io_version": "v1.8.2",
"__meta_kubernetes_node_label_node_role_kubernetes_io_master": "",
"__meta_kubernetes_node_labelpresent_beta_kubernetes_io_arch": "true",
"__meta_kubernetes_node_labelpresent_beta_kubernetes_io_os": "true",
"__meta_kubernetes_node_labelpresent_kubernetes_io_arch": "true",
"__meta_kubernetes_node_labelpresent_kubernetes_io_hostname": "true",
"__meta_kubernetes_node_labelpresent_kubernetes_io_os": "true",
"__meta_kubernetes_node_labelpresent_minikube_k8s_io_commit": "true",
"__meta_kubernetes_node_labelpresent_minikube_k8s_io_name": "true",
"__meta_kubernetes_node_labelpresent_minikube_k8s_io_updated_at": "true",
"__meta_kubernetes_node_labelpresent_minikube_k8s_io_version": "true",
"__meta_kubernetes_node_labelpresent_node_role_kubernetes_io_master": "true",
"__meta_kubernetes_node_annotation_kubeadm_alpha_kubernetes_io_cri_socket": "/var/run/dockershim.sock",
"__meta_kubernetes_node_annotation_node_alpha_kubernetes_io_ttl": "0",
"__meta_kubernetes_node_annotation_volumes_kubernetes_io_controller_managed_attach_detach": "true",
"__meta_kubernetes_node_annotationpresent_kubeadm_alpha_kubernetes_io_cri_socket": "true",
"__meta_kubernetes_node_annotationpresent_node_alpha_kubernetes_io_ttl": "true",
"__meta_kubernetes_node_annotationpresent_volumes_kubernetes_io_controller_managed_attach_detach": "true",
"__meta_kubernetes_node_address_InternalIP": "172.17.0.2",
"__meta_kubernetes_node_address_Hostname": "m01",
})
if !reflect.DeepEqual(labels, expectedLabels) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", labels, expectedLabels)
}
}

View File

@ -0,0 +1,198 @@
package kubernetes
import (
"encoding/json"
"fmt"
"strconv"
"strings"
)
// GetPodsLabels returns labels for k8s pods obtained from the given apiServer
func GetPodsLabels(cfg *APIConfig) ([]map[string]string, error) {
pods, err := getPods(cfg)
if err != nil {
return nil, err
}
var ms []map[string]string
for _, p := range pods {
ms = p.appendTargetLabels(ms)
}
return ms, nil
}
func getPods(cfg *APIConfig) ([]Pod, error) {
data, err := getAPIResponse(cfg, "pod", "/api/v1/pods")
if err != nil {
return nil, fmt.Errorf("cannot obtain pods data from API server: %s", err)
}
pl, err := parsePodList(data)
if err != nil {
return nil, fmt.Errorf("cannot parse pods response from API server: %s", err)
}
return pl.Items, nil
}
// PodList implements k8s pod list.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podlist-v1-core
type PodList struct {
Items []Pod
}
// Pod implements k8s pod.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#pod-v1-core
type Pod struct {
Metadata ObjectMeta
Spec PodSpec
Status PodStatus
}
// PodSpec implements k8s pod spec.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podspec-v1-core
type PodSpec struct {
NodeName string
Containers []Container
InitContainers []Container
}
// Container implements k8s container.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#container-v1-core
type Container struct {
Name string
Ports []ContainerPort
}
// ContainerPort implements k8s container port.
type ContainerPort struct {
Name string
ContainerPort int
Protocol string
}
// PodStatus implements k8s pod status.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podstatus-v1-core
type PodStatus struct {
Phase string
PodIP string
HostIP string
Conditions []PodCondition
}
// PodCondition implements k8s pod condition.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podcondition-v1-core
type PodCondition struct {
Type string
Status string
}
// parsePodList parses PodList from data.
func parsePodList(data []byte) (*PodList, error) {
var pl PodList
if err := json.Unmarshal(data, &pl); err != nil {
return nil, fmt.Errorf("cannot unmarshal PodList from %q: %s", data, err)
}
return &pl, nil
}
// appendTargetLabels appends labels for each port of the given Pod p to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#pod
func (p *Pod) appendTargetLabels(ms []map[string]string) []map[string]string {
if len(p.Status.PodIP) == 0 {
// Skip pod without IP
return ms
}
ms = appendPodLabels(ms, p, p.Spec.Containers, "false")
ms = appendPodLabels(ms, p, p.Spec.InitContainers, "true")
return ms
}
func appendPodLabels(ms []map[string]string, p *Pod, cs []Container, isInit string) []map[string]string {
for _, c := range cs {
for _, cp := range c.Ports {
m := getPodLabels(p, c, &cp, isInit)
ms = append(ms, m)
}
if len(c.Ports) == 0 {
m := getPodLabels(p, c, nil, isInit)
ms = append(ms, m)
}
}
return ms
}
func getPodLabels(p *Pod, c Container, cp *ContainerPort, isInit string) map[string]string {
addr := p.Status.PodIP
if cp != nil {
addr = joinHostPort(addr, cp.ContainerPort)
}
m := map[string]string{
"__address__": addr,
"__meta_kubernetes_pod_container_init": isInit,
}
p.appendCommonLabels(m)
p.appendContainerLabels(m, c, cp)
return m
}
func (p *Pod) appendContainerLabels(m map[string]string, c Container, cp *ContainerPort) {
m["__meta_kubernetes_pod_container_name"] = c.Name
if cp != nil {
m["__meta_kubernetes_pod_container_port_name"] = cp.Name
m["__meta_kubernetes_pod_container_port_number"] = strconv.Itoa(cp.ContainerPort)
m["__meta_kubernetes_pod_container_port_protocol"] = cp.Protocol
}
}
func (p *Pod) appendCommonLabels(m map[string]string) {
m["__meta_kubernetes_pod_name"] = p.Metadata.Name
m["__meta_kubernetes_pod_ip"] = p.Status.PodIP
m["__meta_kubernetes_pod_ready"] = getPodReadyStatus(p.Status.Conditions)
m["__meta_kubernetes_pod_phase"] = p.Status.Phase
m["__meta_kubernetes_pod_node_name"] = p.Spec.NodeName
m["__meta_kubernetes_pod_host_ip"] = p.Status.HostIP
m["__meta_kubernetes_pod_uid"] = p.Metadata.UID
m["__meta_kubernetes_namespace"] = p.Metadata.Namespace
if pc := getPodController(p.Metadata.OwnerReferences); pc != nil {
if pc.Kind != "" {
m["__meta_kubernetes_pod_controller_kind"] = pc.Kind
}
if pc.Name != "" {
m["__meta_kubernetes_pod_controller_name"] = pc.Name
}
}
p.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_pod", m)
}
func getPodController(ors []OwnerReference) *OwnerReference {
for _, or := range ors {
if or.Controller {
return &or
}
}
return nil
}
func getPodReadyStatus(conds []PodCondition) string {
for _, c := range conds {
if c.Type == "Ready" {
return strings.ToLower(c.Status)
}
}
return "unknown"
}
func getPod(pods []Pod, namespace, name string) *Pod {
for i := range pods {
pod := &pods[i]
if pod.Metadata.Name == name && pod.Metadata.Namespace == namespace {
return pod
}
}
return nil
}

View File

@ -0,0 +1,285 @@
package kubernetes
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestParsePodListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
nls, err := parsePodList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if nls != nil {
t.Fatalf("unexpected non-nil PodList: %v", nls)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
}
func TestParsePodListSuccess(t *testing.T) {
data := `
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/pods",
"resourceVersion": "72425"
},
"items": [
{
"metadata": {
"name": "etcd-m01",
"namespace": "kube-system",
"selfLink": "/api/v1/namespaces/kube-system/pods/etcd-m01",
"uid": "9d328156-75d1-411a-bdd0-aeacb53a38de",
"resourceVersion": "22318",
"creationTimestamp": "2020-03-16T20:44:30Z",
"labels": {
"component": "etcd",
"tier": "control-plane"
},
"annotations": {
"kubernetes.io/config.hash": "3ec997b76fb6ed3b78da8e0b5676dac4",
"kubernetes.io/config.mirror": "3ec997b76fb6ed3b78da8e0b5676dac4",
"kubernetes.io/config.seen": "2020-03-16T20:44:26.538136233Z",
"kubernetes.io/config.source": "file"
},
"ownerReferences": [
{
"apiVersion": "v1",
"kind": "Node",
"name": "m01",
"uid": "b48dd901-ead0-476a-b209-d2d908d65109",
"controller": true
}
]
},
"spec": {
"volumes": [
{
"name": "etcd-certs",
"hostPath": {
"path": "/var/lib/minikube/certs/etcd",
"type": "DirectoryOrCreate"
}
},
{
"name": "etcd-data",
"hostPath": {
"path": "/var/lib/minikube/etcd",
"type": "DirectoryOrCreate"
}
}
],
"containers": [
{
"name": "etcd",
"image": "k8s.gcr.io/etcd:3.4.3-0",
"command": [
"etcd",
"--advertise-client-urls=https://172.17.0.2:2379",
"--cert-file=/var/lib/minikube/certs/etcd/server.crt",
"--client-cert-auth=true",
"--data-dir=/var/lib/minikube/etcd",
"--initial-advertise-peer-urls=https://172.17.0.2:2380",
"--initial-cluster=m01=https://172.17.0.2:2380",
"--key-file=/var/lib/minikube/certs/etcd/server.key",
"--listen-client-urls=https://127.0.0.1:2379,https://172.17.0.2:2379",
"--listen-metrics-urls=http://127.0.0.1:2381",
"--listen-peer-urls=https://172.17.0.2:2380",
"--name=m01",
"--peer-cert-file=/var/lib/minikube/certs/etcd/peer.crt",
"--peer-client-cert-auth=true",
"--peer-key-file=/var/lib/minikube/certs/etcd/peer.key",
"--peer-trusted-ca-file=/var/lib/minikube/certs/etcd/ca.crt",
"--snapshot-count=10000",
"--trusted-ca-file=/var/lib/minikube/certs/etcd/ca.crt"
],
"resources": {
},
"ports": [
{
"name": "foobar",
"containerPort": 1234,
"protocol": "TCP"
}
],
"volumeMounts": [
{
"name": "etcd-data",
"mountPath": "/var/lib/minikube/etcd"
},
{
"name": "etcd-certs",
"mountPath": "/var/lib/minikube/certs/etcd"
}
],
"livenessProbe": {
"httpGet": {
"path": "/health",
"port": 2381,
"host": "127.0.0.1",
"scheme": "HTTP"
},
"initialDelaySeconds": 15,
"timeoutSeconds": 15,
"periodSeconds": 10,
"successThreshold": 1,
"failureThreshold": 8
},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File",
"imagePullPolicy": "IfNotPresent"
}
],
"restartPolicy": "Always",
"terminationGracePeriodSeconds": 30,
"dnsPolicy": "ClusterFirst",
"nodeName": "m01",
"hostNetwork": true,
"securityContext": {
},
"schedulerName": "default-scheduler",
"tolerations": [
{
"operator": "Exists",
"effect": "NoExecute"
}
],
"priorityClassName": "system-cluster-critical",
"priority": 2000000000,
"enableServiceLinks": true
},
"status": {
"phase": "Running",
"conditions": [
{
"type": "Initialized",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2020-03-20T13:30:29Z"
},
{
"type": "Ready",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2020-03-20T13:30:32Z"
},
{
"type": "ContainersReady",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2020-03-20T13:30:32Z"
},
{
"type": "PodScheduled",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2020-03-20T13:30:29Z"
}
],
"hostIP": "172.17.0.2",
"podIP": "172.17.0.2",
"podIPs": [
{
"ip": "172.17.0.2"
}
],
"startTime": "2020-03-20T13:30:29Z",
"containerStatuses": [
{
"name": "etcd",
"state": {
"running": {
"startedAt": "2020-03-20T13:30:30Z"
}
},
"lastState": {
"terminated": {
"exitCode": 0,
"reason": "Completed",
"startedAt": "2020-03-17T18:56:24Z",
"finishedAt": "2020-03-20T13:29:54Z",
"containerID": "docker://24eea6f192d4598fcc129b5f163a02d1457137f4ec34e8c80c6049a65604cb07"
}
},
"ready": true,
"restartCount": 2,
"image": "k8s.gcr.io/etcd:3.4.3-0",
"imageID": "docker-pullable://k8s.gcr.io/etcd@sha256:4afb99b4690b418ffc2ceb67e1a17376457e441c1f09ab55447f0aaf992fa646",
"containerID": "docker://a28f0800855008485376c1eece1cf61de97cb7026b9188d138b0d55d92fc2f5c",
"started": true
}
],
"qosClass": "BestEffort"
}
}
]
}
`
pls, err := parsePodList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(pls.Items) != 1 {
t.Fatalf("unexpected length of PodList.Items; got %d; want %d", len(pls.Items), 1)
}
pod := pls.Items[0]
// Check pod.appendTargetLabels()
labelss := pod.appendTargetLabels(nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, getSortedLabels(labels))
}
expectedLabels := [][]prompbmarshal.Label{
getSortedLabels(map[string]string{
"__address__": "172.17.0.2:1234",
"__meta_kubernetes_namespace": "kube-system",
"__meta_kubernetes_pod_name": "etcd-m01",
"__meta_kubernetes_pod_ip": "172.17.0.2",
"__meta_kubernetes_pod_container_name": "etcd",
"__meta_kubernetes_pod_container_port_name": "foobar",
"__meta_kubernetes_pod_container_port_number": "1234",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_phase": "Running",
"__meta_kubernetes_pod_node_name": "m01",
"__meta_kubernetes_pod_host_ip": "172.17.0.2",
"__meta_kubernetes_pod_uid": "9d328156-75d1-411a-bdd0-aeacb53a38de",
"__meta_kubernetes_pod_controller_kind": "Node",
"__meta_kubernetes_pod_controller_name": "m01",
"__meta_kubernetes_pod_container_init": "false",
"__meta_kubernetes_pod_label_component": "etcd",
"__meta_kubernetes_pod_label_tier": "control-plane",
"__meta_kubernetes_pod_labelpresent_component": "true",
"__meta_kubernetes_pod_labelpresent_tier": "true",
"__meta_kubernetes_pod_annotation_kubernetes_io_config_hash": "3ec997b76fb6ed3b78da8e0b5676dac4",
"__meta_kubernetes_pod_annotation_kubernetes_io_config_mirror": "3ec997b76fb6ed3b78da8e0b5676dac4",
"__meta_kubernetes_pod_annotation_kubernetes_io_config_seen": "2020-03-16T20:44:26.538136233Z",
"__meta_kubernetes_pod_annotation_kubernetes_io_config_source": "file",
"__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_hash": "true",
"__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_mirror": "true",
"__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_seen": "true",
"__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_source": "true",
}),
}
if !reflect.DeepEqual(sortedLabelss, expectedLabels) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabels)
}
}

View File

@ -0,0 +1,114 @@
package kubernetes
import (
"encoding/json"
"fmt"
)
// GetServicesLabels returns labels for k8s services obtained from the given apiServer
func GetServicesLabels(cfg *APIConfig) ([]map[string]string, error) {
svcs, err := getServices(cfg)
if err != nil {
return nil, err
}
var ms []map[string]string
for _, svc := range svcs {
ms = svc.appendTargetLabels(ms)
}
return ms, nil
}
func getServices(cfg *APIConfig) ([]Service, error) {
data, err := getAPIResponse(cfg, "service", "/api/v1/services")
if err != nil {
return nil, fmt.Errorf("cannot obtain services data from API server: %s", err)
}
sl, err := parseServiceList(data)
if err != nil {
return nil, fmt.Errorf("cannot parse services response from API server: %s", err)
}
return sl.Items, nil
}
// ServiceList is k8s service list.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicelist-v1-core
type ServiceList struct {
Items []Service
}
// Service is k8s service.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#service-v1-core
type Service struct {
Metadata ObjectMeta
Spec ServiceSpec
}
// ServiceSpec is k8s service spec.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicespec-v1-core
type ServiceSpec struct {
ClusterIP string
ExternalName string
Type string
Ports []ServicePort
}
// ServicePort is k8s service port.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#serviceport-v1-core
type ServicePort struct {
Name string
Protocol string
Port int
}
// parseServiceList parses ServiceList from data.
func parseServiceList(data []byte) (*ServiceList, error) {
var sl ServiceList
if err := json.Unmarshal(data, &sl); err != nil {
return nil, fmt.Errorf("cannot unmarshal ServiceList from %q: %s", data, err)
}
return &sl, nil
}
// appendTargetLabels appends labels for each port of the given Service s to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service
func (s *Service) appendTargetLabels(ms []map[string]string) []map[string]string {
host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace)
for _, sp := range s.Spec.Ports {
addr := joinHostPort(host, sp.Port)
m := map[string]string{
"__address__": addr,
"__meta_kubernetes_service_port_name": sp.Name,
"__meta_kubernetes_service_port_protocol": sp.Protocol,
}
s.appendCommonLabels(m)
ms = append(ms, m)
}
return ms
}
func (s *Service) appendCommonLabels(m map[string]string) {
m["__meta_kubernetes_namespace"] = s.Metadata.Namespace
m["__meta_kubernetes_service_name"] = s.Metadata.Name
m["__meta_kubernetes_service_type"] = s.Spec.Type
if s.Spec.Type != "ExternalName" {
m["__meta_kubernetes_service_cluster_ip"] = s.Spec.ClusterIP
} else {
m["__meta_kubernetes_service_external_name"] = s.Spec.ExternalName
}
s.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_service", m)
}
func getService(svcs []Service, namespace, name string) *Service {
for i := range svcs {
svc := &svcs[i]
if svc.Metadata.Name == name && svc.Metadata.Namespace == namespace {
return svc
}
}
return nil
}

View File

@ -0,0 +1,227 @@
package kubernetes
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestParseServiceListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
nls, err := parseServiceList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if nls != nil {
t.Fatalf("unexpected non-nil ServiceList: %v", nls)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
}
func TestParseServiceListSuccess(t *testing.T) {
data := `{
"kind": "ServiceList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/services",
"resourceVersion": "60485"
},
"items": [
{
"metadata": {
"name": "kube-dns",
"namespace": "kube-system",
"selfLink": "/api/v1/namespaces/kube-system/services/kube-dns",
"uid": "38a396f1-17fe-46c2-a5f4-3b225c18dcdf",
"resourceVersion": "177",
"creationTimestamp": "2020-03-16T20:44:26Z",
"labels": {
"k8s-app": "kube-dns",
"kubernetes.io/cluster-service": "true",
"kubernetes.io/name": "KubeDNS"
},
"annotations": {
"prometheus.io/port": "9153",
"prometheus.io/scrape": "true"
}
},
"spec": {
"ports": [
{
"name": "dns",
"protocol": "UDP",
"port": 53,
"targetPort": 53
},
{
"name": "dns-tcp",
"protocol": "TCP",
"port": 53,
"targetPort": 53
},
{
"name": "metrics",
"protocol": "TCP",
"port": 9153,
"targetPort": 9153
}
],
"selector": {
"k8s-app": "kube-dns"
},
"clusterIP": "10.96.0.10",
"type": "ClusterIP",
"sessionAffinity": "None"
},
"status": {
"loadBalancer": {
}
}
}
]
}
`
sls, err := parseServiceList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(sls.Items) != 1 {
t.Fatalf("unexpected length of ServiceList.Items; got %d; want %d", len(sls.Items), 1)
}
service := sls.Items[0]
meta := service.Metadata
if meta.Name != "kube-dns" {
t.Fatalf("unexpected ObjectMeta.Name; got %q; want %q", meta.Name, "kube-dns")
}
expectedLabels := getSortedLabels(map[string]string{
"k8s-app": "kube-dns",
"kubernetes.io/cluster-service": "true",
"kubernetes.io/name": "KubeDNS",
})
if !reflect.DeepEqual(meta.Labels, expectedLabels) {
t.Fatalf("unexpected ObjectMeta.Labels\ngot\n%v\nwant\n%v", meta.Labels, expectedLabels)
}
expectedAnnotations := getSortedLabels(map[string]string{
"prometheus.io/port": "9153",
"prometheus.io/scrape": "true",
})
if !reflect.DeepEqual(meta.Annotations, expectedAnnotations) {
t.Fatalf("unexpected ObjectMeta.Annotations\ngot\n%v\nwant\n%v", meta.Annotations, expectedAnnotations)
}
spec := service.Spec
expectedClusterIP := "10.96.0.10"
if spec.ClusterIP != expectedClusterIP {
t.Fatalf("unexpected clusterIP; got %q; want %q", spec.ClusterIP, expectedClusterIP)
}
if spec.Type != "ClusterIP" {
t.Fatalf("unexpected type; got %q; want %q", spec.Type, "ClusterIP")
}
expectedPorts := []ServicePort{
{
Name: "dns",
Protocol: "UDP",
Port: 53,
},
{
Name: "dns-tcp",
Protocol: "TCP",
Port: 53,
},
{
Name: "metrics",
Protocol: "TCP",
Port: 9153,
},
}
if !reflect.DeepEqual(spec.Ports, expectedPorts) {
t.Fatalf("unexpected ports\ngot\n%v\nwant\n%v", spec.Ports, expectedPorts)
}
// Check service.appendTargetLabels()
labelss := service.appendTargetLabels(nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, getSortedLabels(labels))
}
expectedLabelss := [][]prompbmarshal.Label{
getSortedLabels(map[string]string{
"__address__": "kube-dns.kube-system.svc:53",
"__meta_kubernetes_namespace": "kube-system",
"__meta_kubernetes_service_name": "kube-dns",
"__meta_kubernetes_service_type": "ClusterIP",
"__meta_kubernetes_service_port_name": "dns",
"__meta_kubernetes_service_port_protocol": "UDP",
"__meta_kubernetes_service_cluster_ip": "10.96.0.10",
"__meta_kubernetes_service_label_k8s_app": "kube-dns",
"__meta_kubernetes_service_label_kubernetes_io_cluster_service": "true",
"__meta_kubernetes_service_label_kubernetes_io_name": "KubeDNS",
"__meta_kubernetes_service_labelpresent_k8s_app": "true",
"__meta_kubernetes_service_labelpresent_kubernetes_io_cluster_service": "true",
"__meta_kubernetes_service_labelpresent_kubernetes_io_name": "true",
"__meta_kubernetes_service_annotation_prometheus_io_port": "9153",
"__meta_kubernetes_service_annotation_prometheus_io_scrape": "true",
"__meta_kubernetes_service_annotationpresent_prometheus_io_port": "true",
"__meta_kubernetes_service_annotationpresent_prometheus_io_scrape": "true",
}),
getSortedLabels(map[string]string{
"__address__": "kube-dns.kube-system.svc:53",
"__meta_kubernetes_namespace": "kube-system",
"__meta_kubernetes_service_name": "kube-dns",
"__meta_kubernetes_service_type": "ClusterIP",
"__meta_kubernetes_service_port_name": "dns-tcp",
"__meta_kubernetes_service_port_protocol": "TCP",
"__meta_kubernetes_service_cluster_ip": "10.96.0.10",
"__meta_kubernetes_service_label_k8s_app": "kube-dns",
"__meta_kubernetes_service_label_kubernetes_io_cluster_service": "true",
"__meta_kubernetes_service_label_kubernetes_io_name": "KubeDNS",
"__meta_kubernetes_service_labelpresent_k8s_app": "true",
"__meta_kubernetes_service_labelpresent_kubernetes_io_cluster_service": "true",
"__meta_kubernetes_service_labelpresent_kubernetes_io_name": "true",
"__meta_kubernetes_service_annotation_prometheus_io_port": "9153",
"__meta_kubernetes_service_annotation_prometheus_io_scrape": "true",
"__meta_kubernetes_service_annotationpresent_prometheus_io_port": "true",
"__meta_kubernetes_service_annotationpresent_prometheus_io_scrape": "true",
}),
getSortedLabels(map[string]string{
"__address__": "kube-dns.kube-system.svc:9153",
"__meta_kubernetes_namespace": "kube-system",
"__meta_kubernetes_service_name": "kube-dns",
"__meta_kubernetes_service_type": "ClusterIP",
"__meta_kubernetes_service_port_name": "metrics",
"__meta_kubernetes_service_port_protocol": "TCP",
"__meta_kubernetes_service_cluster_ip": "10.96.0.10",
"__meta_kubernetes_service_label_k8s_app": "kube-dns",
"__meta_kubernetes_service_label_kubernetes_io_cluster_service": "true",
"__meta_kubernetes_service_label_kubernetes_io_name": "KubeDNS",
"__meta_kubernetes_service_labelpresent_k8s_app": "true",
"__meta_kubernetes_service_labelpresent_kubernetes_io_cluster_service": "true",
"__meta_kubernetes_service_labelpresent_kubernetes_io_name": "true",
"__meta_kubernetes_service_annotation_prometheus_io_port": "9153",
"__meta_kubernetes_service_annotation_prometheus_io_scrape": "true",
"__meta_kubernetes_service_annotationpresent_prometheus_io_port": "true",
"__meta_kubernetes_service_annotationpresent_prometheus_io_scrape": "true",
}),
}
if !reflect.DeepEqual(sortedLabelss, expectedLabelss) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss)
}
}

View File

@ -14,8 +14,11 @@ import (
)
var (
fileSDCheckInterval = flag.Duration("promscrape.fileSDCheckInterval", time.Minute, "Interval for checking for changes in 'file_sd_config'. "+
fileSDCheckInterval = flag.Duration("promscrape.fileSDCheckInterval", 30*time.Second, "Interval for checking for changes in 'file_sd_config'. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config")
kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+
"This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details")
promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config for details")
)
@ -58,6 +61,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
}
swsStatic := cfg.getStaticScrapeWork()
swsFileSD := cfg.getFileSDScrapeWork(nil)
swsK8S := cfg.getKubernetesSDScrapeWork()
mustStop := false
for !mustStop {
@ -73,6 +77,11 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
defer wg.Done()
runFileSDScrapers(swsFileSD, cfg, pushData, stopCh)
}()
wg.Add(1)
go func() {
defer wg.Done()
runKubernetesSDScrapers(swsK8S, cfg, pushData, stopCh)
}()
waitForChans:
select {
@ -86,6 +95,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
cfg = cfgNew
swsStatic = cfg.getStaticScrapeWork()
swsFileSD = cfg.getFileSDScrapeWork(swsFileSD)
swsK8S = cfg.getKubernetesSDScrapeWork()
case <-globalStopCh:
mustStop = true
}
@ -114,6 +124,50 @@ func runStaticScrapers(sws []ScrapeWork, pushData func(wr *prompbmarshal.WriteRe
var staticTargets = metrics.NewCounter(`vm_promscrape_targets{type="static"}`)
func runKubernetesSDScrapers(sws []ScrapeWork, cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) {
if cfg.kubernetesSDConfigsCount() == 0 {
return
}
ticker := time.NewTicker(*kubernetesSDCheckInterval)
defer ticker.Stop()
mustStop := false
for !mustStop {
localStopCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func(sws []ScrapeWork) {
defer wg.Done()
logger.Infof("starting %d scrapers for `kubernetes_sd_config` targets", len(sws))
kubernetesSDTargets.Set(uint64(len(sws)))
runScrapeWorkers(sws, pushData, localStopCh)
kubernetesSDTargets.Set(0)
logger.Infof("stopped all the %d scrapers for `kubernetes_sd_config` targets", len(sws))
}(sws)
waitForChans:
select {
case <-ticker.C:
swsNew := cfg.getKubernetesSDScrapeWork()
if equalStaticConfigForScrapeWorks(swsNew, sws) {
// Nothing changed, continue waiting for updated scrape work
goto waitForChans
}
logger.Infof("restarting scrapers for changed `kubernetes_sd_config` targets")
sws = swsNew
case <-stopCh:
mustStop = true
}
close(localStopCh)
wg.Wait()
kubernetesSDReloads.Inc()
}
}
var (
kubernetesSDTargets = metrics.NewCounter(`vm_promscrape_targets{type="kubernetes_sd"}`)
kubernetesSDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="kubernetes_sd"}`)
)
func runFileSDScrapers(sws []ScrapeWork, cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) {
if cfg.fileSDConfigsCount() == 0 {
return