lib/promscrape: cleanup after 9b2246c29b

Main points:

* Revert changes outside lib/promscrape/discovery/kuberntes . These changes can be applied later in a separate commit
* Minimize changes in lib/promscrape/discovery/kubernetes compared to a93e644001
* Corner case fixes.
This commit is contained in:
Aliaksandr Valialkin 2021-02-26 16:54:03 +02:00
parent cf9262b01f
commit dc8c045378
23 changed files with 1166 additions and 1527 deletions

View File

@ -11,6 +11,7 @@
* `process_io_write_syscalls_total` - the number of write syscalls such as write and pwrite
* `process_io_storage_read_bytes_total` - the number of bytes read from storage layer
* `process_io_storage_written_bytes_total` - the number of bytes written to storage layer
* FEATURE: vmagent: use watch API for Kuberntes service discovery. This should reduce load on Kuberntes API server when it tracks big number of objects (for example, 10K pods). This should also reduce the time needed for k8s targets discovery.
* FEATURE: vmagent: export `vm_promscrape_target_relabel_duration_seconds` metric, which can be used for monitoring the time spend on relabeling for discovered targets.
* FEATURE: vmagent: optimize [relabeling](https://victoriametrics.github.io/vmagent.html#relabeling) performance for common cases.
* FEATURE: add `increase_pure(m[d])` function to MetricsQL. It works the same as `increase(m[d])` except of various edge cases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962) for details.

View File

@ -48,8 +48,6 @@ type Config struct {
// This is set to the directory from where the config has been loaded.
baseDir string
// used for data sync with kubernetes.
kwh *kubernetesWatchHandler
}
// GlobalConfig represents essential parts for `global` section of Prometheus config.
@ -141,7 +139,6 @@ func loadConfig(path string) (cfg *Config, data []byte, err error) {
if err := cfgObj.parse(data, path); err != nil {
return nil, nil, fmt.Errorf("cannot parse Prometheus config from %q: %w", path, err)
}
cfgObj.kwh = newKubernetesWatchHandler()
return &cfgObj, data, nil
}
@ -190,41 +187,30 @@ func getSWSByJob(sws []*ScrapeWork) map[string][]*ScrapeWork {
}
// getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg.
func getKubernetesSDScrapeWorkStream(cfg *Config, prev []*ScrapeWork) []*ScrapeWork {
cfg.kwh.startOnce.Do(func() {
go processKubernetesSyncEvents(cfg)
})
func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
// updated access time.
cfg.kwh.mu.Lock()
cfg.kwh.lastAccessTime = time.Now()
cfg.kwh.mu.Unlock()
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.KubernetesSDConfigs {
// generate set name
setKey := fmt.Sprintf("%d/%d/%s", i, j, sc.JobName)
cfg.kwh.mu.Lock()
cfg.kwh.sdcSet[setKey] = sc.swc
cfg.kwh.mu.Unlock()
sdc := &sc.KubernetesSDConfigs[j]
ms, err := kubernetes.StartWatchOnce(cfg.kwh.watchCfg, setKey, sdc, cfg.baseDir)
if err != nil {
logger.Errorf("got unexpected error: %v", err)
}
dst = appendScrapeWorkForTargetLabels(dst, sc.swc, ms, "kubernetes_sd_config")
var okLocal bool
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "kubernetes_sd_config")
if ok {
ok = okLocal
}
}
// dst will
if len(dst) > 0 {
return dst
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering kubernetes targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
// result from cache
cfg.kwh.mu.Lock()
for _, v := range cfg.kwh.swCache {
dst = append(dst, v...)
}
cfg.kwh.mu.Unlock()
return dst
}

View File

@ -1,42 +1,497 @@
package kubernetes
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 2*time.Minute, "Timeout for requests to Kuberntes API server")
// apiConfig contains config for API server
type apiConfig struct {
setName string
namespaces []string
selectors []Selector
wc *watchClient
targetChan chan SyncEvent
watchOnce sync.Once
aw *apiWatcher
}
var configMap = discoveryutils.NewConfigMap()
func getAPIConfig(watchCfg *WatchConfig, setName string, sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(watchCfg, setName, sdc, baseDir) })
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
func newAPIConfig(watchCfg *WatchConfig, setName string, sdc *SDConfig, baseDir string) (*apiConfig, error) {
wc, err := newWatchClient(watchCfg.WG, sdc, baseDir)
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot parse auth config: %w", err)
}
apiServer := sdc.APIServer
if len(apiServer) == 0 {
// Assume we run at k8s pod.
// Discover apiServer and auth config according to k8s docs.
// See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller
host := os.Getenv("KUBERNETES_SERVICE_HOST")
port := os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 {
return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " +
"probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?")
}
if len(port) == 0 {
return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_PORT env var; it must be defined when running in k8s; "+
"KUBERNETES_SERVICE_HOST=%q", host)
}
apiServer = "https://" + net.JoinHostPort(host, port)
tlsConfig := promauth.TLSConfig{
CAFile: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
}
acNew, err := promauth.NewConfig(".", nil, "", "/var/run/secrets/kubernetes.io/serviceaccount/token", &tlsConfig)
if err != nil {
return nil, fmt.Errorf("cannot initialize service account auth: %w; probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?", err)
}
ac = acNew
}
if !strings.Contains(apiServer, "://") {
proto := "http"
if sdc.TLSConfig != nil {
proto = "https"
}
apiServer = proto + "://" + apiServer
}
for strings.HasSuffix(apiServer, "/") {
apiServer = apiServer[:len(apiServer)-1]
}
var proxy func(*http.Request) (*url.URL, error)
if proxyURL := sdc.ProxyURL.URL(); proxyURL != nil {
proxy = http.ProxyURL(proxyURL)
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: ac.NewTLSConfig(),
Proxy: proxy,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: *apiServerTimeout,
},
Timeout: *apiServerTimeout,
}
aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors)
cfg := &apiConfig{
setName: setName,
targetChan: watchCfg.WatchChan,
wc: wc,
namespaces: sdc.Namespaces.Names,
selectors: sdc.Selectors,
aw: aw,
}
return cfg, nil
}
// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
type WatchEvent struct {
Type string
Object json.RawMessage
}
// object is any Kubernetes object.
type object interface {
key() string
}
// parseObjectFunc must parse object from the given data.
type parseObjectFunc func(data []byte) (object, error)
// parseObjectListFunc must parse objectList from the given data.
type parseObjectListFunc func(data []byte) (map[string]object, ListMeta, error)
// apiWatcher is used for watching for Kuberntes object changes and caching their latest states.
type apiWatcher struct {
// The client used for watching for object changes
client *http.Client
// Kubenetes API server address in the form http://api-server
apiServer string
// The contents for `Authorization` HTTP request header
authorization string
// Namespaces to watch
namespaces []string
// Selectors to apply during watch
selectors []Selector
// mu protects watchersByURL and lastAccessTime
mu sync.Mutex
// a map of watchers keyed by request paths
watchersByURL map[string]*urlWatcher
// The last time the apiWatcher was queried for cached objects.
// It is used for stopping unused watchers.
lastAccessTime time.Time
}
func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector) *apiWatcher {
return &apiWatcher{
apiServer: apiServer,
authorization: authorization,
client: client,
namespaces: namespaces,
selectors: selectors,
watchersByURL: make(map[string]*urlWatcher),
lastAccessTime: time.Now(),
}
}
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
if aw == nil {
return nil
}
key := namespace + "/" + name
aw.startWatchersForRole(role)
var o object
aw.mu.Lock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
uw.mu.Lock()
o = uw.objectsByKey[key]
uw.mu.Unlock()
if o != nil {
break
}
}
aw.lastAccessTime = time.Now()
aw.mu.Unlock()
return o
}
// getObjectsByRole returns all the objects for the given role.
func (aw *apiWatcher) getObjectsByRole(role string) []object {
aw.startWatchersForRole(role)
var os []object
aw.mu.Lock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
uw.mu.Lock()
for _, o := range uw.objectsByKey {
os = append(os, o)
}
uw.mu.Unlock()
}
aw.lastAccessTime = time.Now()
aw.mu.Unlock()
return os
}
func (aw *apiWatcher) startWatchersForRole(role string) {
parseObject, parseObjectList := getObjectParsersForRole(role)
paths := getAPIPaths(role, aw.namespaces, aw.selectors)
for _, path := range paths {
apiURL := aw.apiServer + path
aw.startWatcherForURL(role, apiURL, parseObject, parseObjectList)
}
}
func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) {
aw.mu.Lock()
defer aw.mu.Unlock()
if aw.watchersByURL[apiURL] != nil {
// Watcher for the given path already exists.
return
}
uw := aw.newURLWatcher(role, apiURL, parseObject, parseObjectList)
resourceVersion := uw.reloadObjects()
aw.watchersByURL[apiURL] = uw
go func() {
uw.watchForUpdates(resourceVersion)
aw.mu.Lock()
delete(aw.watchersByURL, apiURL)
aw.mu.Unlock()
}()
}
// needStop returns true if aw wasn't used for long time.
func (aw *apiWatcher) needStop() bool {
aw.mu.Lock()
defer aw.mu.Unlock()
return time.Since(aw.lastAccessTime) > 5*time.Minute
}
// doRequest performs http request to the given requestURL.
func (aw *apiWatcher) doRequest(requestURL string) (*http.Response, error) {
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
}
if aw.authorization != "" {
req.Header.Set("Authorization", aw.authorization)
}
return aw.client.Do(req)
}
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
type urlWatcher struct {
role string
apiURL string
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// mu protects objectsByKey
mu sync.Mutex
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
// the parent apiWatcher
aw *apiWatcher
}
func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher {
return &urlWatcher{
role: role,
apiURL: apiURL,
parseObject: parseObject,
parseObjectList: parseObjectList,
objectsByKey: make(map[string]object),
aw: aw,
}
}
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
func (uw *urlWatcher) reloadObjects() string {
requestURL := uw.apiURL
resp, err := uw.aw.doRequest(requestURL)
if err != nil {
logger.Errorf("error when performing a request to %q: %s", requestURL, err)
return ""
}
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
return ""
}
objectsByKey, metadata, err := uw.parseObjectList(body)
if err != nil {
logger.Errorf("cannot parse response from %q: %s", requestURL, err)
return ""
}
uw.mu.Lock()
uw.objectsByKey = objectsByKey
uw.mu.Unlock()
return metadata.ResourceVersion
}
// watchForUpdates watches for object updates starting from resourceVersion and updates the corresponding objects to the latest state.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
func (uw *urlWatcher) watchForUpdates(resourceVersion string) {
aw := uw.aw
backoffDelay := time.Second
maxBackoffDelay := 30 * time.Second
backoffSleep := func() {
time.Sleep(backoffDelay)
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
}
apiURL := uw.apiURL
delimiter := "?"
if strings.Contains(apiURL, "?") {
delimiter = "&"
}
timeoutSeconds := time.Duration(0.9 * float64(aw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
logger.Infof("started watcher for %q", apiURL)
for {
if aw.needStop() {
logger.Infof("stopped unused watcher for %q", apiURL)
return
}
requestURL := apiURL
if resourceVersion != "" {
requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion) + "&resourceVersionMatch=NotOlderThan"
}
resp, err := aw.doRequest(requestURL)
if err != nil {
logger.Errorf("error when performing a request to %q: %s", requestURL, err)
backoffSleep()
// There is no sense in reloading resources on non-http errors.
continue
}
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
if resp.StatusCode == 410 {
// Update stale resourceVersion. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
resourceVersion = uw.reloadObjects()
backoffDelay = time.Second
} else {
backoffSleep()
// There is no sense in reloading resources on non-410 status codes.
}
continue
}
backoffDelay = time.Second
err = uw.readObjectUpdateStream(resp.Body)
_ = resp.Body.Close()
if err != nil {
if errors.Is(err, io.EOF) {
// The stream has been closed (probably due to timeout)
backoffSleep()
continue
}
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
backoffSleep()
// There is no sense in reloading resources on non-http errors.
continue
}
}
}
// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events.
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
d := json.NewDecoder(r)
var we WatchEvent
for {
if err := d.Decode(&we); err != nil {
return err
}
o, err := uw.parseObject(we.Object)
if err != nil {
return err
}
key := o.key()
switch we.Type {
case "ADDED", "MODIFIED":
uw.mu.Lock()
uw.objectsByKey[key] = o
uw.mu.Unlock()
case "DELETED":
uw.mu.Lock()
delete(uw.objectsByKey, key)
uw.mu.Unlock()
default:
return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role)
}
}
}
func getAPIPaths(role string, namespaces []string, selectors []Selector) []string {
objectName := getObjectNameByRole(role)
if objectName == "nodes" || len(namespaces) == 0 {
query := joinSelectors(role, selectors)
path := getAPIPath(objectName, "", query)
return []string{path}
}
query := joinSelectors(role, selectors)
paths := make([]string, len(namespaces))
for i, namespace := range namespaces {
paths[i] = getAPIPath(objectName, namespace, query)
}
return paths
}
func getAPIPath(objectName, namespace, query string) string {
suffix := objectName
if namespace != "" {
suffix = "namespaces/" + namespace + "/" + objectName
}
if len(query) > 0 {
suffix += "?" + query
}
if objectName == "endpointslices" {
return "/apis/discovery.k8s.io/v1beta1/" + suffix
}
return "/api/v1/" + suffix
}
func joinSelectors(role string, selectors []Selector) string {
var labelSelectors, fieldSelectors []string
for _, s := range selectors {
if s.Role != role {
continue
}
if s.Label != "" {
labelSelectors = append(labelSelectors, s.Label)
}
if s.Field != "" {
fieldSelectors = append(fieldSelectors, s.Field)
}
}
var args []string
if len(labelSelectors) > 0 {
args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ",")))
}
if len(fieldSelectors) > 0 {
args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ",")))
}
return strings.Join(args, "&")
}
func getObjectNameByRole(role string) string {
switch role {
case "node":
return "nodes"
case "pod":
return "pods"
case "service":
return "services"
case "endpoints":
return "endpoints"
case "endpointslices":
return "endpointslices"
case "ingress":
return "ingresses"
default:
logger.Panicf("BUG: unknonw role=%q", role)
return ""
}
}
func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc) {
switch role {
case "node":
return parseNode, parseNodeList
case "pod":
return parsePod, parsePodList
case "service":
return parseService, parseServiceList
case "endpoints":
return parseEndpoints, parseEndpointsList
case "endpointslices":
return parseEndpointSlice, parseEndpointSliceList
case "ingress":
return parseIngress, parseIngressList
default:
logger.Panicf("BUG: unsupported role=%q", role)
return nil, nil
}
}

