vmalert: allow configuring custom notifier headers per group (#4088)

vmalert: allow configuring custom notifier headers per group
This commit is contained in:
Haleygo 2023-04-27 18:17:26 +08:00 committed by Aliaksandr Valialkin
parent 52021713ec
commit 4b0db17bec
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
8 changed files with 55 additions and 35 deletions

View File

@ -36,7 +36,8 @@ type Group struct {
Params url.Values `yaml:"params"`
// Headers contains optional HTTP headers added to each rule request
Headers []Header `yaml:"headers,omitempty"`
// NotifierHeaders contains optional HTTP headers added to each alert request which will send to notifier
NotifierHeaders []Header `yaml:"notifier_headers,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}

View File

@ -39,6 +39,7 @@ type Group struct {
Labels map[string]string
Params url.Values
Headers map[string]string
NotifierHeaders map[string]string
doneCh chan struct{}
finishedCh chan struct{}
@ -102,6 +103,7 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
Checksum: cfg.Checksum,
Params: cfg.Params,
Headers: make(map[string]string),
NotifierHeaders: make(map[string]string),
Labels: cfg.Labels,
doneCh: make(chan struct{}),
@ -117,6 +119,9 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
for _, h := range cfg.Headers {
g.Headers[h.Key] = h.Value
}
for _, h := range cfg.NotifierHeaders {
g.NotifierHeaders[h.Key] = h.Value
}
g.metrics = newGroupMetrics(g)
rules := make([]Rule, len(cfg.Rules))
for i, r := range cfg.Rules {
@ -230,6 +235,7 @@ func (g *Group) updateWith(newGroup *Group) error {
g.Concurrency = newGroup.Concurrency
g.Params = newGroup.Params
g.Headers = newGroup.Headers
g.NotifierHeaders = newGroup.NotifierHeaders
g.Labels = newGroup.Labels
g.Limit = newGroup.Limit
g.Checksum = newGroup.Checksum
@ -294,7 +300,10 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
e := &executor{
rw: rw,
notifiers: nts,
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label)}
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
notifierHeaders: g.NotifierHeaders,
}
evalTS := time.Now()
@ -412,6 +421,8 @@ type executor struct {
// where `ruleID` is ID of the Rule within a Group
// and `ruleLabels` is []prompb.Label marshalled to a string
previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label
notifierHeaders map[string]string
}
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration, limit int) chan error {
@ -504,7 +515,7 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur
for _, nt := range e.notifiers() {
wg.Add(1)
go func(nt notifier.Notifier) {
if err := nt.Send(ctx, alerts); err != nil {
if err := nt.Send(ctx, alerts, e.notifierHeaders); err != nil {
errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
}
wg.Done()

View File

@ -131,7 +131,7 @@ type fakeNotifier struct {
func (*fakeNotifier) Close() {}
func (*fakeNotifier) Addr() string { return "" }
func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error {
func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert, _ map[string]string) error {
fn.Lock()
defer fn.Unlock()
fn.counter += len(alerts)
@ -155,7 +155,7 @@ type faultyNotifier struct {
fakeNotifier
}
func (fn *faultyNotifier) Send(ctx context.Context, _ []notifier.Alert) error {
func (fn *faultyNotifier) Send(ctx context.Context, _ []notifier.Alert, _ map[string]string) error {
d, ok := ctx.Deadline()
if ok {
time.Sleep(time.Until(d))

View File

@ -184,6 +184,8 @@ func (g *Group) toAPI() APIGroup {
Concurrency: g.Concurrency,
Params: urlValuesToStrings(g.Params),
Headers: headersToStrings(g.Headers),
NotifierHeaders: headersToStrings(g.NotifierHeaders),
Labels: g.Labels,
}
for _, r := range g.Rules {

View File

@ -51,16 +51,16 @@ func (am *AlertManager) Close() {
func (am AlertManager) Addr() string { return am.addr }
// Send an alert or resolve message
func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error {
func (am *AlertManager) Send(ctx context.Context, alerts []Alert, notifierHeaders map[string]string) error {
am.metrics.alertsSent.Add(len(alerts))
err := am.send(ctx, alerts)
err := am.send(ctx, alerts, notifierHeaders)
if err != nil {
am.metrics.alertsSendErrors.Add(len(alerts))
}
return err
}
func (am *AlertManager) send(ctx context.Context, alerts []Alert) error {
func (am *AlertManager) send(ctx context.Context, alerts []Alert, notifierHeaders map[string]string) error {
b := &bytes.Buffer{}
writeamRequest(b, alerts, am.argFunc, am.relabelConfigs)
@ -69,6 +69,9 @@ func (am *AlertManager) send(ctx context.Context, alerts []Alert) error {
return err
}
req.Header.Set("Content-Type", "application/json")
for key, value := range notifierHeaders {
req.Header.Set(key, value)
}
if am.timeout > 0 {
var cancel context.CancelFunc
@ -105,7 +108,8 @@ const alertManagerPath = "/api/v2/alerts"
// NewAlertManager is a constructor for AlertManager
func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, authCfg promauth.HTTPClientConfig,
relabelCfg *promrelabel.ParsedConfigs, timeout time.Duration) (*AlertManager, error) {
relabelCfg *promrelabel.ParsedConfigs, timeout time.Duration,
) (*AlertManager, error) {
tls := &promauth.TLSConfig{}
if authCfg.TLSConfig != nil {
tls = authCfg.TLSConfig

View File

@ -90,10 +90,10 @@ func TestAlertManager_Send(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %s", err)
}
if err := am.Send(context.Background(), []Alert{{}, {}}); err == nil {
if err := am.Send(context.Background(), []Alert{{}, {}}, nil); err == nil {
t.Error("expected connection error got nil")
}
if err := am.Send(context.Background(), []Alert{}); err == nil {
if err := am.Send(context.Background(), []Alert{}, nil); err == nil {
t.Error("expected wrong http code error got nil")
}
if err := am.Send(context.Background(), []Alert{{
@ -102,7 +102,7 @@ func TestAlertManager_Send(t *testing.T) {
Start: time.Now().UTC(),
End: time.Now().UTC(),
Annotations: map[string]string{"a": "b", "c": "d", "e": "f"},
}}); err != nil {
}}, nil); err != nil {
t.Errorf("unexpected error %s", err)
}
if c != 2 {

View File

@ -7,7 +7,7 @@ type Notifier interface {
// Send sends the given list of alerts.
// Returns an error if fails to send the alerts.
// Must unblock if the given ctx is cancelled.
Send(ctx context.Context, alerts []Alert) error
Send(ctx context.Context, alerts []Alert, notifierHeaders map[string]string) error
// Addr returns address where alerts are sent.
Addr() string
// Close is a destructor for the Notifier

View File

@ -72,6 +72,8 @@ type APIGroup struct {
Params []string `json:"params,omitempty"`
// Headers contains HTTP headers added to each Rule's request
Headers []string `json:"headers,omitempty"`
// NotifierHeaders contains HTTP headers added to each alert request which will send to notifier
NotifierHeaders []string `json:"notifier_headers,omitempty"`
// Labels is a set of label value pairs, that will be added to every rule.
Labels map[string]string `json:"labels,omitempty"`
}