mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 23:39:48 +01:00
app/vmagent: allow setting independent auth configs per each configured -remoteWrite.url
This commit is contained in:
parent
12dbb9e22c
commit
20538a2a5d
@ -11,6 +11,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||||
@ -22,14 +23,19 @@ var (
|
|||||||
sendTimeout = flag.Duration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to -remoteWrite.url")
|
sendTimeout = flag.Duration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to -remoteWrite.url")
|
||||||
|
|
||||||
tlsInsecureSkipVerify = flag.Bool("remoteWrite.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -remoteWrite.url")
|
tlsInsecureSkipVerify = flag.Bool("remoteWrite.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -remoteWrite.url")
|
||||||
tlsCertFile = flag.String("remoteWrite.tlsCertFile", "", "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url")
|
tlsCertFile = flagutil.NewArray("remoteWrite.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url. "+
|
||||||
tlsKeyFile = flag.String("remoteWrite.tlsKeyFile", "", "Optional path to client-side TLS certificate key to use when connecting to -remoteWrite.url")
|
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
|
||||||
tlsCAFile = flag.String("remoteWrite.tlsCAFile", "", "Optional path to TLS CA file to use for verifying connections to -remoteWrite.url. "+
|
tlsKeyFile = flagutil.NewArray("remoteWrite.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to -remoteWrite.url. "+
|
||||||
"By default system CA is used")
|
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
|
||||||
|
tlsCAFile = flagutil.NewArray("remoteWrite.tlsCAFile", "Optional path to TLS CA file to use for verifying connections to -remoteWrite.url. "+
|
||||||
|
"By default system CA is used. If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
|
||||||
|
|
||||||
basicAuthUsername = flag.String("remoteWrite.basicAuth.username", "", "Optional basic auth username to use for -remoteWrite.url")
|
basicAuthUsername = flagutil.NewArray("remoteWrite.basicAuth.username", "Optional basic auth username to use for -remoteWrite.url. "+
|
||||||
basicAuthPassword = flag.String("remoteWrite.basicAuth.password", "", "Optional basic auth password to use for -remoteWrite.url")
|
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
|
||||||
bearerToken = flag.String("remoteWrite.bearerToken", "", "Optional bearer auth token to use for -remoteWrite.url")
|
basicAuthPassword = flagutil.NewArray("remoteWrite.basicAuth.password", "Optional basic auth password to use for -remoteWrite.url. "+
|
||||||
|
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
|
||||||
|
bearerToken = flagutil.NewArray("remoteWrite.bearerToken", "Optional bearer auth token to use for -remoteWrite.url. "+
|
||||||
|
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
|
||||||
)
|
)
|
||||||
|
|
||||||
type client struct {
|
type client struct {
|
||||||
@ -50,19 +56,22 @@ type client struct {
|
|||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQueue, concurrency int) *client {
|
func newClient(argIdx int, remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQueue, concurrency int) *client {
|
||||||
authHeader := ""
|
authHeader := ""
|
||||||
if len(*basicAuthUsername) > 0 || len(*basicAuthPassword) > 0 {
|
username := basicAuthUsername.GetOptionalArg(argIdx)
|
||||||
|
password := basicAuthPassword.GetOptionalArg(argIdx)
|
||||||
|
if len(username) > 0 || len(password) > 0 {
|
||||||
// See https://en.wikipedia.org/wiki/Basic_access_authentication
|
// See https://en.wikipedia.org/wiki/Basic_access_authentication
|
||||||
token := *basicAuthUsername + ":" + *basicAuthPassword
|
token := username + ":" + password
|
||||||
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
||||||
authHeader = "Basic " + token64
|
authHeader = "Basic " + token64
|
||||||
}
|
}
|
||||||
if len(*bearerToken) > 0 {
|
token := bearerToken.GetOptionalArg(argIdx)
|
||||||
|
if len(token) > 0 {
|
||||||
if authHeader != "" {
|
if authHeader != "" {
|
||||||
logger.Panicf("FATAL: `-remoteWrite.bearerToken`=%q cannot be set when `-remoteWrite.basicAuth.*` flags are set", *bearerToken)
|
logger.Panicf("FATAL: `-remoteWrite.bearerToken`=%q cannot be set when `-remoteWrite.basicAuth.*` flags are set", token)
|
||||||
}
|
}
|
||||||
authHeader = "Bearer " + *bearerToken
|
authHeader = "Bearer " + token
|
||||||
}
|
}
|
||||||
|
|
||||||
readTimeout := *sendTimeout
|
readTimeout := *sendTimeout
|
||||||
@ -87,7 +96,7 @@ func newClient(remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQue
|
|||||||
var tlsCfg *tls.Config
|
var tlsCfg *tls.Config
|
||||||
if isTLS {
|
if isTLS {
|
||||||
var err error
|
var err error
|
||||||
tlsCfg, err = getTLSConfig()
|
tlsCfg, err = getTLSConfig(argIdx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("FATAL: cannot initialize TLS config: %s", err)
|
logger.Panicf("FATAL: cannot initialize TLS config: %s", err)
|
||||||
}
|
}
|
||||||
@ -144,24 +153,26 @@ func (c *client) MustStop() {
|
|||||||
logger.Infof("stopped client for -remoteWrite.url=%q", c.remoteWriteURL)
|
logger.Infof("stopped client for -remoteWrite.url=%q", c.remoteWriteURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTLSConfig() (*tls.Config, error) {
|
func getTLSConfig(argIdx int) (*tls.Config, error) {
|
||||||
var tlsRootCA *x509.CertPool
|
var tlsRootCA *x509.CertPool
|
||||||
var tlsCertificate *tls.Certificate
|
var tlsCertificate *tls.Certificate
|
||||||
if *tlsCertFile != "" || *tlsKeyFile != "" {
|
certFile := tlsCertFile.GetOptionalArg(argIdx)
|
||||||
cert, err := tls.LoadX509KeyPair(*tlsCertFile, *tlsKeyFile)
|
keyFile := tlsKeyFile.GetOptionalArg(argIdx)
|
||||||
|
if certFile != "" || keyFile != "" {
|
||||||
|
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot load TLS certificate for -remoteWrite.tlsCertFile=%q and -remoteWrite.tlsKeyFile=%q: %s", *tlsCertFile, *tlsKeyFile, err)
|
return nil, fmt.Errorf("cannot load TLS certificate for -remoteWrite.tlsCertFile=%q and -remoteWrite.tlsKeyFile=%q: %s", certFile, keyFile, err)
|
||||||
}
|
}
|
||||||
tlsCertificate = &cert
|
tlsCertificate = &cert
|
||||||
}
|
}
|
||||||
if *tlsCAFile != "" {
|
if caFile := tlsCAFile.GetOptionalArg(argIdx); caFile != "" {
|
||||||
data, err := ioutil.ReadFile(*tlsCAFile)
|
data, err := ioutil.ReadFile(caFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot read -remoteWrite.tlsCAFile=%q: %s", *tlsCAFile, err)
|
return nil, fmt.Errorf("cannot read -remoteWrite.tlsCAFile=%q: %s", caFile, err)
|
||||||
}
|
}
|
||||||
tlsRootCA = x509.NewCertPool()
|
tlsRootCA = x509.NewCertPool()
|
||||||
if !tlsRootCA.AppendCertsFromPEM(data) {
|
if !tlsRootCA.AppendCertsFromPEM(data) {
|
||||||
return nil, fmt.Errorf("cannot parse data -remoteWrite.tlsCAFile=%q", *tlsCAFile)
|
return nil, fmt.Errorf("cannot parse data -remoteWrite.tlsCAFile=%q", caFile)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tlsCfg := &tls.Config{
|
tlsCfg := &tls.Config{
|
||||||
|
@ -69,7 +69,7 @@ func Init() {
|
|||||||
if *showRemoteWriteURL {
|
if *showRemoteWriteURL {
|
||||||
urlLabelValue = remoteWriteURL
|
urlLabelValue = remoteWriteURL
|
||||||
}
|
}
|
||||||
rwctx := newRemoteWriteCtx(remoteWriteURL, relabelConfigPath, maxInmemoryBlocks, urlLabelValue)
|
rwctx := newRemoteWriteCtx(i, remoteWriteURL, relabelConfigPath, maxInmemoryBlocks, urlLabelValue)
|
||||||
rwctxs = append(rwctxs, rwctx)
|
rwctxs = append(rwctxs, rwctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,7 +131,7 @@ type remoteWriteCtx struct {
|
|||||||
relabelMetricsDropped *metrics.Counter
|
relabelMetricsDropped *metrics.Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRemoteWriteCtx(remoteWriteURL, relabelConfigPath string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx {
|
func newRemoteWriteCtx(argIdx int, remoteWriteURL, relabelConfigPath string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx {
|
||||||
h := xxhash.Sum64([]byte(remoteWriteURL))
|
h := xxhash.Sum64([]byte(remoteWriteURL))
|
||||||
path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h)
|
path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h)
|
||||||
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, *maxPendingBytesPerURL)
|
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, *maxPendingBytesPerURL)
|
||||||
@ -141,7 +141,7 @@ func newRemoteWriteCtx(remoteWriteURL, relabelConfigPath string, maxInmemoryBloc
|
|||||||
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, path, urlLabelValue), func() float64 {
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, path, urlLabelValue), func() float64 {
|
||||||
return float64(fq.GetInmemoryQueueLen())
|
return float64(fq.GetInmemoryQueueLen())
|
||||||
})
|
})
|
||||||
c := newClient(remoteWriteURL, urlLabelValue, fq, *queues)
|
c := newClient(argIdx, remoteWriteURL, urlLabelValue, fq, *queues)
|
||||||
var prcs []promrelabel.ParsedRelabelConfig
|
var prcs []promrelabel.ParsedRelabelConfig
|
||||||
if len(relabelConfigPath) > 0 {
|
if len(relabelConfigPath) > 0 {
|
||||||
var err error
|
var err error
|
||||||
|
@ -26,3 +26,15 @@ func (a *Array) Set(value string) error {
|
|||||||
*a = append(*a, values...)
|
*a = append(*a, values...)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetOptionalArg returns optional arg under the given argIdx.
|
||||||
|
func (a *Array) GetOptionalArg(argIdx int) string {
|
||||||
|
x := *a
|
||||||
|
if argIdx >= len(x) {
|
||||||
|
if len(x) == 1 {
|
||||||
|
return x[0]
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return x[argIdx]
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user