View File

@ -0,0 +1,162 @@
package kubernetes
import (
"reflect"
"testing"
)
func TestGetAPIPaths(t *testing.T) {
f := func(role string, namespaces []string, selectors []Selector, expectedPaths []string) {
t.Helper()
paths := getAPIPaths(role, namespaces, selectors)
if !reflect.DeepEqual(paths, expectedPaths) {
t.Fatalf("unexpected paths; got\n%q\nwant\n%q", paths, expectedPaths)
}
}
// role=node
f("node", nil, nil, []string{"/api/v1/nodes"})
f("node", []string{"foo", "bar"}, nil, []string{"/api/v1/nodes"})
f("node", nil, []Selector{
{
Role: "pod",
Label: "foo",
Field: "bar",
},
}, []string{"/api/v1/nodes"})
f("node", nil, []Selector{
{
Role: "node",
Label: "foo",
Field: "bar",
},
}, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"})
f("node", []string{"x", "y"}, []Selector{
{
Role: "node",
Label: "foo",
Field: "bar",
},
}, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"})
// role=pod
f("pod", nil, nil, []string{"/api/v1/pods"})
f("pod", []string{"foo", "bar"}, nil, []string{
"/api/v1/namespaces/foo/pods",
"/api/v1/namespaces/bar/pods",
})
f("pod", nil, []Selector{
{
Role: "node",
Label: "foo",
},
}, []string{"/api/v1/pods"})
f("pod", nil, []Selector{
{
Role: "pod",
Label: "foo",
},
{
Role: "pod",
Label: "x",
Field: "y",
},
}, []string{"/api/v1/pods?labelSelector=foo%2Cx&fieldSelector=y"})
f("pod", []string{"x", "y"}, []Selector{
{
Role: "pod",
Label: "foo",
},
{
Role: "pod",
Label: "x",
Field: "y",
},
}, []string{
"/api/v1/namespaces/x/pods?labelSelector=foo%2Cx&fieldSelector=y",
"/api/v1/namespaces/y/pods?labelSelector=foo%2Cx&fieldSelector=y",
})
// role=service
f("service", nil, nil, []string{"/api/v1/services"})
f("service", []string{"x", "y"}, nil, []string{
"/api/v1/namespaces/x/services",
"/api/v1/namespaces/y/services",
})
f("service", nil, []Selector{
{
Role: "node",
Label: "foo",
},
{
Role: "service",
Field: "bar",
},
}, []string{"/api/v1/services?fieldSelector=bar"})
f("service", []string{"x", "y"}, []Selector{
{
Role: "service",
Label: "abc=de",
},
}, []string{
"/api/v1/namespaces/x/services?labelSelector=abc%3Dde",
"/api/v1/namespaces/y/services?labelSelector=abc%3Dde",
})
// role=endpoints
f("endpoints", nil, nil, []string{"/api/v1/endpoints"})
f("endpoints", []string{"x", "y"}, nil, []string{
"/api/v1/namespaces/x/endpoints",
"/api/v1/namespaces/y/endpoints",
})
f("endpoints", []string{"x", "y"}, []Selector{
{
Role: "endpoints",
Label: "bbb",
},
{
Role: "node",
Label: "aa",
},
}, []string{
"/api/v1/namespaces/x/endpoints?labelSelector=bbb",
"/api/v1/namespaces/y/endpoints?labelSelector=bbb",
})
// role=endpointslices
f("endpointslices", nil, nil, []string{"/apis/discovery.k8s.io/v1beta1/endpointslices"})
f("endpointslices", []string{"x", "y"}, []Selector{
{
Role: "endpointslices",
Field: "field",
Label: "label",
},
}, []string{
"/apis/discovery.k8s.io/v1beta1/namespaces/x/endpointslices?labelSelector=label&fieldSelector=field",
"/apis/discovery.k8s.io/v1beta1/namespaces/y/endpointslices?labelSelector=label&fieldSelector=field",
})
// role=ingress
f("ingress", nil, nil, []string{"/api/v1/ingresses"})
f("ingress", []string{"x", "y"}, []Selector{
{
Role: "node",
Field: "xyay",
},
{
Role: "ingress",
Field: "abc",
},
{
Role: "ingress",
Label: "cde",
},
{
Role: "ingress",
Label: "baaa",
},
}, []string{
"/api/v1/namespaces/x/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc",
"/api/v1/namespaces/y/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc",
})
}

View File

@ -1,9 +1,6 @@
package kubernetes
import (
"net/url"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
@ -19,10 +16,14 @@ type ObjectMeta struct {
OwnerReferences []OwnerReference
}
// listMetadata kubernetes list metadata
func (om *ObjectMeta) key() string {
return om.Namespace + "/" + om.Name
}
// ListMeta is a Kubernetes list metadata
// https://v1-17.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#listmeta-v1-meta
type listMetadata struct {
ResourceVersion string `json:"resourceVersion"`
type ListMeta struct {
ResourceVersion string
}
func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m map[string]string) {
@ -53,29 +54,3 @@ type OwnerReference struct {
type DaemonEndpoint struct {
Port int
}
func joinSelectors(role string, namespaces []string, selectors []Selector) string {
var labelSelectors, fieldSelectors []string
for _, ns := range namespaces {
fieldSelectors = append(fieldSelectors, "metadata.namespace="+ns)
}
for _, s := range selectors {
if s.Role != role {
continue
}
if s.Label != "" {
labelSelectors = append(labelSelectors, s.Label)
}
if s.Field != "" {
fieldSelectors = append(fieldSelectors, s.Field)
}
}
var args []string
if len(labelSelectors) > 0 {
args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ",")))
}
if len(fieldSelectors) > 0 {
args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ",")))
}
return strings.Join(args, "&")
}

View File

