diff --git a/app/vmalert/Makefile b/app/vmalert/Makefile index 8f378e67a0..3f8ad175bf 100644 --- a/app/vmalert/Makefile +++ b/app/vmalert/Makefile @@ -61,6 +61,7 @@ run-vmalert: vmalert ./bin/vmalert -rule=app/vmalert/config/testdata/rules2-good.rules \ -datasource.url=http://localhost:8428 \ -notifier.url=http://localhost:9093 \ + -notifier.url=http://127.0.0.1:9093 \ -remoteWrite.url=http://localhost:8428 \ -remoteRead.url=http://localhost:8428 \ -evaluationInterval=3s diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index 8ec74193d4..295cf6ac9e 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -49,9 +49,10 @@ const queryPath = "/api/v1/query?query=" // VMStorage represents vmstorage entity with ability to read and write metrics type VMStorage struct { - c *http.Client - queryURL string - basicAuthUser, basicAuthPass string + c *http.Client + queryURL string + basicAuthUser string + basicAuthPass string } // NewVMStorage is a constructor for VMStorage diff --git a/app/vmalert/group.go b/app/vmalert/group.go index 0de9b10cef..2ae35c0871 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metrics" ) @@ -155,10 +156,10 @@ func (g *Group) close() { <-g.finishedCh } -func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) { +func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) { defer func() { close(g.finishedCh) }() logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) - e := &executor{querier, nr, rw} + e := &executor{querier, nts, rw} t := time.NewTicker(g.Interval) defer t.Stop() for { @@ -201,9 +202,9 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifi } type executor struct { - querier datasource.Querier - notifier notifier.Notifier - rw *remotewrite.Client + querier datasource.Querier + notifiers []notifier.Notifier + rw *remotewrite.Client } func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, interval time.Duration) chan error { @@ -286,10 +287,14 @@ func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, inter if len(alerts) < 1 { return nil } + alertsSent.Add(len(alerts)) - if err := e.notifier.Send(ctx, alerts); err != nil { - alertsSendErrors.Inc() - return fmt.Errorf("rule %q: failed to send alerts: %s", rule, err) + errGr := new(utils.ErrGroup) + for _, nt := range e.notifiers { + if err := nt.Send(ctx, alerts); err != nil { + alertsSendErrors.Inc() + errGr.Add(fmt.Errorf("rule %q: failed to send alerts: %s", rule, err)) + } } - return nil + return errGr.Err() } diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index ee69c9fdc8..5439ebae27 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -179,7 +179,7 @@ func TestGroupStart(t *testing.T) { fs.add(m1) fs.add(m2) go func() { - g.start(context.Background(), fs, fn, nil) + g.start(context.Background(), fs, []notifier.Notifier{fn}, nil) close(finished) }() diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 22ef3fa1c4..3737c72807 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -116,15 +116,15 @@ func newManager(ctx context.Context) (*manager, error) { if err != nil { return nil, fmt.Errorf("failed to init `external.alert.source`: %s", err) } - nt, err := notifier.Init(aug) + nts, err := notifier.Init(aug) if err != nil { return nil, fmt.Errorf("failed to init notifier: %s", err) } manager := &manager{ - groups: make(map[uint64]*Group), - querier: q, - notifier: nt, + groups: make(map[uint64]*Group), + querier: q, + notifiers: nts, } rw, err := remotewrite.Init(ctx) if err != nil { diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index b312cddc84..bce4026bf5 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -15,8 +15,8 @@ import ( // manager controls group states type manager struct { - querier datasource.Querier - notifier notifier.Notifier + querier datasource.Querier + notifiers []notifier.Notifier rw *remotewrite.Client rr datasource.Querier @@ -73,7 +73,7 @@ func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) { m.wg.Add(1) id := group.ID() go func() { - group.start(ctx, m.querier, m.notifier, m.rw) + group.start(ctx, m.querier, m.notifiers, m.rw) m.wg.Done() }() m.groups[id] = group diff --git a/app/vmalert/manager_test.go b/app/vmalert/manager_test.go index 11fcc6d232..bc6f308560 100644 --- a/app/vmalert/manager_test.go +++ b/app/vmalert/manager_test.go @@ -37,9 +37,9 @@ func TestManagerUpdateError(t *testing.T) { // Should be executed with -race flag func TestManagerUpdateConcurrent(t *testing.T) { m := &manager{ - groups: make(map[uint64]*Group), - querier: &fakeQuerier{}, - notifier: &fakeNotifier{}, + groups: make(map[uint64]*Group), + querier: &fakeQuerier{}, + notifiers: []notifier.Notifier{&fakeNotifier{}}, } paths := []string{ "config/testdata/dir/rules0-good.rules", diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index 985bd47d59..67302ca175 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -7,6 +7,8 @@ import ( "strings" "text/template" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" ) // Alert the triggered alert @@ -77,7 +79,7 @@ func ValidateTemplates(annotations map[string]string) error { func templateAnnotations(annotations map[string]string, header string, data alertTplData) (map[string]string, error) { var builder strings.Builder var buf bytes.Buffer - eg := errGroup{} + eg := new(utils.ErrGroup) r := make(map[string]string, len(annotations)) for key, text := range annotations { r[key] = text @@ -87,12 +89,12 @@ func templateAnnotations(annotations map[string]string, header string, data aler builder.WriteString(header) builder.WriteString(text) if err := templateAnnotation(&buf, builder.String(), data); err != nil { - eg.errs = append(eg.errs, fmt.Sprintf("key %q, template %q: %s", key, text, err)) + eg.Add(fmt.Errorf("key %q, template %q: %s", key, text, err)) continue } r[key] = buf.String() } - return r, eg.err() + return r, eg.Err() } func templateAnnotation(dst io.Writer, text string, data alertTplData) error { diff --git a/app/vmalert/notifier/alertmanager.go b/app/vmalert/notifier/alertmanager.go index 6e06949ce2..78fb453bcd 100644 --- a/app/vmalert/notifier/alertmanager.go +++ b/app/vmalert/notifier/alertmanager.go @@ -12,9 +12,11 @@ import ( // AlertManager represents integration provider with Prometheus alert manager // https://github.com/prometheus/alertmanager type AlertManager struct { - alertURL string - argFunc AlertURLGenerator - client *http.Client + alertURL string + basicAuthUser string + basicAuthPass string + argFunc AlertURLGenerator + client *http.Client } // Send an alert or resolve message @@ -28,6 +30,9 @@ func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error { } req.Header.Set("Content-Type", "application/json") req = req.WithContext(ctx) + if am.basicAuthPass != "" { + req.SetBasicAuth(am.basicAuthUser, am.basicAuthPass) + } resp, err := am.client.Do(req) if err != nil { return err @@ -51,10 +56,13 @@ type AlertURLGenerator func(Alert) string const alertManagerPath = "/api/v2/alerts" // NewAlertManager is a constructor for AlertManager -func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager { +func NewAlertManager(alertManagerURL, user, pass string, fn AlertURLGenerator, c *http.Client) *AlertManager { + addr := strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath return &AlertManager{ - alertURL: strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath, - argFunc: fn, - client: c, + alertURL: addr, + argFunc: fn, + client: c, + basicAuthUser: user, + basicAuthPass: pass, } } diff --git a/app/vmalert/notifier/alertmanager_test.go b/app/vmalert/notifier/alertmanager_test.go index cfdcc334c3..18f2efcf4b 100644 --- a/app/vmalert/notifier/alertmanager_test.go +++ b/app/vmalert/notifier/alertmanager_test.go @@ -11,12 +11,21 @@ import ( ) func TestAlertManager_Send(t *testing.T) { + const baUser, baPass = "foo", "bar" mux := http.NewServeMux() mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) { t.Errorf("should not be called") }) c := -1 mux.HandleFunc(alertManagerPath, func(w http.ResponseWriter, r *http.Request) { + user, pass, ok := r.BasicAuth() + if !ok { + t.Errorf("unauthorized request") + } + if user != baUser || pass != baPass { + t.Errorf("wrong creds %q:%q; expected %q:%q", + user, pass, baUser, baPass) + } c++ if r.Method != http.MethodPost { t.Errorf("expected POST method got %s", r.Method) @@ -58,7 +67,7 @@ func TestAlertManager_Send(t *testing.T) { }) srv := httptest.NewServer(mux) defer srv.Close() - am := NewAlertManager(srv.URL, func(alert Alert) string { + am := NewAlertManager(srv.URL, baUser, baPass, func(alert Alert) string { return strconv.FormatUint(alert.GroupID, 10) + "/" + strconv.FormatUint(alert.ID, 10) }, srv.Client()) if err := am.Send(context.Background(), []Alert{{}, {}}); err == nil { diff --git a/app/vmalert/notifier/init.go b/app/vmalert/notifier/init.go index 6871cb010c..b84c42c26a 100644 --- a/app/vmalert/notifier/init.go +++ b/app/vmalert/notifier/init.go @@ -6,28 +6,42 @@ import ( "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" ) var ( - addr = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093") + addrs = flagutil.NewArray("notifier.url", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093") + basicAuthUsername = flagutil.NewArray("notifier.basicAuth.username", "Optional basic auth username for -datasource.url") + basicAuthPassword = flagutil.NewArray("notifier.basicAuth.password", "Optional basic auth password for -datasource.url") + tlsInsecureSkipVerify = flag.Bool("notifier.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -notifier.url") - tlsCertFile = flag.String("notifier.tlsCertFile", "", "Optional path to client-side TLS certificate file to use when connecting to -notifier.url") - tlsKeyFile = flag.String("notifier.tlsKeyFile", "", "Optional path to client-side TLS certificate key to use when connecting to -notifier.url") - tlsCAFile = flag.String("notifier.tlsCAFile", "", "Optional path to TLS CA file to use for verifying connections to -notifier.url. "+ + tlsCertFile = flagutil.NewArray("notifier.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -notifier.url") + tlsKeyFile = flagutil.NewArray("notifier.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to -notifier.url") + tlsCAFile = flagutil.NewArray("notifier.tlsCAFile", "Optional path to TLS CA file to use for verifying connections to -notifier.url. "+ "By default system CA is used") - tlsServerName = flag.String("notifier.tlsServerName", "", "Optional TLS server name to use for connections to -notifier.url. "+ + tlsServerName = flagutil.NewArray("notifier.tlsServerName", "Optional TLS server name to use for connections to -notifier.url. "+ "By default the server name from -notifier.url is used") ) // Init creates a Notifier object based on provided flags. -func Init(gen AlertURLGenerator) (Notifier, error) { - if *addr == "" { +func Init(gen AlertURLGenerator) ([]Notifier, error) { + if len(*addrs) == 0 { flag.PrintDefaults() - return nil, fmt.Errorf("notifier.url is empty") + return nil, fmt.Errorf("at least one `-notifier.url` must be set") } - tr, err := utils.Transport(*addr, *tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify) - if err != nil { - return nil, fmt.Errorf("failed to create transport: %s", err) + + var notifiers []Notifier + for i, addr := range *addrs { + cert, key := tlsCertFile.GetOptionalArg(i), tlsKeyFile.GetOptionalArg(i) + ca, serverName := tlsCAFile.GetOptionalArg(i), tlsServerName.GetOptionalArg(i) + tr, err := utils.Transport(addr, cert, key, ca, serverName, *tlsInsecureSkipVerify) + if err != nil { + return nil, fmt.Errorf("failed to create transport: %s", err) + } + user, pass := basicAuthUsername.GetOptionalArg(i), basicAuthPassword.GetOptionalArg(i) + am := NewAlertManager(addr, user, pass, gen, &http.Client{Transport: tr}) + notifiers = append(notifiers, am) } - return NewAlertManager(*addr, gen, &http.Client{Transport: tr}), nil + + return notifiers, nil } diff --git a/app/vmalert/notifier/utils.go b/app/vmalert/notifier/utils.go deleted file mode 100644 index 4bce9485a7..0000000000 --- a/app/vmalert/notifier/utils.go +++ /dev/null @@ -1,21 +0,0 @@ -package notifier - -import ( - "fmt" - "strings" -) - -type errGroup struct { - errs []string -} - -func (eg *errGroup) err() error { - if eg == nil || len(eg.errs) == 0 { - return nil - } - return eg -} - -func (eg *errGroup) Error() string { - return fmt.Sprintf("errors: %s", strings.Join(eg.errs, "\n")) -} diff --git a/app/vmalert/utils.go b/app/vmalert/utils.go index dcf46f8eca..7a824a096e 100644 --- a/app/vmalert/utils.go +++ b/app/vmalert/utils.go @@ -1,9 +1,10 @@ package main import ( - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "sort" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries { diff --git a/app/vmalert/utils/err_group.go b/app/vmalert/utils/err_group.go new file mode 100644 index 0000000000..a2f5750e84 --- /dev/null +++ b/app/vmalert/utils/err_group.go @@ -0,0 +1,43 @@ +package utils + +import ( + "fmt" + "strings" +) + +// ErrGroup accumulates multiple errors +// and produces single error message. +type ErrGroup struct { + errs []error +} + +// Add adds a new error to group. +// Isn't thread-safe. +func (eg *ErrGroup) Add(err error) { + eg.errs = append(eg.errs, err) +} + +// Err checks if group contains at least +// one error. +func (eg *ErrGroup) Err() error { + if eg == nil || len(eg.errs) == 0 { + return nil + } + return eg +} + +// Error satisfies Error interface +func (eg *ErrGroup) Error() string { + if len(eg.errs) == 0 { + return "" + } + var b strings.Builder + fmt.Fprintf(&b, "errors(%d): ", len(eg.errs)) + for i, err := range eg.errs { + b.WriteString(err.Error()) + if i != len(eg.errs)-1 { + b.WriteString("\n") + } + } + return b.String() +} diff --git a/app/vmalert/utils/err_group_test.go b/app/vmalert/utils/err_group_test.go new file mode 100644 index 0000000000..ae409bcc4a --- /dev/null +++ b/app/vmalert/utils/err_group_test.go @@ -0,0 +1,38 @@ +package utils + +import ( + "errors" + "testing" +) + +func TestErrGroup(t *testing.T) { + testCases := []struct { + errs []error + exp string + }{ + {nil, ""}, + {[]error{errors.New("timeout")}, "errors(1): timeout"}, + { + []error{errors.New("timeout"), errors.New("deadline")}, + "errors(2): timeout\ndeadline", + }, + } + for _, tc := range testCases { + eg := new(ErrGroup) + for _, err := range tc.errs { + eg.Add(err) + } + if len(tc.errs) == 0 { + if eg.Err() != nil { + t.Fatalf("expected to get nil error") + } + continue + } + if eg.Err() == nil { + t.Fatalf("expected to get non-nil error") + } + if eg.Error() != tc.exp { + t.Fatalf("expected to have: \n%q\ngot:\n%q", tc.exp, eg.Error()) + } + } +}