mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-12 12:46:23 +01:00
lib/promauth: follow-up for e16d3f5639
- Make sure that invalid/missing TLS CA file or TLS client certificate files at vmagent startup don't prevent from processing the corresponding scrape targets after the file becomes correct, without the need to restart vmagent. Previously scrape targets with invalid TLS CA file or TLS client certificate files were permanently dropped after the first attempt to initialize them, and they didn't appear until the next vmagent reload or the next change in other places of the loaded scrape configs. - Make sure that TLS CA is properly re-loaded from file after it changes without the need to restart vmagent. Previously the old TLS CA was used until vmagent restart. - Properly handle errors during http request creation for the second attempt to send data to remote system at vmagent and vmalert. Previously failed request creation could result in nil pointer dereferencing, since the returned request is nil on error. - Add more context to the logged error during AWS sigv4 request signing before sending the data to -remoteWrite.url at vmagent. Previously it could miss details on the source of the request. - Do not create a new HTTP client per second when generating OAuth2 token needed to put in Authorization header of every http request issued by vmagent during service discovery or target scraping. Re-use the HTTP client instead until the corresponding scrape config changes. - Cache error at lib/promauth.Config.GetAuthHeader() in the same way as the auth header is cached, e.g. the error is cached for a second now. This should reduce load on CPU and OAuth2 server when auth header cannot be obtained because of temporary error. - Share tls.Config.GetClientCertificate function among multiple scrape targets with the same tls_config. Cache the loaded certificate and the error for one second. This should significantly reduce CPU load when scraping big number of targets with the same tls_config. - Allow loading TLS certificates from HTTP and HTTPs urls by specifying these urls at `tls_config->cert_file` and `tls_config->key_file`. - Improve test coverage at lib/promauth - Skip unreachable or invalid files specified at `scrape_config_files` during vmagent startup, since these files may become valid later. Previously vmagent was exitting in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959
This commit is contained in:
parent
8c9e3b7b50
commit
f03e81c693
@ -106,12 +106,15 @@ type client struct {
|
||||
func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client {
|
||||
authCfg, err := getAuthConfig(argIdx)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot initialize auth config for remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||
logger.Fatalf("cannot initialize auth config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||
}
|
||||
tlsCfg, err := authCfg.NewTLSConfig()
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot initialize tls config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||
}
|
||||
tlsCfg := authCfg.NewTLSConfig()
|
||||
awsCfg, err := getAWSAPIConfig(argIdx)
|
||||
if err != nil {
|
||||
logger.Fatalf("FATAL: cannot initialize AWS Config for remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||
logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||
}
|
||||
tr := &http.Transport{
|
||||
DialContext: statDial,
|
||||
@ -328,15 +331,25 @@ func (c *client) doRequest(url string, body []byte) (*http.Response, error) {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := c.hc.Do(req)
|
||||
if err != nil && errors.Is(err, io.EOF) {
|
||||
// it is likely connection became stale.
|
||||
// So we do one more attempt in hope request will succeed.
|
||||
// If not, the error should be handled by the caller as usual.
|
||||
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139
|
||||
req, _ = c.newRequest(url, body)
|
||||
resp, err = c.hc.Do(req)
|
||||
if err == nil {
|
||||
return resp, nil
|
||||
}
|
||||
return resp, err
|
||||
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
return nil, err
|
||||
}
|
||||
// It is likely connection became stale or timed out during the first request.
|
||||
// Make another attempt in hope request will succeed.
|
||||
// If not, the error should be handled by the caller as usual.
|
||||
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139
|
||||
req, err = c.newRequest(url, body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("second attempt: %w", err)
|
||||
}
|
||||
resp, err = c.hc.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("second attempt: %w", err)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
|
||||
@ -362,8 +375,7 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
|
||||
if c.awsCfg != nil {
|
||||
sigv4Hash := awsapi.HashHex(body)
|
||||
if err := c.awsCfg.SignRequest(req, sigv4Hash); err != nil {
|
||||
// there is no need in retry, request will be rejected by client.Do and retried by code below
|
||||
logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err)
|
||||
return nil, fmt.Errorf("cannot sign remoteWrite request with AWS sigv4: %w", err)
|
||||
}
|
||||
}
|
||||
return req, nil
|
||||
|
@ -142,24 +142,30 @@ func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Resu
|
||||
return Result{}, nil, err
|
||||
}
|
||||
resp, err := s.do(ctx, req)
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
// something in the middle between client and datasource might be closing
|
||||
// the connection. So we do a one more attempt in hope request will succeed.
|
||||
req, _ = s.newQueryRequest(query, ts)
|
||||
resp, err = s.do(ctx, req)
|
||||
}
|
||||
if err != nil {
|
||||
return Result{}, req, err
|
||||
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
// Return unexpected error to the caller.
|
||||
return Result{}, nil, err
|
||||
}
|
||||
// Something in the middle between client and datasource might be closing
|
||||
// the connection. So we do a one more attempt in hope request will succeed.
|
||||
req, err = s.newQueryRequest(query, ts)
|
||||
if err != nil {
|
||||
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
||||
}
|
||||
resp, err = s.do(ctx, req)
|
||||
if err != nil {
|
||||
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
||||
}
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
|
||||
// Process the received response.
|
||||
parseFn := parsePrometheusResponse
|
||||
if s.dataSourceType != datasourcePrometheus {
|
||||
parseFn = parseGraphiteResponse
|
||||
}
|
||||
result, err := parseFn(req, resp)
|
||||
_ = resp.Body.Close()
|
||||
return result, req, err
|
||||
}
|
||||
|
||||
@ -177,23 +183,31 @@ func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end tim
|
||||
return res, fmt.Errorf("end param is missing")
|
||||
}
|
||||
req, err := s.newQueryRangeRequest(query, start, end)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
resp, err := s.do(ctx, req)
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
// something in the middle between client and datasource might be closing
|
||||
// the connection. So we do a one more attempt in hope request will succeed.
|
||||
req, _ = s.newQueryRangeRequest(query, start, end)
|
||||
resp, err = s.do(ctx, req)
|
||||
}
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
return parsePrometheusResponse(req, resp)
|
||||
resp, err := s.do(ctx, req)
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
// Return unexpected error to the caller.
|
||||
return res, err
|
||||
}
|
||||
// Something in the middle between client and datasource might be closing
|
||||
// the connection. So we do a one more attempt in hope request will succeed.
|
||||
req, err = s.newQueryRangeRequest(query, start, end)
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("second attempt: %w", err)
|
||||
}
|
||||
resp, err = s.do(ctx, req)
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("second attempt: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Process the received response.
|
||||
res, err = parsePrometheusResponse(req, resp)
|
||||
_ = resp.Body.Close()
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, error) {
|
||||
|
@ -282,7 +282,9 @@ func (c *Client) send(ctx context.Context, data []byte) error {
|
||||
if c.authCfg != nil {
|
||||
err = c.authCfg.SetHeaders(req, true)
|
||||
if err != nil {
|
||||
return &nonRetriableError{err: err}
|
||||
return &nonRetriableError{
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
if !*disablePathAppend {
|
||||
@ -306,8 +308,9 @@ func (c *Client) send(ctx context.Context, data []byte) error {
|
||||
case 4:
|
||||
if resp.StatusCode != http.StatusTooManyRequests {
|
||||
// MUST NOT retry write requests on HTTP 4xx responses other than 429
|
||||
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||
resp.StatusCode, req.URL.Redacted(), body)}
|
||||
return &nonRetriableError{
|
||||
err: fmt.Errorf("unexpected response code %d for %s. Response body %q", resp.StatusCode, req.URL.Redacted(), body),
|
||||
}
|
||||
}
|
||||
fallthrough
|
||||
default:
|
||||
|
@ -41,7 +41,9 @@ The sandbox cluster installation is running under the constant load generated by
|
||||
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049).
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-rule.evalDelay` flag and `eval_delay` attribute for [Groups](https://docs.victoriametrics.com/vmalert.html#groups). The new flag and param can be used to adjust the `time` parameter for rule evaluation requests to match [intentional query delay](https://docs.victoriametrics.com/keyConcepts.html#query-latency) from the datasource. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5155).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support data ingestion from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-newrelic-agent), [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): skip job with error logs if there is incorrect syntax under `scrape_configs`, previously will exit. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not exit on startup when [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) refer to non-existing or invalid files with auth configs, since these files may appear / updated later. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow loading TLS certificates from HTTP and HTTPS urls by specifying these urls at `cert_file` and `key_file` options inside `tls_config` and `proxy_tls_config` sections at [http client settings](https://docs.victoriametrics.com/sd_configs.html#http-api-client-options).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce CPU load when big number of targets are scraped over HTTPS with the same custom TLS certificate configured via `tls_config->cert_file` and `tls_config->key_file` at [scrape_config](https://docs.victoriametrics.com/sd_configs.html#scrape_configs).
|
||||
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-filestream.disableFadvise` command-line flag, which can be used for disabling `fadvise` syscall during backup upload to the remote storage. By default `vmbackup` uses `fadvise` syscall in order to prevent from eviction of recently accessed data from the [OS page cache](https://en.wikipedia.org/wiki/Page_cache) when backing up large files. Sometimes the `fadvise` syscall may take significant amounts of CPU when the backup is performed with large value of `-concurrency` command-line flag on systems with big number of CPU cores. In this case it is better to manually disable `fadvise` syscall by passing `-filestream.disableFadvise` command-line flag to `vmbackup`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120) for details.
|
||||
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-deleteAllObjectVersions` command-line flag, which can be used for forcing removal of all object versions in remote object storage. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5121) issue and [these docs](https://docs.victoriametrics.com/vmbackup.html#permanent-deletion-of-objects-in-s3-compatible-storages) for the details.
|
||||
* FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): account for `vmauth` component for alerts `ServiceDown` and `TooManyRestarts`.
|
||||
@ -55,7 +57,7 @@ The sandbox cluster installation is running under the constant load generated by
|
||||
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix vmalert web UI when running on 32-bit architectures machine.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): don't send requests if there is wrong auth config in `datasource`, `remoteWrite`, `remoteRead` and `notifier` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): do not send requests to configured remote systems when `-datasource.*`, `-remoteWrite.*`, `-remoteRead.*` or `-notifier.*` command-line flags refer files with invalid auth configs. Previously such requests were sent without properly set auth headers. Now the requests are sent only after the files are updated with valid auth configs. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
|
||||
* BUGFIX: `vmselect`: improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087).
|
||||
* BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component.
|
||||
* BUGFIX: dashboards/vmalert: apply `desc` sorting in tooltips for vmalert dashboard in order to improve visibility of the outliers on graph.
|
||||
@ -63,7 +65,7 @@ The sandbox cluster installation is running under the constant load generated by
|
||||
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): bump hard-coded limit for search query size at `vmstorage` from 1MB to 5MB. The change should be more suitable for real-world scenarios and protect vmstorage from excessive memory usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154) for details
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix error when creating an incremental backup with the `-origin` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5144) for details.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix vmagent ignoring configuration reload for streaming aggregation if it was started with empty streaming aggregation config. Thanks to @aluode99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5178).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): don't send requests if there is wrong auth config in `scrape_configs` and `remoteWrite` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not scrape targets if the corresponding [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) refer to files with invalid auth configs. Previously the targets were scraped without properly set auth headers in this case. Now targets are scraped only after the files are updated with valid auth configs. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly parse `ca`, `cert` and `key` options at `tls_config` section inside [http client settings](https://docs.victoriametrics.com/sd_configs.html#http-api-client-options). Previously string values couldn't be parsed for these options, since the parser was mistakenly expecting a list of `uint8` values instead.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly drop samples if `-streamAggr.dropInput` command-line flag is set and `-remoteWrite.streamAggr.config` contains an empty file. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5207).
|
||||
* BUGFIX: [vmstorage](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): prevent deleted series to be searchable via `/api/v1/series` API if they were re-ingested with staleness markers. This situation could happen if user deletes the series from the target and from VM, and then vmagent sends stale markers for absent series. Thanks to @ilyatrefilov for the [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5069) and [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5174).
|
||||
|
@ -1,7 +1,6 @@
|
||||
package promauth
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
@ -83,18 +82,6 @@ type TLSConfig struct {
|
||||
// This can only result in lower security level if improperly set.
|
||||
}
|
||||
|
||||
// String returns human-readable representation of tc
|
||||
func (tc *TLSConfig) String() string {
|
||||
if tc == nil {
|
||||
return ""
|
||||
}
|
||||
caHash := xxhash.Sum64([]byte(tc.CA))
|
||||
certHash := xxhash.Sum64([]byte(tc.Cert))
|
||||
keyHash := xxhash.Sum64([]byte(tc.Key))
|
||||
return fmt.Sprintf("hash(ca)=%d, ca_file=%q, hash(cert)=%d, cert_file=%q, hash(key)=%d, key_file=%q, server_name=%q, insecure_skip_verify=%v, min_version=%q",
|
||||
caHash, tc.CAFile, certHash, tc.CertFile, keyHash, tc.KeyFile, tc.ServerName, tc.InsecureSkipVerify, tc.MinVersion)
|
||||
}
|
||||
|
||||
// Authorization represents generic authorization config.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/
|
||||
@ -162,12 +149,6 @@ type OAuth2Config struct {
|
||||
ProxyURL string `yaml:"proxy_url,omitempty"`
|
||||
}
|
||||
|
||||
// String returns string representation of o.
|
||||
func (o *OAuth2Config) String() string {
|
||||
return fmt.Sprintf("clientID=%q, clientSecret=%q, clientSecretFile=%q, Scopes=%q, tokenURL=%q, endpointParams=%q, tlsConfig={%s}, proxyURL=%q",
|
||||
o.ClientID, o.ClientSecret, o.ClientSecretFile, o.Scopes, o.TokenURL, o.EndpointParams, o.TLSConfig.String(), o.ProxyURL)
|
||||
}
|
||||
|
||||
func (o *OAuth2Config) validate() error {
|
||||
if o.ClientID == "" {
|
||||
return fmt.Errorf("client_id cannot be empty")
|
||||
@ -188,11 +169,26 @@ type oauth2ConfigInternal struct {
|
||||
mu sync.Mutex
|
||||
cfg *clientcredentials.Config
|
||||
clientSecretFile string
|
||||
ctx context.Context
|
||||
tokenSource oauth2.TokenSource
|
||||
|
||||
// ac contains auth config needed for initializing tls config
|
||||
ac *Config
|
||||
|
||||
proxyURL string
|
||||
proxyURLFunc func(*http.Request) (*url.URL, error)
|
||||
|
||||
ctx context.Context
|
||||
tokenSource oauth2.TokenSource
|
||||
}
|
||||
|
||||
func (oi *oauth2ConfigInternal) String() string {
|
||||
return fmt.Sprintf("clientID=%q, clientSecret=%q, clientSecretFile=%q, scopes=%q, endpointParams=%q, tokenURL=%q, proxyURL=%q, tlsConfig={%s}",
|
||||
oi.cfg.ClientID, oi.cfg.ClientSecret, oi.clientSecretFile, oi.cfg.Scopes, oi.cfg.EndpointParams, oi.cfg.TokenURL, oi.proxyURL, oi.ac.String())
|
||||
}
|
||||
|
||||
func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInternal, error) {
|
||||
if err := o.validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
oi := &oauth2ConfigInternal{
|
||||
cfg: &clientcredentials.Config{
|
||||
ClientID: o.ClientID,
|
||||
@ -204,11 +200,8 @@ func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInte
|
||||
}
|
||||
if o.ClientSecretFile != "" {
|
||||
oi.clientSecretFile = fs.GetFilepath(baseDir, o.ClientSecretFile)
|
||||
secret, err := readPasswordFromFile(oi.clientSecretFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read OAuth2 secret from %q: %w", oi.clientSecretFile, err)
|
||||
}
|
||||
oi.cfg.ClientSecret = secret
|
||||
// There is no need in reading oi.clientSecretFile now, since it may be missing right now.
|
||||
// It is read later before performing oauth2 request to server.
|
||||
}
|
||||
opts := &Options{
|
||||
BaseDir: baseDir,
|
||||
@ -216,25 +209,17 @@ func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInte
|
||||
}
|
||||
ac, err := opts.NewConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize TLS config for OAuth2: %w", err)
|
||||
return nil, fmt.Errorf("cannot parse TLS config for OAuth2: %w", err)
|
||||
}
|
||||
tlsCfg := ac.NewTLSConfig()
|
||||
var proxyURLFunc func(*http.Request) (*url.URL, error)
|
||||
oi.ac = ac
|
||||
if o.ProxyURL != "" {
|
||||
u, err := url.Parse(o.ProxyURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse proxy_url=%q: %w", o.ProxyURL, err)
|
||||
}
|
||||
proxyURLFunc = http.ProxyURL(u)
|
||||
oi.proxyURL = o.ProxyURL
|
||||
oi.proxyURLFunc = http.ProxyURL(u)
|
||||
}
|
||||
c := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
Proxy: proxyURLFunc,
|
||||
},
|
||||
}
|
||||
oi.ctx = context.WithValue(context.Background(), oauth2.HTTPClient, c)
|
||||
oi.tokenSource = oi.cfg.TokenSource(oi.ctx)
|
||||
return oi, nil
|
||||
}
|
||||
|
||||
@ -246,10 +231,32 @@ func urlValuesFromMap(m map[string]string) url.Values {
|
||||
return result
|
||||
}
|
||||
|
||||
func (oi *oauth2ConfigInternal) initTokenSource() error {
|
||||
tlsCfg, err := oi.ac.NewTLSConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot initialize TLS config for OAuth2: %w", err)
|
||||
}
|
||||
c := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
Proxy: oi.proxyURLFunc,
|
||||
},
|
||||
}
|
||||
oi.ctx = context.WithValue(context.Background(), oauth2.HTTPClient, c)
|
||||
oi.tokenSource = oi.cfg.TokenSource(oi.ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (oi *oauth2ConfigInternal) getTokenSource() (oauth2.TokenSource, error) {
|
||||
oi.mu.Lock()
|
||||
defer oi.mu.Unlock()
|
||||
|
||||
if oi.tokenSource == nil {
|
||||
if err := oi.initTokenSource(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if oi.clientSecretFile == "" {
|
||||
return oi.tokenSource, nil
|
||||
}
|
||||
@ -267,23 +274,21 @@ func (oi *oauth2ConfigInternal) getTokenSource() (oauth2.TokenSource, error) {
|
||||
|
||||
// Config is auth config.
|
||||
type Config struct {
|
||||
// Optional TLS config
|
||||
TLSRootCA *x509.CertPool
|
||||
TLSServerName string
|
||||
TLSInsecureSkipVerify bool
|
||||
TLSMinVersion uint16
|
||||
tlsServerName string
|
||||
tlsInsecureSkipVerify bool
|
||||
tlsMinVersion uint16
|
||||
|
||||
getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
|
||||
tlsCertDigest string
|
||||
getTLSRootCACached getTLSRootCAFunc
|
||||
tlsRootCADigest string
|
||||
|
||||
getAuthHeader func() (string, error)
|
||||
authHeaderLock sync.Mutex
|
||||
authHeader string
|
||||
authHeaderDeadline uint64
|
||||
getTLSCertCached getTLSCertFunc
|
||||
tlsCertDigest string
|
||||
|
||||
headers []keyValue
|
||||
getAuthHeaderCached getAuthHeaderFunc
|
||||
authHeaderDigest string
|
||||
|
||||
authDigest string
|
||||
headers []keyValue
|
||||
headersDigest string
|
||||
}
|
||||
|
||||
type keyValue struct {
|
||||
@ -329,7 +334,7 @@ func (ac *Config) SetHeaders(req *http.Request, setAuthHeader bool) error {
|
||||
if setAuthHeader {
|
||||
ah, err := ac.GetAuthHeader()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set request auth header: %w", err)
|
||||
return fmt.Errorf("failed to obtain Authorization request header: %w", err)
|
||||
}
|
||||
if ah != "" {
|
||||
reqHeaders.Set("Authorization", ah)
|
||||
@ -347,7 +352,7 @@ func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool)
|
||||
if setAuthHeader {
|
||||
ah, err := ac.GetAuthHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to obtaine Authorization request header: %w", err)
|
||||
}
|
||||
if ah != "" {
|
||||
reqHeaders.Set("Authorization", ah)
|
||||
@ -358,21 +363,10 @@ func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool)
|
||||
|
||||
// GetAuthHeader returns optional `Authorization: ...` http header.
|
||||
func (ac *Config) GetAuthHeader() (string, error) {
|
||||
f := ac.getAuthHeader
|
||||
if f == nil {
|
||||
return "", nil
|
||||
if f := ac.getAuthHeaderCached; f != nil {
|
||||
return f()
|
||||
}
|
||||
ac.authHeaderLock.Lock()
|
||||
defer ac.authHeaderLock.Unlock()
|
||||
if fasttime.UnixTimestamp() > ac.authHeaderDeadline {
|
||||
var err error
|
||||
if ac.authHeader, err = f(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
// Cache the authHeader for a second.
|
||||
ac.authHeaderDeadline = fasttime.UnixTimestamp() + 1
|
||||
}
|
||||
return ac.authHeader, nil
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// String returns human-readable representation for ac.
|
||||
@ -380,53 +374,103 @@ func (ac *Config) GetAuthHeader() (string, error) {
|
||||
// It is also used for comparing Config objects for equality. If two Config
|
||||
// objects have the same string representation, then they are considered equal.
|
||||
func (ac *Config) String() string {
|
||||
return fmt.Sprintf("AuthDigest=%s, Headers=%s, TLSRootCA=%s, TLSCertificate=%s, TLSServerName=%s, TLSInsecureSkipVerify=%v, TLSMinVersion=%d",
|
||||
ac.authDigest, ac.headers, ac.tlsRootCAString(), ac.tlsCertDigest, ac.TLSServerName, ac.TLSInsecureSkipVerify, ac.TLSMinVersion)
|
||||
return fmt.Sprintf("AuthHeader=%s, Headers=%s, TLSRootCA=%s, TLSCert=%s, TLSServerName=%s, TLSInsecureSkipVerify=%v, TLSMinVersion=%d",
|
||||
ac.authHeaderDigest, ac.headersDigest, ac.tlsRootCADigest, ac.tlsCertDigest, ac.tlsServerName, ac.tlsInsecureSkipVerify, ac.tlsMinVersion)
|
||||
}
|
||||
|
||||
func (ac *Config) tlsRootCAString() string {
|
||||
if ac.TLSRootCA == nil {
|
||||
return ""
|
||||
// getAuthHeaderFunc must return <value> for 'Authorization: <value>' http request header
|
||||
type getAuthHeaderFunc func() (string, error)
|
||||
|
||||
func newGetAuthHeaderCached(getAuthHeader getAuthHeaderFunc) getAuthHeaderFunc {
|
||||
if getAuthHeader == nil {
|
||||
return nil
|
||||
}
|
||||
var mu sync.Mutex
|
||||
var deadline uint64
|
||||
var ah string
|
||||
var err error
|
||||
return func() (string, error) {
|
||||
// Cahe the auth header and the error for up to a second in order to save CPU time
|
||||
// on reading and parsing auth headers from files.
|
||||
// This also reduces load on OAuth2 server when oauth2 config is enabled.
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if fasttime.UnixTimestamp() > deadline {
|
||||
ah, err = getAuthHeader()
|
||||
deadline = fasttime.UnixTimestamp() + 1
|
||||
}
|
||||
return ah, err
|
||||
}
|
||||
}
|
||||
|
||||
type getTLSRootCAFunc func() (*x509.CertPool, error)
|
||||
|
||||
func newGetTLSRootCACached(getTLSRootCA getTLSRootCAFunc) getTLSRootCAFunc {
|
||||
if getTLSRootCA == nil {
|
||||
return nil
|
||||
}
|
||||
var mu sync.Mutex
|
||||
var deadline uint64
|
||||
var rootCA *x509.CertPool
|
||||
var err error
|
||||
return func() (*x509.CertPool, error) {
|
||||
// Cache the root CA and the error for up to a second in order to save CPU time
|
||||
// on reading and parsing the root CA from files.
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if fasttime.UnixTimestamp() > deadline {
|
||||
rootCA, err = getTLSRootCA()
|
||||
deadline = fasttime.UnixTimestamp() + 1
|
||||
}
|
||||
return rootCA, err
|
||||
}
|
||||
}
|
||||
|
||||
type getTLSCertFunc func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error)
|
||||
|
||||
func newGetTLSCertCached(getTLSCert getTLSCertFunc) getTLSCertFunc {
|
||||
if getTLSCert == nil {
|
||||
return nil
|
||||
}
|
||||
var mu sync.Mutex
|
||||
var deadline uint64
|
||||
var cert *tls.Certificate
|
||||
var err error
|
||||
return func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||
// Cache the certificate and the error for up to a second in order to save CPU time
|
||||
// on certificate parsing when TLS connections are frequently re-established.
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if fasttime.UnixTimestamp() > deadline {
|
||||
cert, err = getTLSCert(cri)
|
||||
deadline = fasttime.UnixTimestamp() + 1
|
||||
}
|
||||
return cert, err
|
||||
}
|
||||
data := ac.TLSRootCA.Subjects()
|
||||
return string(bytes.Join(data, []byte("\n")))
|
||||
}
|
||||
|
||||
// NewTLSConfig returns new TLS config for the given ac.
|
||||
func (ac *Config) NewTLSConfig() *tls.Config {
|
||||
func (ac *Config) NewTLSConfig() (*tls.Config, error) {
|
||||
tlsCfg := &tls.Config{
|
||||
ClientSessionCache: tls.NewLRUClientSessionCache(0),
|
||||
}
|
||||
if ac == nil {
|
||||
return tlsCfg
|
||||
return tlsCfg, nil
|
||||
}
|
||||
if ac.getTLSCert != nil {
|
||||
var certLock sync.Mutex
|
||||
var cert *tls.Certificate
|
||||
var certDeadline uint64
|
||||
tlsCfg.GetClientCertificate = func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||
// Cache the certificate for up to a second in order to save CPU time
|
||||
// on certificate parsing when TLS connection are frequently re-established.
|
||||
certLock.Lock()
|
||||
defer certLock.Unlock()
|
||||
if fasttime.UnixTimestamp() > certDeadline {
|
||||
c, err := ac.getTLSCert(cri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cert = c
|
||||
certDeadline = fasttime.UnixTimestamp() + 1
|
||||
}
|
||||
return cert, nil
|
||||
tlsCfg.GetClientCertificate = ac.getTLSCertCached
|
||||
if f := ac.getTLSRootCACached; f != nil {
|
||||
rootCA, err := f()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load root CAs: %w", err)
|
||||
}
|
||||
tlsCfg.RootCAs = rootCA
|
||||
}
|
||||
tlsCfg.RootCAs = ac.TLSRootCA
|
||||
tlsCfg.ServerName = ac.TLSServerName
|
||||
tlsCfg.InsecureSkipVerify = ac.TLSInsecureSkipVerify
|
||||
tlsCfg.MinVersion = ac.TLSMinVersion
|
||||
tlsCfg.ServerName = ac.tlsServerName
|
||||
tlsCfg.InsecureSkipVerify = ac.tlsInsecureSkipVerify
|
||||
tlsCfg.MinVersion = ac.tlsMinVersion
|
||||
// Do not set tlsCfg.MaxVersion, since this has no sense from security PoV.
|
||||
// This can only result in lower security level if improperly set.
|
||||
return tlsCfg
|
||||
return tlsCfg, nil
|
||||
}
|
||||
|
||||
// NewConfig creates auth config for the given hcc.
|
||||
@ -525,17 +569,13 @@ func (opts *Options) NewConfig() (*Config, error) {
|
||||
if opts.BearerToken != "" {
|
||||
return nil, fmt.Errorf("both `bearer_token`=%q and `bearer_token_file`=%q are set", opts.BearerToken, opts.BearerTokenFile)
|
||||
}
|
||||
if err := actx.initFromBearerTokenFile(baseDir, opts.BearerTokenFile); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
actx.mustInitFromBearerTokenFile(baseDir, opts.BearerTokenFile)
|
||||
}
|
||||
if opts.BearerToken != "" {
|
||||
if actx.getAuthHeader != nil {
|
||||
return nil, fmt.Errorf("cannot simultaneously use `authorization`, `basic_auth` and `bearer_token`")
|
||||
}
|
||||
if err := actx.initFromBearerToken(opts.BearerToken); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
actx.mustInitFromBearerToken(opts.BearerToken)
|
||||
}
|
||||
if opts.OAuth2 != nil {
|
||||
if actx.getAuthHeader != nil {
|
||||
@ -555,29 +595,42 @@ func (opts *Options) NewConfig() (*Config, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hd := xxhash.New()
|
||||
for _, kv := range headers {
|
||||
hd.Sum([]byte(kv.key))
|
||||
hd.Sum([]byte("="))
|
||||
hd.Sum([]byte(kv.value))
|
||||
hd.Sum([]byte(","))
|
||||
}
|
||||
headersDigest := fmt.Sprintf("digest(headers)=%d", hd.Sum64())
|
||||
|
||||
ac := &Config{
|
||||
TLSRootCA: tctx.rootCA,
|
||||
TLSServerName: tctx.serverName,
|
||||
TLSInsecureSkipVerify: tctx.insecureSkipVerify,
|
||||
TLSMinVersion: tctx.minVersion,
|
||||
tlsServerName: tctx.serverName,
|
||||
tlsInsecureSkipVerify: tctx.insecureSkipVerify,
|
||||
tlsMinVersion: tctx.minVersion,
|
||||
|
||||
getTLSCert: tctx.getTLSCert,
|
||||
tlsCertDigest: tctx.tlsCertDigest,
|
||||
getTLSRootCACached: newGetTLSRootCACached(tctx.getTLSRootCA),
|
||||
tlsRootCADigest: tctx.tlsRootCADigest,
|
||||
|
||||
getTLSCertCached: newGetTLSCertCached(tctx.getTLSCert),
|
||||
tlsCertDigest: tctx.tlsCertDigest,
|
||||
|
||||
getAuthHeaderCached: newGetAuthHeaderCached(actx.getAuthHeader),
|
||||
authHeaderDigest: actx.authHeaderDigest,
|
||||
|
||||
getAuthHeader: actx.getAuthHeader,
|
||||
headers: headers,
|
||||
authDigest: actx.authDigest,
|
||||
headersDigest: headersDigest,
|
||||
}
|
||||
return ac, nil
|
||||
}
|
||||
|
||||
type authContext struct {
|
||||
// getAuthHeader must return <value> for 'Authorization: <value>' http request header
|
||||
getAuthHeader func() (string, error)
|
||||
getAuthHeader getAuthHeaderFunc
|
||||
|
||||
// authDigest must contain the digest for the used authorization
|
||||
// authHeaderDigest must contain the digest for the used authorization
|
||||
// The digest must be changed whenever the original config changes.
|
||||
authDigest string
|
||||
authHeaderDigest string
|
||||
}
|
||||
|
||||
func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization) error {
|
||||
@ -586,10 +639,11 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization
|
||||
azType = az.Type
|
||||
}
|
||||
if az.CredentialsFile == "" {
|
||||
ah := azType + " " + az.Credentials.String()
|
||||
actx.getAuthHeader = func() (string, error) {
|
||||
return azType + " " + az.Credentials.String(), nil
|
||||
return ah, nil
|
||||
}
|
||||
actx.authDigest = fmt.Sprintf("custom(type=%q, creds=%q)", az.Type, az.Credentials)
|
||||
actx.authHeaderDigest = fmt.Sprintf("custom(type=%q, creds=%q)", az.Type, az.Credentials)
|
||||
return nil
|
||||
}
|
||||
if az.Credentials != nil {
|
||||
@ -603,7 +657,7 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization
|
||||
}
|
||||
return azType + " " + token, nil
|
||||
}
|
||||
actx.authDigest = fmt.Sprintf("custom(type=%q, credsFile=%q)", az.Type, filePath)
|
||||
actx.authHeaderDigest = fmt.Sprintf("custom(type=%q, credsFile=%q)", az.Type, filePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -612,13 +666,14 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
|
||||
return fmt.Errorf("missing `username` in `basic_auth` section")
|
||||
}
|
||||
if ba.PasswordFile == "" {
|
||||
// See https://en.wikipedia.org/wiki/Basic_access_authentication
|
||||
token := ba.Username + ":" + ba.Password.String()
|
||||
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
||||
ah := "Basic " + token64
|
||||
actx.getAuthHeader = func() (string, error) {
|
||||
// See https://en.wikipedia.org/wiki/Basic_access_authentication
|
||||
token := ba.Username + ":" + ba.Password.String()
|
||||
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
||||
return "Basic " + token64, nil
|
||||
return ah, nil
|
||||
}
|
||||
actx.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
|
||||
actx.authHeaderDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
|
||||
return nil
|
||||
}
|
||||
if ba.Password != nil {
|
||||
@ -635,11 +690,11 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
|
||||
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
||||
return "Basic " + token64, nil
|
||||
}
|
||||
actx.authDigest = fmt.Sprintf("basic(username=%q, passwordFile=%q)", ba.Username, filePath)
|
||||
actx.authHeaderDigest = fmt.Sprintf("basic(username=%q, passwordFile=%q)", ba.Username, filePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (actx *authContext) initFromBearerTokenFile(baseDir string, bearerTokenFile string) error {
|
||||
func (actx *authContext) mustInitFromBearerTokenFile(baseDir string, bearerTokenFile string) {
|
||||
filePath := fs.GetFilepath(baseDir, bearerTokenFile)
|
||||
actx.getAuthHeader = func() (string, error) {
|
||||
token, err := readPasswordFromFile(filePath)
|
||||
@ -648,28 +703,23 @@ func (actx *authContext) initFromBearerTokenFile(baseDir string, bearerTokenFile
|
||||
}
|
||||
return "Bearer " + token, nil
|
||||
}
|
||||
actx.authDigest = fmt.Sprintf("bearer(tokenFile=%q)", filePath)
|
||||
return nil
|
||||
actx.authHeaderDigest = fmt.Sprintf("bearer(tokenFile=%q)", filePath)
|
||||
}
|
||||
|
||||
func (actx *authContext) initFromBearerToken(bearerToken string) error {
|
||||
func (actx *authContext) mustInitFromBearerToken(bearerToken string) {
|
||||
ah := "Bearer " + bearerToken
|
||||
actx.getAuthHeader = func() (string, error) {
|
||||
return "Bearer " + bearerToken, nil
|
||||
return ah, nil
|
||||
}
|
||||
actx.authDigest = fmt.Sprintf("bearer(token=%q)", bearerToken)
|
||||
return nil
|
||||
actx.authHeaderDigest = fmt.Sprintf("bearer(token=%q)", bearerToken)
|
||||
}
|
||||
|
||||
func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) error {
|
||||
if err := o.validate(); err != nil {
|
||||
oi, err := newOAuth2ConfigInternal(baseDir, o)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
actx.getAuthHeader = func() (string, error) {
|
||||
oi, err := newOAuth2ConfigInternal(baseDir, o)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ts, err := oi.getTokenSource()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cannot get OAuth2 tokenSource: %w", err)
|
||||
@ -680,15 +730,17 @@ func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) e
|
||||
}
|
||||
return t.Type() + " " + t.AccessToken, nil
|
||||
}
|
||||
actx.authDigest = fmt.Sprintf("oauth2(%s)", o.String())
|
||||
actx.authHeaderDigest = fmt.Sprintf("oauth2(%s)", oi.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
type tlsContext struct {
|
||||
getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
|
||||
getTLSCert getTLSCertFunc
|
||||
tlsCertDigest string
|
||||
|
||||
rootCA *x509.CertPool
|
||||
getTLSRootCA getTLSRootCAFunc
|
||||
tlsRootCADigest string
|
||||
|
||||
serverName string
|
||||
insecureSkipVerify bool
|
||||
minVersion uint16
|
||||
@ -708,37 +760,60 @@ func (tctx *tlsContext) initFromTLSConfig(baseDir string, tc *TLSConfig) error {
|
||||
h := xxhash.Sum64([]byte(tc.Key)) ^ xxhash.Sum64([]byte(tc.Cert))
|
||||
tctx.tlsCertDigest = fmt.Sprintf("digest(key+cert)=%d", h)
|
||||
} else if tc.CertFile != "" || tc.KeyFile != "" {
|
||||
certPath := fs.GetFilepath(baseDir, tc.CertFile)
|
||||
keyPath := fs.GetFilepath(baseDir, tc.KeyFile)
|
||||
tctx.getTLSCert = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||
// Re-read TLS certificate from disk. This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1420
|
||||
certPath := fs.GetFilepath(baseDir, tc.CertFile)
|
||||
keyPath := fs.GetFilepath(baseDir, tc.KeyFile)
|
||||
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
|
||||
certData, err := fs.ReadFileOrHTTP(certPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read TLS certificate from %q: %w", certPath, err)
|
||||
}
|
||||
keyData, err := fs.ReadFileOrHTTP(keyPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read TLS key from %q: %w", keyPath, err)
|
||||
}
|
||||
cert, err := tls.X509KeyPair(certData, keyData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load TLS certificate from `cert_file`=%q, `key_file`=%q: %w", tc.CertFile, tc.KeyFile, err)
|
||||
}
|
||||
return &cert, nil
|
||||
}
|
||||
// Check whether the configured TLS cert can be loaded.
|
||||
if _, err := tctx.getTLSCert(nil); err != nil {
|
||||
return err
|
||||
}
|
||||
tctx.tlsCertDigest = fmt.Sprintf("certFile=%q, keyFile=%q", tc.CertFile, tc.KeyFile)
|
||||
}
|
||||
if len(tc.CA) != 0 {
|
||||
tctx.rootCA = x509.NewCertPool()
|
||||
if !tctx.rootCA.AppendCertsFromPEM([]byte(tc.CA)) {
|
||||
rootCA := x509.NewCertPool()
|
||||
if !rootCA.AppendCertsFromPEM([]byte(tc.CA)) {
|
||||
return fmt.Errorf("cannot parse data from `ca` value")
|
||||
}
|
||||
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
|
||||
return rootCA, nil
|
||||
}
|
||||
h := xxhash.Sum64([]byte(tc.CA))
|
||||
tctx.tlsRootCADigest = fmt.Sprintf("digest(CA)=%d", h)
|
||||
} else if tc.CAFile != "" {
|
||||
path := fs.GetFilepath(baseDir, tc.CAFile)
|
||||
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
|
||||
data, err := fs.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read `ca_file`: %w", err)
|
||||
}
|
||||
rootCA := x509.NewCertPool()
|
||||
if !rootCA.AppendCertsFromPEM(data) {
|
||||
return nil, fmt.Errorf("cannot parse data read from `ca_file` %q", tc.CAFile)
|
||||
}
|
||||
return rootCA, nil
|
||||
}
|
||||
// The Config.NewTLSConfig() is called only once per each scrape target initialization.
|
||||
// So, the tlsRootCADigest must contain the hash of CAFile contents additionally to CAFile itself,
|
||||
// in order to properly reload scrape target configs when CAFile contents changes.
|
||||
data, err := fs.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read `ca_file` %q: %w", tc.CAFile, err)
|
||||
}
|
||||
tctx.rootCA = x509.NewCertPool()
|
||||
if !tctx.rootCA.AppendCertsFromPEM(data) {
|
||||
return fmt.Errorf("cannot parse data from `ca_file` %q", tc.CAFile)
|
||||
// Do not return the error to the caller, since this may result in fatal error.
|
||||
// The CAFile contents can become available on the next check of scrape configs.
|
||||
data = []byte("read error")
|
||||
}
|
||||
h := xxhash.Sum64(data)
|
||||
tctx.tlsRootCADigest = fmt.Sprintf("caFile=%q, digest(caFile)=%d", tc.CAFile, h)
|
||||
}
|
||||
v, err := netutil.ParseTLSVersion(tc.MinVersion)
|
||||
if err != nil {
|
||||
|
@ -6,175 +6,511 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/fasthttp"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
func TestNewConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
opts Options
|
||||
wantErr bool
|
||||
wantErrWhenSetHeader bool
|
||||
expectHeader string
|
||||
}{
|
||||
{
|
||||
name: "OAuth2 config",
|
||||
opts: Options{
|
||||
OAuth2: &OAuth2Config{
|
||||
ClientID: "some-id",
|
||||
ClientSecret: NewSecret("some-secret"),
|
||||
TokenURL: "http://localhost:8511",
|
||||
},
|
||||
},
|
||||
expectHeader: "Bearer some-token",
|
||||
},
|
||||
{
|
||||
name: "OAuth2 config with file",
|
||||
opts: Options{
|
||||
OAuth2: &OAuth2Config{
|
||||
ClientID: "some-id",
|
||||
ClientSecretFile: "testdata/test_secretfile.txt",
|
||||
TokenURL: "http://localhost:8511",
|
||||
},
|
||||
},
|
||||
expectHeader: "Bearer some-token",
|
||||
},
|
||||
{
|
||||
name: "OAuth2 want err",
|
||||
opts: Options{
|
||||
OAuth2: &OAuth2Config{
|
||||
ClientID: "some-id",
|
||||
ClientSecret: NewSecret("some-secret"),
|
||||
ClientSecretFile: "testdata/test_secretfile.txt",
|
||||
TokenURL: "http://localhost:8511",
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "OAuth2 with wrong tls",
|
||||
opts: Options{
|
||||
OAuth2: &OAuth2Config{
|
||||
ClientID: "some-id",
|
||||
ClientSecret: NewSecret("some-secret"),
|
||||
TokenURL: "http://localhost:8511",
|
||||
TLSConfig: &TLSConfig{
|
||||
InsecureSkipVerify: true,
|
||||
CAFile: "non-existing-file",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantErrWhenSetHeader: true,
|
||||
},
|
||||
{
|
||||
name: "basic Auth config",
|
||||
opts: Options{
|
||||
BasicAuth: &BasicAuthConfig{
|
||||
Username: "user",
|
||||
Password: NewSecret("password"),
|
||||
},
|
||||
},
|
||||
expectHeader: "Basic dXNlcjpwYXNzd29yZA==",
|
||||
},
|
||||
{
|
||||
name: "basic Auth config with file",
|
||||
opts: Options{
|
||||
BasicAuth: &BasicAuthConfig{
|
||||
Username: "user",
|
||||
PasswordFile: "testdata/test_secretfile.txt",
|
||||
},
|
||||
},
|
||||
expectHeader: "Basic dXNlcjpzZWNyZXQtY29udGVudA==",
|
||||
},
|
||||
{
|
||||
name: "basic Auth config with non-existing file",
|
||||
opts: Options{
|
||||
BasicAuth: &BasicAuthConfig{
|
||||
Username: "user",
|
||||
PasswordFile: "testdata/non-existing-file",
|
||||
},
|
||||
},
|
||||
wantErrWhenSetHeader: true,
|
||||
},
|
||||
{
|
||||
name: "want Authorization",
|
||||
opts: Options{
|
||||
Authorization: &Authorization{
|
||||
Type: "Bearer",
|
||||
Credentials: NewSecret("Value"),
|
||||
},
|
||||
},
|
||||
expectHeader: "Bearer Value",
|
||||
},
|
||||
{
|
||||
name: "token file",
|
||||
opts: Options{
|
||||
BearerTokenFile: "testdata/test_secretfile.txt",
|
||||
},
|
||||
expectHeader: "Bearer secret-content",
|
||||
},
|
||||
{
|
||||
name: "token with tls",
|
||||
opts: Options{
|
||||
BearerToken: "some-token",
|
||||
TLSConfig: &TLSConfig{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
},
|
||||
expectHeader: "Bearer some-token",
|
||||
},
|
||||
{
|
||||
name: "tls with non-existing file",
|
||||
opts: Options{
|
||||
BearerToken: "some-token",
|
||||
TLSConfig: &TLSConfig{
|
||||
InsecureSkipVerify: true,
|
||||
CAFile: "non-existing-file",
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
func TestOptionsNewConfigFailure(t *testing.T) {
|
||||
f := func(yamlConfig string) {
|
||||
t.Helper()
|
||||
|
||||
var hcc HTTPClientConfig
|
||||
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &hcc); err != nil {
|
||||
t.Fatalf("cannot parse: %s", err)
|
||||
}
|
||||
cfg, err := hcc.NewConfig("")
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
if cfg != nil {
|
||||
t.Fatalf("expecting nil cfg; got %s", cfg.String())
|
||||
}
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if tt.opts.OAuth2 != nil {
|
||||
r := http.NewServeMux()
|
||||
r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(`{"access_token":"some-token","token_type": "Bearer"}`))
|
||||
})
|
||||
mock := httptest.NewServer(r)
|
||||
tt.opts.OAuth2.TokenURL = mock.URL
|
||||
}
|
||||
got, err := tt.opts.NewConfig()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if got != nil {
|
||||
req, err := http.NewRequest(http.MethodGet, "http://foo", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in http.NewRequest: %s", err)
|
||||
}
|
||||
err = got.SetHeaders(req, true)
|
||||
if (err != nil) != tt.wantErrWhenSetHeader {
|
||||
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader)
|
||||
}
|
||||
ah := req.Header.Get("Authorization")
|
||||
if ah != tt.expectHeader {
|
||||
t.Fatalf("unexpected auth header from net/http request; got %q; want %q", ah, tt.expectHeader)
|
||||
}
|
||||
var fhreq fasthttp.Request
|
||||
err = got.SetFasthttpHeaders(&fhreq, true)
|
||||
if (err != nil) != tt.wantErrWhenSetHeader {
|
||||
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader)
|
||||
}
|
||||
ahb := fhreq.Header.Peek("Authorization")
|
||||
if string(ahb) != tt.expectHeader {
|
||||
t.Fatalf("unexpected auth header from fasthttp request; got %q; want %q", ahb, tt.expectHeader)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// authorization: both credentials and credentials_file are set
|
||||
f(`
|
||||
authorization:
|
||||
credentials: foo-bar
|
||||
credentials_file: testdata/test_secretfile.txt
|
||||
`)
|
||||
|
||||
// basic_auth: both authorization and basic_auth are set
|
||||
f(`
|
||||
authorization:
|
||||
credentials: foo-bar
|
||||
basic_auth:
|
||||
username: user
|
||||
password: pass
|
||||
`)
|
||||
|
||||
// basic_auth: missing username
|
||||
f(`
|
||||
basic_auth:
|
||||
password: pass
|
||||
`)
|
||||
|
||||
// basic_auth: password and password_file are set
|
||||
f(`
|
||||
basic_auth:
|
||||
username: user
|
||||
password: pass
|
||||
password_file: testdata/test_secretfile.txt
|
||||
`)
|
||||
|
||||
// bearer_token: both authorization and bearer_token are set
|
||||
f(`
|
||||
authorization:
|
||||
credentials: foo-bar
|
||||
bearer_token: bearer-aaa
|
||||
`)
|
||||
|
||||
// bearer_token: both basic_auth and bearer_token are set
|
||||
f(`
|
||||
bearer_token: bearer-aaa
|
||||
basic_auth:
|
||||
username: user
|
||||
password: pass
|
||||
`)
|
||||
|
||||
// bearer_token_file: both authorization and bearer_token_file are set
|
||||
f(`
|
||||
authorization:
|
||||
credentials: foo-bar
|
||||
bearer_token_file: testdata/test_secretfile.txt
|
||||
`)
|
||||
|
||||
// bearer_token_file: both basic_auth and bearer_token_file are set
|
||||
f(`
|
||||
bearer_token_file: testdata/test_secretfile.txt
|
||||
basic_auth:
|
||||
username: user
|
||||
password: pass
|
||||
`)
|
||||
|
||||
// both bearer_token_file and bearer_token are set
|
||||
f(`
|
||||
bearer_token_file: testdata/test_secretfile.txt
|
||||
bearer_token: foo-bar
|
||||
`)
|
||||
|
||||
// oauth2: both oauth2 and authorization are set
|
||||
f(`
|
||||
authorization:
|
||||
credentials: foo-bar
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
`)
|
||||
|
||||
// oauth2: both oauth2 and basic_auth are set
|
||||
f(`
|
||||
basic_auth:
|
||||
username: user
|
||||
password: pass
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
`)
|
||||
|
||||
// oauth2: both oauth2 and bearer_token are set
|
||||
f(`
|
||||
bearer_token: foo-bar
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
`)
|
||||
|
||||
// oauth2: both oauth2 and bearer_token_file are set
|
||||
f(`
|
||||
bearer_token_file: testdata/test_secretfile.txt
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
`)
|
||||
|
||||
// oauth2: missing client_id
|
||||
f(`
|
||||
oauth2:
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
`)
|
||||
|
||||
// oauth2: invalid inline tls config
|
||||
f(`
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
tls_config:
|
||||
key: foobar
|
||||
cert: baz
|
||||
`)
|
||||
|
||||
// oauth2: invalid ca at tls_config
|
||||
f(`
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
tls_config:
|
||||
ca: foobar
|
||||
`)
|
||||
|
||||
// oauth2: invalid min_version at tls_config
|
||||
f(`
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
tls_config:
|
||||
min_version: foobar
|
||||
`)
|
||||
|
||||
// oauth2: invalid proxy_url
|
||||
f(`
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
proxy_url: ":invalid-proxy-url"
|
||||
`)
|
||||
|
||||
// tls_config: invalid ca
|
||||
f(`
|
||||
tls_config:
|
||||
ca: foobar
|
||||
`)
|
||||
|
||||
// invalid headers
|
||||
f(`
|
||||
headers:
|
||||
- foobar
|
||||
`)
|
||||
|
||||
}
|
||||
|
||||
func TestOauth2ConfigParseFailure(t *testing.T) {
|
||||
f := func(yamlConfig string) {
|
||||
t.Helper()
|
||||
|
||||
var cfg OAuth2Config
|
||||
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &cfg); err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
}
|
||||
|
||||
// invalid yaml
|
||||
f("afdsfds")
|
||||
|
||||
// unknown fields
|
||||
f("foobar: baz")
|
||||
}
|
||||
|
||||
func TestOauth2ConfigValidateFailure(t *testing.T) {
|
||||
f := func(yamlConfig string) {
|
||||
t.Helper()
|
||||
|
||||
var cfg OAuth2Config
|
||||
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &cfg); err != nil {
|
||||
t.Fatalf("cannot unmarshal config: %s", err)
|
||||
}
|
||||
if err := cfg.validate(); err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
}
|
||||
|
||||
// emtpy client_id
|
||||
f(`
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
`)
|
||||
|
||||
// missing client_secret and client_secret_file
|
||||
f(`
|
||||
client_id: some-id
|
||||
token_url: http://some-url/
|
||||
`)
|
||||
|
||||
// client_secret and client_secret_file are set simultaneously
|
||||
f(`
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
client_secret_file: testdata/test_secretfile.txt
|
||||
token_url: http://some-url/
|
||||
`)
|
||||
|
||||
// missing token_url
|
||||
f(`
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
`)
|
||||
}
|
||||
|
||||
func TestOauth2ConfigValidateSuccess(t *testing.T) {
|
||||
f := func(yamlConfig string) {
|
||||
t.Helper()
|
||||
|
||||
var cfg OAuth2Config
|
||||
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &cfg); err != nil {
|
||||
t.Fatalf("cannot parse: %s", err)
|
||||
}
|
||||
if err := cfg.validate(); err != nil {
|
||||
t.Fatalf("cannot validate: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
f(`
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url/
|
||||
proxy_url: http://some-proxy/abc
|
||||
scopes: [read, write, execute]
|
||||
endpoint_params:
|
||||
foo: bar
|
||||
abc: def
|
||||
tls_config:
|
||||
insecure_skip_verify: true
|
||||
`)
|
||||
}
|
||||
|
||||
func TestConfigGetAuthHeaderFailure(t *testing.T) {
|
||||
f := func(yamlConfig string) {
|
||||
t.Helper()
|
||||
|
||||
var hcc HTTPClientConfig
|
||||
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &hcc); err != nil {
|
||||
t.Fatalf("cannot parse: %s", err)
|
||||
}
|
||||
cfg, err := hcc.NewConfig("")
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize config: %s", err)
|
||||
}
|
||||
|
||||
// Verify that GetAuthHeader() returns error
|
||||
ah, err := cfg.GetAuthHeader()
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error from GetAuthHeader()")
|
||||
}
|
||||
if ah != "" {
|
||||
t.Fatalf("expecting empty auth header; got %q", ah)
|
||||
}
|
||||
|
||||
// Verify that SetHeaders() returns error
|
||||
req, err := http.NewRequest(http.MethodGet, "http://foo", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in http.NewRequest: %s", err)
|
||||
}
|
||||
if err := cfg.SetHeaders(req, true); err == nil {
|
||||
t.Fatalf("expecting non-nil error from SetHeaders()")
|
||||
}
|
||||
|
||||
// Verify that cfg.SetFasthttpHeaders() returns error
|
||||
var fhreq fasthttp.Request
|
||||
if err := cfg.SetFasthttpHeaders(&fhreq, true); err == nil {
|
||||
t.Fatalf("expecting non-nil error from SetFasthttpHeaders()")
|
||||
}
|
||||
|
||||
// Verify that the tls cert cannot be loaded properly if it exists
|
||||
if f := cfg.getTLSCertCached; f != nil {
|
||||
cert, err := f(nil)
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error in getTLSCertCached()")
|
||||
}
|
||||
if cert != nil {
|
||||
t.Fatalf("expecting nil cert from getTLSCertCached()")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// oauth2 with invalid proxy_url
|
||||
f(`
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
proxy_url: invalid-proxy-url
|
||||
`)
|
||||
|
||||
// oauth2 with non-existing client_secret_file
|
||||
f(`
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret_file: non-existing-file
|
||||
token_url: http://some-url
|
||||
`)
|
||||
|
||||
// non-existing root ca file for oauth2
|
||||
f(`
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
token_url: http://some-url
|
||||
tls_config:
|
||||
ca_file: non-existing-file
|
||||
`)
|
||||
|
||||
// basic auth via non-existing file
|
||||
f(`
|
||||
basic_auth:
|
||||
username: user
|
||||
password_file: non-existing-file
|
||||
`)
|
||||
|
||||
// bearer token via non-existing file
|
||||
f(`
|
||||
bearer_token_file: non-existing-file
|
||||
`)
|
||||
|
||||
// authorization creds via non-existing file
|
||||
f(`
|
||||
authorization:
|
||||
type: foobar
|
||||
credentials_file: non-existing-file
|
||||
`)
|
||||
}
|
||||
|
||||
func TestConfigGetAuthHeaderSuccess(t *testing.T) {
|
||||
f := func(yamlConfig string, ahExpected string) {
|
||||
t.Helper()
|
||||
|
||||
var hcc HTTPClientConfig
|
||||
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &hcc); err != nil {
|
||||
t.Fatalf("cannot unmarshal config: %s", err)
|
||||
}
|
||||
if hcc.OAuth2 != nil {
|
||||
if hcc.OAuth2.TokenURL != "replace-with-mock-url" {
|
||||
t.Fatalf("unexpected token_url: %q; want `replace-with-mock-url`", hcc.OAuth2.TokenURL)
|
||||
}
|
||||
r := http.NewServeMux()
|
||||
r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(`{"access_token":"test-oauth2-token","token_type": "Bearer"}`))
|
||||
})
|
||||
mock := httptest.NewServer(r)
|
||||
hcc.OAuth2.TokenURL = mock.URL
|
||||
}
|
||||
cfg, err := hcc.NewConfig("")
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize config: %s", err)
|
||||
}
|
||||
|
||||
// Verify that cfg.String() returns non-empty value
|
||||
cfgString := cfg.String()
|
||||
if cfgString == "" {
|
||||
t.Fatalf("unexpected empty result from Config.String")
|
||||
}
|
||||
|
||||
// Check that GetAuthHeader() returns the correct header
|
||||
ah, err := cfg.GetAuthHeader()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected auth header; got %q; want %q", ah, ahExpected)
|
||||
}
|
||||
|
||||
// Make sure that cfg.SetHeaders() properly set Authorization header
|
||||
req, err := http.NewRequest(http.MethodGet, "http://foo", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in http.NewRequest: %s", err)
|
||||
}
|
||||
if err := cfg.SetHeaders(req, true); err != nil {
|
||||
t.Fatalf("unexpected error in SetHeaders(): %s", err)
|
||||
}
|
||||
ah = req.Header.Get("Authorization")
|
||||
if ah != ahExpected {
|
||||
t.Fatalf("unexpected auth header from net/http request; got %q; want %q", ah, ahExpected)
|
||||
}
|
||||
|
||||
// Make sure that cfg.SetFasthttpHeaders() properly set Authorization header
|
||||
var fhreq fasthttp.Request
|
||||
if err := cfg.SetFasthttpHeaders(&fhreq, true); err != nil {
|
||||
t.Fatalf("unexpected error in SetFasthttpHeaders(): %s", err)
|
||||
}
|
||||
ahb := fhreq.Header.Peek("Authorization")
|
||||
if string(ahb) != ahExpected {
|
||||
t.Fatalf("unexpected auth header from fasthttp request; got %q; want %q", ahb, ahExpected)
|
||||
}
|
||||
}
|
||||
|
||||
// Zero config
|
||||
f(``, "")
|
||||
|
||||
// no auth config, non-zero tls config
|
||||
f(`
|
||||
tls_config:
|
||||
insecure_skip_verify: true
|
||||
`, "")
|
||||
|
||||
// no auth config, tls_config with non-existing files
|
||||
f(`
|
||||
tls_config:
|
||||
key_file: non-existing-file
|
||||
cert_file: non-existing-file
|
||||
`, "")
|
||||
|
||||
// no auth config, tls_config with non-existing ca file
|
||||
f(`
|
||||
tls_config:
|
||||
ca_file: non-existing-file
|
||||
`, "")
|
||||
|
||||
// inline oauth2 config
|
||||
f(`
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret: some-secret
|
||||
endpoint_params:
|
||||
foo: bar
|
||||
scopes: [foo, bar]
|
||||
token_url: replace-with-mock-url
|
||||
`, "Bearer test-oauth2-token")
|
||||
|
||||
// oauth2 config with secrets in the file
|
||||
f(`
|
||||
oauth2:
|
||||
client_id: some-id
|
||||
client_secret_file: testdata/test_secretfile.txt
|
||||
token_url: replace-with-mock-url
|
||||
`, "Bearer test-oauth2-token")
|
||||
|
||||
// inline basic auth
|
||||
f(`
|
||||
basic_auth:
|
||||
username: user
|
||||
password: password
|
||||
`, "Basic dXNlcjpwYXNzd29yZA==")
|
||||
|
||||
// basic auth via file
|
||||
f(`
|
||||
basic_auth:
|
||||
username: user
|
||||
password_file: testdata/test_secretfile.txt
|
||||
`, "Basic dXNlcjpzZWNyZXQtY29udGVudA==")
|
||||
|
||||
// inline authorization config
|
||||
f(`
|
||||
authorization:
|
||||
type: My-Super-Auth
|
||||
credentials: some-password
|
||||
`, "My-Super-Auth some-password")
|
||||
|
||||
// authorization config via file
|
||||
f(`
|
||||
authorization:
|
||||
type: Foo
|
||||
credentials_file: testdata/test_secretfile.txt
|
||||
`, "Foo secret-content")
|
||||
|
||||
// inline bearer token
|
||||
f(`
|
||||
bearer_token: some-token
|
||||
`, "Bearer some-token")
|
||||
|
||||
// bearer token via file
|
||||
f(`
|
||||
bearer_token_file: testdata/test_secretfile.txt
|
||||
`, "Bearer secret-content")
|
||||
}
|
||||
|
||||
func TestParseHeadersSuccess(t *testing.T) {
|
||||
@ -233,7 +569,9 @@ func TestConfigHeaders(t *testing.T) {
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result from HeadersNoAuthString; got\n%s\nwant\n%s", result, resultExpected)
|
||||
}
|
||||
_ = c.SetHeaders(req, false)
|
||||
if err := c.SetHeaders(req, false); err != nil {
|
||||
t.Fatalf("unexpected error in SetHeaders(): %s", err)
|
||||
}
|
||||
for _, h := range headersParsed {
|
||||
v := req.Header.Get(h.key)
|
||||
if v != h.value {
|
||||
@ -241,7 +579,9 @@ func TestConfigHeaders(t *testing.T) {
|
||||
}
|
||||
}
|
||||
var fhreq fasthttp.Request
|
||||
_ = c.SetFasthttpHeaders(&fhreq, false)
|
||||
if err := c.SetFasthttpHeaders(&fhreq, false); err != nil {
|
||||
t.Fatalf("unexpected error in SetFasthttpHeaders(): %s", err)
|
||||
}
|
||||
for _, h := range headersParsed {
|
||||
v := fhreq.Header.Peek(h.key)
|
||||
if string(v) != h.value {
|
||||
|
@ -90,7 +90,11 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
||||
isTLS := string(u.Scheme()) == "https"
|
||||
var tlsCfg *tls.Config
|
||||
if isTLS {
|
||||
tlsCfg = sw.AuthConfig.NewTLSConfig()
|
||||
var err error
|
||||
tlsCfg, err = sw.AuthConfig.NewTLSConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
|
||||
}
|
||||
}
|
||||
setProxyHeaders := func(req *http.Request) error { return nil }
|
||||
setFasthttpProxyHeaders := func(req *fasthttp.Request) error { return nil }
|
||||
@ -104,7 +108,11 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
||||
requestURI = sw.ScrapeURL
|
||||
isTLS = pu.Scheme == "https"
|
||||
if isTLS {
|
||||
tlsCfg = sw.ProxyAuthConfig.NewTLSConfig()
|
||||
var err error
|
||||
tlsCfg, err = sw.ProxyAuthConfig.NewTLSConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize proxy tls config: %w", err)
|
||||
}
|
||||
}
|
||||
proxyURLOrig := proxyURL
|
||||
setProxyHeaders = func(req *http.Request) error {
|
||||
@ -160,7 +168,7 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return &client{
|
||||
c := &client{
|
||||
hc: hc,
|
||||
ctx: ctx,
|
||||
sc: sc,
|
||||
@ -168,14 +176,19 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
||||
scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()),
|
||||
hostPort: hostPort,
|
||||
requestURI: requestURI,
|
||||
setHeaders: func(req *http.Request) error { return sw.AuthConfig.SetHeaders(req, true) },
|
||||
setProxyHeaders: setProxyHeaders,
|
||||
setFasthttpHeaders: func(req *fasthttp.Request) error { return sw.AuthConfig.SetFasthttpHeaders(req, true) },
|
||||
setHeaders: func(req *http.Request) error {
|
||||
return sw.AuthConfig.SetHeaders(req, true)
|
||||
},
|
||||
setProxyHeaders: setProxyHeaders,
|
||||
setFasthttpHeaders: func(req *fasthttp.Request) error {
|
||||
return sw.AuthConfig.SetFasthttpHeaders(req, true)
|
||||
},
|
||||
setFasthttpProxyHeaders: setFasthttpProxyHeaders,
|
||||
denyRedirects: sw.DenyRedirects,
|
||||
disableCompression: sw.DisableCompression,
|
||||
disableKeepAlive: sw.DisableKeepAlive,
|
||||
}, nil
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *client) GetStreamReader() (*streamReader, error) {
|
||||
@ -196,15 +209,13 @@ func (c *client) GetStreamReader() (*streamReader, error) {
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
|
||||
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
|
||||
req.Header.Set("User-Agent", scrapeUserAgent)
|
||||
err = c.setHeaders(req)
|
||||
if err != nil {
|
||||
if err := c.setHeaders(req); err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("failed to create request to %q: %w", c.scrapeURL, err)
|
||||
return nil, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
|
||||
}
|
||||
err = c.setProxyHeaders(req)
|
||||
if err != nil {
|
||||
if err := c.setProxyHeaders(req); err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("failed to create request to %q: %w", c.scrapeURL, err)
|
||||
return nil, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
|
||||
}
|
||||
scrapeRequests.Inc()
|
||||
resp, err := c.sc.Do(req)
|
||||
@ -221,12 +232,13 @@ func (c *client) GetStreamReader() (*streamReader, error) {
|
||||
c.scrapeURL, resp.StatusCode, http.StatusOK, respBody)
|
||||
}
|
||||
scrapesOK.Inc()
|
||||
return &streamReader{
|
||||
sr := &streamReader{
|
||||
r: resp.Body,
|
||||
cancel: cancel,
|
||||
scrapeURL: c.scrapeURL,
|
||||
maxBodySize: int64(c.hc.MaxResponseBodySize),
|
||||
}, nil
|
||||
}
|
||||
return sr, nil
|
||||
}
|
||||
|
||||
// checks fasthttp status code for redirect as standard http/client does.
|
||||
@ -252,13 +264,11 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
|
||||
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
|
||||
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
|
||||
err := c.setFasthttpHeaders(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request to %q: %w", c.scrapeURL, err)
|
||||
if err := c.setFasthttpHeaders(req); err != nil {
|
||||
return nil, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
|
||||
}
|
||||
err = c.setFasthttpProxyHeaders(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request to %q: %w", c.scrapeURL, err)
|
||||
if err := c.setFasthttpProxyHeaders(req); err != nil {
|
||||
return nil, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
|
||||
}
|
||||
if !*disableCompression && !c.disableCompression {
|
||||
req.Header.Set("Accept-Encoding", "gzip")
|
||||
@ -277,7 +287,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
|
||||
ctx, cancel := context.WithDeadline(c.ctx, deadline)
|
||||
defer cancel()
|
||||
|
||||
err = doRequestWithPossibleRetry(ctx, c.hc, req, resp)
|
||||
err := doRequestWithPossibleRetry(ctx, c.hc, req, resp)
|
||||
statusCode := resp.StatusCode()
|
||||
redirectsCount := 0
|
||||
for err == nil && isStatusRedirect(statusCode) {
|
||||
|
@ -141,9 +141,10 @@ func (cfg *Config) mustStart() {
|
||||
}
|
||||
|
||||
// mustRestart restarts service discovery routines at cfg if they were changed comparing to prevCfg.
|
||||
func (cfg *Config) mustRestart(prevCfg *Config) {
|
||||
//
|
||||
// It returns true if at least a single scraper has been restarted.
|
||||
func (cfg *Config) mustRestart(prevCfg *Config) bool {
|
||||
startTime := time.Now()
|
||||
logger.Infof("restarting service discovery routines...")
|
||||
|
||||
prevScrapeCfgByName := make(map[string]*ScrapeConfig, len(prevCfg.ScrapeConfigs))
|
||||
for _, scPrev := range prevCfg.ScrapeConfigs {
|
||||
@ -186,7 +187,12 @@ func (cfg *Config) mustRestart(prevCfg *Config) {
|
||||
}
|
||||
jobNames := cfg.getJobNames()
|
||||
tsmGlobal.registerJobNames(jobNames)
|
||||
logger.Infof("restarted service discovery routines in %.3f seconds, stopped=%d, started=%d, restarted=%d", time.Since(startTime).Seconds(), stopped, started, restarted)
|
||||
hasChanges := started > 0 || stopped > 0 || restarted > 0
|
||||
if hasChanges {
|
||||
logger.Infof("updated %d service discovery routines in %.3f seconds, started=%d, stopped=%d, restarted=%d",
|
||||
len(cfg.ScrapeConfigs), time.Since(startTime).Seconds(), started, stopped, restarted)
|
||||
}
|
||||
return hasChanges
|
||||
}
|
||||
|
||||
func areEqualGlobalConfigs(a, b *GlobalConfig) bool {
|
||||
@ -198,7 +204,20 @@ func areEqualGlobalConfigs(a, b *GlobalConfig) bool {
|
||||
func areEqualScrapeConfigs(a, b *ScrapeConfig) bool {
|
||||
sa := a.marshalJSON()
|
||||
sb := b.marshalJSON()
|
||||
return string(sa) == string(sb)
|
||||
if string(sa) != string(sb) {
|
||||
return false
|
||||
}
|
||||
// Compare auth configs for a and b, since they may differ by TLS CA file contents,
|
||||
// which is missing in the marshaled JSON of a and b,
|
||||
// but it existis in the string representation of auth configs.
|
||||
if a.swc.authConfig.String() != b.swc.authConfig.String() {
|
||||
return false
|
||||
}
|
||||
if a.swc.proxyAuthConfig.String() != b.swc.proxyAuthConfig.String() {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
}
|
||||
|
||||
func (sc *ScrapeConfig) unmarshalJSON(data []byte) error {
|
||||
@ -400,29 +419,28 @@ func loadStaticConfigs(path string) ([]StaticConfig, error) {
|
||||
}
|
||||
|
||||
// loadConfig loads Prometheus config from the given path.
|
||||
func loadConfig(path string) (*Config, []byte, error) {
|
||||
func loadConfig(path string) (*Config, error) {
|
||||
data, err := fs.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot read Prometheus config from %q: %w", path, err)
|
||||
return nil, fmt.Errorf("cannot read Prometheus config from %q: %w", path, err)
|
||||
}
|
||||
var c Config
|
||||
dataNew, err := c.parseData(data, path)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot parse Prometheus config from %q: %w", path, err)
|
||||
if err := c.parseData(data, path); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse Prometheus config from %q: %w", path, err)
|
||||
}
|
||||
return &c, dataNew, nil
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
func loadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) ([]*ScrapeConfig, []byte, error) {
|
||||
func mustLoadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) []*ScrapeConfig {
|
||||
var scrapeConfigs []*ScrapeConfig
|
||||
var scsData []byte
|
||||
for _, filePath := range scrapeConfigFiles {
|
||||
filePath := fs.GetFilepath(baseDir, filePath)
|
||||
paths := []string{filePath}
|
||||
if strings.Contains(filePath, "*") {
|
||||
ps, err := filepath.Glob(filePath)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid pattern %q: %w", filePath, err)
|
||||
logger.Errorf("skipping pattern %q at `scrape_config_files` because of error: %s", filePath, err)
|
||||
continue
|
||||
}
|
||||
sort.Strings(ps)
|
||||
paths = ps
|
||||
@ -430,22 +448,23 @@ func loadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) ([]*Scrap
|
||||
for _, path := range paths {
|
||||
data, err := fs.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot load %q: %w", path, err)
|
||||
logger.Errorf("skipping %q at `scrape_config_files` because of error: %s", path, err)
|
||||
continue
|
||||
}
|
||||
data, err = envtemplate.ReplaceBytes(data)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot expand environment vars in %q: %w", path, err)
|
||||
logger.Errorf("skipping %q at `scrape_config_files` because of failure to expand environment vars: %s", path, err)
|
||||
continue
|
||||
}
|
||||
var scs []*ScrapeConfig
|
||||
if err = yaml.UnmarshalStrict(data, &scs); err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot parse %q: %w", path, err)
|
||||
logger.Errorf("skipping %q at `scrape_config_files` because of failure to parse it: %s", path, err)
|
||||
continue
|
||||
}
|
||||
scrapeConfigs = append(scrapeConfigs, scs...)
|
||||
scsData = append(scsData, '\n')
|
||||
scsData = append(scsData, data...)
|
||||
}
|
||||
}
|
||||
return scrapeConfigs, scsData, nil
|
||||
return scrapeConfigs
|
||||
}
|
||||
|
||||
// IsDryRun returns true if -promscrape.config.dryRun command-line flag is set
|
||||
@ -453,54 +472,56 @@ func IsDryRun() bool {
|
||||
return *dryRun
|
||||
}
|
||||
|
||||
func (cfg *Config) parseData(data []byte, path string) ([]byte, error) {
|
||||
func (cfg *Config) parseData(data []byte, path string) error {
|
||||
if err := cfg.unmarshal(data, *strictParse); err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal data: %w", err)
|
||||
cfg.ScrapeConfigs = nil
|
||||
return fmt.Errorf("cannot unmarshal data: %w", err)
|
||||
}
|
||||
absPath, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain abs path for %q: %w", path, err)
|
||||
cfg.ScrapeConfigs = nil
|
||||
return fmt.Errorf("cannot obtain abs path for %q: %w", path, err)
|
||||
}
|
||||
cfg.baseDir = filepath.Dir(absPath)
|
||||
|
||||
// Load cfg.ScrapeConfigFiles into c.ScrapeConfigs
|
||||
scs, scsData, err := loadScrapeConfigFiles(cfg.baseDir, cfg.ScrapeConfigFiles)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load `scrape_config_files` from %q: %w", path, err)
|
||||
}
|
||||
scs := mustLoadScrapeConfigFiles(cfg.baseDir, cfg.ScrapeConfigFiles)
|
||||
cfg.ScrapeConfigFiles = nil
|
||||
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scs...)
|
||||
dataNew := append(data, scsData...)
|
||||
|
||||
// Check that all the scrape configs have unique JobName
|
||||
m := make(map[string]struct{}, len(cfg.ScrapeConfigs))
|
||||
for _, sc := range cfg.ScrapeConfigs {
|
||||
jobName := sc.JobName
|
||||
if _, ok := m[jobName]; ok {
|
||||
return nil, fmt.Errorf("duplicate `job_name` in `scrape_configs` loaded from %q: %q", path, jobName)
|
||||
cfg.ScrapeConfigs = nil
|
||||
return fmt.Errorf("duplicate `job_name` in `scrape_configs` loaded from %q: %q", path, jobName)
|
||||
}
|
||||
m[jobName] = struct{}{}
|
||||
}
|
||||
|
||||
// Initialize cfg.ScrapeConfigs
|
||||
var validScrapeConfigs []*ScrapeConfig
|
||||
for i, sc := range cfg.ScrapeConfigs {
|
||||
validScrapeConfigs := cfg.ScrapeConfigs[:0]
|
||||
for _, sc := range cfg.ScrapeConfigs {
|
||||
// Make a copy of sc in order to remove references to `data` memory.
|
||||
// This should prevent from memory leaks on config reload.
|
||||
sc = sc.clone()
|
||||
cfg.ScrapeConfigs[i] = sc
|
||||
|
||||
swc, err := getScrapeWorkConfig(sc, cfg.baseDir, &cfg.Global)
|
||||
if err != nil {
|
||||
// print error and skip invalid scrape config
|
||||
logger.Errorf("cannot parse `scrape_config` for job %q, skip it: %w", sc.JobName, err)
|
||||
logger.Errorf("skipping `scrape_config` for job_name=%s because of error: %s", sc.JobName, err)
|
||||
continue
|
||||
}
|
||||
sc.swc = swc
|
||||
validScrapeConfigs = append(validScrapeConfigs, sc)
|
||||
}
|
||||
tailScrapeConfigs := cfg.ScrapeConfigs[len(validScrapeConfigs):]
|
||||
cfg.ScrapeConfigs = validScrapeConfigs
|
||||
return dataNew, nil
|
||||
for i := range tailScrapeConfigs {
|
||||
tailScrapeConfigs[i] = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *ScrapeConfig) clone() *ScrapeConfig {
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -86,7 +86,7 @@ func getMXAddrLabels(ctx context.Context, sdc *SDConfig) []*promutils.Labels {
|
||||
for range sdc.Names {
|
||||
r := <-ch
|
||||
if r.err != nil {
|
||||
logger.Errorf("error in MX lookup for %q; skipping it; error: %s", r.name, r.err)
|
||||
logger.Errorf("dns_sd_config: skipping MX lookup for %q because of error: %s", r.name, r.err)
|
||||
continue
|
||||
}
|
||||
for _, mx := range r.mx {
|
||||
@ -121,7 +121,7 @@ func getSRVAddrLabels(ctx context.Context, sdc *SDConfig) []*promutils.Labels {
|
||||
for range sdc.Names {
|
||||
r := <-ch
|
||||
if r.err != nil {
|
||||
logger.Errorf("error in SRV lookup for %q; skipping it; error: %s", r.name, r.err)
|
||||
logger.Errorf("dns_sd_config: skipping SRV lookup for %q because of error: %s", r.name, r.err)
|
||||
continue
|
||||
}
|
||||
for _, a := range r.as {
|
||||
|
@ -50,7 +50,7 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||
case "nodes":
|
||||
return getNodesLabels(cfg)
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `tasks`, `services` or `nodes`; skipping it", sdc.Role)
|
||||
return nil, fmt.Errorf("skipping unexpected role=%q; must be one of `tasks`, `services` or `nodes`", sdc.Role)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -72,7 +72,7 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc
|
||||
if sdc.Namespaces.OwnNamespace {
|
||||
namespace, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot determine namespace for the current pod according to `own_namespace: true` option in kubernetes_sd_config: %s", err)
|
||||
logger.Panicf("FATAL: cannot determine namespace for the current pod according to `own_namespace: true` option in kubernetes_sd_config: %s", err)
|
||||
}
|
||||
namespaces = []string{string(namespace)}
|
||||
}
|
||||
@ -223,9 +223,13 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
|
||||
if proxyURL != nil {
|
||||
proxy = http.ProxyURL(proxyURL)
|
||||
}
|
||||
tlsConfig, err := ac.NewTLSConfig()
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot initialize tls config: %s", err)
|
||||
}
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: ac.NewTLSConfig(),
|
||||
TLSClientConfig: tlsConfig,
|
||||
Proxy: proxy,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
IdleConnTimeout: *apiServerTimeout,
|
||||
@ -239,9 +243,11 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
|
||||
selectors: selectors,
|
||||
attachNodeMetadata: attachNodeMetadata,
|
||||
|
||||
setHeaders: func(req *http.Request) error { return ac.SetHeaders(req, true) },
|
||||
client: client,
|
||||
m: make(map[string]*urlWatcher),
|
||||
setHeaders: func(req *http.Request) error {
|
||||
return ac.SetHeaders(req, true)
|
||||
},
|
||||
client: client,
|
||||
m: make(map[string]*urlWatcher),
|
||||
}
|
||||
}
|
||||
|
||||
@ -418,11 +424,10 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
|
||||
logger.Panicf("FATAL: cannot create a request for %q: %s", requestURL, err)
|
||||
}
|
||||
err = gw.setHeaders(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err := gw.setHeaders(req); err != nil {
|
||||
return nil, fmt.Errorf("cannot set request headers: %w", err)
|
||||
}
|
||||
resp, err := gw.client.Do(req)
|
||||
if err != nil {
|
||||
|
@ -92,10 +92,15 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
ac, err := opts.NewConfig()
|
||||
if err != nil {
|
||||
cfg.client.CloseIdleConnections()
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("cannot parse TLS config: %w", err)
|
||||
}
|
||||
tlsConfig, err := ac.NewTLSConfig()
|
||||
if err != nil {
|
||||
cfg.client.CloseIdleConnections()
|
||||
return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
|
||||
}
|
||||
cfg.client.Transport = &http.Transport{
|
||||
TLSClientConfig: ac.NewTLSConfig(),
|
||||
TLSClientConfig: tlsConfig,
|
||||
MaxIdleConnsPerHost: 100,
|
||||
}
|
||||
}
|
||||
@ -121,7 +126,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
parsedURL, err := url.Parse(sdcAuth.IdentityEndpoint)
|
||||
if err != nil {
|
||||
cfg.client.CloseIdleConnections()
|
||||
return nil, fmt.Errorf("cannot parse identity_endpoint: %s as url, err: %w", sdcAuth.IdentityEndpoint, err)
|
||||
return nil, fmt.Errorf("cannot parse identity_endpoint %s as url: %w", sdcAuth.IdentityEndpoint, err)
|
||||
}
|
||||
cfg.endpoint = parsedURL
|
||||
tokenReq, err := buildAuthRequestBody(&sdcAuth)
|
||||
@ -144,7 +149,7 @@ func getCreds(cfg *apiConfig) (*apiCredentials, error) {
|
||||
|
||||
resp, err := cfg.client.Post(apiURL.String(), "application/json", bytes.NewBuffer(cfg.authTokenReq))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed query openstack identity api, url: %s, err: %w", apiURL.String(), err)
|
||||
return nil, fmt.Errorf("failed query openstack identity api at url %s: %w", apiURL.String(), err)
|
||||
}
|
||||
r, err := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
|
@ -51,7 +51,7 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||
case "instance":
|
||||
return getInstancesLabels(cfg)
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `instance` or `hypervisor`; skipping it", sdc.Role)
|
||||
return nil, fmt.Errorf("skipping unexpected role=%q; must be one of `instance` or `hypervisor`", sdc.Role)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,10 +56,14 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
TLSConfig: sdc.TLSConfig,
|
||||
}
|
||||
ac, err := opts.NewConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse TLS config: %w", err)
|
||||
}
|
||||
tlsConfig, err := ac.NewTLSConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
|
||||
}
|
||||
transport.TLSClientConfig = ac.NewTLSConfig()
|
||||
transport.TLSClientConfig = tlsConfig
|
||||
}
|
||||
cfg := &apiConfig{
|
||||
client: &http.Client{
|
||||
|
@ -34,7 +34,7 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||
case "compute":
|
||||
return getInstancesLabels(cfg)
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected `service`: %q; only `compute` supported yet; skipping it", sdc.Service)
|
||||
return nil, fmt.Errorf("skipping unexpected service=%q; only `compute` supported for now", sdc.Service)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,7 +109,11 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
|
||||
isTLS := u.Scheme == "https"
|
||||
var tlsCfg *tls.Config
|
||||
if isTLS {
|
||||
tlsCfg = ac.NewTLSConfig()
|
||||
var err error
|
||||
tlsCfg, err = ac.NewTLSConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var proxyURLFunc func(*http.Request) (*url.URL, error)
|
||||
@ -247,13 +251,11 @@ func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, clien
|
||||
return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err)
|
||||
}
|
||||
|
||||
err = c.setHTTPHeaders(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot set request http header for %q: %w", requestURL, err)
|
||||
if err := c.setHTTPHeaders(req); err != nil {
|
||||
return nil, fmt.Errorf("cannot set request headers for %q: %w", requestURL, err)
|
||||
}
|
||||
err = c.setHTTPProxyHeaders(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot set request http proxy header for %q: %w", requestURL, err)
|
||||
if err := c.setHTTPProxyHeaders(req); err != nil {
|
||||
return nil, fmt.Errorf("cannot set request proxy headers for %q: %w", requestURL, err)
|
||||
}
|
||||
if modifyRequest != nil {
|
||||
modifyRequest(req)
|
||||
|
@ -1,7 +1,6 @@
|
||||
package promscrape
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
@ -36,8 +35,8 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
configCheckInterval = flag.Duration("promscrape.configCheckInterval", 0, "Interval for checking for changes in '-promscrape.config' file. "+
|
||||
"By default, the checking is disabled. Send SIGHUP signal in order to force config check for changes")
|
||||
configCheckInterval = flag.Duration("promscrape.configCheckInterval", 0, "Interval for checking for changes in -promscrape.config file. "+
|
||||
"By default, the checking is disabled. See how to reload -promscrape.config file at https://docs.victoriametrics.com/vmagent.html#configuration-update")
|
||||
suppressDuplicateScrapeTargetErrors = flag.Bool("promscrape.suppressDuplicateScrapeTargetErrors", false, "Whether to suppress 'duplicate scrape target' errors; "+
|
||||
"see https://docs.victoriametrics.com/vmagent.html#troubleshooting for details")
|
||||
promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+
|
||||
@ -53,7 +52,7 @@ func CheckConfig() error {
|
||||
if *promscrapeConfigFile == "" {
|
||||
return nil
|
||||
}
|
||||
_, _, err := loadConfig(*promscrapeConfigFile)
|
||||
_, err := loadConfig(*promscrapeConfigFile)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -110,8 +109,8 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
||||
sighupCh := procutil.NewSighupChan()
|
||||
|
||||
logger.Infof("reading Prometheus configs from %q", configFile)
|
||||
cfg, data, err := loadConfig(configFile)
|
||||
logger.Infof("reading scrape configs from %q", configFile)
|
||||
cfg, err := loadConfig(configFile)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot read %q: %s", configFile, err)
|
||||
}
|
||||
@ -154,41 +153,40 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
|
||||
select {
|
||||
case <-sighupCh:
|
||||
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
|
||||
cfgNew, dataNew, err := loadConfig(configFile)
|
||||
cfgNew, err := loadConfig(configFile)
|
||||
if err != nil {
|
||||
configReloadErrors.Inc()
|
||||
configSuccess.Set(0)
|
||||
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
|
||||
goto waitForChans
|
||||
}
|
||||
if bytes.Equal(data, dataNew) {
|
||||
configSuccess.Set(1)
|
||||
configSuccess.Set(1)
|
||||
if !cfgNew.mustRestart(cfg) {
|
||||
logger.Infof("nothing changed in %q", configFile)
|
||||
goto waitForChans
|
||||
}
|
||||
cfgNew.mustRestart(cfg)
|
||||
cfg = cfgNew
|
||||
data = dataNew
|
||||
marshaledData = cfgNew.marshal()
|
||||
marshaledData = cfg.marshal()
|
||||
configData.Store(&marshaledData)
|
||||
configReloads.Inc()
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
case <-tickerCh:
|
||||
cfgNew, dataNew, err := loadConfig(configFile)
|
||||
cfgNew, err := loadConfig(configFile)
|
||||
if err != nil {
|
||||
configReloadErrors.Inc()
|
||||
configSuccess.Set(0)
|
||||
logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err)
|
||||
goto waitForChans
|
||||
}
|
||||
if bytes.Equal(data, dataNew) {
|
||||
configSuccess.Set(1)
|
||||
// Nothing changed since the previous loadConfig
|
||||
configSuccess.Set(1)
|
||||
if !cfgNew.mustRestart(cfg) {
|
||||
goto waitForChans
|
||||
}
|
||||
cfgNew.mustRestart(cfg)
|
||||
cfg = cfgNew
|
||||
data = dataNew
|
||||
marshaledData = cfgNew.marshal()
|
||||
marshaledData = cfg.marshal()
|
||||
configData.Store(&marshaledData)
|
||||
configReloads.Inc()
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
case <-globalStopCh:
|
||||
cfg.mustStop()
|
||||
logger.Infof("stopping Prometheus scrapers")
|
||||
@ -197,10 +195,6 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
|
||||
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
|
||||
return
|
||||
}
|
||||
logger.Infof("found changes in %q; applying these changes", configFile)
|
||||
configReloads.Inc()
|
||||
configSuccess.Set(1)
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
}
|
||||
}
|
||||
|
||||
@ -407,8 +401,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
|
||||
for _, sw := range swsToStart {
|
||||
sc, err := newScraper(sw, sg.name, sg.pushData)
|
||||
if err != nil {
|
||||
// print error and skip invalid scraper config
|
||||
logger.Errorf("cannot create scraper to %q in job %q, will skip it: %w", sw.ScrapeURL, sg.name, err)
|
||||
logger.Errorf("skipping scraper for url=%s, job=%s because of error: %s", sw.ScrapeURL, sg.name, err)
|
||||
continue
|
||||
}
|
||||
sg.activeScrapers.Inc()
|
||||
@ -455,7 +448,7 @@ func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *
|
||||
}
|
||||
c, err := newClient(ctx, sw)
|
||||
if err != nil {
|
||||
return &scraper{}, err
|
||||
return nil, err
|
||||
}
|
||||
sc.sw.Config = sw
|
||||
sc.sw.ScrapeGroup = group
|
||||
|
@ -213,7 +213,7 @@ func unmarshalRow(dst []Row, s string, tagsPool []Tag, fieldsPool []Field, noEsc
|
||||
tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool, noEscapeChars)
|
||||
if err != nil {
|
||||
dst = dst[:len(dst)-1]
|
||||
logger.Errorf("cannot unmarshal InfluxDB line %q: %s; skipping it", s, err)
|
||||
logger.Errorf("skipping InfluxDB line %q because of error: %s", s, err)
|
||||
invalidLines.Inc()
|
||||
}
|
||||
return dst, tagsPool, fieldsPool
|
||||
|
@ -236,7 +236,7 @@ func unmarshalRow(dst []Row, s string, tu *tagsUnmarshaler) []Row {
|
||||
r := &dst[len(dst)-1]
|
||||
if err := r.unmarshal(s, tu); err != nil {
|
||||
dst = dst[:len(dst)-1]
|
||||
logger.Errorf("cannot unmarshal json line %q: %s; skipping it", s, err)
|
||||
logger.Errorf("skipping json line %q because of error: %s", s, err)
|
||||
invalidLines.Inc()
|
||||
}
|
||||
return dst
|
||||
|
@ -76,7 +76,7 @@ func (u *URL) String() string {
|
||||
func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) error {
|
||||
ah, err := u.getAuthHeader(ac)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("cannot obtain Proxy-Authorization headers: %w", err)
|
||||
}
|
||||
if ah != "" {
|
||||
req.Header.Set("Proxy-Authorization", ah)
|
||||
@ -88,7 +88,7 @@ func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) error {
|
||||
func (u *URL) SetFasthttpHeaders(ac *promauth.Config, req *fasthttp.Request) error {
|
||||
ah, err := u.getAuthHeader(ac)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("cannot obtain Proxy-Authorization headers: %w", err)
|
||||
}
|
||||
if ah != "" {
|
||||
req.Header.Set("Proxy-Authorization", ah)
|
||||
@ -155,7 +155,11 @@ func (u *URL) NewDialFunc(ac *promauth.Config) (fasthttp.DialFunc, error) {
|
||||
proxyAddr := addMissingPort(pu.Host, isTLS)
|
||||
var tlsCfg *tls.Config
|
||||
if isTLS {
|
||||
tlsCfg = ac.NewTLSConfig()
|
||||
var err error
|
||||
tlsCfg, err = ac.NewTLSConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
|
||||
}
|
||||
if !tlsCfg.InsecureSkipVerify && tlsCfg.ServerName == "" {
|
||||
tlsCfg.ServerName = tlsServerName(proxyAddr)
|
||||
}
|
||||
@ -173,7 +177,7 @@ func (u *URL) NewDialFunc(ac *promauth.Config) (fasthttp.DialFunc, error) {
|
||||
}
|
||||
authHeader, err := u.getAuthHeader(ac)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get auth header: %w", err)
|
||||
return nil, fmt.Errorf("cannot obtain Proxy-Authorization header: %w", err)
|
||||
}
|
||||
if authHeader != "" {
|
||||
authHeader = "Proxy-Authorization: " + authHeader + "\r\n"
|
||||
|
Loading…
Reference in New Issue
Block a user