2021-03-05 11:36:05 +01:00
package kubernetes
import (
"encoding/json"
"errors"
2021-03-11 15:41:09 +01:00
"flag"
2021-03-05 11:36:05 +01:00
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2021-03-11 15:41:09 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2021-03-05 11:36:05 +01:00
"github.com/VictoriaMetrics/metrics"
)
2021-03-11 15:41:09 +01:00
var apiServerTimeout = flag . Duration ( "promscrape.kubernetes.apiServerTimeout" , 30 * time . Minute , "How frequently to reload the full state from Kuberntes API server" )
2021-03-05 11:36:05 +01:00
// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
type WatchEvent struct {
Type string
Object json . RawMessage
}
// object is any Kubernetes object.
type object interface {
2021-04-02 13:45:08 +02:00
key ( ) string
2021-03-14 20:10:35 +01:00
getTargetLabels ( gw * groupWatcher ) [ ] map [ string ] string
2021-03-05 11:36:05 +01:00
}
// parseObjectFunc must parse object from the given data.
type parseObjectFunc func ( data [ ] byte ) ( object , error )
2021-03-11 15:41:09 +01:00
// parseObjectListFunc must parse objectList from the given r.
type parseObjectListFunc func ( r io . Reader ) ( map [ string ] object , ListMeta , error )
2021-03-05 11:36:05 +01:00
// apiWatcher is used for watching for Kuberntes object changes and caching their latest states.
type apiWatcher struct {
2021-03-14 20:10:35 +01:00
role string
2021-03-05 11:36:05 +01:00
2021-03-11 15:41:09 +01:00
// Constructor for creating ScrapeWork objects from labels
2021-03-05 11:36:05 +01:00
swcFunc ScrapeWorkConstructorFunc
2021-03-14 20:10:35 +01:00
gw * groupWatcher
2021-04-02 13:17:53 +02:00
// swos contains per-namepsace maps of ScrapeWork objects for the given apiWatcher
swosByNamespace map [ string ] map [ string ] [ ] interface { }
swosByNamespaceLock sync . Mutex
2021-03-05 11:36:05 +01:00
2021-03-14 20:10:35 +01:00
swosCount * metrics . Counter
2021-03-05 11:36:05 +01:00
}
2021-03-11 15:41:09 +01:00
func newAPIWatcher ( apiServer string , ac * promauth . Config , sdc * SDConfig , swcFunc ScrapeWorkConstructorFunc ) * apiWatcher {
2021-03-14 20:10:35 +01:00
namespaces := sdc . Namespaces . Names
selectors := sdc . Selectors
proxyURL := sdc . ProxyURL . URL ( )
gw := getGroupWatcher ( apiServer , ac , namespaces , selectors , proxyURL )
2021-03-05 11:36:05 +01:00
return & apiWatcher {
2021-04-02 13:17:53 +02:00
role : sdc . Role ,
swcFunc : swcFunc ,
gw : gw ,
swosByNamespace : make ( map [ string ] map [ string ] [ ] interface { } ) ,
swosCount : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_scrape_works { role=%q} ` , sdc . Role ) ) ,
2021-03-05 11:36:05 +01:00
}
}
2021-04-05 21:02:09 +02:00
func ( aw * apiWatcher ) mustStart ( ) {
aw . gw . startWatchersForRole ( aw . role , aw )
}
2021-03-11 15:41:09 +01:00
func ( aw * apiWatcher ) mustStop ( ) {
2021-03-14 20:10:35 +01:00
aw . gw . unsubscribeAPIWatcher ( aw )
2021-04-02 13:17:53 +02:00
aw . swosByNamespaceLock . Lock ( )
aw . swosByNamespace = make ( map [ string ] map [ string ] [ ] interface { } )
aw . swosByNamespaceLock . Unlock ( )
2021-03-11 15:41:09 +01:00
}
2021-04-02 13:45:08 +02:00
func ( aw * apiWatcher ) reloadScrapeWorks ( namespace string , swosByKey map [ string ] [ ] interface { } ) {
2021-04-02 13:17:53 +02:00
aw . swosByNamespaceLock . Lock ( )
2021-04-02 13:45:08 +02:00
aw . swosCount . Add ( len ( swosByKey ) - len ( aw . swosByNamespace [ namespace ] ) )
aw . swosByNamespace [ namespace ] = swosByKey
2021-04-02 13:17:53 +02:00
aw . swosByNamespaceLock . Unlock ( )
2021-03-11 15:41:09 +01:00
}
2021-04-02 13:45:08 +02:00
func ( aw * apiWatcher ) setScrapeWorks ( namespace , key string , labels [ ] map [ string ] string ) {
2021-04-05 12:53:26 +02:00
swos := aw . getScrapeWorkObjectsForLabels ( labels )
2021-04-02 13:17:53 +02:00
aw . swosByNamespaceLock . Lock ( )
2021-04-02 13:45:08 +02:00
swosByKey := aw . swosByNamespace [ namespace ]
if swosByKey == nil {
swosByKey = make ( map [ string ] [ ] interface { } )
aw . swosByNamespace [ namespace ] = swosByKey
2021-04-02 13:17:53 +02:00
}
2021-03-11 15:41:09 +01:00
if len ( swos ) > 0 {
2021-04-02 13:45:08 +02:00
aw . swosCount . Add ( len ( swos ) - len ( swosByKey [ key ] ) )
swosByKey [ key ] = swos
2021-03-11 15:41:09 +01:00
} else {
2021-04-02 13:45:08 +02:00
aw . swosCount . Add ( - len ( swosByKey [ key ] ) )
delete ( swosByKey , key )
2021-03-11 15:41:09 +01:00
}
2021-04-02 13:17:53 +02:00
aw . swosByNamespaceLock . Unlock ( )
2021-03-11 15:41:09 +01:00
}
2021-04-02 13:45:08 +02:00
func ( aw * apiWatcher ) removeScrapeWorks ( namespace , key string ) {
2021-04-02 13:17:53 +02:00
aw . swosByNamespaceLock . Lock ( )
2021-04-02 13:45:08 +02:00
swosByKey := aw . swosByNamespace [ namespace ]
if len ( swosByKey ) > 0 {
aw . swosCount . Add ( - len ( swosByKey [ key ] ) )
delete ( swosByKey , key )
2021-04-02 13:17:53 +02:00
}
aw . swosByNamespaceLock . Unlock ( )
2021-03-11 15:41:09 +01:00
}
2021-04-05 12:53:26 +02:00
func ( aw * apiWatcher ) getScrapeWorkObjectsForLabels ( labelss [ ] map [ string ] string ) [ ] interface { } {
2021-03-11 15:41:09 +01:00
swos := make ( [ ] interface { } , 0 , len ( labelss ) )
for _ , labels := range labelss {
2021-04-05 12:53:26 +02:00
swo := aw . swcFunc ( labels )
2021-03-11 15:41:09 +01:00
// The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
if swo != nil && ! reflect . ValueOf ( swo ) . IsNil ( ) {
swos = append ( swos , swo )
2021-03-05 11:36:05 +01:00
}
}
2021-03-11 15:41:09 +01:00
return swos
}
// getScrapeWorkObjects returns all the ScrapeWork objects for the given aw.
func ( aw * apiWatcher ) getScrapeWorkObjects ( ) [ ] interface { } {
2021-04-05 21:02:09 +02:00
aw . gw . registerPendingAPIWatchers ( )
2021-04-02 13:17:53 +02:00
aw . swosByNamespaceLock . Lock ( )
defer aw . swosByNamespaceLock . Unlock ( )
2021-03-11 15:41:09 +01:00
size := 0
2021-04-02 13:45:08 +02:00
for _ , swosByKey := range aw . swosByNamespace {
for _ , swosLocal := range swosByKey {
2021-04-02 13:17:53 +02:00
size += len ( swosLocal )
}
2021-03-11 15:41:09 +01:00
}
swos := make ( [ ] interface { } , 0 , size )
2021-04-02 13:45:08 +02:00
for _ , swosByKey := range aw . swosByNamespace {
for _ , swosLocal := range swosByKey {
2021-04-02 13:17:53 +02:00
swos = append ( swos , swosLocal ... )
}
2021-03-11 15:41:09 +01:00
}
2021-03-05 11:36:05 +01:00
return swos
}
2021-03-14 20:10:35 +01:00
// groupWatcher watches for Kubernetes objects on the given apiServer with the given namespaces,
// selectors and authorization using the given client.
type groupWatcher struct {
apiServer string
namespaces [ ] string
selectors [ ] Selector
authorization string
client * http . Client
mu sync . Mutex
m map [ string ] * urlWatcher
}
func newGroupWatcher ( apiServer string , ac * promauth . Config , namespaces [ ] string , selectors [ ] Selector , proxyURL * url . URL ) * groupWatcher {
var proxy func ( * http . Request ) ( * url . URL , error )
if proxyURL != nil {
proxy = http . ProxyURL ( proxyURL )
}
client := & http . Client {
Transport : & http . Transport {
TLSClientConfig : ac . NewTLSConfig ( ) ,
Proxy : proxy ,
TLSHandshakeTimeout : 10 * time . Second ,
IdleConnTimeout : * apiServerTimeout ,
} ,
Timeout : * apiServerTimeout ,
}
return & groupWatcher {
apiServer : apiServer ,
authorization : ac . Authorization ,
namespaces : namespaces ,
selectors : selectors ,
client : client ,
m : make ( map [ string ] * urlWatcher ) ,
}
}
func getGroupWatcher ( apiServer string , ac * promauth . Config , namespaces [ ] string , selectors [ ] Selector , proxyURL * url . URL ) * groupWatcher {
2021-04-05 21:25:31 +02:00
proxyURLStr := "<nil>"
if proxyURL != nil {
proxyURLStr = proxyURL . String ( )
}
key := fmt . Sprintf ( "apiServer=%s, namespaces=%s, selectors=%s, proxyURL=%s, authConfig=%s" ,
apiServer , namespaces , selectorsKey ( selectors ) , proxyURLStr , ac . String ( ) )
2021-03-14 20:10:35 +01:00
groupWatchersLock . Lock ( )
gw := groupWatchers [ key ]
if gw == nil {
gw = newGroupWatcher ( apiServer , ac , namespaces , selectors , proxyURL )
groupWatchers [ key ] = gw
}
groupWatchersLock . Unlock ( )
return gw
}
func selectorsKey ( selectors [ ] Selector ) string {
var sb strings . Builder
for _ , s := range selectors {
fmt . Fprintf ( & sb , "{role=%q, label=%q, field=%q}" , s . Role , s . Label , s . Field )
}
return sb . String ( )
}
var (
groupWatchersLock sync . Mutex
groupWatchers = make ( map [ string ] * groupWatcher )
_ = metrics . NewGauge ( ` vm_promscrape_discovery_kubernetes_group_watchers ` , func ( ) float64 {
groupWatchersLock . Lock ( )
n := len ( groupWatchers )
groupWatchersLock . Unlock ( )
return float64 ( n )
} )
)
2021-03-05 11:36:05 +01:00
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
2021-03-14 20:10:35 +01:00
func ( gw * groupWatcher ) getObjectByRole ( role , namespace , name string ) object {
if gw == nil {
2021-03-11 15:41:09 +01:00
// this is needed for testing
2021-03-05 11:36:05 +01:00
return nil
}
2021-04-05 19:27:23 +02:00
o := gw . getCachedObjectByRole ( role , namespace , name )
if o != nil {
// Fast path: the object has been found in the cache.
return o
}
// The object wasn't found in the cache. Try querying it directly from API server.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details.
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_direct_object_loads_total { role=%q} ` , role ) ) . Inc ( )
objectType := getObjectTypeByRole ( role )
path := getAPIPath ( objectType , namespace , "" )
path += "/" + name
requestURL := gw . apiServer + path
resp , err := gw . doRequest ( requestURL )
if err != nil {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_direct_object_load_errors_total { role=%q} ` , role ) ) . Inc ( )
logger . Errorf ( "cannot obtain data for object %s (namespace=%q, name=%q): %s" , role , namespace , name , err )
return nil
}
data , err := ioutil . ReadAll ( resp . Body )
_ = resp . Body . Close ( )
if err != nil {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_direct_object_load_errors_total { role=%q} ` , role ) ) . Inc ( )
logger . Errorf ( "cannot read response from %q: %s" , requestURL , err )
return nil
}
if resp . StatusCode != http . StatusOK {
if resp . StatusCode == http . StatusNotFound {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_direct_object_load_misses_total { role=%q} ` , role ) ) . Inc ( )
return nil
}
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_direct_object_load_errors_total { role=%q} ` , role ) ) . Inc ( )
logger . Errorf ( "unexpected status code when reading response from %q; got %d; want %d; response body: %q" , requestURL , resp . StatusCode , http . StatusOK , data )
return nil
}
parseObject , _ := getObjectParsersForRole ( role )
o , err = parseObject ( data )
if err != nil {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_direct_object_load_errors_total { role=%q} ` , role ) ) . Inc ( )
logger . Errorf ( "cannot parse object obtained from %q: %s; response body: %q" , requestURL , err , data )
return nil
}
// There is no need in storing the object in urlWatcher cache, since it should be eventually populated there by urlWatcher itself.
return o
}
func ( gw * groupWatcher ) getCachedObjectByRole ( role , namespace , name string ) object {
2021-04-02 13:45:08 +02:00
key := namespace + "/" + name
2021-03-14 20:10:35 +01:00
gw . startWatchersForRole ( role , nil )
2021-04-05 21:02:09 +02:00
uws := gw . getURLWatchers ( )
for _ , uw := range uws {
2021-03-05 11:36:05 +01:00
if uw . role != role {
2021-04-02 13:17:53 +02:00
// Role mismatch
continue
}
if uw . namespace != "" && uw . namespace != namespace {
// Namespace mismatch
2021-03-05 11:36:05 +01:00
continue
}
2021-03-11 15:41:09 +01:00
uw . mu . Lock ( )
2021-04-02 13:45:08 +02:00
o := uw . objectsByKey [ key ]
2021-03-11 15:41:09 +01:00
uw . mu . Unlock ( )
2021-03-05 11:36:05 +01:00
if o != nil {
2021-03-11 15:41:09 +01:00
return o
2021-03-05 11:36:05 +01:00
}
}
2021-03-11 15:41:09 +01:00
return nil
2021-03-05 11:36:05 +01:00
}
2021-04-23 18:01:00 +02:00
func ( gw * groupWatcher ) refreshObjectLabels ( role , namespace , key string ) {
// There is no need in starting url watcher for the given role,
// since there is no (namespace, key) object yet for this role.
// gw.startWatchersForRole(role, nil)
uws := gw . getURLWatchers ( )
for _ , uw := range uws {
if uw . role != role {
// Role mismatch
continue
}
if uw . namespace != "" && uw . namespace != namespace {
// Namespace mismatch
continue
}
var aws [ ] * apiWatcher
uw . mu . Lock ( )
o := uw . objectsByKey [ key ]
if o != nil {
aws = uw . getAPIWatchersLocked ( )
}
uw . mu . Unlock ( )
if len ( aws ) > 0 {
labels := o . getTargetLabels ( gw )
for _ , aw := range aws {
aw . setScrapeWorks ( namespace , key , labels )
}
}
}
}
2021-03-14 20:10:35 +01:00
func ( gw * groupWatcher ) startWatchersForRole ( role string , aw * apiWatcher ) {
2021-04-02 13:17:53 +02:00
paths , namespaces := getAPIPathsWithNamespaces ( role , gw . namespaces , gw . selectors )
for i , path := range paths {
2021-03-14 20:10:35 +01:00
apiURL := gw . apiServer + path
gw . mu . Lock ( )
uw := gw . m [ apiURL ]
2021-04-05 12:53:26 +02:00
needStart := uw == nil
if needStart {
2021-04-02 13:17:53 +02:00
uw = newURLWatcher ( role , namespaces [ i ] , apiURL , gw )
2021-03-14 20:10:35 +01:00
gw . m [ apiURL ] = uw
}
gw . mu . Unlock ( )
2021-04-05 12:53:26 +02:00
if needStart {
uw . reloadObjects ( )
go uw . watchForUpdates ( )
2021-03-13 14:18:47 +01:00
}
2021-04-05 21:02:09 +02:00
if aw != nil {
uw . subscribeAPIWatcher ( aw )
}
2021-03-14 20:10:35 +01:00
}
2021-03-05 11:36:05 +01:00
}
2021-03-14 20:10:35 +01:00
// doRequest performs http request to the given requestURL.
func ( gw * groupWatcher ) doRequest ( requestURL string ) ( * http . Response , error ) {
req , err := http . NewRequest ( "GET" , requestURL , nil )
if err != nil {
logger . Fatalf ( "cannot create a request for %q: %s" , requestURL , err )
}
if gw . authorization != "" {
req . Header . Set ( "Authorization" , gw . authorization )
}
return gw . client . Do ( req )
2021-03-05 11:36:05 +01:00
}
2021-04-05 21:02:09 +02:00
func ( gw * groupWatcher ) registerPendingAPIWatchers ( ) {
uws := gw . getURLWatchers ( )
for _ , uw := range uws {
uw . registerPendingAPIWatchers ( )
}
}
func ( gw * groupWatcher ) getURLWatchers ( ) [ ] * urlWatcher {
2021-03-14 20:10:35 +01:00
gw . mu . Lock ( )
2021-04-05 21:02:09 +02:00
uws := make ( [ ] * urlWatcher , 0 , len ( gw . m ) )
2021-03-14 20:10:35 +01:00
for _ , uw := range gw . m {
2021-04-05 21:02:09 +02:00
uws = append ( uws , uw )
2021-03-14 20:10:35 +01:00
}
gw . mu . Unlock ( )
2021-04-05 21:02:09 +02:00
return uws
}
func ( gw * groupWatcher ) unsubscribeAPIWatcher ( aw * apiWatcher ) {
uws := gw . getURLWatchers ( )
for _ , uw := range uws {
uw . unsubscribeAPIWatcher ( aw )
}
2021-03-14 20:10:35 +01:00
}
2021-03-05 11:36:05 +01:00
2021-04-02 13:45:08 +02:00
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
2021-03-05 11:36:05 +01:00
type urlWatcher struct {
2021-04-02 13:17:53 +02:00
role string
namespace string
apiURL string
gw * groupWatcher
2021-03-05 11:36:05 +01:00
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
2021-04-05 21:02:09 +02:00
// mu protects aws, awsPending, objectsByKey and resourceVersion
2021-03-11 15:41:09 +01:00
mu sync . Mutex
2021-04-05 21:02:09 +02:00
// awsPending contains pending apiWatcher objects, which are registered in a batch.
// Batch registering saves CPU time needed for registering big number of Kubernetes objects
// shared among big number of scrape jobs, since per-object labels are generated only once
// for all the scrape jobs (each scrape job is associated with a single apiWatcher).
// See reloadScrapeWorksForAPIWatchers for details.
awsPending map [ * apiWatcher ] struct { }
2021-03-11 15:41:09 +01:00
// aws contains registered apiWatcher objects
aws map [ * apiWatcher ] struct { }
2021-04-02 13:45:08 +02:00
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map [ string ] object
2021-03-05 16:29:54 +01:00
2021-03-07 18:50:01 +01:00
resourceVersion string
2021-03-05 11:36:05 +01:00
2021-03-26 11:28:10 +01:00
objectsCount * metrics . Counter
objectsAdded * metrics . Counter
objectsRemoved * metrics . Counter
objectsUpdated * metrics . Counter
staleResourceVersions * metrics . Counter
2021-03-05 11:36:05 +01:00
}
2021-04-02 13:17:53 +02:00
func newURLWatcher ( role , namespace , apiURL string , gw * groupWatcher ) * urlWatcher {
2021-03-11 15:41:09 +01:00
parseObject , parseObjectList := getObjectParsersForRole ( role )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_url_watchers { role=%q} ` , role ) ) . Inc ( )
uw := & urlWatcher {
2021-04-02 13:17:53 +02:00
role : role ,
namespace : namespace ,
apiURL : apiURL ,
gw : gw ,
2021-03-05 11:36:05 +01:00
parseObject : parseObject ,
parseObjectList : parseObjectList ,
2021-04-05 21:02:09 +02:00
awsPending : make ( map [ * apiWatcher ] struct { } ) ,
2021-04-02 13:45:08 +02:00
aws : make ( map [ * apiWatcher ] struct { } ) ,
objectsByKey : make ( map [ string ] object ) ,
2021-03-05 11:36:05 +01:00
2021-03-26 11:28:10 +01:00
objectsCount : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_objects { role=%q} ` , role ) ) ,
objectsAdded : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_objects_added_total { role=%q} ` , role ) ) ,
objectsRemoved : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_objects_removed_total { role=%q} ` , role ) ) ,
objectsUpdated : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_objects_updated_total { role=%q} ` , role ) ) ,
staleResourceVersions : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_stale_resource_versions_total { role=%q} ` , role ) ) ,
2021-03-11 15:41:09 +01:00
}
2021-03-14 20:10:35 +01:00
logger . Infof ( "started %s watcher for %q" , uw . role , uw . apiURL )
2021-03-11 15:41:09 +01:00
return uw
}
2021-03-05 11:36:05 +01:00
2021-03-14 20:10:35 +01:00
func ( uw * urlWatcher ) subscribeAPIWatcher ( aw * apiWatcher ) {
2021-03-11 15:41:09 +01:00
uw . mu . Lock ( )
2021-03-14 20:10:35 +01:00
if _ , ok := uw . aws [ aw ] ; ! ok {
2021-04-05 21:02:09 +02:00
if _ , ok := uw . awsPending [ aw ] ; ! ok {
uw . awsPending [ aw ] = struct { } { }
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="pending"} ` , uw . role ) ) . Inc ( )
2021-03-14 20:10:35 +01:00
}
2021-03-05 11:36:05 +01:00
}
2021-03-11 15:41:09 +01:00
uw . mu . Unlock ( )
2021-03-05 11:36:05 +01:00
}
2021-04-05 21:02:09 +02:00
func ( uw * urlWatcher ) registerPendingAPIWatchers ( ) {
uw . mu . Lock ( )
awsPending := make ( [ ] * apiWatcher , 0 , len ( uw . awsPending ) )
for aw := range uw . awsPending {
awsPending = append ( awsPending , aw )
delete ( uw . awsPending , aw )
2021-04-07 12:07:36 +02:00
uw . aws [ aw ] = struct { } { }
2021-04-05 21:02:09 +02:00
}
uw . reloadScrapeWorksForAPIWatchers ( awsPending , uw . objectsByKey )
uw . mu . Unlock ( )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="working"} ` , uw . role ) ) . Add ( len ( awsPending ) )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="pending"} ` , uw . role ) ) . Add ( - len ( awsPending ) )
}
2021-03-14 20:10:35 +01:00
func ( uw * urlWatcher ) unsubscribeAPIWatcher ( aw * apiWatcher ) {
2021-03-11 15:41:09 +01:00
uw . mu . Lock ( )
2021-04-05 21:02:09 +02:00
if _ , ok := uw . awsPending [ aw ] ; ok {
delete ( uw . awsPending , aw )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="pending"} ` , uw . role ) ) . Dec ( )
}
2021-03-14 20:10:35 +01:00
if _ , ok := uw . aws [ aw ] ; ok {
delete ( uw . aws , aw )
2021-04-05 21:02:09 +02:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="working"} ` , uw . role ) ) . Dec ( )
2021-03-11 15:41:09 +01:00
}
uw . mu . Unlock ( )
}
2021-03-10 14:06:33 +01:00
func ( uw * urlWatcher ) setResourceVersion ( resourceVersion string ) {
2021-03-07 18:50:01 +01:00
uw . mu . Lock ( )
2021-03-10 14:06:33 +01:00
uw . resourceVersion = resourceVersion
2021-03-07 18:50:01 +01:00
uw . mu . Unlock ( )
}
2021-03-05 11:36:05 +01:00
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
func ( uw * urlWatcher ) reloadObjects ( ) string {
2021-03-07 18:50:01 +01:00
uw . mu . Lock ( )
resourceVersion := uw . resourceVersion
uw . mu . Unlock ( )
if resourceVersion != "" {
2021-03-11 15:41:09 +01:00
// Fast path - there is no need in reloading the objects.
2021-03-07 18:50:01 +01:00
return resourceVersion
}
2021-03-05 11:36:05 +01:00
requestURL := uw . apiURL
2021-03-14 20:10:35 +01:00
resp , err := uw . gw . doRequest ( requestURL )
2021-03-05 11:36:05 +01:00
if err != nil {
2021-03-11 15:41:09 +01:00
logger . Errorf ( "cannot perform request to %q: %s" , requestURL , err )
2021-03-05 11:36:05 +01:00
return ""
}
if resp . StatusCode != http . StatusOK {
2021-03-11 15:41:09 +01:00
body , _ := ioutil . ReadAll ( resp . Body )
_ = resp . Body . Close ( )
2021-03-05 11:36:05 +01:00
logger . Errorf ( "unexpected status code for request to %q: %d; want %d; response: %q" , requestURL , resp . StatusCode , http . StatusOK , body )
return ""
}
2021-04-02 13:45:08 +02:00
objectsByKey , metadata , err := uw . parseObjectList ( resp . Body )
2021-03-11 15:41:09 +01:00
_ = resp . Body . Close ( )
2021-03-05 11:36:05 +01:00
if err != nil {
2021-03-11 15:41:09 +01:00
logger . Errorf ( "cannot parse objects from %q: %s" , requestURL , err )
2021-03-05 11:36:05 +01:00
return ""
}
2021-03-11 15:41:09 +01:00
uw . mu . Lock ( )
var updated , removed , added int
2021-04-02 13:45:08 +02:00
for key := range uw . objectsByKey {
if o , ok := objectsByKey [ key ] ; ok {
uw . objectsByKey [ key ] = o
2021-03-11 15:41:09 +01:00
updated ++
} else {
2021-04-02 13:45:08 +02:00
delete ( uw . objectsByKey , key )
2021-03-11 15:41:09 +01:00
removed ++
2021-03-05 11:36:05 +01:00
}
}
2021-04-02 13:45:08 +02:00
for key , o := range objectsByKey {
if _ , ok := uw . objectsByKey [ key ] ; ! ok {
uw . objectsByKey [ key ] = o
2021-03-11 15:41:09 +01:00
added ++
}
}
uw . objectsUpdated . Add ( updated )
uw . objectsRemoved . Add ( removed )
uw . objectsAdded . Add ( added )
uw . objectsCount . Add ( added - removed )
2021-03-07 18:50:01 +01:00
uw . resourceVersion = metadata . ResourceVersion
2021-04-23 18:01:00 +02:00
aws := uw . getAPIWatchersLocked ( )
2021-03-05 11:36:05 +01:00
uw . mu . Unlock ( )
2021-03-07 18:50:01 +01:00
2021-04-05 12:53:26 +02:00
uw . reloadScrapeWorksForAPIWatchers ( aws , objectsByKey )
2021-04-23 18:01:00 +02:00
if uw . role == "service" {
// Update endpoints and endpointslices for the corresponding service as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
gw := uw . gw
namespace := uw . namespace
for key := range objectsByKey {
gw . refreshObjectLabels ( "endpoints" , namespace , key )
gw . refreshObjectLabels ( "endpointslices" , namespace , key )
}
}
2021-04-02 13:45:08 +02:00
logger . Infof ( "reloaded %d objects from %q" , len ( objectsByKey ) , requestURL )
2021-04-05 12:53:26 +02:00
return uw . resourceVersion
}
func ( uw * urlWatcher ) reloadScrapeWorksForAPIWatchers ( aws [ ] * apiWatcher , objectsByKey map [ string ] object ) {
if len ( aws ) == 0 {
return
}
swosByKey := make ( [ ] map [ string ] [ ] interface { } , len ( aws ) )
for i := range aws {
swosByKey [ i ] = make ( map [ string ] [ ] interface { } )
}
for key , o := range objectsByKey {
labels := o . getTargetLabels ( uw . gw )
for i , aw := range aws {
swos := aw . getScrapeWorkObjectsForLabels ( labels )
if len ( swos ) > 0 {
swosByKey [ i ] [ key ] = swos
}
}
}
for i , aw := range aws {
aw . reloadScrapeWorks ( uw . namespace , swosByKey [ i ] )
}
2021-03-05 11:36:05 +01:00
}
2021-04-23 18:01:00 +02:00
func ( uw * urlWatcher ) getAPIWatchersLocked ( ) [ ] * apiWatcher {
awsMap := uw . aws
2021-03-14 20:10:35 +01:00
aws := make ( [ ] * apiWatcher , 0 , len ( awsMap ) )
for aw := range awsMap {
2021-03-11 15:41:09 +01:00
aws = append ( aws , aw )
2021-03-05 11:36:05 +01:00
}
2021-03-11 15:41:09 +01:00
return aws
2021-03-05 11:36:05 +01:00
}
2021-03-07 18:50:01 +01:00
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
2021-03-05 11:36:05 +01:00
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
2021-03-07 18:50:01 +01:00
func ( uw * urlWatcher ) watchForUpdates ( ) {
2021-03-05 11:36:05 +01:00
backoffDelay := time . Second
maxBackoffDelay := 30 * time . Second
backoffSleep := func ( ) {
time . Sleep ( backoffDelay )
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
}
apiURL := uw . apiURL
delimiter := "?"
if strings . Contains ( apiURL , "?" ) {
delimiter = "&"
}
2021-03-14 20:10:35 +01:00
timeoutSeconds := time . Duration ( 0.9 * float64 ( uw . gw . client . Timeout ) ) . Seconds ( )
2021-03-10 14:06:33 +01:00
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv . Itoa ( int ( timeoutSeconds ) )
2021-03-05 11:36:05 +01:00
for {
2021-03-07 18:50:01 +01:00
resourceVersion := uw . reloadObjects ( )
2021-03-14 20:55:00 +01:00
if resourceVersion == "" {
backoffSleep ( )
continue
2021-03-05 11:36:05 +01:00
}
2021-03-14 20:55:00 +01:00
requestURL := apiURL + "&resourceVersion=" + url . QueryEscape ( resourceVersion )
2021-03-14 20:10:35 +01:00
resp , err := uw . gw . doRequest ( requestURL )
2021-03-05 11:36:05 +01:00
if err != nil {
2021-03-13 14:18:47 +01:00
logger . Errorf ( "cannot perform request to %q: %s" , requestURL , err )
2021-03-05 11:36:05 +01:00
backoffSleep ( )
continue
}
if resp . StatusCode != http . StatusOK {
if resp . StatusCode == 410 {
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
backoffDelay = time . Second
2021-03-26 11:28:10 +01:00
uw . staleResourceVersions . Inc ( )
2021-03-10 14:06:33 +01:00
uw . setResourceVersion ( "" )
2021-03-05 11:36:05 +01:00
} else {
2021-03-26 11:28:10 +01:00
body , _ := ioutil . ReadAll ( resp . Body )
_ = resp . Body . Close ( )
logger . Errorf ( "unexpected status code for request to %q: %d; want %d; response: %q" , requestURL , resp . StatusCode , http . StatusOK , body )
2021-03-05 11:36:05 +01:00
backoffSleep ( )
}
continue
}
backoffDelay = time . Second
err = uw . readObjectUpdateStream ( resp . Body )
_ = resp . Body . Close ( )
if err != nil {
if ! errors . Is ( err , io . EOF ) {
logger . Errorf ( "error when reading WatchEvent stream from %q: %s" , requestURL , err )
}
backoffSleep ( )
continue
}
}
}
// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events.
func ( uw * urlWatcher ) readObjectUpdateStream ( r io . Reader ) error {
d := json . NewDecoder ( r )
var we WatchEvent
for {
if err := d . Decode ( & we ) ; err != nil {
return err
}
switch we . Type {
case "ADDED" , "MODIFIED" :
2021-03-11 12:06:40 +01:00
o , err := uw . parseObject ( we . Object )
if err != nil {
return err
}
2021-04-02 13:45:08 +02:00
key := o . key ( )
2021-03-05 11:36:05 +01:00
uw . mu . Lock ( )
2021-04-02 13:45:08 +02:00
if _ , ok := uw . objectsByKey [ key ] ; ! ok {
2021-03-11 15:41:09 +01:00
uw . objectsCount . Inc ( )
uw . objectsAdded . Inc ( )
2021-03-05 11:36:05 +01:00
} else {
2021-03-11 15:41:09 +01:00
uw . objectsUpdated . Inc ( )
2021-03-05 11:36:05 +01:00
}
2021-04-02 13:45:08 +02:00
uw . objectsByKey [ key ] = o
2021-04-23 18:01:00 +02:00
aws := uw . getAPIWatchersLocked ( )
2021-03-05 11:36:05 +01:00
uw . mu . Unlock ( )
2021-04-23 18:01:00 +02:00
if len ( aws ) > 0 {
labels := o . getTargetLabels ( uw . gw )
for _ , aw := range aws {
aw . setScrapeWorks ( uw . namespace , key , labels )
}
}
if uw . role == "service" {
// Update endpoints and endpointslices for the corresponding service as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
uw . gw . refreshObjectLabels ( "endpoints" , uw . namespace , key )
uw . gw . refreshObjectLabels ( "endpointslices" , uw . namespace , key )
2021-03-11 15:41:09 +01:00
}
2021-03-05 11:36:05 +01:00
case "DELETED" :
2021-03-11 12:06:40 +01:00
o , err := uw . parseObject ( we . Object )
if err != nil {
return err
}
2021-04-02 13:45:08 +02:00
key := o . key ( )
2021-03-05 11:36:05 +01:00
uw . mu . Lock ( )
2021-04-02 13:45:08 +02:00
if _ , ok := uw . objectsByKey [ key ] ; ok {
2021-03-11 15:41:09 +01:00
uw . objectsCount . Dec ( )
uw . objectsRemoved . Inc ( )
2021-04-02 13:45:08 +02:00
delete ( uw . objectsByKey , key )
2021-03-11 15:41:09 +01:00
}
2021-04-23 18:01:00 +02:00
aws := uw . getAPIWatchersLocked ( )
2021-03-05 11:36:05 +01:00
uw . mu . Unlock ( )
2021-03-14 20:10:35 +01:00
for _ , aw := range aws {
2021-04-02 13:45:08 +02:00
aw . removeScrapeWorks ( uw . namespace , key )
2021-03-11 15:41:09 +01:00
}
2021-03-10 14:06:33 +01:00
case "BOOKMARK" :
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
2021-03-11 12:06:40 +01:00
bm , err := parseBookmark ( we . Object )
if err != nil {
return fmt . Errorf ( "cannot parse bookmark from %q: %w" , we . Object , err )
}
uw . setResourceVersion ( bm . Metadata . ResourceVersion )
2021-03-26 11:28:10 +01:00
case "ERROR" :
em , err := parseError ( we . Object )
if err != nil {
2021-03-26 11:45:59 +01:00
return fmt . Errorf ( "cannot parse error message from %q: %w" , we . Object , err )
2021-03-26 11:28:10 +01:00
}
if em . Code == 410 {
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
uw . staleResourceVersions . Inc ( )
uw . setResourceVersion ( "" )
return nil
}
return fmt . Errorf ( "unexpected error message: %q" , we . Object )
2021-03-05 11:36:05 +01:00
default :
2021-03-26 11:28:10 +01:00
return fmt . Errorf ( "unexpected WatchEvent type %q: %q" , we . Type , we . Object )
2021-03-05 11:36:05 +01:00
}
}
}
2021-03-26 11:28:10 +01:00
// Bookmark is a bookmark message from Kubernetes Watch API.
2021-03-11 12:06:40 +01:00
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
type Bookmark struct {
Metadata struct {
ResourceVersion string
}
}
func parseBookmark ( data [ ] byte ) ( * Bookmark , error ) {
var bm Bookmark
if err := json . Unmarshal ( data , & bm ) ; err != nil {
return nil , err
}
return & bm , nil
}
2021-03-26 11:28:10 +01:00
// Error is an error message from Kubernetes Watch API.
type Error struct {
Code int
}
func parseError ( data [ ] byte ) ( * Error , error ) {
var em Error
if err := json . Unmarshal ( data , & em ) ; err != nil {
return nil , err
}
return & em , nil
}
2021-04-02 13:17:53 +02:00
func getAPIPathsWithNamespaces ( role string , namespaces [ ] string , selectors [ ] Selector ) ( [ ] string , [ ] string ) {
2021-04-05 19:27:23 +02:00
objectType := getObjectTypeByRole ( role )
if objectType == "nodes" || len ( namespaces ) == 0 {
2021-03-05 11:36:05 +01:00
query := joinSelectors ( role , selectors )
2021-04-05 19:27:23 +02:00
path := getAPIPath ( objectType , "" , query )
2021-04-02 13:17:53 +02:00
return [ ] string { path } , [ ] string { "" }
2021-03-05 11:36:05 +01:00
}
query := joinSelectors ( role , selectors )
paths := make ( [ ] string , len ( namespaces ) )
for i , namespace := range namespaces {
2021-04-05 19:27:23 +02:00
paths [ i ] = getAPIPath ( objectType , namespace , query )
2021-03-05 11:36:05 +01:00
}
2021-04-02 13:17:53 +02:00
return paths , namespaces
2021-03-05 11:36:05 +01:00
}
2021-04-05 19:27:23 +02:00
func getAPIPath ( objectType , namespace , query string ) string {
suffix := objectType
2021-03-05 11:36:05 +01:00
if namespace != "" {
2021-04-05 19:27:23 +02:00
suffix = "namespaces/" + namespace + "/" + objectType
2021-03-05 11:36:05 +01:00
}
if len ( query ) > 0 {
suffix += "?" + query
}
2021-04-05 19:27:23 +02:00
if objectType == "ingresses" {
2021-03-05 11:36:05 +01:00
return "/apis/networking.k8s.io/v1beta1/" + suffix
}
2021-04-05 19:27:23 +02:00
if objectType == "endpointslices" {
2021-03-05 11:36:05 +01:00
return "/apis/discovery.k8s.io/v1beta1/" + suffix
}
return "/api/v1/" + suffix
}
func joinSelectors ( role string , selectors [ ] Selector ) string {
var labelSelectors , fieldSelectors [ ] string
for _ , s := range selectors {
if s . Role != role {
continue
}
if s . Label != "" {
labelSelectors = append ( labelSelectors , s . Label )
}
if s . Field != "" {
fieldSelectors = append ( fieldSelectors , s . Field )
}
}
var args [ ] string
if len ( labelSelectors ) > 0 {
args = append ( args , "labelSelector=" + url . QueryEscape ( strings . Join ( labelSelectors , "," ) ) )
}
if len ( fieldSelectors ) > 0 {
args = append ( args , "fieldSelector=" + url . QueryEscape ( strings . Join ( fieldSelectors , "," ) ) )
}
return strings . Join ( args , "&" )
}
2021-04-05 19:27:23 +02:00
func getObjectTypeByRole ( role string ) string {
2021-03-05 11:36:05 +01:00
switch role {
case "node" :
return "nodes"
case "pod" :
return "pods"
case "service" :
return "services"
case "endpoints" :
return "endpoints"
case "endpointslices" :
return "endpointslices"
case "ingress" :
return "ingresses"
default :
logger . Panicf ( "BUG: unknonw role=%q" , role )
return ""
}
}
func getObjectParsersForRole ( role string ) ( parseObjectFunc , parseObjectListFunc ) {
switch role {
case "node" :
return parseNode , parseNodeList
case "pod" :
return parsePod , parsePodList
case "service" :
return parseService , parseServiceList
case "endpoints" :
return parseEndpoints , parseEndpointsList
case "endpointslices" :
return parseEndpointSlice , parseEndpointSliceList
case "ingress" :
return parseIngress , parseIngressList
default :
logger . Panicf ( "BUG: unsupported role=%q" , role )
return nil , nil
}
}