@ -3,18 +3,59 @@ package kubernetes
import (
"encoding/json"
"fmt"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getEndpointsLabels returns labels for k8s endpoints obtained from the given cfg.
func getEndpointsLabels(cfg *apiConfig) []map[string]string {
epss := getEndpoints(cfg)
var ms []map[string]string
for _, eps := range epss {
ms = eps.appendTargetLabels(ms, cfg.aw)
}
return ms
}
func getEndpoints(cfg *apiConfig) []*Endpoints {
os := cfg.aw.getObjectsByRole("endpoint")
epss := make([]*Endpoints, len(os))
for i, o := range os {
epss[i] = o.(*Endpoints)
}
return epss
}
func (eps *Endpoints) key() string {
return eps.Metadata.key()
}
func parseEndpointsList(data []byte) (map[string]object, ListMeta, error) {
var epsl EndpointsList
if err := json.Unmarshal(data, &epsl); err != nil {
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointsList from %q: %w", data, err)
}
objectsByKey := make(map[string]object)
for _, eps := range epsl.Items {
objectsByKey[eps.key()] = eps
}
return objectsByKey, epsl.Metadata, nil
}
func parseEndpoints(data []byte) (object, error) {
var eps Endpoints
if err := json.Unmarshal(data, &eps); err != nil {
return nil, err
}
return &eps, nil
}
// EndpointsList implements k8s endpoints list.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointslist-v1-core
type EndpointsList struct {
Items []Endpoints
Metadata listMetadata `json:"metadata"`
Metadata ListMeta
Items []*Endpoints
}
// Endpoints implements k8s endpoints.
@ -25,10 +66,6 @@ type Endpoints struct {
Subsets []EndpointSubset
}
func (eps *Endpoints) key() string {
return eps.Metadata.Namespace + "/" + eps.Metadata.Name
}
// EndpointSubset implements k8s endpoint subset.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointsubset-v1-core
@ -57,10 +94,6 @@ type ObjectReference struct {
Namespace string
}
func (or ObjectReference) key() string {
return or.Namespace + "/" + or.Name
}
// EndpointPort implements k8s endpoint port.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointport-v1beta1-discovery-k8s-io
@ -71,28 +104,19 @@ type EndpointPort struct {
Protocol string
}
// parseEndpointsList parses EndpointsList from data.
func parseEndpointsList(data []byte) (*EndpointsList, error) {
var esl EndpointsList
if err := json.Unmarshal(data, &esl); err != nil {
return nil, fmt.Errorf("cannot unmarshal EndpointsList from %q: %w", data, err)
}
return &esl, nil
}
// appendTargetLabels appends labels for each endpoint in eps to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints
func (eps *Endpoints) appendTargetLabels(ms []map[string]string, podsCache, servicesCache *sync.Map) []map[string]string {
func (eps *Endpoints) appendTargetLabels(ms []map[string]string, aw *apiWatcher) []map[string]string {
var svc *Service
if svco, ok := servicesCache.Load(eps.key()); ok {
svc = svco.(*Service)
if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
svc = o.(*Service)
}
podPortsSeen := make(map[*Pod][]int)
for _, ess := range eps.Subsets {
for _, epp := range ess.Ports {
ms = appendEndpointLabelsForAddresses(ms, podPortsSeen, eps, ess.Addresses, epp, podsCache, svc, "true")
ms = appendEndpointLabelsForAddresses(ms, podPortsSeen, eps, ess.NotReadyAddresses, epp, podsCache, svc, "false")
ms = appendEndpointLabelsForAddresses(ms, aw, podPortsSeen, eps, ess.Addresses, epp, svc, "true")
ms = appendEndpointLabelsForAddresses(ms, aw, podPortsSeen, eps, ess.NotReadyAddresses, epp, svc, "false")
}
}
@ -127,14 +151,13 @@ func (eps *Endpoints) appendTargetLabels(ms []map[string]string, podsCache, serv
return ms
}
func appendEndpointLabelsForAddresses(ms []map[string]string, podPortsSeen map[*Pod][]int, eps *Endpoints, eas []EndpointAddress, epp EndpointPort,
podsCache *sync.Map, svc *Service, ready string) []map[string]string {
func appendEndpointLabelsForAddresses(ms []map[string]string, aw *apiWatcher, podPortsSeen map[*Pod][]int, eps *Endpoints,
eas []EndpointAddress, epp EndpointPort, svc *Service, ready string) []map[string]string {
for _, ea := range eas {
var p *Pod
if po, ok := podsCache.Load(ea.TargetRef.key()); ok {
p = po.(*Pod)
if o := aw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil {
p = o.(*Pod)
}
//p := getPod(pods, ea.TargetRef.Namespace, ea.TargetRef.Name)
m := getEndpointLabelsForAddressAndPort(podPortsSeen, eps, ea, epp, p, svc, ready)
ms = append(ms, m)
}
@ -186,24 +209,3 @@ func getEndpointLabels(om ObjectMeta, ea EndpointAddress, epp EndpointPort, read
}
return m
}
func processEndpoints(cfg *apiConfig, sc *SharedKubernetesCache, p *Endpoints, action string) {
key := buildSyncKey("endpoints", cfg.setName, p.key())
switch action {
case "ADDED", "MODIFIED":
lbs := p.appendTargetLabels(nil, sc.Pods, sc.Services)
cfg.targetChan <- SyncEvent{
Labels: lbs,
Key: key,
ConfigSectionSet: cfg.setName,
}
case "DELETED":
cfg.targetChan <- SyncEvent{
Key: key,
ConfigSectionSet: cfg.setName,
}
case "ERROR":
default:
logger.Warnf("unexpected action: %s", action)
}
}

View File

@ -2,7 +2,6 @@ package kubernetes
import (
"reflect"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -12,12 +11,12 @@ import (
func TestParseEndpointsListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
els, err := parseEndpointsList([]byte(s))
objectsByKey, _, err := parseEndpointsList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if els != nil {
t.Fatalf("unexpected non-nil EnpointsList: %v", els)
if len(objectsByKey) != 0 {
t.Fatalf("unexpected non-empty objectsByKey: %v", objectsByKey)
}
}
f(``)
@ -80,22 +79,18 @@ func TestParseEndpointsListSuccess(t *testing.T) {
]
}
`
els, err := parseEndpointsList([]byte(data))
objectsByKey, meta, err := parseEndpointsList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(els.Items) != 1 {
t.Fatalf("unexpected length of EndpointsList.Items; got %d; want %d", len(els.Items), 1)
expectedResourceVersion := "128055"
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
endpoint := els.Items[0]
// Check endpoint.appendTargetLabels()
var pc, sc sync.Map
labelss := endpoint.appendTargetLabels(nil, &pc, &sc)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Endpoints).appendTargetLabels(nil, nil)
})
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.17.0.2:8443",

View File

@ -4,38 +4,70 @@ import (
"encoding/json"
"fmt"
"strconv"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// parseEndpointsList parses EndpointSliceList from data.
func parseEndpointSlicesList(data []byte) (*EndpointSliceList, error) {
var esl EndpointSliceList
if err := json.Unmarshal(data, &esl); err != nil {
return nil, fmt.Errorf("cannot unmarshal EndpointSliceList from %q: %w", data, err)
// getEndpointSlicesLabels returns labels for k8s endpointSlices obtained from the given cfg.
func getEndpointSlicesLabels(cfg *apiConfig) []map[string]string {
epss := getEndpointSlices(cfg)
var ms []map[string]string
for _, eps := range epss {
ms = eps.appendTargetLabels(ms, cfg.aw)
}
return ms
}
return &esl, nil
// getEndpointSlices retrieves endpointSlice with given apiConfig
func getEndpointSlices(cfg *apiConfig) []*EndpointSlice {
os := cfg.aw.getObjectsByRole("endpointslices")
epss := make([]*EndpointSlice, len(os))
for i, o := range os {
epss[i] = o.(*EndpointSlice)
}
return epss
}
func (eps *EndpointSlice) key() string {
return eps.Metadata.key()
}
func parseEndpointSliceList(data []byte) (map[string]object, ListMeta, error) {
var epsl EndpointSliceList
if err := json.Unmarshal(data, &epsl); err != nil {
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList from %q: %w", data, err)
}
objectsByKey := make(map[string]object)
for _, eps := range epsl.Items {
objectsByKey[eps.key()] = eps
}
return objectsByKey, epsl.Metadata, nil
}
func parseEndpointSlice(data []byte) (object, error) {
var eps EndpointSlice
if err := json.Unmarshal(data, &eps); err != nil {
return nil, err
}
return &eps, nil
}
// appendTargetLabels injects labels for endPointSlice to slice map
// follows TargetRef for enrich labels with pod and service metadata
func (eps *EndpointSlice) appendTargetLabels(ms []map[string]string, podsCache, servicesCache *sync.Map) []map[string]string {
func (eps *EndpointSlice) appendTargetLabels(ms []map[string]string, aw *apiWatcher) []map[string]string {
var svc *Service
if s, ok := servicesCache.Load(eps.key()); ok {
svc = s.(*Service)
if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
svc = o.(*Service)
}
podPortsSeen := make(map[*Pod][]int)
for _, ess := range eps.Endpoints {
var pod *Pod
if p, ok := podsCache.Load(ess.TargetRef.key()); ok {
pod = p.(*Pod)
var p *Pod
if o := aw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil {
p = o.(*Pod)
}
for _, epp := range eps.Ports {
for _, addr := range ess.Addresses {
ms = append(ms, getEndpointSliceLabelsForAddressAndPort(podPortsSeen, addr, eps, ess, epp, pod, svc))
ms = append(ms, getEndpointSliceLabelsForAddressAndPort(podPortsSeen, addr, eps, ess, epp, p, svc))
}
}
@ -70,13 +102,12 @@ func (eps *EndpointSlice) appendTargetLabels(ms []map[string]string, podsCache,
}
}
return ms
}
// getEndpointSliceLabelsForAddressAndPort gets labels for endpointSlice
// from address, Endpoint and EndpointPort
// enriches labels with TargetRef
// pod appended to seen Ports
// p appended to seen Ports
// if TargetRef matches
func getEndpointSliceLabelsForAddressAndPort(podPortsSeen map[*Pod][]int, addr string, eps *EndpointSlice, ea Endpoint, epp EndpointPort, p *Pod, svc *Service) map[string]string {
m := getEndpointSliceLabels(eps, addr, ea, epp)
@ -135,8 +166,8 @@ func getEndpointSliceLabels(eps *EndpointSlice, addr string, ea Endpoint, epp En
// that groups service endpoints slices.
// https://v1-17.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointslice-v1beta1-discovery-k8s-io
type EndpointSliceList struct {
Items []EndpointSlice
Metadata listMetadata `json:"metadata"`
Metadata ListMeta
Items []*EndpointSlice
}
// EndpointSlice - implements kubernetes endpoint slice.
@ -148,10 +179,6 @@ type EndpointSlice struct {
Ports []EndpointPort
}
func (eps EndpointSlice) key() string {
return eps.Metadata.Namespace + "/" + eps.Metadata.Name
}
// Endpoint implements kubernetes object endpoint for endpoint slice.
// https://v1-17.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpoint-v1beta1-discovery-k8s-io
type Endpoint struct {
@ -167,23 +194,3 @@ type Endpoint struct {
type EndpointConditions struct {
Ready bool
}
func processEndpointSlices(cfg *apiConfig, sc *SharedKubernetesCache, p *EndpointSlice, action string) {
key := buildSyncKey("endpointslices", cfg.setName, p.key())
switch action {
case "ADDED", "MODIFIED":
cfg.targetChan <- SyncEvent{
Labels: p.appendTargetLabels(nil, sc.Pods, sc.Services),
Key: key,
ConfigSectionSet: cfg.setName,
}
case "DELETED":
cfg.targetChan <- SyncEvent{
Key: key,
ConfigSectionSet: cfg.setName,
}
case "ERROR":
default:
logger.Warnf("unexpected action: %s", action)
}
}

View File

@ -2,21 +2,20 @@ package kubernetes
import (
"reflect"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func Test_parseEndpointSlicesListFail(t *testing.T) {
func TestParseEndpointSliceListFail(t *testing.T) {
f := func(data string) {
eslList, err := parseEndpointSlicesList([]byte(data))
objectsByKey, _, err := parseEndpointSliceList([]byte(data))
if err == nil {
t.Errorf("unexpected result, test must fail! data: %s", data)
}
if eslList != nil {
t.Errorf("endpointSliceList must be nil, got: %v", eslList)
if len(objectsByKey) != 0 {
t.Errorf("EndpointSliceList must be emptry, got: %v", objectsByKey)
}
}
@ -29,7 +28,7 @@ func Test_parseEndpointSlicesListFail(t *testing.T) {
}
func Test_parseEndpointSlicesListSuccess(t *testing.T) {
func TestParseEndpointSliceListSuccess(t *testing.T) {
data := `{
"kind": "EndpointSliceList",
"apiVersion": "discovery.k8s.io/v1beta1",
@ -177,23 +176,19 @@ func Test_parseEndpointSlicesListSuccess(t *testing.T) {
}
]
}`
esl, err := parseEndpointSlicesList([]byte(data))
objectsByKey, meta, err := parseEndpointSliceList([]byte(data))
if err != nil {
t.Errorf("cannot parse data for EndpointSliceList: %v", err)
return
}
if len(esl.Items) != 2 {
t.Fatalf("expected 2 items at endpointSliceList, got: %d", len(esl.Items))
expectedResourceVersion := "1177"
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
firstEsl := esl.Items[0]
var pc, sc sync.Map
got := firstEsl.appendTargetLabels(nil, &pc, &sc)
sortedLables := [][]prompbmarshal.Label{}
for _, labels := range got {
sortedLables = append(sortedLables, discoveryutils.GetSortedLabels(labels))
}
expectedLabels := [][]prompbmarshal.Label{
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*EndpointSlice).appendTargetLabels(nil, nil)
})
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.18.0.2:6443",
"__meta_kubernetes_endpointslice_address_type": "IPv4",
@ -203,263 +198,94 @@ func Test_parseEndpointSlicesListSuccess(t *testing.T) {
"__meta_kubernetes_endpointslice_port_name": "https",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
"__meta_kubernetes_namespace": "default",
})}
if !reflect.DeepEqual(sortedLables, expectedLabels) {
t.Fatalf("unexpected labels,\ngot:\n%v,\nwant:\n%v", sortedLables, expectedLabels)
}
}
func TestEndpointSlice_appendTargetLabels(t *testing.T) {
type fields struct {
Metadata ObjectMeta
Endpoints []Endpoint
AddressType string
Ports []EndpointPort
}
type args struct {
ms []map[string]string
pods []Pod
svcs []Service
}
tests := []struct {
name string
fields fields
args args
want [][]prompbmarshal.Label
}{
{
name: "simple eps",
args: args{},
fields: fields{
Metadata: ObjectMeta{
Name: "fake-esl",
Namespace: "default",
},
AddressType: "ipv4",
Endpoints: []Endpoint{
{Addresses: []string{"127.0.0.1"},
Hostname: "node-1",
Topology: map[string]string{"kubernetes.topoligy.io/zone": "gce-1"},
Conditions: EndpointConditions{Ready: true},
TargetRef: ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "main-pod",
},
},
},
Ports: []EndpointPort{
{
Name: "http",
Port: 8085,
AppProtocol: "http",
Protocol: "tcp",
},
},
},
want: [][]prompbmarshal.Label{
}),
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "127.0.0.1:8085",
"__address__": "10.244.0.3:53",
"__meta_kubernetes_endpointslice_address_target_kind": "Pod",
"__meta_kubernetes_endpointslice_address_target_name": "main-pod",
"__meta_kubernetes_endpointslice_address_type": "ipv4",
"__meta_kubernetes_endpointslice_address_target_name": "coredns-66bff467f8-z8czk",
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_topoligy_io_zone": "gce-1",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_topoligy_io_zone": "true",
"__meta_kubernetes_endpointslice_endpoint_hostname": "node-1",
"__meta_kubernetes_endpointslice_name": "fake-esl",
"__meta_kubernetes_endpointslice_port": "8085",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "http",
"__meta_kubernetes_endpointslice_port_protocol": "tcp",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_io_hostname": "kind-control-plane",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_io_hostname": "true",
"__meta_kubernetes_endpointslice_name": "kube-dns-22mvb",
"__meta_kubernetes_endpointslice_port": "53",
"__meta_kubernetes_endpointslice_port_name": "dns-tcp",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
"__meta_kubernetes_namespace": "kube-system",
}),
},
},
{
name: "eps with pods and services",
args: args{
pods: []Pod{
{
Metadata: ObjectMeta{
UID: "some-pod-uuid",
Namespace: "monitoring",
Name: "main-pod",
Labels: discoveryutils.GetSortedLabels(map[string]string{
"pod-label-1": "pod-value-1",
"pod-label-2": "pod-value-2",
}),
Annotations: discoveryutils.GetSortedLabels(map[string]string{
"pod-annotations-1": "annotation-value-1",
}),
},
Status: PodStatus{PodIP: "192.168.11.5", HostIP: "172.15.1.1"},
Spec: PodSpec{NodeName: "node-2", Containers: []Container{
{
Name: "container-1",
Ports: []ContainerPort{
{
ContainerPort: 8085,
Protocol: "tcp",
Name: "http",
},
{
ContainerPort: 8011,
Protocol: "udp",
Name: "dns",
},
},
},
}},
},
},
svcs: []Service{
{
Spec: ServiceSpec{Type: "ClusterIP", Ports: []ServicePort{
{
Name: "http",
Protocol: "tcp",
Port: 8085,
},
}},
Metadata: ObjectMeta{
Name: "custom-esl",
Namespace: "monitoring",
Labels: discoveryutils.GetSortedLabels(map[string]string{
"service-label-1": "value-1",
"service-label-2": "value-2",
}),
},
},
},
},
fields: fields{
Metadata: ObjectMeta{
Name: "custom-esl",
Namespace: "monitoring",
},
AddressType: "ipv4",
Endpoints: []Endpoint{
{Addresses: []string{"127.0.0.1"},
Hostname: "node-1",
Topology: map[string]string{"kubernetes.topoligy.io/zone": "gce-1"},
Conditions: EndpointConditions{Ready: true},
TargetRef: ObjectReference{
Kind: "Pod",
Namespace: "monitoring",
Name: "main-pod",
},
},
},
Ports: []EndpointPort{
{
Name: "http",
Port: 8085,
AppProtocol: "http",
Protocol: "tcp",
},
},
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "127.0.0.1:8085",
"__address__": "10.244.0.3:9153",
"__meta_kubernetes_endpointslice_address_target_kind": "Pod",
"__meta_kubernetes_endpointslice_address_target_name": "main-pod",
"__meta_kubernetes_endpointslice_address_type": "ipv4",
"__meta_kubernetes_endpointslice_address_target_name": "coredns-66bff467f8-z8czk",
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_topoligy_io_zone": "gce-1",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_topoligy_io_zone": "true",
"__meta_kubernetes_endpointslice_endpoint_hostname": "node-1",
"__meta_kubernetes_endpointslice_name": "custom-esl",
"__meta_kubernetes_endpointslice_port": "8085",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "http",
"__meta_kubernetes_endpointslice_port_protocol": "tcp",
"__meta_kubernetes_namespace": "monitoring",
"__meta_kubernetes_pod_annotation_pod_annotations_1": "annotation-value-1",
"__meta_kubernetes_pod_annotationpresent_pod_annotations_1": "true",
"__meta_kubernetes_pod_container_name": "container-1",
"__meta_kubernetes_pod_container_port_name": "http",
"__meta_kubernetes_pod_container_port_number": "8085",
"__meta_kubernetes_pod_container_port_protocol": "tcp",
"__meta_kubernetes_pod_host_ip": "172.15.1.1",
"__meta_kubernetes_pod_ip": "192.168.11.5",
"__meta_kubernetes_pod_label_pod_label_1": "pod-value-1",
"__meta_kubernetes_pod_label_pod_label_2": "pod-value-2",
"__meta_kubernetes_pod_labelpresent_pod_label_1": "true",
"__meta_kubernetes_pod_labelpresent_pod_label_2": "true",
"__meta_kubernetes_pod_name": "main-pod",
"__meta_kubernetes_pod_node_name": "node-2",
"__meta_kubernetes_pod_phase": "",
"__meta_kubernetes_pod_ready": "unknown",
"__meta_kubernetes_pod_uid": "some-pod-uuid",
"__meta_kubernetes_service_cluster_ip": "",
"__meta_kubernetes_service_label_service_label_1": "value-1",
"__meta_kubernetes_service_label_service_label_2": "value-2",
"__meta_kubernetes_service_labelpresent_service_label_1": "true",
"__meta_kubernetes_service_labelpresent_service_label_2": "true",
"__meta_kubernetes_service_name": "custom-esl",
"__meta_kubernetes_service_type": "ClusterIP",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_io_hostname": "kind-control-plane",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_io_hostname": "true",
"__meta_kubernetes_endpointslice_name": "kube-dns-22mvb",
"__meta_kubernetes_endpointslice_port": "9153",
"__meta_kubernetes_endpointslice_port_name": "metrics",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
"__meta_kubernetes_namespace": "kube-system",
}),
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "192.168.11.5:8011",
"__meta_kubernetes_namespace": "monitoring",
"__meta_kubernetes_pod_annotation_pod_annotations_1": "annotation-value-1",
"__meta_kubernetes_pod_annotationpresent_pod_annotations_1": "true",
"__meta_kubernetes_pod_container_name": "container-1",
"__meta_kubernetes_pod_container_port_name": "dns",
"__meta_kubernetes_pod_container_port_number": "8011",
"__meta_kubernetes_pod_container_port_protocol": "udp",
"__meta_kubernetes_pod_host_ip": "172.15.1.1",
"__meta_kubernetes_pod_ip": "192.168.11.5",
"__meta_kubernetes_pod_label_pod_label_1": "pod-value-1",
"__meta_kubernetes_pod_label_pod_label_2": "pod-value-2",
"__meta_kubernetes_pod_labelpresent_pod_label_1": "true",
"__meta_kubernetes_pod_labelpresent_pod_label_2": "true",
"__meta_kubernetes_pod_name": "main-pod",
"__meta_kubernetes_pod_node_name": "node-2",
"__meta_kubernetes_pod_phase": "",
"__meta_kubernetes_pod_ready": "unknown",
"__meta_kubernetes_pod_uid": "some-pod-uuid",
"__meta_kubernetes_service_cluster_ip": "",
"__meta_kubernetes_service_label_service_label_1": "value-1",
"__meta_kubernetes_service_label_service_label_2": "value-2",
"__meta_kubernetes_service_labelpresent_service_label_1": "true",
"__meta_kubernetes_service_labelpresent_service_label_2": "true",
"__meta_kubernetes_service_name": "custom-esl",
"__meta_kubernetes_service_type": "ClusterIP",
"__address__": "10.244.0.3:53",
"__meta_kubernetes_endpointslice_address_target_kind": "Pod",
"__meta_kubernetes_endpointslice_address_target_name": "coredns-66bff467f8-z8czk",
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_io_hostname": "kind-control-plane",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_io_hostname": "true",
"__meta_kubernetes_endpointslice_name": "kube-dns-22mvb",
"__meta_kubernetes_endpointslice_port": "53",
"__meta_kubernetes_endpointslice_port_name": "dns",
"__meta_kubernetes_endpointslice_port_protocol": "UDP",
"__meta_kubernetes_namespace": "kube-system",
}),
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "10.244.0.4:53",
"__meta_kubernetes_endpointslice_address_target_kind": "Pod",
"__meta_kubernetes_endpointslice_address_target_name": "coredns-66bff467f8-kpbhk",
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_io_hostname": "kind-control-plane",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_io_hostname": "true",
"__meta_kubernetes_endpointslice_name": "kube-dns-22mvb",
"__meta_kubernetes_endpointslice_port": "53",
"__meta_kubernetes_endpointslice_port_name": "dns-tcp",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
"__meta_kubernetes_namespace": "kube-system",
}),
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "10.244.0.4:9153",
"__meta_kubernetes_endpointslice_address_target_kind": "Pod",
"__meta_kubernetes_endpointslice_address_target_name": "coredns-66bff467f8-kpbhk",
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_io_hostname": "kind-control-plane",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_io_hostname": "true",
"__meta_kubernetes_endpointslice_name": "kube-dns-22mvb",
"__meta_kubernetes_endpointslice_port": "9153",
"__meta_kubernetes_endpointslice_port_name": "metrics",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
"__meta_kubernetes_namespace": "kube-system",
}),
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "10.244.0.4:53",
"__meta_kubernetes_endpointslice_address_target_kind": "Pod",
"__meta_kubernetes_endpointslice_address_target_name": "coredns-66bff467f8-kpbhk",
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_io_hostname": "kind-control-plane",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_io_hostname": "true",
"__meta_kubernetes_endpointslice_name": "kube-dns-22mvb",
"__meta_kubernetes_endpointslice_port": "53",
"__meta_kubernetes_endpointslice_port_name": "dns",
"__meta_kubernetes_endpointslice_port_protocol": "UDP",
"__meta_kubernetes_namespace": "kube-system",
}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eps := &EndpointSlice{
Metadata: tt.fields.Metadata,
Endpoints: tt.fields.Endpoints,
AddressType: tt.fields.AddressType,
Ports: tt.fields.Ports,
}
pc := sync.Map{}
sc := sync.Map{}
for _, p := range tt.args.pods {
p := &p
pc.Store(p.key(), p)
}
for _, s := range tt.args.svcs {
s := &s
sc.Store(s.key(), s)
}
got := eps.appendTargetLabels(tt.args.ms, &pc, &sc)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range got {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
if !reflect.DeepEqual(sortedLabelss, expectedLabelss) {
t.Fatalf("unexpected labels,\ngot:\n%v,\nwant:\n%v", sortedLabelss, expectedLabelss)
}
if !reflect.DeepEqual(sortedLabelss, tt.want) {
t.Errorf("got unxpected labels: \ngot:\n %v, \nexpect:\n %v", sortedLabelss, tt.want)
}
})
}
}

View File

@ -1,56 +0,0 @@
package kubernetes
import (
"encoding/json"
"io"
)
type jsonFrameReader struct {
r io.ReadCloser
decoder *json.Decoder
remaining []byte
}
func newJSONFramedReader(r io.ReadCloser) io.ReadCloser {
return &jsonFrameReader{
r: r,
decoder: json.NewDecoder(r),
}
}
// ReadFrame decodes the next JSON object in the stream, or returns an error. The returned
// byte slice will be modified the next time ReadFrame is invoked and should not be altered.
func (r *jsonFrameReader) Read(data []byte) (int, error) {
// Return whatever remaining data exists from an in progress frame
if n := len(r.remaining); n > 0 {
if n <= len(data) {
data = append(data[0:0], r.remaining...)
r.remaining = nil
return n, nil
}
n = len(data)
data = append(data[0:0], r.remaining[:n]...)
r.remaining = r.remaining[n:]
return n, io.ErrShortBuffer
}
n := len(data)
m := json.RawMessage(data[:0])
if err := r.decoder.Decode(&m); err != nil {
return 0, err
}
// If capacity of data is less than length of the message, decoder will allocate a new slice
// and set m to it, which means we need to copy the partial result back into data and preserve
// the remaining result for subsequent reads.
if len(m) > n {
data = append(data[0:0], m[:n]...)
r.remaining = m[n:]
return n, io.ErrShortBuffer
}
return len(m), nil
}
func (r *jsonFrameReader) Close() error {
return r.r.Close()
}

View File

@ -3,16 +3,57 @@ package kubernetes
import (
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// getIngressesLabels returns labels for k8s ingresses obtained from the given cfg.
func getIngressesLabels(cfg *apiConfig) []map[string]string {
igs := getIngresses(cfg)
var ms []map[string]string
for _, ig := range igs {
ms = ig.appendTargetLabels(ms)
}
return ms
}
func getIngresses(cfg *apiConfig) []*Ingress {
os := cfg.aw.getObjectsByRole("ingress")
igs := make([]*Ingress, len(os))
for i, o := range os {
igs[i] = o.(*Ingress)
}
return igs
}
func (ig *Ingress) key() string {
return ig.Metadata.key()
}
func parseIngressList(data []byte) (map[string]object, ListMeta, error) {
var igl IngressList
if err := json.Unmarshal(data, &igl); err != nil {
return nil, igl.Metadata, fmt.Errorf("cannot unmarshal IngressList from %q: %w", data, err)
}
objectsByKey := make(map[string]object)
for _, ig := range igl.Items {
objectsByKey[ig.key()] = ig
}
return objectsByKey, igl.Metadata, nil
}
func parseIngress(data []byte) (object, error) {
var ig Ingress
if err := json.Unmarshal(data, &ig); err != nil {
return nil, err
}
return &ig, nil
}
// IngressList represents ingress list in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingresslist-v1beta1-extensions
type IngressList struct {
Items []Ingress
Metadata listMetadata `json:"metadata"`
Metadata ListMeta
Items []*Ingress
}
// Ingress represents ingress in k8s.
@ -23,10 +64,6 @@ type Ingress struct {
Spec IngressSpec
}
func (ig Ingress) key() string {
return ig.Metadata.Namespace + "/" + ig.Metadata.Name
}
// IngressSpec represents ingress spec in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingressspec-v1beta1-extensions
@ -64,15 +101,6 @@ type HTTPIngressPath struct {
Path string
}
// parseIngressList parses IngressList from data.
func parseIngressList(data []byte) (*IngressList, error) {
var il IngressList
if err := json.Unmarshal(data, &il); err != nil {
return nil, fmt.Errorf("cannot unmarshal IngressList from %q: %w", data, err)
}
return &il, nil
}
// appendTargetLabels appends labels for Ingress ig to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress
@ -124,23 +152,3 @@ func getIngressRulePaths(paths []HTTPIngressPath) []string {
}
return result
}
func processIngress(cfg *apiConfig, p *Ingress, action string) {
key := buildSyncKey("ingress", cfg.setName, p.key())
switch action {
case "ADDED", "MODIFIED":
cfg.targetChan <- SyncEvent{
Labels: p.appendTargetLabels(nil),
Key: key,
ConfigSectionSet: cfg.setName,
}
case "DELETED":
cfg.targetChan <- SyncEvent{
Key: key,
ConfigSectionSet: cfg.setName,
}
case "ERROR":
default:
logger.Infof("unexpected action: %s", action)
}
}

View File

@ -11,12 +11,12 @@ import (
func TestParseIngressListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
nls, err := parseIngressList([]byte(s))
objectsByKey, _, err := parseIngressList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if nls != nil {
t.Fatalf("unexpected non-nil IngressList: %v", nls)
if len(objectsByKey) != 0 {
t.Fatalf("unexpected non-empty IngressList: %v", objectsByKey)
}
}
f(``)
@ -71,21 +71,17 @@ func TestParseIngressListSuccess(t *testing.T) {
}
]
}`
igs, err := parseIngressList([]byte(data))
objectsByKey, meta, err := parseIngressList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(igs.Items) != 1 {
t.Fatalf("unexpected length of IngressList.Items; got %d; want %d", len(igs.Items), 1)
}
ig := igs.Items[0]
// Check ig.appendTargetLabels()
labelss := ig.appendTargetLabels(nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
expectedResourceVersion := "351452"
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Ingress).appendTargetLabels(nil)
})
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "foobar",

View File

@ -37,16 +37,26 @@ type Selector struct {
Field string `yaml:"field"`
}
// StartWatchOnce returns init labels for the given sdc and baseDir.
// and starts watching for changes.
func StartWatchOnce(watchCfg *WatchConfig, setName string, sdc *SDConfig, baseDir string) ([]map[string]string, error) {
cfg, err := getAPIConfig(watchCfg, setName, sdc, baseDir)
// GetLabels returns labels for the given sdc and baseDir.
func (sdc *SDConfig) GetLabels(baseDir string) ([]map[string]string, error) {
cfg, err := getAPIConfig(sdc, baseDir)
if err != nil {
return nil, fmt.Errorf("cannot create API config: %w", err)
}
var ms []map[string]string
cfg.watchOnce.Do(func() {
ms = startWatcherByRole(watchCfg.Ctx, sdc.Role, cfg, watchCfg.SC)
})
return ms, nil
switch sdc.Role {
case "node":
return getNodesLabels(cfg), nil
case "pod":
return getPodsLabels(cfg), nil
case "service":
return getServicesLabels(cfg), nil
case "endpoints":
return getEndpointsLabels(cfg), nil
case "endpointslices":
return getEndpointSlicesLabels(cfg), nil
case "ingress":
return getIngressesLabels(cfg), nil
default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`; skipping it", sdc.Role)
}
}

View File

@ -4,16 +4,58 @@ import (
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getNodesLabels returns labels for k8s nodes obtained from the given cfg
func getNodesLabels(cfg *apiConfig) []map[string]string {
nodes := getNodes(cfg)
var ms []map[string]string
for _, n := range nodes {
ms = n.appendTargetLabels(ms)
}
return ms
}
func getNodes(cfg *apiConfig) []*Node {
os := cfg.aw.getObjectsByRole("node")
ns := make([]*Node, len(os))
for i, o := range os {
ns[i] = o.(*Node)
}
return ns
}
func (n *Node) key() string {
return n.Metadata.key()
}
func parseNodeList(data []byte) (map[string]object, ListMeta, error) {
var nl NodeList
if err := json.Unmarshal(data, &nl); err != nil {
return nil, nl.Metadata, fmt.Errorf("cannot unmarshal NodeList from %q: %w", data, err)
}
objectsByKey := make(map[string]object)
for _, n := range nl.Items {
objectsByKey[n.key()] = n
}
return objectsByKey, nl.Metadata, nil
}
func parseNode(data []byte) (object, error) {
var n Node
if err := json.Unmarshal(data, &n); err != nil {
return nil, err
}
return &n, nil
}
// NodeList represents NodeList from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodelist-v1-core
type NodeList struct {
Items []Node
Metadata listMetadata `json:"metadata"`
Metadata ListMeta
Items []*Node
}
// Node represents Node from k8s API.
@ -24,10 +66,6 @@ type Node struct {
Status NodeStatus
}
func (n Node) key() string {
return n.Metadata.Name
}
// NodeStatus represents NodeStatus from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodestatus-v1-core
@ -51,15 +89,6 @@ type NodeDaemonEndpoints struct {
KubeletEndpoint DaemonEndpoint
}
// parseNodeList parses NodeList from data.
func parseNodeList(data []byte) (*NodeList, error) {
var nl NodeList
if err := json.Unmarshal(data, &nl); err != nil {
return nil, fmt.Errorf("cannot unmarshal NodeList from %q: %w", data, err)
}
return &nl, nil
}
// appendTargetLabels appends labels for the given Node n to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node
@ -119,24 +148,3 @@ func getAddrByType(nas []NodeAddress, typ string) string {
}
return ""
}
func processNode(cfg *apiConfig, n *Node, action string) {
key := buildSyncKey("nodes", cfg.setName, n.key())
switch action {
case "ADDED", "MODIFIED":
lbs := n.appendTargetLabels(nil)
cfg.targetChan <- SyncEvent{
Labels: lbs,
ConfigSectionSet: cfg.setName,
Key: key,
}
case "DELETED":
cfg.targetChan <- SyncEvent{
ConfigSectionSet: cfg.setName,
Key: key,
}
case "ERROR":
default:
logger.Warnf("unexpected action: %s", action)
}
}

View File

@ -4,18 +4,19 @@ import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func TestParseNodeListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
nls, err := parseNodeList([]byte(s))
objectsByKey, _, err := parseNodeList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if nls != nil {
t.Fatalf("unexpected non-nil NodeList: %v", nls)
if len(objectsByKey) != 0 {
t.Fatalf("unexpected non-empty objectsByKey: %v", objectsByKey)
}
}
f(``)
@ -226,59 +227,19 @@ func TestParseNodeListSuccess(t *testing.T) {
]
}
`
nls, err := parseNodeList([]byte(data))
objectsByKey, meta, err := parseNodeList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(nls.Items) != 1 {
t.Fatalf("unexpected length of NodeList.Items; got %d; want %d", len(nls.Items), 1)
expectedResourceVersion := "22627"
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
node := nls.Items[0]
meta := node.Metadata
if meta.Name != "m01" {
t.Fatalf("unexpected ObjectMeta.Name; got %q; want %q", meta.Name, "m01")
}
expectedLabels := discoveryutils.GetSortedLabels(map[string]string{
"beta.kubernetes.io/arch": "amd64",
"beta.kubernetes.io/os": "linux",
"kubernetes.io/arch": "amd64",
"kubernetes.io/hostname": "m01",
"kubernetes.io/os": "linux",
"minikube.k8s.io/commit": "eb13446e786c9ef70cb0a9f85a633194e62396a1",
"minikube.k8s.io/name": "minikube",
"minikube.k8s.io/updated_at": "2020_03_16T22_44_27_0700",
"minikube.k8s.io/version": "v1.8.2",
"node-role.kubernetes.io/master": "",
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Node).appendTargetLabels(nil)
})
if !reflect.DeepEqual(meta.Labels, expectedLabels) {
t.Fatalf("unexpected ObjectMeta.Labels\ngot\n%v\nwant\n%v", meta.Labels, expectedLabels)
}
expectedAnnotations := discoveryutils.GetSortedLabels(map[string]string{
"kubeadm.alpha.kubernetes.io/cri-socket": "/var/run/dockershim.sock",
"node.alpha.kubernetes.io/ttl": "0",
"volumes.kubernetes.io/controller-managed-attach-detach": "true",
})
if !reflect.DeepEqual(meta.Annotations, expectedAnnotations) {
t.Fatalf("unexpected ObjectMeta.Annotations\ngot\n%v\nwant\n%v", meta.Annotations, expectedAnnotations)
}
status := node.Status
expectedAddresses := []NodeAddress{
{
Type: "InternalIP",
Address: "172.17.0.2",
},
{
Type: "Hostname",
Address: "m01",
},
}
if !reflect.DeepEqual(status.Addresses, expectedAddresses) {
t.Fatalf("unexpected addresses\ngot\n%v\nwant\n%v", status.Addresses, expectedAddresses)
}
// Check node.appendTargetLabels()
labels := discoveryutils.GetSortedLabels(node.appendTargetLabels(nil)[0])
expectedLabels = discoveryutils.GetSortedLabels(map[string]string{
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"instance": "m01",
"__address__": "172.17.0.2:10250",
"__meta_kubernetes_node_name": "m01",
@ -315,8 +276,20 @@ func TestParseNodeListSuccess(t *testing.T) {
"__meta_kubernetes_node_address_InternalIP": "172.17.0.2",
"__meta_kubernetes_node_address_Hostname": "m01",
})
if !reflect.DeepEqual(labels, expectedLabels) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", labels, expectedLabels)
}),
}
if !reflect.DeepEqual(sortedLabelss, expectedLabelss) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss)
}
}
func getSortedLabelss(objectsByKey map[string]object, getLabelss func(o object) []map[string]string) [][]prompbmarshal.Label {
var result [][]prompbmarshal.Label
for _, o := range objectsByKey {
labelss := getLabelss(o)
for _, labels := range labelss {
result = append(result, discoveryutils.GetSortedLabels(labels))
}
}
return result
}

View File

@ -6,16 +6,58 @@ import (
"strconv"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getPodsLabels returns labels for k8s pods obtained from the given cfg
func getPodsLabels(cfg *apiConfig) []map[string]string {
pods := getPods(cfg)
var ms []map[string]string
for _, p := range pods {
ms = p.appendTargetLabels(ms)
}
return ms
}
func getPods(cfg *apiConfig) []*Pod {
os := cfg.aw.getObjectsByRole("pod")
ps := make([]*Pod, len(os))
for i, o := range os {
ps[i] = o.(*Pod)
}
return ps
}
func (p *Pod) key() string {
return p.Metadata.key()
}
func parsePodList(data []byte) (map[string]object, ListMeta, error) {
var pl PodList
if err := json.Unmarshal(data, &pl); err != nil {
return nil, pl.Metadata, fmt.Errorf("cannot unmarshal PodList from %q: %w", data, err)
}
objectsByKey := make(map[string]object)
for _, p := range pl.Items {
objectsByKey[p.key()] = p
}
return objectsByKey, pl.Metadata, nil
}
func parsePod(data []byte) (object, error) {
var p Pod
if err := json.Unmarshal(data, &p); err != nil {
return nil, err
}
return &p, nil
}
// PodList implements k8s pod list.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podlist-v1-core
type PodList struct {
Items []Pod
Metadata listMetadata `json:"metadata"`
Metadata ListMeta
Items []*Pod
}
// Pod implements k8s pod.
@ -27,10 +69,6 @@ type Pod struct {
Status PodStatus
}
func (p Pod) key() string {
return p.Metadata.Namespace + "/" + p.Metadata.Name
}
// PodSpec implements k8s pod spec.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podspec-v1-core
@ -73,15 +111,6 @@ type PodCondition struct {
Status string
}
// parsePodList parses PodList from data.
func parsePodList(data []byte) (*PodList, error) {
var pl PodList
if err := json.Unmarshal(data, &pl); err != nil {
return nil, fmt.Errorf("cannot unmarshal PodList from %q: %w", data, err)
}
return &pl, nil
}
// appendTargetLabels appends labels for each port of the given Pod p to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#pod
@ -169,23 +198,3 @@ func getPodReadyStatus(conds []PodCondition) string {
}
return "unknown"
}
func processPods(cfg *apiConfig, p *Pod, action string) {
key := buildSyncKey("pods", cfg.setName, p.key())
switch action {
case "ADDED", "MODIFIED":
cfg.targetChan <- SyncEvent{
Labels: p.appendTargetLabels(nil),
Key: key,
ConfigSectionSet: cfg.setName,
}
case "DELETED":
cfg.targetChan <- SyncEvent{
Key: key,
ConfigSectionSet: cfg.setName,
}
case "ERROR":
default:
logger.Warnf("unexpected action: %s", action)
}
}

View File

@ -11,12 +11,12 @@ import (
func TestParsePodListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
nls, err := parsePodList([]byte(s))
objectsByKey, _, err := parsePodList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if nls != nil {
t.Fatalf("unexpected non-nil PodList: %v", nls)
if len(objectsByKey) != 0 {
t.Fatalf("unexpected non-empty objectsByKey: %v", objectsByKey)
}
}
f(``)
@ -228,22 +228,18 @@ func TestParsePodListSuccess(t *testing.T) {
]
}
`
pls, err := parsePodList([]byte(data))
objectsByKey, meta, err := parsePodList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(pls.Items) != 1 {
t.Fatalf("unexpected length of PodList.Items; got %d; want %d", len(pls.Items), 1)
expectedResourceVersion := "72425"
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
pod := pls.Items[0]
// Check pod.appendTargetLabels()
labelss := pod.appendTargetLabels(nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
expectedLabels := [][]prompbmarshal.Label{
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Pod).appendTargetLabels(nil)
})
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.17.0.2:1234",
@ -280,7 +276,7 @@ func TestParsePodListSuccess(t *testing.T) {
"__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_source": "true",
}),
}
if !reflect.DeepEqual(sortedLabelss, expectedLabels) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabels)
if !reflect.DeepEqual(sortedLabelss, expectedLabelss) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss)
}
}

View File

@ -4,16 +4,58 @@ import (
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getServicesLabels returns labels for k8s services obtained from the given cfg
func getServicesLabels(cfg *apiConfig) []map[string]string {
svcs := getServices(cfg)
var ms []map[string]string
for _, svc := range svcs {
ms = svc.appendTargetLabels(ms)
}
return ms
}
func getServices(cfg *apiConfig) []*Service {
os := cfg.aw.getObjectsByRole("service")
svcs := make([]*Service, len(os))
for i, o := range os {
svcs[i] = o.(*Service)
}
return svcs
}
func (svc *Service) key() string {
return svc.Metadata.key()
}
func parseServiceList(data []byte) (map[string]object, ListMeta, error) {
var sl ServiceList
if err := json.Unmarshal(data, &sl); err != nil {
return nil, sl.Metadata, fmt.Errorf("cannot unmarshal ServiceList from %q: %w", data, err)
}
objectsByKey := make(map[string]object)
for _, svc := range sl.Items {
objectsByKey[svc.key()] = svc
}
return objectsByKey, sl.Metadata, nil
}
func parseService(data []byte) (object, error) {
var svc Service
if err := json.Unmarshal(data, &svc); err != nil {
return nil, err
}
return &svc, nil
}
// ServiceList is k8s service list.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicelist-v1-core
type ServiceList struct {
Items []Service
Metadata listMetadata `json:"metadata"`
Metadata ListMeta
Items []*Service
}
// Service is k8s service.
@ -24,10 +66,6 @@ type Service struct {
Spec ServiceSpec
}
func (s Service) key() string {
return s.Metadata.Namespace + "/" + s.Metadata.Name
}
// ServiceSpec is k8s service spec.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicespec-v1-core
@ -47,15 +85,6 @@ type ServicePort struct {
Port int
}
// parseServiceList parses ServiceList from data.
func parseServiceList(data []byte) (*ServiceList, error) {
var sl ServiceList
if err := json.Unmarshal(data, &sl); err != nil {
return nil, fmt.Errorf("cannot unmarshal ServiceList from %q: %w", data, err)
}
return &sl, nil
}
// appendTargetLabels appends labels for each port of the given Service s to ms and returns the result.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service
@ -85,23 +114,3 @@ func (s *Service) appendCommonLabels(m map[string]string) {
}
s.Metadata.registerLabelsAndAnnotations("__meta_kubernetes_service", m)
}
func processService(cfg *apiConfig, svc *Service, action string) {
key := buildSyncKey("service", cfg.setName, svc.key())
switch action {
case "ADDED", "MODIFIED":
cfg.targetChan <- SyncEvent{
Labels: svc.appendTargetLabels(nil),
Key: key,
ConfigSectionSet: cfg.setName,
}
case "DELETED":
cfg.targetChan <- SyncEvent{
Key: key,
ConfigSectionSet: cfg.setName,
}
case "ERROR":
default:
logger.Warnf("unexpected action: %s", action)
}
}

View File

@ -11,12 +11,12 @@ import (
func TestParseServiceListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
nls, err := parseServiceList([]byte(s))
objectsByKey, _, err := parseServiceList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if nls != nil {
t.Fatalf("unexpected non-nil ServiceList: %v", nls)
if len(objectsByKey) != 0 {
t.Fatalf("unexpected non-empty objectsByKey: %v", objectsByKey)
}
}
f(``)
@ -89,68 +89,17 @@ func TestParseServiceListSuccess(t *testing.T) {
]
}
`
sls, err := parseServiceList([]byte(data))
objectsByKey, meta, err := parseServiceList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(sls.Items) != 1 {
t.Fatalf("unexpected length of ServiceList.Items; got %d; want %d", len(sls.Items), 1)
expectedResourceVersion := "60485"
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
service := sls.Items[0]
meta := service.Metadata
if meta.Name != "kube-dns" {
t.Fatalf("unexpected ObjectMeta.Name; got %q; want %q", meta.Name, "kube-dns")
}
expectedLabels := discoveryutils.GetSortedLabels(map[string]string{
"k8s-app": "kube-dns",
"kubernetes.io/cluster-service": "true",
"kubernetes.io/name": "KubeDNS",
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Service).appendTargetLabels(nil)
})
if !reflect.DeepEqual(meta.Labels, expectedLabels) {
t.Fatalf("unexpected ObjectMeta.Labels\ngot\n%v\nwant\n%v", meta.Labels, expectedLabels)
}
expectedAnnotations := discoveryutils.GetSortedLabels(map[string]string{
"prometheus.io/port": "9153",
"prometheus.io/scrape": "true",
})
if !reflect.DeepEqual(meta.Annotations, expectedAnnotations) {
t.Fatalf("unexpected ObjectMeta.Annotations\ngot\n%v\nwant\n%v", meta.Annotations, expectedAnnotations)
}
spec := service.Spec
expectedClusterIP := "10.96.0.10"
if spec.ClusterIP != expectedClusterIP {
t.Fatalf("unexpected clusterIP; got %q; want %q", spec.ClusterIP, expectedClusterIP)
}
if spec.Type != "ClusterIP" {
t.Fatalf("unexpected type; got %q; want %q", spec.Type, "ClusterIP")
}
expectedPorts := []ServicePort{
{
Name: "dns",
Protocol: "UDP",
Port: 53,
},
{
Name: "dns-tcp",
Protocol: "TCP",
Port: 53,
},
{
Name: "metrics",
Protocol: "TCP",
Port: 9153,
},
}
if !reflect.DeepEqual(spec.Ports, expectedPorts) {
t.Fatalf("unexpected ports\ngot\n%v\nwant\n%v", spec.Ports, expectedPorts)
}
// Check service.appendTargetLabels()
labelss := service.appendTargetLabels(nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "kube-dns.kube-system.svc:53",

View File

@ -1,76 +0,0 @@
package kubernetes
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// SharedKubernetesCache holds cache of kubernetes objects for current config.
type SharedKubernetesCache struct {
Endpoints *sync.Map
EndpointsSlices *sync.Map
Pods *sync.Map
Services *sync.Map
}
// NewSharedKubernetesCache returns new cache.
func NewSharedKubernetesCache() *SharedKubernetesCache {
return &SharedKubernetesCache{
Endpoints: new(sync.Map),
EndpointsSlices: new(sync.Map),
Pods: new(sync.Map),
Services: new(sync.Map),
}
}
func updatePodCache(cache *sync.Map, p *Pod, action string) {
switch action {
case "ADDED":
cache.Store(p.key(), p)
case "DELETED":
cache.Delete(p.key())
case "MODIFIED":
cache.Store(p.key(), p)
case "ERROR":
default:
logger.Warnf("unexpected action: %s", action)
}
}
func updateServiceCache(cache *sync.Map, p *Service, action string) {
switch action {
case "ADDED", "MODIFIED":
cache.Store(p.key(), p)
case "DELETED":
cache.Delete(p.key())
case "ERROR":
default:
logger.Warnf("unexpected action: %s", action)
}
}
func updateEndpointsCache(cache *sync.Map, p *Endpoints, action string) {
switch action {
case "ADDED", "MODIFIED":
cache.Store(p.key(), p)
case "DELETED":
cache.Delete(p.key())
case "ERROR":
default:
logger.Warnf("unexpected action: %s", action)
}
}
func updateEndpointsSliceCache(cache *sync.Map, p *EndpointSlice, action string) {
switch action {
case "ADDED", "MODIFIED":
cache.Store(p.key(), p)
case "DELETED":
cache.Delete(p.key())
case "ERROR":
default:
logger.Infof("unexpected action: %s", action)
}
}

View File

@ -1,510 +0,0 @@
package kubernetes
import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
// SyncEvent represent kubernetes resource watch event.
type SyncEvent struct {
// object type + set name + ns + name
// must be unique.
Key string
// Labels targets labels for given resource
Labels []map[string]string
// job name + position id
ConfigSectionSet string
}
type watchResponse struct {
Action string `json:"type"`
Object json.RawMessage `json:"object"`
}
// WatchConfig holds objects for watch handler start.
type WatchConfig struct {
Ctx context.Context
SC *SharedKubernetesCache
WG *sync.WaitGroup
WatchChan chan SyncEvent
}
// NewWatchConfig returns new config with given context.
func NewWatchConfig(ctx context.Context) *WatchConfig {
return &WatchConfig{
Ctx: ctx,
SC: NewSharedKubernetesCache(),
WG: new(sync.WaitGroup),
WatchChan: make(chan SyncEvent, 100),
}
}
func buildSyncKey(objType string, setName string, objKey string) string {
return objType + "/" + setName + "/" + objKey
}
func startWatcherByRole(ctx context.Context, role string, cfg *apiConfig, sc *SharedKubernetesCache) []map[string]string {
var ms []map[string]string
switch role {
case "pod":
startWatchForObject(ctx, cfg, "pods", func(wr *watchResponse) {
var p Pod
if err := json.Unmarshal(wr.Object, &p); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
processPods(cfg, &p, wr.Action)
}, func(bytes []byte) (string, error) {
pods, err := parsePodList(bytes)
if err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return "", err
}
for _, pod := range pods.Items {
ms = pod.appendTargetLabels(ms)
processPods(cfg, &pod, "ADDED")
}
return pods.Metadata.ResourceVersion, nil
})
case "node":
startWatchForObject(ctx, cfg, "nodes", func(wr *watchResponse) {
var n Node
if err := json.Unmarshal(wr.Object, &n); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
processNode(cfg, &n, wr.Action)
}, func(bytes []byte) (string, error) {
nodes, err := parseNodeList(bytes)
if err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return "", err
}
for _, node := range nodes.Items {
processNode(cfg, &node, "ADDED")
ms = node.appendTargetLabels(ms)
}
return nodes.Metadata.ResourceVersion, nil
})
case "endpoints":
startWatchForObject(ctx, cfg, "pods", func(wr *watchResponse) {
var p Pod
if err := json.Unmarshal(wr.Object, &p); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
updatePodCache(sc.Pods, &p, wr.Action)
if wr.Action == "MODIFIED" {
eps, ok := sc.Endpoints.Load(p.key())
if ok {
ep := eps.(*Endpoints)
processEndpoints(cfg, sc, ep, wr.Action)
}
}
}, func(bytes []byte) (string, error) {
pods, err := parsePodList(bytes)
if err != nil {
return "", err
}
for _, pod := range pods.Items {
updatePodCache(sc.Pods, &pod, "ADDED")
}
return pods.Metadata.ResourceVersion, nil
})
startWatchForObject(ctx, cfg, "services", func(wr *watchResponse) {
var svc Service
if err := json.Unmarshal(wr.Object, &svc); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
updateServiceCache(sc.Services, &svc, wr.Action)
if wr.Action == "MODIFIED" {
linkedEps, ok := sc.Endpoints.Load(svc.key())
if ok {
ep := linkedEps.(*Endpoints)
processEndpoints(cfg, sc, ep, wr.Action)
}
}
}, func(bytes []byte) (string, error) {
svcs, err := parseServiceList(bytes)
if err != nil {
return "", err
}
for _, svc := range svcs.Items {
updateServiceCache(sc.Services, &svc, "ADDED")
}
return svcs.Metadata.ResourceVersion, nil
})
startWatchForObject(ctx, cfg, "endpoints", func(wr *watchResponse) {
var eps Endpoints
if err := json.Unmarshal(wr.Object, &eps); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
processEndpoints(cfg, sc, &eps, wr.Action)
updateEndpointsCache(sc.Endpoints, &eps, wr.Action)
}, func(bytes []byte) (string, error) {
eps, err := parseEndpointsList(bytes)
if err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return "", err
}
for _, ep := range eps.Items {
ms = ep.appendTargetLabels(ms, sc.Pods, sc.Services)
processEndpoints(cfg, sc, &ep, "ADDED")
updateEndpointsCache(sc.Endpoints, &ep, "ADDED")
}
return eps.Metadata.ResourceVersion, nil
})
case "service":
startWatchForObject(ctx, cfg, "services", func(wr *watchResponse) {
var svc Service
if err := json.Unmarshal(wr.Object, &svc); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
processService(cfg, &svc, wr.Action)
}, func(bytes []byte) (string, error) {
svcs, err := parseServiceList(bytes)
if err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return "", err
}
for _, svc := range svcs.Items {
processService(cfg, &svc, "ADDED")
ms = svc.appendTargetLabels(ms)
}
return svcs.Metadata.ResourceVersion, nil
})
case "ingress":
startWatchForObject(ctx, cfg, "ingresses", func(wr *watchResponse) {
var ig Ingress
if err := json.Unmarshal(wr.Object, &ig); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
processIngress(cfg, &ig, wr.Action)
}, func(bytes []byte) (string, error) {
igs, err := parseIngressList(bytes)
if err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return "", err
}
for _, ig := range igs.Items {
processIngress(cfg, &ig, "ADDED")
ms = ig.appendTargetLabels(ms)
}
return igs.Metadata.ResourceVersion, nil
})
case "endpointslices":
startWatchForObject(ctx, cfg, "pods", func(wr *watchResponse) {
var p Pod
if err := json.Unmarshal(wr.Object, &p); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
updatePodCache(sc.Pods, &p, wr.Action)
if wr.Action == "MODIFIED" {
eps, ok := sc.EndpointsSlices.Load(p.key())
if ok {
ep := eps.(*EndpointSlice)
processEndpointSlices(cfg, sc, ep, wr.Action)
}
}
}, func(bytes []byte) (string, error) {
pods, err := parsePodList(bytes)
if err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return "", err
}
for _, pod := range pods.Items {
updatePodCache(sc.Pods, &pod, "ADDED")
}
return pods.Metadata.ResourceVersion, nil
})
startWatchForObject(ctx, cfg, "services", func(wr *watchResponse) {
var svc Service
if err := json.Unmarshal(wr.Object, &svc); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
updateServiceCache(sc.Services, &svc, wr.Action)
if wr.Action == "MODIFIED" {
linkedEps, ok := sc.EndpointsSlices.Load(svc.key())
if ok {
ep := linkedEps.(*EndpointSlice)
processEndpointSlices(cfg, sc, ep, wr.Action)
}
}
}, func(bytes []byte) (string, error) {
svcs, err := parseServiceList(bytes)
if err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return "", err
}
for _, svc := range svcs.Items {
updateServiceCache(sc.Services, &svc, "ADDED")
}
return svcs.Metadata.ResourceVersion, nil
})
startWatchForObject(ctx, cfg, "endpointslices", func(wr *watchResponse) {
var eps EndpointSlice
if err := json.Unmarshal(wr.Object, &eps); err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return
}
processEndpointSlices(cfg, sc, &eps, wr.Action)
updateEndpointsSliceCache(sc.EndpointsSlices, &eps, wr.Action)
}, func(bytes []byte) (string, error) {
epss, err := parseEndpointSlicesList(bytes)
if err != nil {
logger.Errorf("failed to parse object, err: %v", err)
return "", err
}
for _, eps := range epss.Items {
ms = eps.appendTargetLabels(ms, sc.Pods, sc.Services)
processEndpointSlices(cfg, sc, &eps, "ADDED")
}
return epss.Metadata.ResourceVersion, nil
})
default:
logger.Errorf("unexpected role: %s", role)
}
return ms
}
func startWatchForObject(ctx context.Context, cfg *apiConfig, objectName string, wh func(wr *watchResponse), getSync func([]byte) (string, error)) {
if len(cfg.namespaces) > 0 {
for _, ns := range cfg.namespaces {
path := fmt.Sprintf("/api/v1/namespaces/%s/%s", ns, objectName)
// special case.
if objectName == "endpointslices" {
path = fmt.Sprintf("/apis/discovery.k8s.io/v1beta1/namespaces/%s/%s", ns, objectName)
}
query := joinSelectors(objectName, nil, cfg.selectors)
if len(query) > 0 {
path += "?" + query
}
data, err := cfg.wc.getBlockingAPIResponse(path)
if err != nil {
logger.Errorf("cannot get latest resource version: %v", err)
}
version, err := getSync(data)
if err != nil {
logger.Errorf("cannot get latest resource version: %v", err)
}
cfg.wc.wg.Add(1)
go func(path, version string) {
cfg.wc.startWatchForResource(ctx, path, wh, version)
}(path, version)
}
} else {
path := "/api/v1/" + objectName
if objectName == "endpointslices" {
// special case.
path = fmt.Sprintf("/apis/discovery.k8s.io/v1beta1/%s", objectName)
}
query := joinSelectors(objectName, nil, cfg.selectors)
if len(query) > 0 {
path += "?" + query
}
data, err := cfg.wc.getBlockingAPIResponse(path)
if err != nil {
logger.Errorf("cannot get latest resource version: %v", err)
}
version, err := getSync(data)
if err != nil {
logger.Errorf("cannot get latest resource version: %v", err)
}
cfg.wc.wg.Add(1)
go func() {
cfg.wc.startWatchForResource(ctx, path, wh, version)
}()
}
}
type watchClient struct {
c *http.Client
ac *promauth.Config
apiServer string
wg *sync.WaitGroup
}
func (wc *watchClient) startWatchForResource(ctx context.Context, path string, wh func(wr *watchResponse), initResourceVersion string) {
defer wc.wg.Done()
path += "?watch=1"
maxBackOff := time.Second * 30
backoff := time.Second
for {
err := wc.getStreamAPIResponse(ctx, path, initResourceVersion, wh)
if errors.Is(err, context.Canceled) {
return
}
if !errors.Is(err, io.EOF) {
logger.Errorf("got unexpected error : %v", err)
}
// reset version.
initResourceVersion = ""
if backoff < maxBackOff {
backoff += time.Second * 5
}
time.Sleep(backoff)
}
}
func (wc *watchClient) getBlockingAPIResponse(path string) ([]byte, error) {
req, err := http.NewRequest("GET", wc.apiServer+path, nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept-Encoding", "gzip")
if wc.ac != nil && wc.ac.Authorization != "" {
req.Header.Set("Authorization", wc.ac.Authorization)
}
resp, err := wc.c.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("get unexpected code: %d, at blocking api request path: %q", resp.StatusCode, path)
}
if ce := resp.Header.Get("Content-Encoding"); ce == "gzip" {
gr, err := gzip.NewReader(resp.Body)
if err != nil {
return nil, fmt.Errorf("cannot create gzip reader: %w", err)
}
return ioutil.ReadAll(gr)
}
return ioutil.ReadAll(resp.Body)
}
func (wc *watchClient) getStreamAPIResponse(ctx context.Context, path, resouceVersion string, wh func(wr *watchResponse)) error {
if resouceVersion != "" {
path += "&resourceVersion=" + resouceVersion
}
req, err := http.NewRequestWithContext(ctx, "GET", wc.apiServer+path, nil)
if err != nil {
return err
}
req.Header.Set("Accept-Encoding", "gzip")
if wc.ac != nil && wc.ac.Authorization != "" {
req.Header.Set("Authorization", wc.ac.Authorization)
}
resp, err := wc.c.Do(req)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
br := resp.Body
if ce := resp.Header.Get("Content-Encoding"); ce == "gzip" {
br, err = gzip.NewReader(resp.Body)
if err != nil {
return fmt.Errorf("cannot create gzip reader: %w", err)
}
}
r := newJSONFramedReader(br)
for {
b := make([]byte, 1024)
b, err := readJSONObject(r, b)
if err != nil {
return err
}
var rObject watchResponse
err = json.Unmarshal(b, &rObject)
if err != nil {
logger.Errorf("failed to parse watch api response as json, err %v, response: %v", err, string(b))
continue
}
wh(&rObject)
}
}
func readJSONObject(r io.Reader, b []byte) ([]byte, error) {
offset := 0
for {
n, err := r.Read(b[offset:])
if err == io.ErrShortBuffer {
if n == 0 {
return nil, fmt.Errorf("got short buffer with n=0, cap=%d", cap(b))
}
// double buffer..
b = bytesutil.Resize(b, len(b)*2)
offset += n
continue
}
if err != nil {
return nil, err
}
offset += n
break
}
return b[:offset], nil
}
func newWatchClient(wg *sync.WaitGroup, sdc *SDConfig, baseDir string) (*watchClient, error) {
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %w", err)
}
apiServer := sdc.APIServer
if len(apiServer) == 0 {
// Assume we run at k8s pod.
// Discover apiServer and auth config according to k8s docs.
// See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller
host := os.Getenv("KUBERNETES_SERVICE_HOST")
port := os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 {
return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " +
"probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?")
}
if len(port) == 0 {
return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_PORT env var; it must be defined when running in k8s; "+
"KUBERNETES_SERVICE_HOST=%q", host)
}
apiServer = "https://" + net.JoinHostPort(host, port)
tlsConfig := promauth.TLSConfig{
CAFile: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
}
acNew, err := promauth.NewConfig(".", nil, "", "/var/run/secrets/kubernetes.io/serviceaccount/token", &tlsConfig)
if err != nil {
return nil, fmt.Errorf("cannot initialize service account auth: %w; probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?", err)
}
ac = acNew
}
var proxy func(*http.Request) (*url.URL, error)
if proxyURL := sdc.ProxyURL.URL(); proxyURL != nil {
proxy = http.ProxyURL(proxyURL)
}
c := &http.Client{
Transport: &http.Transport{
TLSClientConfig: ac.NewTLSConfig(),
Proxy: proxy,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 2 * time.Minute,
},
}
wc := watchClient{
c: c,
apiServer: apiServer,
ac: ac,
wg: wg,
}
return &wc, nil
}

