2021-03-05 11:36:05 +01:00
package kubernetes
import (
2023-09-15 19:40:13 +02:00
"context"
2021-03-05 11:36:05 +01:00
"encoding/json"
"errors"
2021-03-11 15:41:09 +01:00
"flag"
2021-03-05 11:36:05 +01:00
"fmt"
"io"
"net/http"
"net/url"
2022-08-21 22:51:13 +02:00
"os"
2021-03-05 11:36:05 +01:00
"reflect"
"strconv"
"strings"
"sync"
2021-08-29 10:16:40 +02:00
"sync/atomic"
2021-03-05 11:36:05 +01:00
"time"
2023-09-15 19:40:13 +02:00
"github.com/VictoriaMetrics/metrics"
2022-04-20 15:09:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2021-03-05 11:36:05 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2021-03-11 15:41:09 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2022-11-30 06:22:12 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
2023-09-18 23:23:41 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2021-03-05 11:36:05 +01:00
)
2024-01-21 02:12:52 +01:00
var (
apiServerTimeout = flag . Duration ( "promscrape.kubernetes.apiServerTimeout" , 30 * time . Minute , "How frequently to reload the full state from Kubernetes API server" )
attachNodeMetadataAll = flag . Bool ( "promscrape.kubernetes.attachNodeMetadataAll" , false , "Whether to set attach_metadata.node=true for all the kubernetes_sd_configs at -promscrape.config . " +
"It is possible to set attach_metadata.node=false individually per each kubernetes_sd_configs . See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs" )
)
2021-03-11 15:41:09 +01:00
2021-03-05 11:36:05 +01:00
// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
type WatchEvent struct {
Type string
Object json . RawMessage
}
// object is any Kubernetes object.
type object interface {
2021-04-02 13:45:08 +02:00
key ( ) string
2021-04-29 09:14:24 +02:00
// getTargetLabels must be called under gw.mu lock.
2022-11-30 06:22:12 +01:00
getTargetLabels ( gw * groupWatcher ) [ ] * promutils . Labels
2021-03-05 11:36:05 +01:00
}
// parseObjectFunc must parse object from the given data.
type parseObjectFunc func ( data [ ] byte ) ( object , error )
2021-03-11 15:41:09 +01:00
// parseObjectListFunc must parse objectList from the given r.
type parseObjectListFunc func ( r io . Reader ) ( map [ string ] object , ListMeta , error )
2021-03-05 11:36:05 +01:00
2022-04-20 09:50:36 +02:00
// apiWatcher is used for watching for Kubernetes object changes and caching their latest states.
2021-03-05 11:36:05 +01:00
type apiWatcher struct {
2021-03-14 20:10:35 +01:00
role string
2021-03-05 11:36:05 +01:00
2021-03-11 15:41:09 +01:00
// Constructor for creating ScrapeWork objects from labels
2021-03-05 11:36:05 +01:00
swcFunc ScrapeWorkConstructorFunc
2021-03-14 20:10:35 +01:00
gw * groupWatcher
2022-05-21 00:01:37 +02:00
// swosByURLWatcher contains per-urlWatcher maps of ScrapeWork objects for the given apiWatcher
2021-05-17 22:45:20 +02:00
swosByURLWatcher map [ * urlWatcher ] map [ string ] [ ] interface { }
swosByURLWatcherLock sync . Mutex
2021-03-05 11:36:05 +01:00
2021-03-14 20:10:35 +01:00
swosCount * metrics . Counter
2021-03-05 11:36:05 +01:00
}
2023-10-27 20:22:44 +02:00
func newAPIWatcher ( apiServer string , ac * promauth . Config , sdc * SDConfig , swcFunc ScrapeWorkConstructorFunc ) ( * apiWatcher , error ) {
2021-03-14 20:10:35 +01:00
namespaces := sdc . Namespaces . Names
2022-01-13 21:44:14 +01:00
if len ( namespaces ) == 0 {
if sdc . Namespaces . OwnNamespace {
2022-08-21 22:51:13 +02:00
namespace , err := os . ReadFile ( "/var/run/secrets/kubernetes.io/serviceaccount/namespace" )
2022-01-13 21:44:14 +01:00
if err != nil {
2023-10-27 20:22:44 +02:00
return nil , fmt . Errorf ( "cannot determine namespace for the current pod according to `own_namespace: true` option in kubernetes_sd_config: %w" , err )
2022-01-13 21:44:14 +01:00
}
namespaces = [ ] string { string ( namespace ) }
}
}
2021-03-14 20:10:35 +01:00
selectors := sdc . Selectors
2024-01-21 02:12:52 +01:00
attachNodeMetadata := * attachNodeMetadataAll
if sdc . AttachMetadata != nil {
attachNodeMetadata = sdc . AttachMetadata . Node
}
2022-05-06 23:02:54 +02:00
proxyURL := sdc . ProxyURL . GetURL ( )
2023-10-27 20:22:44 +02:00
gw , err := getGroupWatcher ( apiServer , ac , namespaces , selectors , attachNodeMetadata , proxyURL )
if err != nil {
return nil , err
}
2021-08-29 11:34:55 +02:00
role := sdc . role ( )
2023-10-27 20:22:44 +02:00
aw := & apiWatcher {
2021-08-29 11:34:55 +02:00
role : role ,
2021-05-17 22:45:20 +02:00
swcFunc : swcFunc ,
gw : gw ,
swosByURLWatcher : make ( map [ * urlWatcher ] map [ string ] [ ] interface { } ) ,
2021-08-29 11:34:55 +02:00
swosCount : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_scrape_works { role=%q} ` , role ) ) ,
2021-03-05 11:36:05 +01:00
}
2023-10-27 20:22:44 +02:00
return aw , nil
2021-03-05 11:36:05 +01:00
}
2021-04-05 21:02:09 +02:00
func ( aw * apiWatcher ) mustStart ( ) {
2024-01-21 22:13:15 +01:00
atomic . AddInt32 ( & aw . gw . apiWatcherInflightStartCalls , 1 )
2021-05-05 11:21:39 +02:00
aw . gw . startWatchersForRole ( aw . role , aw )
2024-01-21 22:13:15 +01:00
atomic . AddInt32 ( & aw . gw . apiWatcherInflightStartCalls , - 1 )
2021-04-05 21:02:09 +02:00
}
2022-05-21 00:01:37 +02:00
func ( aw * apiWatcher ) updateSwosCount ( multiplier int , swosByKey map [ string ] [ ] interface { } ) {
n := 0
for _ , swos := range swosByKey {
n += len ( swos )
}
n *= multiplier
aw . swosCount . Add ( n )
}
2021-03-11 15:41:09 +01:00
func ( aw * apiWatcher ) mustStop ( ) {
2021-03-14 20:10:35 +01:00
aw . gw . unsubscribeAPIWatcher ( aw )
2021-05-17 22:45:20 +02:00
aw . swosByURLWatcherLock . Lock ( )
2021-05-18 14:29:32 +02:00
for _ , swosByKey := range aw . swosByURLWatcher {
2022-05-21 00:01:37 +02:00
aw . updateSwosCount ( - 1 , swosByKey )
2021-05-18 14:29:32 +02:00
}
2021-05-17 22:45:20 +02:00
aw . swosByURLWatcher = make ( map [ * urlWatcher ] map [ string ] [ ] interface { } )
aw . swosByURLWatcherLock . Unlock ( )
2021-03-11 15:41:09 +01:00
}
2022-05-21 00:01:37 +02:00
func ( aw * apiWatcher ) replaceScrapeWorks ( uw * urlWatcher , swosByKey map [ string ] [ ] interface { } ) {
2021-05-17 22:45:20 +02:00
aw . swosByURLWatcherLock . Lock ( )
2022-05-21 00:01:37 +02:00
aw . updateSwosCount ( - 1 , aw . swosByURLWatcher [ uw ] )
aw . updateSwosCount ( 1 , swosByKey )
2021-05-17 22:45:20 +02:00
aw . swosByURLWatcher [ uw ] = swosByKey
aw . swosByURLWatcherLock . Unlock ( )
2021-03-11 15:41:09 +01:00
}
2022-05-21 00:01:37 +02:00
func ( aw * apiWatcher ) updateScrapeWorks ( uw * urlWatcher , swosByKey map [ string ] [ ] interface { } ) {
aw . swosByURLWatcherLock . Lock ( )
dst := aw . swosByURLWatcher [ uw ]
if dst == nil {
dst = make ( map [ string ] [ ] interface { } )
aw . swosByURLWatcher [ uw ] = dst
}
for key , swos := range swosByKey {
aw . swosCount . Add ( len ( swos ) - len ( dst [ key ] ) )
if len ( swos ) == 0 {
delete ( dst , key )
} else {
dst [ key ] = swos
}
}
aw . swosByURLWatcherLock . Unlock ( )
}
2022-11-30 06:22:12 +01:00
func ( aw * apiWatcher ) setScrapeWorks ( uw * urlWatcher , key string , labelss [ ] * promutils . Labels ) {
swos := getScrapeWorkObjectsForLabels ( aw . swcFunc , labelss )
2021-05-17 22:45:20 +02:00
aw . swosByURLWatcherLock . Lock ( )
swosByKey := aw . swosByURLWatcher [ uw ]
2021-04-02 13:45:08 +02:00
if swosByKey == nil {
swosByKey = make ( map [ string ] [ ] interface { } )
2021-05-17 22:45:20 +02:00
aw . swosByURLWatcher [ uw ] = swosByKey
2021-04-02 13:17:53 +02:00
}
2021-05-18 14:29:32 +02:00
aw . swosCount . Add ( len ( swos ) - len ( swosByKey [ key ] ) )
2022-05-21 00:01:37 +02:00
if len ( swos ) == 0 {
2021-04-02 13:45:08 +02:00
delete ( swosByKey , key )
2022-05-21 00:01:37 +02:00
} else {
swosByKey [ key ] = swos
2021-03-11 15:41:09 +01:00
}
2021-05-17 22:45:20 +02:00
aw . swosByURLWatcherLock . Unlock ( )
2021-03-11 15:41:09 +01:00
}
2021-05-17 22:45:20 +02:00
func ( aw * apiWatcher ) removeScrapeWorks ( uw * urlWatcher , key string ) {
aw . swosByURLWatcherLock . Lock ( )
swosByKey := aw . swosByURLWatcher [ uw ]
2021-04-02 13:45:08 +02:00
if len ( swosByKey ) > 0 {
aw . swosCount . Add ( - len ( swosByKey [ key ] ) )
delete ( swosByKey , key )
2021-04-02 13:17:53 +02:00
}
2021-05-17 22:45:20 +02:00
aw . swosByURLWatcherLock . Unlock ( )
2021-03-11 15:41:09 +01:00
}
2022-11-30 06:22:12 +01:00
func getScrapeWorkObjectsForLabels ( swcFunc ScrapeWorkConstructorFunc , labelss [ ] * promutils . Labels ) [ ] interface { } {
2022-04-20 15:40:17 +02:00
// Do not pre-allocate swos, since it is likely the swos will be empty because of relabeling
var swos [ ] interface { }
2021-03-11 15:41:09 +01:00
for _ , labels := range labelss {
2022-04-20 15:11:37 +02:00
swo := swcFunc ( labels )
2021-03-11 15:41:09 +01:00
// The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
if swo != nil && ! reflect . ValueOf ( swo ) . IsNil ( ) {
swos = append ( swos , swo )
2021-03-05 11:36:05 +01:00
}
}
2021-03-11 15:41:09 +01:00
return swos
}
// getScrapeWorkObjects returns all the ScrapeWork objects for the given aw.
func ( aw * apiWatcher ) getScrapeWorkObjects ( ) [ ] interface { } {
2021-04-05 21:02:09 +02:00
aw . gw . registerPendingAPIWatchers ( )
2022-04-20 15:11:37 +02:00
swos := make ( [ ] interface { } , 0 , aw . swosCount . Get ( ) )
2021-05-17 22:45:20 +02:00
aw . swosByURLWatcherLock . Lock ( )
for _ , swosByKey := range aw . swosByURLWatcher {
2021-04-02 13:45:08 +02:00
for _ , swosLocal := range swosByKey {
2021-04-02 13:17:53 +02:00
swos = append ( swos , swosLocal ... )
}
2021-03-11 15:41:09 +01:00
}
2022-04-20 15:11:37 +02:00
aw . swosByURLWatcherLock . Unlock ( )
2021-03-05 11:36:05 +01:00
return swos
}
2021-03-14 20:10:35 +01:00
// groupWatcher watches for Kubernetes objects on the given apiServer with the given namespaces,
2022-04-22 18:39:34 +02:00
// selectors and attachNodeMetadata using the given client.
2021-03-14 20:10:35 +01:00
type groupWatcher struct {
2024-01-21 22:13:15 +01:00
// The number of in-flight apiWatcher.mustStart() calls for the given groupWatcher.
// This field is used by groupWatchersCleaner() in order to determine when the given groupWatcher can be stopped.
apiWatcherInflightStartCalls int32
2021-08-29 10:16:40 +02:00
// Old Kubernetes doesn't support /apis/networking.k8s.io/v1/, so /apis/networking.k8s.io/v1beta1/ must be used instead.
// This flag is used for automatic substitution of v1 API path with v1beta1 API path during requests to apiServer.
useNetworkingV1Beta1 uint32
// Old Kubernetes doesn't support /apis/discovery.k8s.io/v1/, so discovery.k8s.io/v1beta1/ must be used instead.
// This flag is used for automatic substitution of v1 API path with v1beta1 API path during requests to apiServer.
useDiscoveryV1Beta1 uint32
2022-04-22 18:39:34 +02:00
apiServer string
namespaces [ ] string
selectors [ ] Selector
attachNodeMetadata bool
2023-10-17 11:58:19 +02:00
setHeaders func ( req * http . Request ) error
2022-06-22 19:38:43 +02:00
client * http . Client
2021-03-14 20:10:35 +01:00
mu sync . Mutex
m map [ string ] * urlWatcher
2023-10-27 13:35:27 +02:00
// cancel is used for stopping all the urlWatcher instances inside m,
// which must check for ctx.Done() when performing their background watch work.
ctx context . Context
cancel context . CancelFunc
// noAPIWatchers is set to true when there are no API watchers for the given groupWatcher.
// This field is used for determining when it is safe to stop all the urlWatcher instances
// for the given groupWatcher.
noAPIWatchers bool
2021-03-14 20:10:35 +01:00
}
2023-10-27 20:22:44 +02:00
func newGroupWatcher ( apiServer string , ac * promauth . Config , namespaces [ ] string , selectors [ ] Selector , attachNodeMetadata bool , proxyURL * url . URL ) ( * groupWatcher , error ) {
2021-03-14 20:10:35 +01:00
var proxy func ( * http . Request ) ( * url . URL , error )
if proxyURL != nil {
proxy = http . ProxyURL ( proxyURL )
}
2023-10-25 23:19:33 +02:00
tlsConfig , err := ac . NewTLSConfig ( )
if err != nil {
2023-10-27 20:22:44 +02:00
return nil , fmt . Errorf ( "cannot initialize tls config: %w" , err )
2023-10-25 23:19:33 +02:00
}
2021-03-14 20:10:35 +01:00
client := & http . Client {
Transport : & http . Transport {
2023-10-25 23:19:33 +02:00
TLSClientConfig : tlsConfig ,
2021-03-14 20:10:35 +01:00
Proxy : proxy ,
TLSHandshakeTimeout : 10 * time . Second ,
IdleConnTimeout : * apiServerTimeout ,
2021-05-14 17:10:19 +02:00
MaxIdleConnsPerHost : 100 ,
2021-03-14 20:10:35 +01:00
} ,
Timeout : * apiServerTimeout ,
}
2023-10-27 13:35:27 +02:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2023-10-27 20:22:44 +02:00
gw := & groupWatcher {
2022-04-22 18:39:34 +02:00
apiServer : apiServer ,
namespaces : namespaces ,
selectors : selectors ,
attachNodeMetadata : attachNodeMetadata ,
2023-10-25 23:19:33 +02:00
setHeaders : func ( req * http . Request ) error {
return ac . SetHeaders ( req , true )
} ,
client : client ,
m : make ( map [ string ] * urlWatcher ) ,
2023-10-27 13:35:27 +02:00
ctx : ctx ,
cancel : cancel ,
2021-03-14 20:10:35 +01:00
}
2023-10-27 20:22:44 +02:00
return gw , nil
2021-03-14 20:10:35 +01:00
}
2023-10-27 20:22:44 +02:00
func getGroupWatcher ( apiServer string , ac * promauth . Config , namespaces [ ] string , selectors [ ] Selector , attachNodeMetadata bool , proxyURL * url . URL ) ( * groupWatcher , error ) {
2021-04-05 21:25:31 +02:00
proxyURLStr := "<nil>"
if proxyURL != nil {
proxyURLStr = proxyURL . String ( )
}
2022-04-22 18:39:34 +02:00
key := fmt . Sprintf ( "apiServer=%s, namespaces=%s, selectors=%s, attachNodeMetadata=%v, proxyURL=%s, authConfig=%s" ,
apiServer , namespaces , selectorsKey ( selectors ) , attachNodeMetadata , proxyURLStr , ac . String ( ) )
2021-03-14 20:10:35 +01:00
groupWatchersLock . Lock ( )
gw := groupWatchers [ key ]
2023-10-27 20:22:44 +02:00
var err error
2021-03-14 20:10:35 +01:00
if gw == nil {
2023-10-27 20:22:44 +02:00
gw , err = newGroupWatcher ( apiServer , ac , namespaces , selectors , attachNodeMetadata , proxyURL )
if err != nil {
err = fmt . Errorf ( "cannot initialize watcher for key={%s}: %w" , key , err )
} else {
groupWatchers [ key ] = gw
}
2021-03-14 20:10:35 +01:00
}
groupWatchersLock . Unlock ( )
2023-10-27 20:22:44 +02:00
return gw , err
2021-03-14 20:10:35 +01:00
}
func selectorsKey ( selectors [ ] Selector ) string {
var sb strings . Builder
for _ , s := range selectors {
fmt . Fprintf ( & sb , "{role=%q, label=%q, field=%q}" , s . Role , s . Label , s . Field )
}
return sb . String ( )
}
var (
groupWatchersLock sync . Mutex
2024-01-21 22:13:15 +01:00
groupWatchers map [ string ] * groupWatcher
2021-03-14 20:10:35 +01:00
_ = metrics . NewGauge ( ` vm_promscrape_discovery_kubernetes_group_watchers ` , func ( ) float64 {
groupWatchersLock . Lock ( )
n := len ( groupWatchers )
groupWatchersLock . Unlock ( )
return float64 ( n )
} )
)
2024-01-21 22:13:15 +01:00
func init ( ) {
groupWatchers = make ( map [ string ] * groupWatcher )
go groupWatchersCleaner ( )
}
func groupWatchersCleaner ( ) {
2023-09-18 23:23:41 +02:00
for {
time . Sleep ( 7 * time . Second )
groupWatchersLock . Lock ( )
2024-01-21 22:13:15 +01:00
for key , gw := range groupWatchers {
2023-09-18 23:23:41 +02:00
gw . mu . Lock ( )
2023-10-27 13:35:27 +02:00
// Calculate the number of apiWatcher instances subscribed to gw.
awsTotal := 0
for _ , uw := range gw . m {
awsTotal += len ( uw . aws ) + len ( uw . awsPending )
}
2024-01-21 22:13:15 +01:00
if awsTotal == 0 && atomic . LoadInt32 ( & gw . apiWatcherInflightStartCalls ) == 0 {
// There are no API watchers subscribed to gw and there are no in-flight apiWatcher.mustStart() calls.
// Stop all the urlWatcher instances at gw and drop gw from groupWatchers in this case,
2023-10-27 13:35:27 +02:00
// but do it only on the second iteration in order to reduce urlWatcher churn
// during scrape config reloads.
if gw . noAPIWatchers {
gw . cancel ( )
2024-01-21 22:13:15 +01:00
delete ( groupWatchers , key )
2023-10-27 13:35:27 +02:00
} else {
gw . noAPIWatchers = true
}
} else {
gw . noAPIWatchers = false
2023-09-18 23:23:41 +02:00
}
gw . mu . Unlock ( )
}
groupWatchersLock . Unlock ( )
}
}
2022-05-21 00:01:37 +02:00
type swosByKeyWithLock struct {
mu sync . Mutex
swosByKey map [ string ] [ ] interface { }
}
func ( gw * groupWatcher ) getScrapeWorkObjectsByAPIWatcherLocked ( objectsByKey map [ string ] object , awsMap map [ * apiWatcher ] struct { } ) map [ * apiWatcher ] * swosByKeyWithLock {
if len ( awsMap ) == 0 {
return nil
}
swosByAPIWatcher := make ( map [ * apiWatcher ] * swosByKeyWithLock , len ( awsMap ) )
for aw := range awsMap {
swosByAPIWatcher [ aw ] = & swosByKeyWithLock {
swosByKey : make ( map [ string ] [ ] interface { } ) ,
}
}
// Generate ScrapeWork objects in parallel on available CPU cores.
// This should reduce the time needed for their generation on systems with many CPU cores.
var wg sync . WaitGroup
limiterCh := make ( chan struct { } , cgroup . AvailableCPUs ( ) )
for key , o := range objectsByKey {
2022-11-30 06:22:12 +01:00
labelss := o . getTargetLabels ( gw )
2022-05-21 00:01:37 +02:00
wg . Add ( 1 )
limiterCh <- struct { } { }
2022-11-30 06:22:12 +01:00
go func ( key string , labelss [ ] * promutils . Labels ) {
2022-05-21 00:01:37 +02:00
for aw , e := range swosByAPIWatcher {
2022-11-30 06:22:12 +01:00
swos := getScrapeWorkObjectsForLabels ( aw . swcFunc , labelss )
2022-05-21 00:01:37 +02:00
e . mu . Lock ( )
e . swosByKey [ key ] = swos
e . mu . Unlock ( )
}
2022-11-30 06:22:12 +01:00
putLabelssToPool ( labelss )
2022-05-21 00:01:37 +02:00
wg . Done ( )
<- limiterCh
2022-11-30 06:22:12 +01:00
} ( key , labelss )
2022-05-21 00:01:37 +02:00
}
wg . Wait ( )
return swosByAPIWatcher
}
2022-11-30 06:22:12 +01:00
func putLabelssToPool ( labelss [ ] * promutils . Labels ) {
for _ , labels := range labelss {
promutils . PutLabels ( labels )
}
}
2021-04-29 09:14:24 +02:00
func ( gw * groupWatcher ) getObjectByRoleLocked ( role , namespace , name string ) object {
2022-04-22 18:39:34 +02:00
if role == "node" {
// Node objects have no namespace
namespace = ""
2021-03-05 11:36:05 +01:00
}
2021-04-02 13:45:08 +02:00
key := namespace + "/" + name
2021-04-29 09:14:24 +02:00
for _ , uw := range gw . m {
2021-03-05 11:36:05 +01:00
if uw . role != role {
2021-04-02 13:17:53 +02:00
// Role mismatch
continue
}
2022-04-22 18:39:34 +02:00
if namespace != "" && uw . namespace != "" && uw . namespace != namespace {
2021-04-02 13:17:53 +02:00
// Namespace mismatch
2021-03-05 11:36:05 +01:00
continue
}
2021-04-29 09:14:24 +02:00
if o := uw . objectsByKey [ key ] ; o != nil {
2021-03-11 15:41:09 +01:00
return o
2021-03-05 11:36:05 +01:00
}
}
2021-03-11 15:41:09 +01:00
return nil
2021-03-05 11:36:05 +01:00
}
2021-03-14 20:10:35 +01:00
func ( gw * groupWatcher ) startWatchersForRole ( role string , aw * apiWatcher ) {
2021-08-29 10:23:06 +02:00
if role == "endpoints" || role == "endpointslice" {
// endpoints and endpointslice watchers query pod and service objects. So start watchers for these roles as well.
2021-05-12 13:10:10 +02:00
gw . startWatchersForRole ( "pod" , nil )
gw . startWatchersForRole ( "service" , nil )
}
2022-07-06 22:18:55 +02:00
if gw . attachNodeMetadata && ( role == "pod" || role == "endpoints" || role == "endpointslice" ) {
2022-04-22 18:39:34 +02:00
gw . startWatchersForRole ( "node" , nil )
}
2024-01-21 22:13:15 +01:00
2021-05-17 22:45:20 +02:00
paths := getAPIPathsWithNamespaces ( role , gw . namespaces , gw . selectors )
for _ , path := range paths {
2021-03-14 20:10:35 +01:00
apiURL := gw . apiServer + path
gw . mu . Lock ( )
uw := gw . m [ apiURL ]
2021-04-05 12:53:26 +02:00
needStart := uw == nil
if needStart {
2021-05-17 22:45:20 +02:00
uw = newURLWatcher ( role , apiURL , gw )
2021-03-14 20:10:35 +01:00
gw . m [ apiURL ] = uw
}
2021-04-29 09:14:24 +02:00
if aw != nil {
uw . subscribeAPIWatcherLocked ( aw )
}
2021-03-14 20:10:35 +01:00
gw . mu . Unlock ( )
2021-04-05 12:53:26 +02:00
if needStart {
uw . reloadObjects ( )
go uw . watchForUpdates ( )
2022-04-22 18:39:34 +02:00
if role == "endpoints" || role == "endpointslice" || ( gw . attachNodeMetadata && role == "pod" ) {
// Refresh targets in background, since they depend on other object types such as pod, service or node.
// This should guarantee that the ScrapeWork objects for these objects are properly updated
// as soon as the objects they depend on are updated.
2022-04-21 11:57:44 +02:00
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 .
2023-09-18 23:23:41 +02:00
go uw . recreateScrapeWorks ( )
2022-04-21 11:57:44 +02:00
}
2021-03-13 14:18:47 +01:00
}
2021-03-14 20:10:35 +01:00
}
2021-03-05 11:36:05 +01:00
}
2021-03-14 20:10:35 +01:00
// doRequest performs http request to the given requestURL.
2023-09-15 19:40:13 +02:00
func ( gw * groupWatcher ) doRequest ( ctx context . Context , requestURL string ) ( * http . Response , error ) {
2021-08-29 10:16:40 +02:00
if strings . Contains ( requestURL , "/apis/networking.k8s.io/v1/" ) && atomic . LoadUint32 ( & gw . useNetworkingV1Beta1 ) == 1 {
// Update networking URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings . Replace ( requestURL , "/apis/networking.k8s.io/v1/" , "/apis/networking.k8s.io/v1beta1/" , 1 )
}
if strings . Contains ( requestURL , "/apis/discovery.k8s.io/v1/" ) && atomic . LoadUint32 ( & gw . useDiscoveryV1Beta1 ) == 1 {
2023-02-13 13:27:13 +01:00
// Update discovery URL for old Kubernetes API, which supports only v1beta1 path.
2021-08-29 10:16:40 +02:00
requestURL = strings . Replace ( requestURL , "/apis/discovery.k8s.io/v1/" , "/apis/discovery.k8s.io/v1beta1/" , 1 )
}
2023-09-15 19:40:13 +02:00
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , requestURL , nil )
2021-03-14 20:10:35 +01:00
if err != nil {
2023-10-25 23:19:33 +02:00
logger . Panicf ( "FATAL: cannot create a request for %q: %s" , requestURL , err )
2021-03-14 20:10:35 +01:00
}
2023-10-25 23:19:33 +02:00
if err := gw . setHeaders ( req ) ; err != nil {
return nil , fmt . Errorf ( "cannot set request headers: %w" , err )
2023-10-17 11:58:19 +02:00
}
2021-08-29 10:16:40 +02:00
resp , err := gw . client . Do ( req )
if err != nil {
return nil , err
}
if resp . StatusCode == http . StatusNotFound {
if strings . Contains ( requestURL , "/apis/networking.k8s.io/v1/" ) && atomic . LoadUint32 ( & gw . useNetworkingV1Beta1 ) == 0 {
atomic . StoreUint32 ( & gw . useNetworkingV1Beta1 , 1 )
2023-09-15 19:40:13 +02:00
return gw . doRequest ( ctx , requestURL )
2021-08-29 10:16:40 +02:00
}
if strings . Contains ( requestURL , "/apis/discovery.k8s.io/v1/" ) && atomic . LoadUint32 ( & gw . useDiscoveryV1Beta1 ) == 0 {
atomic . StoreUint32 ( & gw . useDiscoveryV1Beta1 , 1 )
2023-09-15 19:40:13 +02:00
return gw . doRequest ( ctx , requestURL )
2021-08-29 10:16:40 +02:00
}
}
return resp , nil
2021-03-05 11:36:05 +01:00
}
2021-04-05 21:02:09 +02:00
func ( gw * groupWatcher ) registerPendingAPIWatchers ( ) {
2021-03-14 20:10:35 +01:00
gw . mu . Lock ( )
for _ , uw := range gw . m {
2021-04-29 09:14:24 +02:00
uw . registerPendingAPIWatchersLocked ( )
2021-03-14 20:10:35 +01:00
}
2023-10-27 13:35:27 +02:00
gw . mu . Unlock ( )
2021-04-05 21:02:09 +02:00
}
func ( gw * groupWatcher ) unsubscribeAPIWatcher ( aw * apiWatcher ) {
2021-04-29 09:14:24 +02:00
gw . mu . Lock ( )
2023-09-18 17:45:10 +02:00
for _ , uw := range gw . m {
2021-04-29 09:14:24 +02:00
uw . unsubscribeAPIWatcherLocked ( aw )
2021-04-05 21:02:09 +02:00
}
2023-10-27 13:35:27 +02:00
gw . mu . Unlock ( )
2021-03-14 20:10:35 +01:00
}
2021-03-05 11:36:05 +01:00
2021-04-02 13:45:08 +02:00
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
2021-04-29 09:14:24 +02:00
//
// urlWatcher fields must be accessed under gw.mu lock.
2021-03-05 11:36:05 +01:00
type urlWatcher struct {
2021-04-02 13:17:53 +02:00
role string
namespace string
apiURL string
gw * groupWatcher
2021-03-05 11:36:05 +01:00
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
2021-04-05 21:02:09 +02:00
// awsPending contains pending apiWatcher objects, which are registered in a batch.
// Batch registering saves CPU time needed for registering big number of Kubernetes objects
// shared among big number of scrape jobs, since per-object labels are generated only once
// for all the scrape jobs (each scrape job is associated with a single apiWatcher).
2022-04-20 15:11:37 +02:00
// See registerPendingAPIWatchersLocked for details.
2021-04-05 21:02:09 +02:00
awsPending map [ * apiWatcher ] struct { }
2021-03-11 15:41:09 +01:00
// aws contains registered apiWatcher objects
aws map [ * apiWatcher ] struct { }
2021-04-02 13:45:08 +02:00
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map [ string ] object
2021-03-05 16:29:54 +01:00
2022-05-21 00:01:37 +02:00
needRecreateScrapeWorks bool
2022-04-21 11:57:44 +02:00
2021-03-07 18:50:01 +01:00
resourceVersion string
2021-03-05 11:36:05 +01:00
2021-03-26 11:28:10 +01:00
objectsCount * metrics . Counter
objectsAdded * metrics . Counter
objectsRemoved * metrics . Counter
objectsUpdated * metrics . Counter
staleResourceVersions * metrics . Counter
2021-03-05 11:36:05 +01:00
}
2021-05-17 22:45:20 +02:00
func newURLWatcher ( role , apiURL string , gw * groupWatcher ) * urlWatcher {
2021-03-11 15:41:09 +01:00
parseObject , parseObjectList := getObjectParsersForRole ( role )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_url_watchers { role=%q} ` , role ) ) . Inc ( )
2023-09-15 19:40:13 +02:00
2021-03-11 15:41:09 +01:00
uw := & urlWatcher {
2021-05-17 22:45:20 +02:00
role : role ,
apiURL : apiURL ,
gw : gw ,
2021-03-05 11:36:05 +01:00
parseObject : parseObject ,
parseObjectList : parseObjectList ,
2021-04-05 21:02:09 +02:00
awsPending : make ( map [ * apiWatcher ] struct { } ) ,
2021-04-02 13:45:08 +02:00
aws : make ( map [ * apiWatcher ] struct { } ) ,
objectsByKey : make ( map [ string ] object ) ,
2021-03-05 11:36:05 +01:00
2021-03-26 11:28:10 +01:00
objectsCount : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_objects { role=%q} ` , role ) ) ,
objectsAdded : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_objects_added_total { role=%q} ` , role ) ) ,
objectsRemoved : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_objects_removed_total { role=%q} ` , role ) ) ,
objectsUpdated : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_objects_updated_total { role=%q} ` , role ) ) ,
staleResourceVersions : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_stale_resource_versions_total { role=%q} ` , role ) ) ,
2021-03-11 15:41:09 +01:00
}
2021-03-14 20:10:35 +01:00
logger . Infof ( "started %s watcher for %q" , uw . role , uw . apiURL )
2021-03-11 15:41:09 +01:00
return uw
}
2021-03-05 11:36:05 +01:00
2023-09-18 23:23:41 +02:00
func ( uw * urlWatcher ) recreateScrapeWorks ( ) {
const minSleepTime = 5 * time . Second
sleepTime := minSleepTime
gw := uw . gw
2023-10-27 13:35:27 +02:00
stopCh := gw . ctx . Done ( )
2023-09-18 23:23:41 +02:00
for {
t := timerpool . Get ( sleepTime )
select {
case <- stopCh :
timerpool . Put ( t )
return
case <- t . C :
timerpool . Put ( t )
}
startTime := time . Now ( )
gw . mu . Lock ( )
if uw . needRecreateScrapeWorks {
uw . needRecreateScrapeWorks = false
uw . recreateScrapeWorksLocked ( uw . objectsByKey , uw . aws )
sleepTime = time . Since ( startTime )
if sleepTime < minSleepTime {
sleepTime = minSleepTime
}
}
gw . mu . Unlock ( )
}
}
2021-04-29 09:14:24 +02:00
func ( uw * urlWatcher ) subscribeAPIWatcherLocked ( aw * apiWatcher ) {
2021-03-14 20:10:35 +01:00
if _ , ok := uw . aws [ aw ] ; ! ok {
2021-04-05 21:02:09 +02:00
if _ , ok := uw . awsPending [ aw ] ; ! ok {
uw . awsPending [ aw ] = struct { } { }
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="pending"} ` , uw . role ) ) . Inc ( )
2021-03-14 20:10:35 +01:00
}
2021-03-05 11:36:05 +01:00
}
}
2021-04-29 09:14:24 +02:00
func ( uw * urlWatcher ) registerPendingAPIWatchersLocked ( ) {
2021-04-27 13:57:48 +02:00
if len ( uw . awsPending ) == 0 {
return
}
2022-05-21 00:01:37 +02:00
uw . recreateScrapeWorksLocked ( uw . objectsByKey , uw . awsPending )
2021-04-05 21:02:09 +02:00
for aw := range uw . awsPending {
2021-04-07 12:07:36 +02:00
uw . aws [ aw ] = struct { } { }
2022-04-21 11:57:44 +02:00
}
awsPendingLen := len ( uw . awsPending )
uw . awsPending = make ( map [ * apiWatcher ] struct { } )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="working"} ` , uw . role ) ) . Add ( awsPendingLen )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="pending"} ` , uw . role ) ) . Add ( - awsPendingLen )
}
2022-05-21 00:01:37 +02:00
func ( uw * urlWatcher ) recreateScrapeWorksLocked ( objectsByKey map [ string ] object , awsMap map [ * apiWatcher ] struct { } ) {
es := uw . gw . getScrapeWorkObjectsByAPIWatcherLocked ( objectsByKey , awsMap )
for aw , e := range es {
swosByKey := e . swosByKey
for key , swos := range swosByKey {
if len ( swos ) == 0 {
delete ( swosByKey , key )
2022-04-20 15:11:37 +02:00
}
2022-05-21 00:01:37 +02:00
}
aw . replaceScrapeWorks ( uw , swosByKey )
2022-04-20 15:11:37 +02:00
}
2022-05-21 00:01:37 +02:00
}
func ( uw * urlWatcher ) updateScrapeWorksLocked ( objectsByKey map [ string ] object , awsMap map [ * apiWatcher ] struct { } ) {
es := uw . gw . getScrapeWorkObjectsByAPIWatcherLocked ( objectsByKey , awsMap )
for aw , e := range es {
aw . updateScrapeWorks ( uw , e . swosByKey )
2021-04-05 21:02:09 +02:00
}
}
2022-04-22 12:21:58 +02:00
func ( uw * urlWatcher ) removeScrapeWorksLocked ( keys [ ] string ) {
for _ , key := range keys {
for aw := range uw . aws {
aw . removeScrapeWorks ( uw , key )
}
}
}
2021-04-29 09:14:24 +02:00
func ( uw * urlWatcher ) unsubscribeAPIWatcherLocked ( aw * apiWatcher ) {
2021-04-05 21:02:09 +02:00
if _ , ok := uw . awsPending [ aw ] ; ok {
delete ( uw . awsPending , aw )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="pending"} ` , uw . role ) ) . Dec ( )
}
2021-03-14 20:10:35 +01:00
if _ , ok := uw . aws [ aw ] ; ok {
delete ( uw . aws , aw )
2021-04-05 21:02:09 +02:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_subscribers { role=%q,status="working"} ` , uw . role ) ) . Dec ( )
2021-03-11 15:41:09 +01:00
}
}
2021-03-05 11:36:05 +01:00
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
func ( uw * urlWatcher ) reloadObjects ( ) string {
2021-05-18 14:10:45 +02:00
if uw . resourceVersion != "" {
2021-03-11 15:41:09 +01:00
// Fast path - there is no need in reloading the objects.
2021-05-18 14:10:45 +02:00
return uw . resourceVersion
2021-03-07 18:50:01 +01:00
}
2023-10-27 13:35:27 +02:00
gw := uw . gw
2022-04-21 11:57:44 +02:00
startTime := time . Now ( )
2023-08-30 16:03:41 +02:00
apiURL := uw . apiURL
// Set resourceVersion to 0 in order to reduce load on Kubernetes control plane.
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 .
delimiter := getQueryArgsDelimiter ( apiURL )
requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan"
2023-10-27 13:35:27 +02:00
resp , err := gw . doRequest ( gw . ctx , requestURL )
2021-03-05 11:36:05 +01:00
if err != nil {
2023-09-18 17:06:37 +02:00
if ! errors . Is ( err , context . Canceled ) {
logger . Errorf ( "cannot perform request to %q: %s" , requestURL , err )
}
2021-03-05 11:36:05 +01:00
return ""
}
if resp . StatusCode != http . StatusOK {
2022-08-21 23:13:44 +02:00
body , _ := io . ReadAll ( resp . Body )
2021-03-11 15:41:09 +01:00
_ = resp . Body . Close ( )
2021-03-05 11:36:05 +01:00
logger . Errorf ( "unexpected status code for request to %q: %d; want %d; response: %q" , requestURL , resp . StatusCode , http . StatusOK , body )
return ""
}
2021-04-02 13:45:08 +02:00
objectsByKey , metadata , err := uw . parseObjectList ( resp . Body )
2021-03-11 15:41:09 +01:00
_ = resp . Body . Close ( )
2021-03-05 11:36:05 +01:00
if err != nil {
2021-03-11 15:41:09 +01:00
logger . Errorf ( "cannot parse objects from %q: %s" , requestURL , err )
2021-03-05 11:36:05 +01:00
return ""
}
2021-03-11 15:41:09 +01:00
2023-10-27 13:35:27 +02:00
gw . mu . Lock ( )
2022-04-22 12:21:58 +02:00
objectsAdded := make ( map [ string ] object )
objectsUpdated := make ( map [ string ] object )
var objectsRemoved [ ] string
for key , oPrev := range uw . objectsByKey {
2022-04-20 15:11:37 +02:00
o , ok := objectsByKey [ key ]
if ok {
2022-04-22 12:21:58 +02:00
if ! reflect . DeepEqual ( oPrev , o ) {
objectsUpdated [ key ] = o
}
// Overwrite oPrev with o even if these objects are equal.
// This should free up memory associated with oPrev.
uw . objectsByKey [ key ] = o
2021-03-11 15:41:09 +01:00
} else {
2022-04-22 12:21:58 +02:00
objectsRemoved = append ( objectsRemoved , key )
delete ( uw . objectsByKey , key )
2021-03-05 11:36:05 +01:00
}
}
2022-04-20 15:11:37 +02:00
for key , o := range objectsByKey {
2021-04-02 13:45:08 +02:00
if _ , ok := uw . objectsByKey [ key ] ; ! ok {
2022-04-22 12:21:58 +02:00
objectsAdded [ key ] = o
uw . objectsByKey [ key ] = o
2021-03-11 15:41:09 +01:00
}
}
2022-04-22 12:21:58 +02:00
uw . removeScrapeWorksLocked ( objectsRemoved )
uw . updateScrapeWorksLocked ( objectsUpdated , uw . aws )
uw . updateScrapeWorksLocked ( objectsAdded , uw . aws )
2022-05-21 00:01:37 +02:00
uw . needRecreateScrapeWorks = false
2022-04-22 12:21:58 +02:00
if len ( objectsRemoved ) > 0 || len ( objectsUpdated ) > 0 || len ( objectsAdded ) > 0 {
2022-04-22 18:39:34 +02:00
uw . maybeUpdateDependedScrapeWorksLocked ( )
2022-04-22 12:21:58 +02:00
}
2023-10-27 13:35:27 +02:00
gw . mu . Unlock ( )
2021-05-18 14:10:45 +02:00
2022-04-22 12:21:58 +02:00
uw . objectsUpdated . Add ( len ( objectsUpdated ) )
uw . objectsRemoved . Add ( len ( objectsRemoved ) )
uw . objectsAdded . Add ( len ( objectsAdded ) )
uw . objectsCount . Add ( len ( objectsAdded ) - len ( objectsRemoved ) )
2021-03-07 18:50:01 +01:00
uw . resourceVersion = metadata . ResourceVersion
2021-04-29 09:14:24 +02:00
2022-04-21 11:57:44 +02:00
logger . Infof ( "reloaded %d objects from %q in %.3fs; updated=%d, removed=%d, added=%d, resourceVersion=%q" ,
2022-04-22 12:21:58 +02:00
len ( objectsByKey ) , requestURL , time . Since ( startTime ) . Seconds ( ) , len ( objectsUpdated ) , len ( objectsRemoved ) , len ( objectsAdded ) , uw . resourceVersion )
2021-04-05 12:53:26 +02:00
return uw . resourceVersion
}
2021-03-07 18:50:01 +01:00
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
2021-03-05 11:36:05 +01:00
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
2021-03-07 18:50:01 +01:00
func ( uw * urlWatcher ) watchForUpdates ( ) {
2023-10-27 13:35:27 +02:00
gw := uw . gw
stopCh := gw . ctx . Done ( )
2021-03-05 11:36:05 +01:00
backoffDelay := time . Second
maxBackoffDelay := 30 * time . Second
backoffSleep := func ( ) {
2023-09-18 23:23:41 +02:00
t := timerpool . Get ( backoffDelay )
select {
case <- stopCh :
timerpool . Put ( t )
return
case <- t . C :
timerpool . Put ( t )
}
2021-03-05 11:36:05 +01:00
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
}
apiURL := uw . apiURL
2023-08-30 16:03:41 +02:00
delimiter := getQueryArgsDelimiter ( apiURL )
2023-10-27 13:35:27 +02:00
timeoutSeconds := time . Duration ( 0.9 * float64 ( gw . client . Timeout ) ) . Seconds ( )
2021-03-10 14:06:33 +01:00
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv . Itoa ( int ( timeoutSeconds ) )
2021-03-05 11:36:05 +01:00
for {
2023-09-15 19:40:13 +02:00
select {
2023-09-18 17:06:37 +02:00
case <- stopCh :
2023-09-18 23:23:41 +02:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_discovery_kubernetes_url_watchers { role=%q} ` , uw . role ) ) . Dec ( )
logger . Infof ( "stopped %s watcher for %q" , uw . role , uw . apiURL )
2023-09-15 19:40:13 +02:00
return
default :
}
2021-03-07 18:50:01 +01:00
resourceVersion := uw . reloadObjects ( )
2021-03-14 20:55:00 +01:00
if resourceVersion == "" {
backoffSleep ( )
continue
2021-03-05 11:36:05 +01:00
}
2021-03-14 20:55:00 +01:00
requestURL := apiURL + "&resourceVersion=" + url . QueryEscape ( resourceVersion )
2023-10-27 13:35:27 +02:00
resp , err := gw . doRequest ( gw . ctx , requestURL )
2021-03-05 11:36:05 +01:00
if err != nil {
2023-09-18 17:06:37 +02:00
if ! errors . Is ( err , context . Canceled ) {
logger . Errorf ( "cannot perform request to %q: %s" , requestURL , err )
backoffSleep ( )
}
2021-03-05 11:36:05 +01:00
continue
}
if resp . StatusCode != http . StatusOK {
if resp . StatusCode == 410 {
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
backoffDelay = time . Second
2021-03-26 11:28:10 +01:00
uw . staleResourceVersions . Inc ( )
2021-05-18 14:10:45 +02:00
uw . resourceVersion = ""
2021-03-05 11:36:05 +01:00
} else {
2022-08-21 23:13:44 +02:00
body , _ := io . ReadAll ( resp . Body )
2021-03-26 11:28:10 +01:00
_ = resp . Body . Close ( )
logger . Errorf ( "unexpected status code for request to %q: %d; want %d; response: %q" , requestURL , resp . StatusCode , http . StatusOK , body )
2021-03-05 11:36:05 +01:00
backoffSleep ( )
}
continue
}
backoffDelay = time . Second
err = uw . readObjectUpdateStream ( resp . Body )
_ = resp . Body . Close ( )
if err != nil {
2023-10-27 13:35:27 +02:00
if ! errors . Is ( err , io . EOF ) && ! errors . Is ( err , context . Canceled ) {
2021-03-05 11:36:05 +01:00
logger . Errorf ( "error when reading WatchEvent stream from %q: %s" , requestURL , err )
2021-05-18 22:25:42 +02:00
uw . resourceVersion = ""
2021-03-05 11:36:05 +01:00
}
backoffSleep ( )
continue
}
}
}
2022-04-20 09:50:36 +02:00
// readObjectUpdateStream reads Kubernetes watch events from r and updates locally cached objects according to the received events.
2021-03-05 11:36:05 +01:00
func ( uw * urlWatcher ) readObjectUpdateStream ( r io . Reader ) error {
2023-10-27 13:35:27 +02:00
gw := uw . gw
2021-03-05 11:36:05 +01:00
d := json . NewDecoder ( r )
var we WatchEvent
for {
if err := d . Decode ( & we ) ; err != nil {
2022-10-18 19:37:42 +02:00
return fmt . Errorf ( "cannot parse WatchEvent json response: %w" , err )
2021-03-05 11:36:05 +01:00
}
switch we . Type {
case "ADDED" , "MODIFIED" :
2021-03-11 12:06:40 +01:00
o , err := uw . parseObject ( we . Object )
if err != nil {
2021-05-18 22:25:42 +02:00
return fmt . Errorf ( "cannot parse %s object: %w" , we . Type , err )
2021-03-11 12:06:40 +01:00
}
2021-04-02 13:45:08 +02:00
key := o . key ( )
2023-10-27 13:35:27 +02:00
gw . mu . Lock ( )
2022-04-20 15:11:37 +02:00
uw . updateObjectLocked ( key , o )
2023-10-27 13:35:27 +02:00
gw . mu . Unlock ( )
2021-03-05 11:36:05 +01:00
case "DELETED" :
2021-03-11 12:06:40 +01:00
o , err := uw . parseObject ( we . Object )
if err != nil {
2021-05-18 22:25:42 +02:00
return fmt . Errorf ( "cannot parse %s object: %w" , we . Type , err )
2021-03-11 12:06:40 +01:00
}
2021-04-02 13:45:08 +02:00
key := o . key ( )
2023-10-27 13:35:27 +02:00
gw . mu . Lock ( )
2022-04-20 15:11:37 +02:00
uw . removeObjectLocked ( key )
2023-10-27 13:35:27 +02:00
gw . mu . Unlock ( )
2021-03-10 14:06:33 +01:00
case "BOOKMARK" :
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
2021-03-11 12:06:40 +01:00
bm , err := parseBookmark ( we . Object )
if err != nil {
return fmt . Errorf ( "cannot parse bookmark from %q: %w" , we . Object , err )
}
2021-05-18 14:10:45 +02:00
uw . resourceVersion = bm . Metadata . ResourceVersion
2021-03-26 11:28:10 +01:00
case "ERROR" :
em , err := parseError ( we . Object )
if err != nil {
2021-03-26 11:45:59 +01:00
return fmt . Errorf ( "cannot parse error message from %q: %w" , we . Object , err )
2021-03-26 11:28:10 +01:00
}
if em . Code == 410 {
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
uw . staleResourceVersions . Inc ( )
2021-05-18 14:10:45 +02:00
uw . resourceVersion = ""
2021-03-26 11:28:10 +01:00
return nil
}
return fmt . Errorf ( "unexpected error message: %q" , we . Object )
2021-03-05 11:36:05 +01:00
default :
2021-03-26 11:28:10 +01:00
return fmt . Errorf ( "unexpected WatchEvent type %q: %q" , we . Type , we . Object )
2021-03-05 11:36:05 +01:00
}
}
}
2022-04-20 15:11:37 +02:00
func ( uw * urlWatcher ) updateObjectLocked ( key string , o object ) {
oPrev , ok := uw . objectsByKey [ key ]
2022-04-22 12:21:58 +02:00
// Overwrite oPrev with o even if these objects are equal.
// This should free up memory associated with oPrev.
2022-04-20 15:11:37 +02:00
uw . objectsByKey [ key ] = o
2022-04-22 12:21:58 +02:00
if ! ok {
uw . objectsCount . Inc ( )
uw . objectsAdded . Inc ( )
} else {
if reflect . DeepEqual ( oPrev , o ) {
// Nothing to do, since the new object is equal to the previous one.
return
}
uw . objectsUpdated . Inc ( )
}
2022-04-21 11:57:44 +02:00
if len ( uw . aws ) > 0 {
2022-11-30 06:22:12 +01:00
labelss := o . getTargetLabels ( uw . gw )
2022-04-21 11:57:44 +02:00
for aw := range uw . aws {
2022-11-30 06:22:12 +01:00
aw . setScrapeWorks ( uw , key , labelss )
2022-04-21 11:57:44 +02:00
}
2022-11-30 06:22:12 +01:00
putLabelssToPool ( labelss )
2022-04-20 15:11:37 +02:00
}
2022-04-22 18:39:34 +02:00
uw . maybeUpdateDependedScrapeWorksLocked ( )
2022-04-20 15:11:37 +02:00
}
func ( uw * urlWatcher ) removeObjectLocked ( key string ) {
2022-04-22 12:21:58 +02:00
if _ , ok := uw . objectsByKey [ key ] ; ! ok {
return
}
uw . objectsCount . Dec ( )
uw . objectsRemoved . Inc ( )
2022-04-20 15:11:37 +02:00
delete ( uw . objectsByKey , key )
for aw := range uw . aws {
aw . removeScrapeWorks ( uw , key )
}
2022-04-22 18:39:34 +02:00
uw . maybeUpdateDependedScrapeWorksLocked ( )
2022-04-21 11:57:44 +02:00
}
2022-04-22 18:39:34 +02:00
func ( uw * urlWatcher ) maybeUpdateDependedScrapeWorksLocked ( ) {
role := uw . role
attachNodeMetadata := uw . gw . attachNodeMetadata
if ! ( role == "pod" || role == "service" || ( attachNodeMetadata && role == "node" ) ) {
2022-04-21 11:57:44 +02:00
// Nothing to update
return
}
namespace := uw . namespace
for _ , uwx := range uw . gw . m {
2022-04-22 18:39:34 +02:00
if namespace != "" && uwx . namespace != "" && uwx . namespace != namespace {
// Namespace mismatch
continue
}
if ( role == "pod" || role == "service" ) && ( uwx . role == "endpoints" || uwx . role == "endpointslice" ) {
// endpoints and endpointslice objects depend on pods and service objects
2022-05-21 00:01:37 +02:00
uwx . needRecreateScrapeWorks = true
2022-04-21 11:57:44 +02:00
continue
}
2022-07-06 22:18:55 +02:00
if attachNodeMetadata && role == "node" && ( uwx . role == "pod" || uwx . role == "endpoints" || uwx . role == "endpointslice" ) {
// pod, endpoints and enpointslices objects depend on node objects if attachNodeMetadata is set
2022-05-21 00:01:37 +02:00
uwx . needRecreateScrapeWorks = true
2022-04-21 11:57:44 +02:00
continue
}
}
2022-04-20 15:11:37 +02:00
}
2021-03-26 11:28:10 +01:00
// Bookmark is a bookmark message from Kubernetes Watch API.
2021-03-11 12:06:40 +01:00
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
type Bookmark struct {
Metadata struct {
ResourceVersion string
}
}
func parseBookmark ( data [ ] byte ) ( * Bookmark , error ) {
var bm Bookmark
if err := json . Unmarshal ( data , & bm ) ; err != nil {
return nil , err
}
return & bm , nil
}
2021-03-26 11:28:10 +01:00
// Error is an error message from Kubernetes Watch API.
type Error struct {
Code int
}
func parseError ( data [ ] byte ) ( * Error , error ) {
var em Error
if err := json . Unmarshal ( data , & em ) ; err != nil {
return nil , err
}
return & em , nil
}
2021-05-17 22:45:20 +02:00
func getAPIPathsWithNamespaces ( role string , namespaces [ ] string , selectors [ ] Selector ) [ ] string {
2021-04-05 19:27:23 +02:00
objectType := getObjectTypeByRole ( role )
if objectType == "nodes" || len ( namespaces ) == 0 {
2021-03-05 11:36:05 +01:00
query := joinSelectors ( role , selectors )
2021-04-05 19:27:23 +02:00
path := getAPIPath ( objectType , "" , query )
2021-05-17 22:45:20 +02:00
return [ ] string { path }
2021-03-05 11:36:05 +01:00
}
query := joinSelectors ( role , selectors )
paths := make ( [ ] string , len ( namespaces ) )
for i , namespace := range namespaces {
2021-04-05 19:27:23 +02:00
paths [ i ] = getAPIPath ( objectType , namespace , query )
2021-03-05 11:36:05 +01:00
}
2021-05-17 22:45:20 +02:00
return paths
2021-03-05 11:36:05 +01:00
}
2021-04-05 19:27:23 +02:00
func getAPIPath ( objectType , namespace , query string ) string {
suffix := objectType
2021-03-05 11:36:05 +01:00
if namespace != "" {
2021-04-05 19:27:23 +02:00
suffix = "namespaces/" + namespace + "/" + objectType
2021-03-05 11:36:05 +01:00
}
if len ( query ) > 0 {
suffix += "?" + query
}
2021-04-05 19:27:23 +02:00
if objectType == "ingresses" {
2021-08-29 10:16:40 +02:00
return "/apis/networking.k8s.io/v1/" + suffix
2021-03-05 11:36:05 +01:00
}
2021-04-05 19:27:23 +02:00
if objectType == "endpointslices" {
2021-08-29 10:16:40 +02:00
return "/apis/discovery.k8s.io/v1/" + suffix
2021-03-05 11:36:05 +01:00
}
return "/api/v1/" + suffix
}
func joinSelectors ( role string , selectors [ ] Selector ) string {
var labelSelectors , fieldSelectors [ ] string
for _ , s := range selectors {
if s . Role != role {
continue
}
if s . Label != "" {
labelSelectors = append ( labelSelectors , s . Label )
}
if s . Field != "" {
fieldSelectors = append ( fieldSelectors , s . Field )
}
}
var args [ ] string
if len ( labelSelectors ) > 0 {
args = append ( args , "labelSelector=" + url . QueryEscape ( strings . Join ( labelSelectors , "," ) ) )
}
if len ( fieldSelectors ) > 0 {
args = append ( args , "fieldSelector=" + url . QueryEscape ( strings . Join ( fieldSelectors , "," ) ) )
}
return strings . Join ( args , "&" )
}
2021-04-05 19:27:23 +02:00
func getObjectTypeByRole ( role string ) string {
2021-03-05 11:36:05 +01:00
switch role {
case "node" :
return "nodes"
case "pod" :
return "pods"
case "service" :
return "services"
case "endpoints" :
return "endpoints"
2021-08-29 10:23:06 +02:00
case "endpointslice" :
2021-03-05 11:36:05 +01:00
return "endpointslices"
case "ingress" :
return "ingresses"
default :
logger . Panicf ( "BUG: unknonw role=%q" , role )
return ""
}
}
func getObjectParsersForRole ( role string ) ( parseObjectFunc , parseObjectListFunc ) {
switch role {
case "node" :
return parseNode , parseNodeList
case "pod" :
return parsePod , parsePodList
case "service" :
return parseService , parseServiceList
case "endpoints" :
return parseEndpoints , parseEndpointsList
2021-08-29 10:23:06 +02:00
case "endpointslice" :
2021-03-05 11:36:05 +01:00
return parseEndpointSlice , parseEndpointSliceList
case "ingress" :
return parseIngress , parseIngressList
default :
logger . Panicf ( "BUG: unsupported role=%q" , role )
return nil , nil
}
}
2023-08-30 16:03:41 +02:00
func getQueryArgsDelimiter ( apiURL string ) string {
if strings . Contains ( apiURL , "?" ) {
return "&"
}
return "?"
}