mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-11 20:52:24 +01:00
1e364c992d
Already terminated pods and containers cannot be scraped and will never resurrect, so there is zero sense in creating scrape targets for them.
221 lines
7.1 KiB
Go
221 lines
7.1 KiB
Go
package kubernetes
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
)
|
|
|
|
func (eps *Endpoints) key() string {
|
|
return eps.Metadata.key()
|
|
}
|
|
|
|
func parseEndpointsList(r io.Reader) (map[string]object, ListMeta, error) {
|
|
var epsl EndpointsList
|
|
d := json.NewDecoder(r)
|
|
if err := d.Decode(&epsl); err != nil {
|
|
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointsList: %w", err)
|
|
}
|
|
objectsByKey := make(map[string]object)
|
|
for _, eps := range epsl.Items {
|
|
objectsByKey[eps.key()] = eps
|
|
}
|
|
return objectsByKey, epsl.Metadata, nil
|
|
}
|
|
|
|
func parseEndpoints(data []byte) (object, error) {
|
|
var eps Endpoints
|
|
if err := json.Unmarshal(data, &eps); err != nil {
|
|
return nil, err
|
|
}
|
|
return &eps, nil
|
|
}
|
|
|
|
// EndpointsList implements k8s endpoints list.
|
|
//
|
|
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointslist-v1-core
|
|
type EndpointsList struct {
|
|
Metadata ListMeta
|
|
Items []*Endpoints
|
|
}
|
|
|
|
// Endpoints implements k8s endpoints.
|
|
//
|
|
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpoints-v1-core
|
|
type Endpoints struct {
|
|
Metadata ObjectMeta
|
|
Subsets []EndpointSubset
|
|
}
|
|
|
|
// EndpointSubset implements k8s endpoint subset.
|
|
//
|
|
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointsubset-v1-core
|
|
type EndpointSubset struct {
|
|
Addresses []EndpointAddress
|
|
NotReadyAddresses []EndpointAddress
|
|
Ports []EndpointPort
|
|
}
|
|
|
|
// EndpointAddress implements k8s endpoint address.
|
|
//
|
|
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointaddress-v1-core
|
|
type EndpointAddress struct {
|
|
Hostname string
|
|
IP string
|
|
NodeName string
|
|
TargetRef ObjectReference
|
|
}
|
|
|
|
// ObjectReference implements k8s object reference.
|
|
//
|
|
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#objectreference-v1-core
|
|
type ObjectReference struct {
|
|
Kind string
|
|
Name string
|
|
Namespace string
|
|
}
|
|
|
|
// EndpointPort implements k8s endpoint port.
|
|
//
|
|
// See https://v1-21.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#endpointport-v1-discovery-k8s-io
|
|
type EndpointPort struct {
|
|
AppProtocol string
|
|
Name string
|
|
Port int
|
|
Protocol string
|
|
}
|
|
|
|
// getTargetLabels returns labels for each endpoint in eps.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints
|
|
func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []*promutils.Labels {
|
|
var svc *Service
|
|
if o := gw.getObjectByRoleLocked("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
|
|
svc = o.(*Service)
|
|
}
|
|
podPortsSeen := make(map[*Pod][]int)
|
|
var ms []*promutils.Labels
|
|
for _, ess := range eps.Subsets {
|
|
for _, epp := range ess.Ports {
|
|
ms = appendEndpointLabelsForAddresses(ms, gw, podPortsSeen, eps, ess.Addresses, epp, svc, "true")
|
|
ms = appendEndpointLabelsForAddresses(ms, gw, podPortsSeen, eps, ess.NotReadyAddresses, epp, svc, "false")
|
|
}
|
|
}
|
|
// See https://kubernetes.io/docs/reference/labels-annotations-taints/#endpoints-kubernetes-io-over-capacity
|
|
// and https://github.com/kubernetes/kubernetes/pull/99975
|
|
switch eps.Metadata.Annotations.Get("endpoints.kubernetes.io/over-capacity") {
|
|
case "truncated":
|
|
logger.Warnf(`the number of targets for "role: endpoints" %q exceeds 1000 and has been truncated; please use "role: endpointslice" instead`, eps.Metadata.key())
|
|
case "warning":
|
|
logger.Warnf(`the number of targets for "role: endpoints" %q exceeds 1000 and will be truncated in the next k8s releases; please use "role: endpointslice" instead`, eps.Metadata.key())
|
|
}
|
|
|
|
// Append labels for skipped ports on seen pods.
|
|
portSeen := func(port int, ports []int) bool {
|
|
for _, p := range ports {
|
|
if p == port {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
for p, ports := range podPortsSeen {
|
|
for _, c := range p.Spec.Containers {
|
|
for _, cp := range c.Ports {
|
|
if portSeen(cp.ContainerPort, ports) {
|
|
continue
|
|
}
|
|
addr := discoveryutils.JoinHostPort(p.Status.PodIP, cp.ContainerPort)
|
|
m := promutils.GetLabels()
|
|
m.Add("__address__", addr)
|
|
p.appendCommonLabels(m, gw)
|
|
p.appendContainerLabels(m, &c, &cp)
|
|
|
|
// Prometheus sets endpoints_name and namespace labels for all endpoints
|
|
// Even if port is not matching service port.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4154
|
|
p.appendEndpointLabels(m, eps)
|
|
if svc != nil {
|
|
svc.appendCommonLabels(m)
|
|
}
|
|
// Remove possible duplicate labels, which can appear after appendCommonLabels() call
|
|
m.RemoveDuplicates()
|
|
ms = append(ms, m)
|
|
}
|
|
}
|
|
}
|
|
return ms
|
|
}
|
|
|
|
func appendEndpointLabelsForAddresses(ms []*promutils.Labels, gw *groupWatcher, podPortsSeen map[*Pod][]int, eps *Endpoints,
|
|
eas []EndpointAddress, epp EndpointPort, svc *Service, ready string) []*promutils.Labels {
|
|
for _, ea := range eas {
|
|
var p *Pod
|
|
if ea.TargetRef.Name != "" {
|
|
if o := gw.getObjectByRoleLocked("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil {
|
|
p = o.(*Pod)
|
|
}
|
|
}
|
|
m := getEndpointLabelsForAddressAndPort(gw, podPortsSeen, eps, ea, epp, p, svc, ready)
|
|
// Remove possible duplicate labels, which can appear inside getEndpointLabelsForAddressAndPort()
|
|
m.RemoveDuplicates()
|
|
ms = append(ms, m)
|
|
}
|
|
return ms
|
|
}
|
|
|
|
func getEndpointLabelsForAddressAndPort(gw *groupWatcher, podPortsSeen map[*Pod][]int, eps *Endpoints, ea EndpointAddress, epp EndpointPort,
|
|
p *Pod, svc *Service, ready string) *promutils.Labels {
|
|
m := getEndpointLabels(eps.Metadata, ea, epp, ready)
|
|
if svc != nil {
|
|
svc.appendCommonLabels(m)
|
|
}
|
|
// See https://github.com/prometheus/prometheus/issues/10284
|
|
eps.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_endpoints", m)
|
|
if ea.TargetRef.Kind != "Pod" || p == nil {
|
|
return m
|
|
}
|
|
p.appendCommonLabels(m, gw)
|
|
// always add pod targetRef, even if epp port doesn't match container port
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2134
|
|
if _, ok := podPortsSeen[p]; !ok {
|
|
podPortsSeen[p] = []int{}
|
|
}
|
|
for _, c := range p.Spec.Containers {
|
|
for _, cp := range c.Ports {
|
|
if cp.ContainerPort == epp.Port {
|
|
podPortsSeen[p] = append(podPortsSeen[p], cp.ContainerPort)
|
|
p.appendContainerLabels(m, &c, &cp)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return m
|
|
}
|
|
|
|
func getEndpointLabels(om ObjectMeta, ea EndpointAddress, epp EndpointPort, ready string) *promutils.Labels {
|
|
addr := discoveryutils.JoinHostPort(ea.IP, epp.Port)
|
|
m := promutils.GetLabels()
|
|
m.Add("__address__", addr)
|
|
m.Add("__meta_kubernetes_namespace", om.Namespace)
|
|
m.Add("__meta_kubernetes_endpoints_name", om.Name)
|
|
m.Add("__meta_kubernetes_endpoint_ready", ready)
|
|
m.Add("__meta_kubernetes_endpoint_port_name", epp.Name)
|
|
m.Add("__meta_kubernetes_endpoint_port_protocol", epp.Protocol)
|
|
if ea.TargetRef.Kind != "" {
|
|
m.Add("__meta_kubernetes_endpoint_address_target_kind", ea.TargetRef.Kind)
|
|
m.Add("__meta_kubernetes_endpoint_address_target_name", ea.TargetRef.Name)
|
|
}
|
|
if ea.NodeName != "" {
|
|
m.Add("__meta_kubernetes_endpoint_node_name", ea.NodeName)
|
|
}
|
|
if ea.Hostname != "" {
|
|
m.Add("__meta_kubernetes_endpoint_hostname", ea.Hostname)
|
|
}
|
|
return m
|
|
}
|