View File

@ -23,7 +23,6 @@ var (
kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+
"This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details")
openstackSDCheckInterval = flag.Duration("promscrape.openstackSDCheckInterval", 30*time.Second, "Interval for checking for changes in openstack API server. "+
"This works only if `openstack_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config for details")
@ -98,9 +97,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
scs := newScrapeConfigs(pushData)
scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork {
return getKubernetesSDScrapeWorkStream(cfg, swsPrev)
})
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) })

View File

@ -1,83 +0,0 @@
package promscrape
import (
"context"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
)
type kubernetesWatchHandler struct {
ctx context.Context
cancel context.CancelFunc
startOnce sync.Once
watchCfg *kubernetes.WatchConfig
// guards cache and set
mu sync.Mutex
lastAccessTime time.Time
swCache map[string][]*ScrapeWork
sdcSet map[string]*scrapeWorkConfig
}
func newKubernetesWatchHandler() *kubernetesWatchHandler {
ctx, cancel := context.WithCancel(context.Background())
kwh := &kubernetesWatchHandler{
ctx: ctx,
cancel: cancel,
swCache: map[string][]*ScrapeWork{},
sdcSet: map[string]*scrapeWorkConfig{},
watchCfg: kubernetes.NewWatchConfig(ctx),
}
go kwh.waitForStop()
return kwh
}
func (ksc *kubernetesWatchHandler) waitForStop() {
t := time.NewTicker(time.Second * 5)
for range t.C {
ksc.mu.Lock()
lastTime := time.Since(ksc.lastAccessTime)
ksc.mu.Unlock()
if lastTime > *kubernetesSDCheckInterval*30 {
t1 := time.Now()
ksc.cancel()
ksc.watchCfg.WG.Wait()
close(ksc.watchCfg.WatchChan)
logger.Infof("stopped kubernetes api watcher handler, after: %.3f seconds", time.Since(t1).Seconds())
ksc.watchCfg.SC = nil
t.Stop()
return
}
}
}
func processKubernetesSyncEvents(cfg *Config) {
for {
select {
case <-cfg.kwh.ctx.Done():
return
case se, ok := <-cfg.kwh.watchCfg.WatchChan:
if !ok {
return
}
if se.Labels == nil {
cfg.kwh.mu.Lock()
delete(cfg.kwh.swCache, se.Key)
cfg.kwh.mu.Unlock()
continue
}
cfg.kwh.mu.Lock()
swc, ok := cfg.kwh.sdcSet[se.ConfigSectionSet]
cfg.kwh.mu.Unlock()
if !ok {
logger.Fatalf("bug config section not found: %v", se.ConfigSectionSet)
}
ms := appendScrapeWorkForTargetLabels(nil, swc, se.Labels, "kubernetes_sd_config")
cfg.kwh.mu.Lock()
cfg.kwh.swCache[se.Key] = ms
cfg.kwh.mu.Unlock()
}
}
}