lib/promscrape/discovery/kubernetes: allow attaching node-level labels and annotations to discovered pod targets in the same way as Prometheus 2.35 does

See https://github.com/prometheus/prometheus/issues/9510
and https://github.com/prometheus/prometheus/pull/10080
This commit is contained in:
Aliaksandr Valialkin 2022-04-22 19:39:34 +03:00
parent cc6eae6992
commit a89e31b304
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
9 changed files with 153 additions and 66 deletions

View File

@ -18,6 +18,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow filtering targets by target url and by target labels with [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) on `http://vmagent:8429/targets` page. This may be useful when `vmagent` scrapes big number of targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1796).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce `-promscrape.config` reload duration when the config contains big number of jobs (aka [scrape_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) sections) and only a few of them are changed. Previously all the jobs were restarted. Now only the jobs with changed configs are restarted. This should reduce the probability of data miss because of slow config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2270).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve service discovery speed for big number of scrape targets. This should help when `vmagent` discovers big number of targets (e.g. thousands) in Kubernetes cluster. The service discovery speed now should scale with the number of CPU cores available to `vmagent`.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to attach node-level labels and annotations to discovered Kubernetes pod targets in the same way as Prometheus 2.35 does. See [this feature request](https://github.com/prometheus/prometheus/issues/9510) and [this pull request](https://github.com/prometheus/prometheus/pull/10080).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add support for DNS-based discovery for notifiers in the same way as Prometheus does. See [these docs](https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2460).
* FEATURE: allow specifying TLS cipher suites for incoming https requests via `-tlsCipherSuites` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2404).
* FEATURE: allow specifying TLS cipher suites for mTLS connections between cluster components via `-cluster.tlsCipherSuites` command-line flag. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection).

View File

