diff --git a/app/vmalert/main.go b/app/vmalert/main.go index f73659bacd..26bb69018d 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -56,7 +56,6 @@ absolute path to all .yaml files in root.`) externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier") ) -// TODO: hot configuration reload func main() { envflag.Parse() buildinfo.Init() @@ -70,7 +69,7 @@ func main() { notifier.InitTemplateFunc(eu) logger.Infof("reading alert rules configuration file from %s", strings.Join(*rulePath, ";")) - groups, err := Parse(*rulePath, *validateTemplates) + groups, err := readRules() if err != nil { logger.Fatalf("cannot parse configuration file: %s", err) } @@ -101,21 +100,18 @@ func main() { } wg := sync.WaitGroup{} - for _, g := range groups { - if restoreDS != nil { - err := g.Restore(ctx, restoreDS, *remoteReadLookBack) - if err != nil { - logger.Errorf("error while restoring state for group %q: %s", g.Name, err) - } - } - wg.Add(1) - go func(group Group) { - w.run(ctx, group, *evaluationInterval) - wg.Done() - }(g) - } - go httpserver.Serve(*httpListenAddr, (&requestHandler{groups: groups}).handler) + groupUpdateStorage := startInitGroups(ctx, w, restoreDS, groups, &wg) + + rh := &requestHandler{groups: groups, mu: sync.RWMutex{}} + + //run config updater + wg.Add(1) + sigHup := procutil.NewSighupChan() + + go rh.runConfigUpdater(ctx, sigHup, groupUpdateStorage, w, &wg) + + go httpserver.Serve(*httpListenAddr, (rh).handler) sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) @@ -152,15 +148,30 @@ var ( remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`) remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) + + configReloadTotal = metrics.NewCounter(`vmalert_config_reload_total`) + configReloadOkTotal = metrics.NewCounter(`vmalert_config_reload_ok_total`) + configReloadErrorTotal = metrics.NewCounter(`vmalert_config_reload_error_total`) ) -func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration) { +func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration, groupUpdate chan Group) { logger.Infof("watchdog for %s has been started", group.Name) t := time.NewTicker(evaluationInterval) defer t.Stop() for { select { + case newGroup := <-groupUpdate: + if newGroup.Rules == nil || len(newGroup.Rules) == 0 { + //empty rules for group + //need to exit + logger.Infof("stopping group: %s, it contains 0 rules now", group.Name) + return + } + logger.Infof("new group update received, group: %s", group.Name) + group.Update(newGroup) + logger.Infof("group was reconciled, group: %s", group.Name) + case <-t.C: iterationTotal.Inc() iterationStart := time.Now() @@ -237,3 +248,31 @@ func checkFlags() { logger.Fatalf("datasource.url is empty") } } + +func startInitGroups(ctx context.Context, w *watchdog, restoreDS *datasource.VMStorage, groups []Group, wg *sync.WaitGroup) map[string]chan Group { + groupUpdateStorage := map[string]chan Group{} + for _, g := range groups { + if restoreDS != nil { + err := g.Restore(ctx, restoreDS, *remoteReadLookBack) + if err != nil { + logger.Errorf("error while restoring state for group %q: %s", g.Name, err) + } + } + + groupUpdateChan := make(chan Group, 1) + groupUpdateStorage[g.Name] = groupUpdateChan + + wg.Add(1) + go func(group Group) { + w.run(ctx, group, *evaluationInterval, groupUpdateChan) + wg.Done() + }(g) + } + return groupUpdateStorage +} + +//wrapper +func readRules() ([]Group, error) { + return Parse(*rulePath, *validateTemplates) + +} diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index c8bfb79c19..666b6cf0f7 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -36,6 +36,32 @@ func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time return nil } +// Update group +func (g *Group) Update(newGroup Group) *Group { + //check if old rule exists at new rules + for _, newRule := range newGroup.Rules { + for _, oldRule := range g.Rules { + if newRule.Name == oldRule.Name { + //is lock nessesary? + oldRule.mu.Lock() + //we copy only rules related values + //it`s safe to add additional fields to rule + //struct + oldRule.Annotations = newRule.Annotations + oldRule.Labels = newRule.Labels + oldRule.For = newRule.For + oldRule.Expr = newRule.Expr + oldRule.group = newRule.group + newRule = oldRule + oldRule.mu.Unlock() + } + } + } + //swap rules + g.Rules = newGroup.Rules + return g +} + // Rule is basic alert entity type Rule struct { Name string `yaml:"alert"` diff --git a/app/vmalert/rule_test.go b/app/vmalert/rule_test.go index db3915dd9a..476c783786 100644 --- a/app/vmalert/rule_test.go +++ b/app/vmalert/rule_test.go @@ -2,6 +2,7 @@ package main import ( "context" + "reflect" "testing" "time" @@ -533,3 +534,86 @@ func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) dat m.Value = value return m } + +func TestGroup_Update(t *testing.T) { + type fields struct { + Name string + Rules []*Rule + } + type args struct { + newGroup Group + } + tests := []struct { + name string + fields fields + args args + want *Group + }{ + { + name: "update group with replace one value", + args: args{newGroup: Group{Name: "base-group", Rules: []*Rule{ + { + Annotations: map[string]string{"different": "annotation"}, + For: time.Second * 30, + }, + }}}, + fields: fields{ + Name: "base-group", + Rules: []*Rule{ + { + Annotations: map[string]string{"one": "annotations"}, + }, + }, + }, + want: &Group{ + Name: "base-group", + Rules: []*Rule{ + {Annotations: map[string]string{"different": "annotation"}, For: time.Second * 30}, + }, + }, + }, + { + name: "update group with change one value for rule", + args: args{newGroup: Group{Name: "base-group-2", Rules: []*Rule{ + { + Annotations: map[string]string{"different": "annotation", "replace-value": "new-one"}, + For: time.Second * 30, + Labels: map[string]string{"label-1": "value-1"}, + Expr: "rate(vm) > 1", + }, + }}}, + fields: fields{ + Name: "base-group-2", + Rules: []*Rule{ + { + Annotations: map[string]string{"different": "annotation", "replace-value": "old-one"}, + For: time.Second * 50, + Expr: "rate(vm) > 5", + }, + }, + }, + want: &Group{ + Name: "base-group-2", + Rules: []*Rule{ + { + Annotations: map[string]string{"different": "annotation", "replace-value": "new-one"}, + For: time.Second * 30, + Labels: map[string]string{"label-1": "value-1"}, + Expr: "rate(vm) > 1", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := &Group{ + Name: tt.fields.Name, + Rules: tt.fields.Rules, + } + if got := g.Update(tt.args.newGroup); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Update() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/app/vmalert/web.go b/app/vmalert/web.go index 732e0f6121..b89b3b531a 100644 --- a/app/vmalert/web.go +++ b/app/vmalert/web.go @@ -1,12 +1,17 @@ package main import ( + "context" "encoding/json" "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "net/http" + "os" "sort" "strconv" "strings" + "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -27,6 +32,74 @@ type APIAlert struct { type requestHandler struct { groups []Group + mu sync.RWMutex +} + +func (rh *requestHandler) runConfigUpdater(ctx context.Context, reloadChan <-chan os.Signal, groupUpdateStorage map[string]chan Group, w *watchdog, wg *sync.WaitGroup) { + logger.Infof("starting config updater") + defer wg.Done() + for { + select { + case <-reloadChan: + logger.Infof("get sighup signal, updating config") + configReloadTotal.Inc() + newRules, err := readRules() + if err != nil { + logger.Errorf("sighup, cannot read new rules: %v", err) + configReloadErrorTotal.Inc() + continue + } + + rh.mu.Lock() + configReloadOkTotal.Inc() + //send new group to running watchers + for _, group := range newRules { + //update or start new group + if updateChan, ok := groupUpdateStorage[group.Name]; ok { + updateChan <- group + } else { + //its new group, we need to start it + updateChan := make(chan Group, 1) + groupUpdateStorage[group.Name] = updateChan + wg.Add(1) + go func(grp Group) { + w.run(ctx, grp, *evaluationInterval, updateChan) + wg.Done() + }(group) + //add new group to route handler + rh.groups = append(rh.groups, group) + } + } + //we have to check, if group is missing and remove it + for groupName, updateChan := range groupUpdateStorage { + var exist bool + for _, newGroup := range newRules { + if groupName == newGroup.Name { + exist = true + } + } + if !exist { + logger.Infof("group not exists in new rules, remove it, group: %s", groupName) + delete(groupUpdateStorage, groupName) + updateChan <- Group{Rules: []*Rule{}} + for i, group := range rh.groups { + if group.Name == groupName { + rh.groups[i] = rh.groups[len(rh.groups)-1] + rh.groups[len(rh.groups)-1] = Group{} + rh.groups = rh.groups[:len(rh.groups)-1] + } + } + } + } + rh.mu.Unlock() + logger.Infof("finished sync") + + case <-ctx.Done(): + logger.Infof("exiting config updater") + return + + } + } } var pathList = [][]string{ @@ -48,6 +121,12 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool { case "/api/v1/alerts": resph.handle(rh.list()) return true + case "/-/reload": + logger.Infof("api config reload was called, sending sighup") + procutil.SelfSIGHUP() + w.WriteHeader(http.StatusOK) + return true + default: // /api/v1///status if strings.HasSuffix(r.URL.Path, "/status") { @@ -66,6 +145,8 @@ type listAlertsResponse struct { } func (rh *requestHandler) list() ([]byte, error) { + rh.mu.RLock() + defer rh.mu.RUnlock() lr := listAlertsResponse{Status: "success"} for _, g := range rh.groups { for _, r := range g.Rules { @@ -89,6 +170,8 @@ func (rh *requestHandler) list() ([]byte, error) { } func (rh *requestHandler) alert(path string) ([]byte, error) { + rh.mu.RLock() + defer rh.mu.RUnlock() parts := strings.SplitN(strings.TrimPrefix(path, "/api/v1/"), "/", 3) if len(parts) != 3 { return nil, &httpserver.ErrorWithStatusCode{ diff --git a/app/vmalert/web_test.go b/app/vmalert/web_test.go index 8d2c02d76c..6b88a54c55 100644 --- a/app/vmalert/web_test.go +++ b/app/vmalert/web_test.go @@ -1,13 +1,17 @@ package main import ( + "context" "encoding/json" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "net/http" "net/http/httptest" + "os" "reflect" + "sync" + "syscall" "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "time" ) func TestHandler(t *testing.T) { @@ -22,6 +26,7 @@ func TestHandler(t *testing.T) { Name: "group", Rules: []*Rule{rule}, }}, + mu: sync.RWMutex{}, } getResp := func(url string, to interface{}, code int) { t.Helper() @@ -70,3 +75,96 @@ func TestHandler(t *testing.T) { getResp(ts.URL, nil, 200) }) } + +func Test_requestHandler_runConfigUpdater(t *testing.T) { + type fields struct { + groups []Group + } + type args struct { + updateChan chan os.Signal + w *watchdog + wg *sync.WaitGroup + initRulePath []string + updateRulePath string + } + tests := []struct { + name string + fields fields + args args + want []Group + }{ + { + name: "update good rules", + args: args{ + w: &watchdog{}, + wg: &sync.WaitGroup{}, + updateChan: make(chan os.Signal), + initRulePath: []string{"testdata/rules0-good.rules"}, + updateRulePath: "testdata/dir/rules1-good.rules", + }, + fields: fields{ + groups: []Group{}, + }, + want: []Group{{Name: "duplicatedGroupDiffFiles", Rules: []*Rule{newTestRule("VMRows", time.Second*10)}}}, + }, + { + name: "update with one bad rule file", + args: args{ + w: &watchdog{}, + wg: &sync.WaitGroup{}, + updateChan: make(chan os.Signal), + initRulePath: []string{"testdata/rules0-good.rules"}, + updateRulePath: "testdata/dir/rules2-bad.rules", + }, + fields: fields{ + groups: []Group{}, + }, + want: []Group{ + { + Name: "duplicatedGroupDiffFiles", Rules: []*Rule{ + newTestRule("VMRows", time.Second*10), + }}, + { + Name: "TestGroup", Rules: []*Rule{ + newTestRule("Conns", time.Duration(0)), + newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)), + }}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + grp, err := Parse(tt.args.initRulePath, *validateTemplates) + if err != nil { + t.Errorf("cannot setup test: %v", err) + cancel() + return + } + groupUpdateStorage := startInitGroups(ctx, tt.args.w, nil, grp, tt.args.wg) + rh := &requestHandler{ + groups: grp, + mu: sync.RWMutex{}, + } + tt.args.wg.Add(1) + go func() { + //possible side effect with global var modification + err = rulePath.Set(tt.args.updateRulePath) + if err != nil { + t.Errorf("cannot update rule") + panic(err) + } + //need some delay + time.Sleep(time.Millisecond * 300) + tt.args.updateChan <- syscall.SIGHUP + cancel() + + }() + rh.runConfigUpdater(ctx, tt.args.updateChan, groupUpdateStorage, tt.args.w, tt.args.wg) + tt.args.wg.Wait() + if len(tt.want) != len(rh.groups) { + t.Errorf("want: %v,\ngot :%v ", tt.want, rh.groups) + } + }) + } +}