mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-04 22:02:12 +01:00
567 lines
16 KiB
Go
567 lines
16 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/base64"
|
|
"flag"
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"gopkg.in/yaml.v2"
|
|
)
|
|
|
|
var (
|
|
authConfigPath = flag.String("auth.config", "", "Path to auth config. It can point either to local file or to http url. "+
|
|
"See https://docs.victoriametrics.com/vmauth.html for details on the format of this auth config")
|
|
configCheckInterval = flag.Duration("configCheckInterval", 0, "interval for config file re-read. "+
|
|
"Zero value disables config re-reading. By default, refreshing is disabled, send SIGHUP for config refresh.")
|
|
)
|
|
|
|
// AuthConfig represents auth config.
|
|
type AuthConfig struct {
|
|
Users []UserInfo `yaml:"users,omitempty"`
|
|
UnauthorizedUser *UserInfo `yaml:"unauthorized_user,omitempty"`
|
|
}
|
|
|
|
// UserInfo is user information read from authConfigPath
|
|
type UserInfo struct {
|
|
Name string `yaml:"name,omitempty"`
|
|
BearerToken string `yaml:"bearer_token,omitempty"`
|
|
Username string `yaml:"username,omitempty"`
|
|
Password string `yaml:"password,omitempty"`
|
|
URLPrefix *URLPrefix `yaml:"url_prefix,omitempty"`
|
|
URLMaps []URLMap `yaml:"url_map,omitempty"`
|
|
HeadersConf HeadersConf `yaml:",inline"`
|
|
MaxConcurrentRequests int `yaml:"max_concurrent_requests,omitempty"`
|
|
DefaultURL *URLPrefix `yaml:"default_url,omitempty"`
|
|
RetryStatusCodes []int `yaml:"retry_status_codes,omitempty"`
|
|
|
|
concurrencyLimitCh chan struct{}
|
|
concurrencyLimitReached *metrics.Counter
|
|
|
|
requests *metrics.Counter
|
|
requestsDuration *metrics.Summary
|
|
}
|
|
|
|
// HeadersConf represents config for request and response headers.
|
|
type HeadersConf struct {
|
|
RequestHeaders []Header `yaml:"headers,omitempty"`
|
|
ResponseHeaders []Header `yaml:"response_headers,omitempty"`
|
|
}
|
|
|
|
func (ui *UserInfo) beginConcurrencyLimit() error {
|
|
select {
|
|
case ui.concurrencyLimitCh <- struct{}{}:
|
|
return nil
|
|
default:
|
|
ui.concurrencyLimitReached.Inc()
|
|
return fmt.Errorf("cannot handle more than %d concurrent requests from user %s", ui.getMaxConcurrentRequests(), ui.name())
|
|
}
|
|
}
|
|
|
|
func (ui *UserInfo) endConcurrencyLimit() {
|
|
<-ui.concurrencyLimitCh
|
|
}
|
|
|
|
func (ui *UserInfo) getMaxConcurrentRequests() int {
|
|
mcr := ui.MaxConcurrentRequests
|
|
if mcr <= 0 {
|
|
mcr = *maxConcurrentPerUserRequests
|
|
}
|
|
return mcr
|
|
}
|
|
|
|
// Header is `Name: Value` http header, which must be added to the proxied request.
|
|
type Header struct {
|
|
Name string
|
|
Value string
|
|
}
|
|
|
|
// UnmarshalYAML unmarshals h from f.
|
|
func (h *Header) UnmarshalYAML(f func(interface{}) error) error {
|
|
var s string
|
|
if err := f(&s); err != nil {
|
|
return err
|
|
}
|
|
n := strings.IndexByte(s, ':')
|
|
if n < 0 {
|
|
return fmt.Errorf("missing speparator char ':' between Name and Value in the header %q; expected format - 'Name: Value'", s)
|
|
}
|
|
h.Name = strings.TrimSpace(s[:n])
|
|
h.Value = strings.TrimSpace(s[n+1:])
|
|
return nil
|
|
}
|
|
|
|
// MarshalYAML marshals h to yaml.
|
|
func (h *Header) MarshalYAML() (interface{}, error) {
|
|
s := fmt.Sprintf("%s: %s", h.Name, h.Value)
|
|
return s, nil
|
|
}
|
|
|
|
// URLMap is a mapping from source paths to target urls.
|
|
type URLMap struct {
|
|
SrcPaths []*SrcPath `yaml:"src_paths,omitempty"`
|
|
URLPrefix *URLPrefix `yaml:"url_prefix,omitempty"`
|
|
HeadersConf HeadersConf `yaml:",inline"`
|
|
RetryStatusCodes []int `yaml:"retry_status_codes,omitempty"`
|
|
}
|
|
|
|
// SrcPath represents an src path
|
|
type SrcPath struct {
|
|
sOriginal string
|
|
re *regexp.Regexp
|
|
}
|
|
|
|
// URLPrefix represents passed `url_prefix`
|
|
type URLPrefix struct {
|
|
n uint32
|
|
bus []*backendURL
|
|
}
|
|
|
|
type backendURL struct {
|
|
brokenDeadline uint64
|
|
concurrentRequests int32
|
|
url *url.URL
|
|
}
|
|
|
|
func (bu *backendURL) isBroken() bool {
|
|
ct := fasttime.UnixTimestamp()
|
|
return ct < atomic.LoadUint64(&bu.brokenDeadline)
|
|
}
|
|
|
|
func (bu *backendURL) setBroken() {
|
|
deadline := fasttime.UnixTimestamp() + uint64((*failTimeout).Seconds())
|
|
atomic.StoreUint64(&bu.brokenDeadline, deadline)
|
|
}
|
|
|
|
func (bu *backendURL) put() {
|
|
atomic.AddInt32(&bu.concurrentRequests, -1)
|
|
}
|
|
|
|
func (up *URLPrefix) getBackendsCount() int {
|
|
return len(up.bus)
|
|
}
|
|
|
|
// getLeastLoadedBackendURL returns the backendURL with the minimum number of concurrent requests.
|
|
//
|
|
// backendURL.put() must be called on the returned backendURL after the request is complete.
|
|
func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL {
|
|
bus := up.bus
|
|
if len(bus) == 1 {
|
|
// Fast path - return the only backend url.
|
|
bu := bus[0]
|
|
atomic.AddInt32(&bu.concurrentRequests, 1)
|
|
return bu
|
|
}
|
|
|
|
// Slow path - select other backend urls.
|
|
n := atomic.AddUint32(&up.n, 1)
|
|
|
|
for i := uint32(0); i < uint32(len(bus)); i++ {
|
|
idx := (n + i) % uint32(len(bus))
|
|
bu := bus[idx]
|
|
if bu.isBroken() {
|
|
continue
|
|
}
|
|
if atomic.CompareAndSwapInt32(&bu.concurrentRequests, 0, 1) {
|
|
// Fast path - return the backend with zero concurrently executed requests.
|
|
return bu
|
|
}
|
|
}
|
|
|
|
// Slow path - return the backend with the minimum number of concurrently executed requests.
|
|
buMin := bus[n%uint32(len(bus))]
|
|
minRequests := atomic.LoadInt32(&buMin.concurrentRequests)
|
|
for _, bu := range bus {
|
|
if bu.isBroken() {
|
|
continue
|
|
}
|
|
if n := atomic.LoadInt32(&bu.concurrentRequests); n < minRequests {
|
|
buMin = bu
|
|
minRequests = n
|
|
}
|
|
}
|
|
atomic.AddInt32(&buMin.concurrentRequests, 1)
|
|
return buMin
|
|
}
|
|
|
|
// UnmarshalYAML unmarshals up from yaml.
|
|
func (up *URLPrefix) UnmarshalYAML(f func(interface{}) error) error {
|
|
var v interface{}
|
|
if err := f(&v); err != nil {
|
|
return err
|
|
}
|
|
var urls []string
|
|
switch x := v.(type) {
|
|
case string:
|
|
urls = []string{x}
|
|
case []interface{}:
|
|
if len(x) == 0 {
|
|
return fmt.Errorf("`url_prefix` must contain at least a single url")
|
|
}
|
|
us := make([]string, len(x))
|
|
for i, xx := range x {
|
|
s, ok := xx.(string)
|
|
if !ok {
|
|
return fmt.Errorf("`url_prefix` must contain array of strings; got %T", xx)
|
|
}
|
|
us[i] = s
|
|
}
|
|
urls = us
|
|
default:
|
|
return fmt.Errorf("unexpected type for `url_prefix`: %T; want string or []string", v)
|
|
}
|
|
bus := make([]*backendURL, len(urls))
|
|
for i, u := range urls {
|
|
pu, err := url.Parse(u)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal %q into url: %w", u, err)
|
|
}
|
|
bus[i] = &backendURL{
|
|
url: pu,
|
|
}
|
|
}
|
|
up.bus = bus
|
|
return nil
|
|
}
|
|
|
|
// MarshalYAML marshals up to yaml.
|
|
func (up *URLPrefix) MarshalYAML() (interface{}, error) {
|
|
var b []byte
|
|
if len(up.bus) == 1 {
|
|
u := up.bus[0].url.String()
|
|
b = strconv.AppendQuote(b, u)
|
|
return string(b), nil
|
|
}
|
|
b = append(b, '[')
|
|
for i, bu := range up.bus {
|
|
u := bu.url.String()
|
|
b = strconv.AppendQuote(b, u)
|
|
if i+1 < len(up.bus) {
|
|
b = append(b, ',')
|
|
}
|
|
}
|
|
b = append(b, ']')
|
|
return string(b), nil
|
|
}
|
|
|
|
func (sp *SrcPath) match(s string) bool {
|
|
prefix, ok := sp.re.LiteralPrefix()
|
|
if ok {
|
|
// Fast path - literal match
|
|
return s == prefix
|
|
}
|
|
if !strings.HasPrefix(s, prefix) {
|
|
return false
|
|
}
|
|
return sp.re.MatchString(s)
|
|
}
|
|
|
|
// UnmarshalYAML implements yaml.Unmarshaler
|
|
func (sp *SrcPath) UnmarshalYAML(f func(interface{}) error) error {
|
|
var s string
|
|
if err := f(&s); err != nil {
|
|
return err
|
|
}
|
|
sAnchored := "^(?:" + s + ")$"
|
|
re, err := regexp.Compile(sAnchored)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot build regexp from %q: %w", s, err)
|
|
}
|
|
sp.sOriginal = s
|
|
sp.re = re
|
|
return nil
|
|
}
|
|
|
|
// MarshalYAML implements yaml.Marshaler.
|
|
func (sp *SrcPath) MarshalYAML() (interface{}, error) {
|
|
return sp.sOriginal, nil
|
|
}
|
|
|
|
var (
|
|
configReloads = metrics.NewCounter(`vmauth_config_last_reload_total`)
|
|
configReloadErrors = metrics.NewCounter(`vmauth_config_last_reload_errors_total`)
|
|
configSuccess = metrics.NewCounter(`vmauth_config_last_reload_successful`)
|
|
configTimestamp = metrics.NewCounter(`vmauth_config_last_reload_success_timestamp_seconds`)
|
|
)
|
|
|
|
func initAuthConfig() {
|
|
if len(*authConfigPath) == 0 {
|
|
logger.Fatalf("missing required `-auth.config` command-line flag")
|
|
}
|
|
|
|
// Register SIGHUP handler for config re-read just before readAuthConfig call.
|
|
// This guarantees that the config will be re-read if the signal arrives during readAuthConfig call.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
|
sighupCh := procutil.NewSighupChan()
|
|
|
|
_, err := loadAuthConfig()
|
|
if err != nil {
|
|
logger.Fatalf("cannot load auth config: %s", err)
|
|
}
|
|
|
|
configSuccess.Set(1)
|
|
configTimestamp.Set(fasttime.UnixTimestamp())
|
|
|
|
stopCh = make(chan struct{})
|
|
authConfigWG.Add(1)
|
|
go func() {
|
|
defer authConfigWG.Done()
|
|
authConfigReloader(sighupCh)
|
|
}()
|
|
}
|
|
|
|
func stopAuthConfig() {
|
|
close(stopCh)
|
|
authConfigWG.Wait()
|
|
}
|
|
|
|
func authConfigReloader(sighupCh <-chan os.Signal) {
|
|
var refreshCh <-chan time.Time
|
|
// initialize auth refresh interval
|
|
if *configCheckInterval > 0 {
|
|
ticker := time.NewTicker(*configCheckInterval)
|
|
defer ticker.Stop()
|
|
refreshCh = ticker.C
|
|
}
|
|
|
|
updateFn := func() {
|
|
configReloads.Inc()
|
|
updated, err := loadAuthConfig()
|
|
if err != nil {
|
|
logger.Errorf("failed to load auth config; using the last successfully loaded config; error: %s", err)
|
|
configSuccess.Set(0)
|
|
configReloadErrors.Inc()
|
|
return
|
|
}
|
|
configSuccess.Set(1)
|
|
if updated {
|
|
configTimestamp.Set(fasttime.UnixTimestamp())
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case <-refreshCh:
|
|
updateFn()
|
|
case <-sighupCh:
|
|
logger.Infof("SIGHUP received; loading -auth.config=%q", *authConfigPath)
|
|
updateFn()
|
|
}
|
|
}
|
|
}
|
|
|
|
// authConfigData stores the yaml definition for this config.
|
|
// authConfigData needs to be updated each time authConfig is updated.
|
|
var authConfigData atomic.Pointer[[]byte]
|
|
|
|
var authConfig atomic.Pointer[AuthConfig]
|
|
var authUsers atomic.Pointer[map[string]*UserInfo]
|
|
var authConfigWG sync.WaitGroup
|
|
var stopCh chan struct{}
|
|
|
|
// loadAuthConfig loads and applies the config from *authConfigPath.
|
|
// It returns bool value to identify if new config was applied.
|
|
// The config can be not applied if there is a parsing error
|
|
// or if there are no changes to the current authConfig.
|
|
func loadAuthConfig() (bool, error) {
|
|
data, err := fs.ReadFileOrHTTP(*authConfigPath)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to read -auth.config=%q: %w", *authConfigPath, err)
|
|
}
|
|
|
|
oldData := authConfigData.Load()
|
|
if oldData != nil && bytes.Equal(data, *oldData) {
|
|
// there are no updates in the config - skip reloading.
|
|
return false, nil
|
|
}
|
|
|
|
ac, err := parseAuthConfig(data)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to parse -auth.config=%q: %w", *authConfigPath, err)
|
|
}
|
|
|
|
m, err := parseAuthConfigUsers(ac)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to parse users from -auth.config=%q: %w", *authConfigPath, err)
|
|
}
|
|
logger.Infof("loaded information about %d users from -auth.config=%q", len(m), *authConfigPath)
|
|
|
|
authConfig.Store(ac)
|
|
authConfigData.Store(&data)
|
|
authUsers.Store(&m)
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func parseAuthConfig(data []byte) (*AuthConfig, error) {
|
|
data, err := envtemplate.ReplaceBytes(data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot expand environment vars: %w", err)
|
|
}
|
|
var ac AuthConfig
|
|
if err = yaml.UnmarshalStrict(data, &ac); err != nil {
|
|
return nil, fmt.Errorf("cannot unmarshal AuthConfig data: %w", err)
|
|
}
|
|
ui := ac.UnauthorizedUser
|
|
if ui != nil {
|
|
ui.requests = metrics.GetOrCreateCounter(`vmauth_unauthorized_user_requests_total`)
|
|
ui.requestsDuration = metrics.GetOrCreateSummary(`vmauth_unauthorized_user_request_duration_seconds`)
|
|
ui.concurrencyLimitCh = make(chan struct{}, ui.getMaxConcurrentRequests())
|
|
ui.concurrencyLimitReached = metrics.GetOrCreateCounter(`vmauth_unauthorized_user_concurrent_requests_limit_reached_total`)
|
|
_ = metrics.GetOrCreateGauge(`vmauth_unauthorized_user_concurrent_requests_capacity`, func() float64 {
|
|
return float64(cap(ui.concurrencyLimitCh))
|
|
})
|
|
_ = metrics.GetOrCreateGauge(`vmauth_unauthorized_user_concurrent_requests_current`, func() float64 {
|
|
return float64(len(ui.concurrencyLimitCh))
|
|
})
|
|
}
|
|
return &ac, nil
|
|
}
|
|
|
|
func parseAuthConfigUsers(ac *AuthConfig) (map[string]*UserInfo, error) {
|
|
uis := ac.Users
|
|
if len(uis) == 0 && ac.UnauthorizedUser == nil {
|
|
return nil, fmt.Errorf("Missing `users` or `unauthorized_user` sections")
|
|
}
|
|
byAuthToken := make(map[string]*UserInfo, len(uis))
|
|
for i := range uis {
|
|
ui := &uis[i]
|
|
if ui.BearerToken == "" && ui.Username == "" {
|
|
return nil, fmt.Errorf("either bearer_token or username must be set")
|
|
}
|
|
if ui.BearerToken != "" && ui.Username != "" {
|
|
return nil, fmt.Errorf("bearer_token=%q and username=%q cannot be set simultaneously", ui.BearerToken, ui.Username)
|
|
}
|
|
at1, at2 := getAuthTokens(ui.BearerToken, ui.Username, ui.Password)
|
|
if byAuthToken[at1] != nil {
|
|
return nil, fmt.Errorf("duplicate auth token found for bearer_token=%q, username=%q: %q", ui.BearerToken, ui.Username, at1)
|
|
}
|
|
if byAuthToken[at2] != nil {
|
|
return nil, fmt.Errorf("duplicate auth token found for bearer_token=%q, username=%q: %q", ui.BearerToken, ui.Username, at2)
|
|
}
|
|
if ui.URLPrefix != nil {
|
|
if err := ui.URLPrefix.sanitize(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if ui.DefaultURL != nil {
|
|
if err := ui.DefaultURL.sanitize(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
for _, e := range ui.URLMaps {
|
|
if len(e.SrcPaths) == 0 {
|
|
return nil, fmt.Errorf("missing `src_paths` in `url_map`")
|
|
}
|
|
if e.URLPrefix == nil {
|
|
return nil, fmt.Errorf("missing `url_prefix` in `url_map`")
|
|
}
|
|
if err := e.URLPrefix.sanitize(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if len(ui.URLMaps) == 0 && ui.URLPrefix == nil {
|
|
return nil, fmt.Errorf("missing `url_prefix`")
|
|
}
|
|
name := ui.name()
|
|
if ui.BearerToken != "" {
|
|
if ui.Password != "" {
|
|
return nil, fmt.Errorf("password shouldn't be set for bearer_token %q", ui.BearerToken)
|
|
}
|
|
ui.requests = metrics.GetOrCreateCounter(fmt.Sprintf(`vmauth_user_requests_total{username=%q}`, name))
|
|
ui.requestsDuration = metrics.GetOrCreateSummary(fmt.Sprintf(`vmauth_user_request_duration_seconds{username=%q}`, name))
|
|
}
|
|
if ui.Username != "" {
|
|
ui.requests = metrics.GetOrCreateCounter(fmt.Sprintf(`vmauth_user_requests_total{username=%q}`, name))
|
|
ui.requestsDuration = metrics.GetOrCreateSummary(fmt.Sprintf(`vmauth_user_request_duration_seconds{username=%q}`, name))
|
|
}
|
|
mcr := ui.getMaxConcurrentRequests()
|
|
ui.concurrencyLimitCh = make(chan struct{}, mcr)
|
|
ui.concurrencyLimitReached = metrics.GetOrCreateCounter(fmt.Sprintf(`vmauth_user_concurrent_requests_limit_reached_total{username=%q}`, name))
|
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmauth_user_concurrent_requests_capacity{username=%q}`, name), func() float64 {
|
|
return float64(cap(ui.concurrencyLimitCh))
|
|
})
|
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmauth_user_concurrent_requests_current{username=%q}`, name), func() float64 {
|
|
return float64(len(ui.concurrencyLimitCh))
|
|
})
|
|
byAuthToken[at1] = ui
|
|
byAuthToken[at2] = ui
|
|
}
|
|
return byAuthToken, nil
|
|
}
|
|
|
|
func (ui *UserInfo) name() string {
|
|
if ui.Name != "" {
|
|
return ui.Name
|
|
}
|
|
if ui.Username != "" {
|
|
return ui.Username
|
|
}
|
|
if ui.BearerToken != "" {
|
|
return "bearer_token"
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func getAuthTokens(bearerToken, username, password string) (string, string) {
|
|
if bearerToken != "" {
|
|
// Accept the bearerToken as Basic Auth username with empty password
|
|
at1 := getAuthToken(bearerToken, "", "")
|
|
at2 := getAuthToken("", bearerToken, "")
|
|
return at1, at2
|
|
}
|
|
at := getAuthToken("", username, password)
|
|
return at, at
|
|
}
|
|
|
|
func getAuthToken(bearerToken, username, password string) string {
|
|
if bearerToken != "" {
|
|
return "Bearer " + bearerToken
|
|
}
|
|
token := username + ":" + password
|
|
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
|
return "Basic " + token64
|
|
}
|
|
|
|
func (up *URLPrefix) sanitize() error {
|
|
for _, bu := range up.bus {
|
|
puNew, err := sanitizeURLPrefix(bu.url)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bu.url = puNew
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func sanitizeURLPrefix(urlPrefix *url.URL) (*url.URL, error) {
|
|
// Remove trailing '/' from urlPrefix
|
|
for strings.HasSuffix(urlPrefix.Path, "/") {
|
|
urlPrefix.Path = urlPrefix.Path[:len(urlPrefix.Path)-1]
|
|
}
|
|
// Validate urlPrefix
|
|
if urlPrefix.Scheme != "http" && urlPrefix.Scheme != "https" {
|
|
return nil, fmt.Errorf("unsupported scheme for `url_prefix: %q`: %q; must be `http` or `https`", urlPrefix, urlPrefix.Scheme)
|
|
}
|
|
if urlPrefix.Host == "" {
|
|
return nil, fmt.Errorf("missing hostname in `url_prefix %q`", urlPrefix.Host)
|
|
}
|
|
return urlPrefix, nil
|
|
}
|