@ -74,8 +74,9 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc
}
}
selectors := sdc.Selectors
attachNodeMetadata := sdc.AttachMetadata.Node
proxyURL := sdc.ProxyURL.URL()
gw := getGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL)
gw := getGroupWatcher(apiServer, ac, namespaces, selectors, attachNodeMetadata, proxyURL)
role := sdc.role()
return &apiWatcher{
role: role,
@ -163,7 +164,7 @@ func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
}
// groupWatcher watches for Kubernetes objects on the given apiServer with the given namespaces,
// selectors using the given client.
// selectors and attachNodeMetadata using the given client.
type groupWatcher struct {
// Old Kubernetes doesn't support /apis/networking.k8s.io/v1/, so /apis/networking.k8s.io/v1beta1/ must be used instead.
// This flag is used for automatic substitution of v1 API path with v1beta1 API path during requests to apiServer.
@ -176,6 +177,8 @@ type groupWatcher struct {
apiServer string
namespaces []string
selectors []Selector
attachNodeMetadata bool
getAuthHeader func() string
client *http.Client
@ -183,7 +186,7 @@ type groupWatcher struct {
m map[string]*urlWatcher
}
func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher {
func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) *groupWatcher {
var proxy func(*http.Request) (*url.URL, error)
if proxyURL != nil {
proxy = http.ProxyURL(proxyURL)
@ -200,25 +203,27 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
}
return &groupWatcher{
apiServer: apiServer,
getAuthHeader: ac.GetAuthHeader,
namespaces: namespaces,
selectors: selectors,
attachNodeMetadata: attachNodeMetadata,
getAuthHeader: ac.GetAuthHeader,
client: client,
m: make(map[string]*urlWatcher),
}
}
func getGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher {
func getGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) *groupWatcher {
proxyURLStr := "<nil>"
if proxyURL != nil {
proxyURLStr = proxyURL.String()
}
key := fmt.Sprintf("apiServer=%s, namespaces=%s, selectors=%s, proxyURL=%s, authConfig=%s",
apiServer, namespaces, selectorsKey(selectors), proxyURLStr, ac.String())
key := fmt.Sprintf("apiServer=%s, namespaces=%s, selectors=%s, attachNodeMetadata=%v, proxyURL=%s, authConfig=%s",
apiServer, namespaces, selectorsKey(selectors), attachNodeMetadata, proxyURLStr, ac.String())
groupWatchersLock.Lock()
gw := groupWatchers[key]
if gw == nil {
gw = newGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL)
gw = newGroupWatcher(apiServer, ac, namespaces, selectors, attachNodeMetadata, proxyURL)
groupWatchers[key] = gw
}
groupWatchersLock.Unlock()
@ -246,9 +251,9 @@ var (
)
func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) object {
if gw == nil {
// this is needed for testing
return nil
if role == "node" {
// Node objects have no namespace
namespace = ""
}
key := namespace + "/" + name
for _, uw := range gw.m {
@ -256,7 +261,7 @@ func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) obje
// Role mismatch
continue
}
if uw.namespace != "" && uw.namespace != namespace {
if namespace != "" && uw.namespace != "" && uw.namespace != namespace {
// Namespace mismatch
continue
}
@ -273,6 +278,9 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
gw.startWatchersForRole("pod", nil)
gw.startWatchersForRole("service", nil)
}
if gw.attachNodeMetadata && role == "pod" {
gw.startWatchersForRole("node", nil)
}
paths := getAPIPathsWithNamespaces(role, gw.namespaces, gw.selectors)
for _, path := range paths {
apiURL := gw.apiServer + path
@ -290,8 +298,10 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
if needStart {
uw.reloadObjects()
go uw.watchForUpdates()
if role == "endpoints" || role == "endpointslice" {
// Refresh endpoints and enpointslices targets in background, since they depend on other object types such as pod and service.
if role == "endpoints" || role == "endpointslice" || (gw.attachNodeMetadata && role == "pod") {
// Refresh targets in background, since they depend on other object types such as pod, service or node.
// This should guarantee that the ScrapeWork objects for these objects are properly updated
// as soon as the objects they depend on are updated.
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 .
go func() {
sleepTime := 20 * time.Second
@ -562,7 +572,7 @@ func (uw *urlWatcher) reloadObjects() string {
uw.updateScrapeWorksLocked(objectsAdded, uw.aws)
uw.needUpdateScrapeWorks = false
if len(objectsRemoved) > 0 || len(objectsUpdated) > 0 || len(objectsAdded) > 0 {
uw.maybeUpdateEndpointsScrapeWorksLocked()
uw.maybeUpdateDependedScrapeWorksLocked()
}
uw.gw.mu.Unlock()
@ -711,7 +721,7 @@ func (uw *urlWatcher) updateObjectLocked(key string, o object) {
aw.setScrapeWorks(uw, key, labels)
}
}
uw.maybeUpdateEndpointsScrapeWorksLocked()
uw.maybeUpdateDependedScrapeWorksLocked()
}
func (uw *urlWatcher) removeObjectLocked(key string) {
@ -724,23 +734,32 @@ func (uw *urlWatcher) removeObjectLocked(key string) {
for aw := range uw.aws {
aw.removeScrapeWorks(uw, key)
}
uw.maybeUpdateEndpointsScrapeWorksLocked()
uw.maybeUpdateDependedScrapeWorksLocked()
}
func (uw *urlWatcher) maybeUpdateEndpointsScrapeWorksLocked() {
if uw.role != "pod" && uw.role != "service" {
func (uw *urlWatcher) maybeUpdateDependedScrapeWorksLocked() {
role := uw.role
attachNodeMetadata := uw.gw.attachNodeMetadata
if !(role == "pod" || role == "service" || (attachNodeMetadata && role == "node")) {
// Nothing to update
return
}
namespace := uw.namespace
for _, uwx := range uw.gw.m {
if uwx.role != "endpoints" && uwx.role != "endpointslice" {
continue
}
if uwx.namespace != "" && uwx.namespace != namespace {
if namespace != "" && uwx.namespace != "" && uwx.namespace != namespace {
// Namespace mismatch
continue
}
if (role == "pod" || role == "service") && (uwx.role == "endpoints" || uwx.role == "endpointslice") {
// endpoints and endpointslice objects depend on pods and service objects
uwx.needUpdateScrapeWorks = true
continue
}
if attachNodeMetadata && role == "node" && uwx.role == "pod" {
// pod objects depend on node objects if attachNodeMetadata is set
uwx.needUpdateScrapeWorks = true
continue
}
}
}

View File

@ -132,7 +132,11 @@ func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []map[string]string {
m := map[string]string{
"__address__": addr,
}
p.appendCommonLabels(m)
if !p.appendCommonLabels(m, gw) {
// The corresponding node is filtered out with label or field selectors.
// Do not generate endpoint labels in this case.
continue
}
p.appendContainerLabels(m, c, &cp)
if svc != nil {
svc.appendCommonLabels(m)
@ -153,13 +157,16 @@ func appendEndpointLabelsForAddresses(ms []map[string]string, gw *groupWatcher,
p = o.(*Pod)
}
}
m := getEndpointLabelsForAddressAndPort(podPortsSeen, eps, ea, epp, p, svc, ready)
m := getEndpointLabelsForAddressAndPort(gw, podPortsSeen, eps, ea, epp, p, svc, ready)
if m != nil {
ms = append(ms, m)
}
}
return ms
}
func getEndpointLabelsForAddressAndPort(podPortsSeen map[*Pod][]int, eps *Endpoints, ea EndpointAddress, epp EndpointPort, p *Pod, svc *Service, ready string) map[string]string {
func getEndpointLabelsForAddressAndPort(gw *groupWatcher, podPortsSeen map[*Pod][]int, eps *Endpoints, ea EndpointAddress, epp EndpointPort,
p *Pod, svc *Service, ready string) map[string]string {
m := getEndpointLabels(eps.Metadata, ea, epp, ready)
if svc != nil {
svc.appendCommonLabels(m)
@ -169,7 +176,11 @@ func getEndpointLabelsForAddressAndPort(podPortsSeen map[*Pod][]int, eps *Endpoi
if ea.TargetRef.Kind != "Pod" || p == nil {
return m
}
p.appendCommonLabels(m)
if !p.appendCommonLabels(m, gw) {
// The corresponding node is filtered out with label or field selectors.
// Do not generate endpoint labels in this case.
return nil
}
// always add pod targetRef, even if epp port doesn't match container port
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2134
if _, ok := podPortsSeen[p]; !ok {

View File

@ -56,7 +56,7 @@ func TestParseEndpointsListSuccess(t *testing.T) {
"addresses": [
{
"hostname": "aaa.bbb",
"nodeName": "foobar",
"nodeName": "test-node",
"ip": "172.17.0.2",
"targetRef": {
"kind": "Pod",
@ -97,7 +97,7 @@ func TestParseEndpointsListSuccess(t *testing.T) {
"__meta_kubernetes_endpoint_address_target_kind": "Pod",
"__meta_kubernetes_endpoint_address_target_name": "coredns-6955765f44-lnp6t",
"__meta_kubernetes_endpoint_hostname": "aaa.bbb",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_node_name": "test-node",
"__meta_kubernetes_endpoint_port_name": "https",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",

View File

@ -51,7 +51,10 @@ func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string
}
for _, epp := range eps.Ports {
for _, addr := range ess.Addresses {
ms = append(ms, getEndpointSliceLabelsForAddressAndPort(podPortsSeen, addr, eps, ess, epp, p, svc))
m := getEndpointSliceLabelsForAddressAndPort(gw, podPortsSeen, addr, eps, ess, epp, p, svc)
if m != nil {
ms = append(ms, m)
}
}
}
@ -76,7 +79,11 @@ func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string
m := map[string]string{
"__address__": addr,
}
p.appendCommonLabels(m)
if !p.appendCommonLabels(m, gw) {
// The corresponding node is filtered out with label or field selectors.
// Do not generate endpointslice labels in this case.
continue
}
p.appendContainerLabels(m, c, &cp)
if svc != nil {
svc.appendCommonLabels(m)
@ -93,7 +100,8 @@ func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string
// enriches labels with TargetRef
// p appended to seen Ports
// if TargetRef matches
func getEndpointSliceLabelsForAddressAndPort(podPortsSeen map[*Pod][]int, addr string, eps *EndpointSlice, ea Endpoint, epp EndpointPort, p *Pod, svc *Service) map[string]string {
func getEndpointSliceLabelsForAddressAndPort(gw *groupWatcher, podPortsSeen map[*Pod][]int, addr string, eps *EndpointSlice, ea Endpoint, epp EndpointPort,
p *Pod, svc *Service) map[string]string {
m := getEndpointSliceLabels(eps, addr, ea, epp)
if svc != nil {
svc.appendCommonLabels(m)
@ -108,7 +116,11 @@ func getEndpointSliceLabelsForAddressAndPort(podPortsSeen map[*Pod][]int, addr s
if _, ok := podPortsSeen[p]; !ok {
podPortsSeen[p] = []int{}
}
p.appendCommonLabels(m)
if !p.appendCommonLabels(m, gw) {
// The corresponding node is filtered out with label or field selectors.
// Do not generate endpointslice labels in this case.
return nil
}
for _, c := range p.Spec.Containers {
for _, cp := range c.Ports {
if cp.ContainerPort == epp.Port {

View File

@ -27,6 +27,7 @@ type SDConfig struct {
ProxyURL *proxy.URL `yaml:"proxy_url,omitempty"`
Namespaces Namespaces `yaml:"namespaces,omitempty"`
Selectors []Selector `yaml:"selectors,omitempty"`
AttachMetadata AttachMetadata `yaml:"attach_metadata,omitempty"`
cfg *apiConfig
startErr error
@ -41,6 +42,13 @@ func (sdc *SDConfig) role() string {
return sdc.Role
}
// AttachMetadata represents `attach_metadata` option at `kuberentes_sd_config`.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
type AttachMetadata struct {
Node bool `yaml:"node"`
}
// Namespaces represents namespaces for SDConfig
type Namespaces struct {
OwnNamespace bool `yaml:"own_namespace"`

View File

@ -289,9 +289,28 @@ func TestParseNodeListSuccess(t *testing.T) {
}
func getSortedLabelss(objectsByKey map[string]object) [][]prompbmarshal.Label {
var gw groupWatcher
gw.m = map[string]*urlWatcher{
"node": {
role: "node",
objectsByKey: map[string]object{
"/test-node": &Node{
Metadata: ObjectMeta{
Labels: []prompbmarshal.Label{
{
Name: "node-label",
Value: "xyz",
},
},
},
},
},
},
}
gw.attachNodeMetadata = true
var result [][]prompbmarshal.Label
for _, o := range objectsByKey {
labelss := o.getTargetLabels(nil)
labelss := o.getTargetLabels(&gw)
for _, labels := range labelss {
result = append(result, discoveryutils.GetSortedLabels(labels))
}

View File

@ -103,26 +103,24 @@ func (p *Pod) getTargetLabels(gw *groupWatcher) []map[string]string {
return nil
}
var ms []map[string]string
ms = appendPodLabels(ms, p, p.Spec.Containers, "false")
ms = appendPodLabels(ms, p, p.Spec.InitContainers, "true")
ms = appendPodLabels(ms, gw, p, p.Spec.Containers, "false")
ms = appendPodLabels(ms, gw, p, p.Spec.InitContainers, "true")
return ms
}
func appendPodLabels(ms []map[string]string, p *Pod, cs []Container, isInit string) []map[string]string {
func appendPodLabels(ms []map[string]string, gw *groupWatcher, p *Pod, cs []Container, isInit string) []map[string]string {
for _, c := range cs {
for _, cp := range c.Ports {
m := getPodLabels(p, c, &cp, isInit)
ms = append(ms, m)
ms = appendPodLabelsInternal(ms, gw, p, c, &cp, isInit)
}
if len(c.Ports) == 0 {
m := getPodLabels(p, c, nil, isInit)
ms = append(ms, m)
ms = appendPodLabelsInternal(ms, gw, p, c, nil, isInit)
}
}
return ms
}
func getPodLabels(p *Pod, c Container, cp *ContainerPort, isInit string) map[string]string {
func appendPodLabelsInternal(ms []map[string]string, gw *groupWatcher, p *Pod, c Container, cp *ContainerPort, isInit string) []map[string]string {
addr := p.Status.PodIP
if cp != nil {
addr = discoveryutils.JoinHostPort(addr, cp.ContainerPort)
@ -131,9 +129,13 @@ func getPodLabels(p *Pod, c Container, cp *ContainerPort, isInit string) map[str
"__address__": addr,
"__meta_kubernetes_pod_container_init": isInit,
}
p.appendCommonLabels(m)
if !p.appendCommonLabels(m, gw) {
// The corresponding node is filtered out with label or field selectors.
// Do not generate pod labels in this case.
return ms
}
p.appendContainerLabels(m, c, cp)
return m
return append(ms, m)
}
func (p *Pod) appendContainerLabels(m map[string]string, c Container, cp *ContainerPort) {
@ -145,7 +147,18 @@ func (p *Pod) appendContainerLabels(m map[string]string, c Container, cp *Contai
}
}
func (p *Pod) appendCommonLabels(m map[string]string) {
func (p *Pod) appendCommonLabels(m map[string]string, gw *groupWatcher) bool {
if gw.attachNodeMetadata {
o := gw.getObjectByRoleLocked("node", p.Metadata.Namespace, p.Spec.NodeName)
if o == nil {
// The node associated with the pod is filtered out with label or field selectors,
// so do not generate labels for the pod.
return false
}
n := o.(*Node)
m["__meta_kubernetes_node_name"] = p.Spec.NodeName
n.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_node", m)
}
m["__meta_kubernetes_pod_name"] = p.Metadata.Name
m["__meta_kubernetes_pod_ip"] = p.Status.PodIP
m["__meta_kubernetes_pod_ready"] = getPodReadyStatus(p.Status.Conditions)
@ -163,6 +176,7 @@ func (p *Pod) appendCommonLabels(m map[string]string) {
}
}
p.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_pod", m)
return true
}
func getPodController(ors []OwnerReference) *OwnerReference {

View File

@ -146,7 +146,7 @@ func TestParsePodListSuccess(t *testing.T) {
"restartPolicy": "Always",
"terminationGracePeriodSeconds": 30,
"dnsPolicy": "ClusterFirst",
"nodeName": "m01",
"nodeName": "test-node",
"hostNetwork": true,
"securityContext": {
@ -244,6 +244,9 @@ func TestParsePodListSuccess(t *testing.T) {
"__address__": "172.17.0.2:1234",
"__meta_kubernetes_namespace": "kube-system",
"__meta_kubernetes_node_label_node_label": "xyz",
"__meta_kubernetes_node_labelpresent_node_label": "true",
"__meta_kubernetes_node_name": "test-node",
"__meta_kubernetes_pod_name": "etcd-m01",
"__meta_kubernetes_pod_ip": "172.17.0.2",
"__meta_kubernetes_pod_container_name": "etcd",
@ -252,7 +255,7 @@ func TestParsePodListSuccess(t *testing.T) {
"__meta_kubernetes_pod_container_port_protocol": "TCP",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_phase": "Running",
"__meta_kubernetes_pod_node_name": "m01",
"__meta_kubernetes_pod_node_name": "test-node",
"__meta_kubernetes_pod_host_ip": "172.17.0.2",
"__meta_kubernetes_pod_uid": "9d328156-75d1-411a-bdd0-aeacb53a38de",
"__meta_kubernetes_pod_controller_kind": "Node",