vmalert: cleanup and restructure of code to improve maintainability (#471)

The change introduces new entity `manager` which replaces
`watchdog`, decouples requestHandler and groups. Manager
supposed to control life cycle of groups, rules and
config reloads.

Groups export an ID method which returns a hash
from filename and group name. ID supposed to be unique
identifier across all loaded groups.

Some tests were added to improve coverage.

Bug with wrong annotation value if $value is used in
 templates after metrics being restored fixed.

Notifier interface was extended to accept context.

New set of metrics was introduced for config reload.
This commit is contained in:
Roman Khavronenko 2020-05-10 17:58:17 +01:00 committed by GitHub
parent 9e8733ff65
commit 8c8ff5d0cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 876 additions and 593 deletions

View File

@ -48,6 +48,7 @@ Rules in group evaluated one-by-one sequentially.
* `http://<vmalert-addr>/api/v1/<groupName>/<alertID>/status" ` - get alert status by ID.
Used as alert source in AlertManager.
* `http://<vmalert-addr>/metrics` - application metrics.
* `http://<vmalert-addr>/-/reload` - hot configuration reload.
`vmalert` may be configured with `-remotewrite` flag to write alerts state in form of timeseries
via remote write protocol. Alerts state will be written as `ALERTS` timeseries. These timeseries
@ -109,6 +110,9 @@ Usage of vmalert:
Pass `-help` to `vmalert` in order to see the full list of supported
command-line flags with their descriptions.
To reload configuration without `vmalert` restart send SIGHUP signal
or send GET request to `/-/reload` endpoint.
### Contributing
`vmalert` is mostly designed and built by VictoriaMetrics community.

View File

@ -27,29 +27,32 @@ func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) {
if err != nil {
return nil, fmt.Errorf("file %s: %w", file, err)
}
for _, group := range gr {
if _, ok := groupsNames[group.Name]; ok {
return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", file, group.Name)
for _, g := range gr {
if _, ok := groupsNames[g.Name]; ok {
return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", g.Name, file)
}
groupsNames[group.Name] = struct{}{}
for _, rule := range group.Rules {
g.File = file
g.done = make(chan struct{})
g.finished = make(chan struct{})
groupsNames[g.Name] = struct{}{}
for _, rule := range g.Rules {
if err = rule.Validate(); err != nil {
return nil, fmt.Errorf("invalid rule filepath:%s, group %s:%w", file, group.Name, err)
return nil, fmt.Errorf("invalid rule filepath: %s, group %s: %w", file, g.Name, err)
}
// TODO: this init looks weird here
rule.alerts = make(map[uint64]*notifier.Alert)
if validateAnnotations {
if err = notifier.ValidateTemplates(rule.Annotations); err != nil {
return nil, fmt.Errorf("invalid annotations filepath:%s, group %s:%w", file, group.Name, err)
return nil, fmt.Errorf("invalid annotations filepath: %s, group %s: %w", file, g.Name, err)
}
if err = notifier.ValidateTemplates(rule.Labels); err != nil {
return nil, fmt.Errorf("invalid labels filepath:%s, group %s:%w", file, group.Name, err)
return nil, fmt.Errorf("invalid labels filepath: %s, group %s: %w", file, g.Name, err)
}
}
rule.group = group
rule.group = g
rule.alerts = make(map[uint64]*notifier.Alert)
}
groups = append(groups, g)
}
groups = append(groups, gr...)
}
if len(groups) < 1 {
return nil, fmt.Errorf("no groups found in %s", strings.Join(pathPatterns, ";"))

165
app/vmalert/group.go Normal file
View File

@ -0,0 +1,165 @@
package main
import (
"context"
"fmt"
"hash/fnv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
// Group is an entity for grouping rules
type Group struct {
Name string
File string
Rules []*Rule
done chan struct{}
finished chan struct{}
}
// ID return unique group ID that consists of
// rules file and group name
func (g Group) ID() uint64 {
hash := fnv.New64a()
hash.Write([]byte(g.File))
hash.Write([]byte("\xff"))
hash.Write([]byte(g.Name))
return hash.Sum64()
}
// Restore restores alerts state for all group rules with For > 0
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
for _, rule := range g.Rules {
if rule.For == 0 {
return nil
}
if err := rule.Restore(ctx, q, lookback); err != nil {
return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err)
}
}
return nil
}
// updateWith updates existing group with
// passed group object.
func (g *Group) updateWith(newGroup Group) {
rulesRegistry := make(map[string]*Rule)
for _, nr := range newGroup.Rules {
rulesRegistry[nr.id()] = nr
}
for i, or := range g.Rules {
nr, ok := rulesRegistry[or.id()]
if !ok {
// old rule is not present in the new list
// and must be removed
or = nil
g.Rules = append(g.Rules[:i], g.Rules[i+1:]...)
continue
}
// copy all significant fields.
// alerts state isn't copied since
// it should be updated in next 2 Evals
or.For = nr.For
or.Expr = nr.Expr
or.Labels = nr.Labels
or.Annotations = nr.Annotations
delete(rulesRegistry, nr.id())
}
for _, nr := range rulesRegistry {
g.Rules = append(g.Rules, nr)
}
}
var (
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
execTotal = metrics.NewCounter(`vmalert_execution_total`)
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
)
func (g *Group) close() {
if g.done == nil {
return
}
close(g.done)
<-g.finished
}
func (g *Group) start(ctx context.Context, interval time.Duration,
querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
logger.Infof("group %q started", g.Name)
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("group %q: context cancelled", g.Name)
close(g.finished)
return
case <-g.done:
logger.Infof("group %q: received stop signal", g.Name)
close(g.finished)
return
case <-t.C:
iterationTotal.Inc()
iterationStart := time.Now()
for _, rule := range g.Rules {
execTotal.Inc()
execStart := time.Now()
err := rule.Exec(ctx, querier)
execDuration.UpdateDuration(execStart)
if err != nil {
execErrors.Inc()
logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule.Name, err)
continue
}
var alertsToSend []notifier.Alert
for _, a := range rule.alerts {
if a.State != notifier.StatePending {
alertsToSend = append(alertsToSend, *a)
}
if a.State == notifier.StateInactive || rw == nil {
continue
}
tss := rule.AlertToTimeSeries(a, execStart)
for _, ts := range tss {
remoteWriteSent.Inc()
if err := rw.Push(ts); err != nil {
remoteWriteErrors.Inc()
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
}
}
}
if len(alertsToSend) > 0 {
alertsSent.Add(len(alertsToSend))
if err := nr.Send(ctx, alertsToSend); err != nil {
alertsSendErrors.Inc()
logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule.Name, err)
}
}
}
iterationDuration.UpdateDuration(iterationStart)
}
}
}

203
app/vmalert/group_test.go Normal file
View File

@ -0,0 +1,203 @@
package main
import (
"context"
"reflect"
"sort"
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
)
func TestUpdateWith(t *testing.T) {
testCases := []struct {
name string
currentRules []*Rule
newRules []*Rule
}{
{
"new rule",
[]*Rule{},
[]*Rule{{Name: "bar"}},
},
{
"update rule",
[]*Rule{{
Name: "foo",
Expr: "up > 0",
For: time.Second,
Labels: map[string]string{
"bar": "baz",
},
Annotations: map[string]string{
"summary": "{{ $value|humanize }}",
"description": "{{$labels}}",
},
}},
[]*Rule{{
Name: "bar",
Expr: "up > 10",
For: time.Second,
Labels: map[string]string{
"baz": "bar",
},
Annotations: map[string]string{
"summary": "none",
},
}},
},
{
"empty rule",
[]*Rule{{Name: "foo"}},
[]*Rule{},
},
{
"multiple rules",
[]*Rule{{Name: "foo"}, {Name: "bar"}, {Name: "baz"}},
[]*Rule{{Name: "foo"}, {Name: "baz"}},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
g := &Group{Rules: tc.currentRules}
g.updateWith(Group{Rules: tc.newRules})
if len(g.Rules) != len(tc.newRules) {
t.Fatalf("expected to have %d rules; got: %d",
len(g.Rules), len(tc.newRules))
}
for i, r := range g.Rules {
got, want := r, tc.newRules[i]
if got.Name != want.Name {
t.Fatalf("expected to have rule %q; got %q", want.Name, got.Name)
}
if got.Expr != want.Expr {
t.Fatalf("expected to have expression %q; got %q", want.Expr, got.Expr)
}
if got.For != want.For {
t.Fatalf("expected to have for %q; got %q", want.For, got.For)
}
if !reflect.DeepEqual(got.Annotations, want.Annotations) {
t.Fatalf("expected to have annotations %#v; got %#v", want.Annotations, got.Annotations)
}
if !reflect.DeepEqual(got.Labels, want.Labels) {
t.Fatalf("expected to have labels %#v; got %#v", want.Labels, got.Labels)
}
}
})
}
}
func TestGroupStart(t *testing.T) {
// TODO: make parsing from string instead of file
groups, err := Parse([]string{"testdata/rules1-good.rules"}, true)
if err != nil {
t.Fatalf("failed to parse rules: %s", err)
}
g := groups[0]
fn := &fakeNotifier{}
fs := &fakeQuerier{}
const inst1, inst2, job = "foo", "bar", "baz"
m1 := metricWithLabels(t, "instance", inst1, "job", job)
m2 := metricWithLabels(t, "instance", inst2, "job", job)
r := g.Rules[0]
alert1, err := r.newAlert(m1)
if err != nil {
t.Fatalf("faield to create alert: %s", err)
}
alert1.State = notifier.StateFiring
alert1.ID = hash(m1)
alert2, err := r.newAlert(m2)
if err != nil {
t.Fatalf("faield to create alert: %s", err)
}
alert2.State = notifier.StateFiring
alert2.ID = hash(m2)
const evalInterval = time.Millisecond
finished := make(chan struct{})
fs.add(m1)
fs.add(m2)
go func() {
g.start(context.Background(), evalInterval, fs, fn, nil)
close(finished)
}()
// wait for multiple evals
time.Sleep(20 * evalInterval)
gotAlerts := fn.getAlerts()
expectedAlerts := []notifier.Alert{*alert1, *alert2}
compareAlerts(t, expectedAlerts, gotAlerts)
// reset previous data
fs.reset()
// and set only one datapoint for response
fs.add(m1)
// wait for multiple evals
time.Sleep(20 * evalInterval)
gotAlerts = fn.getAlerts()
expectedAlerts = []notifier.Alert{*alert1}
compareAlerts(t, expectedAlerts, gotAlerts)
g.close()
<-finished
}
func compareAlerts(t *testing.T, as, bs []notifier.Alert) {
t.Helper()
if len(as) != len(bs) {
t.Fatalf("expected to have length %d; got %d", len(as), len(bs))
}
sort.Slice(as, func(i, j int) bool {
return as[i].ID < as[j].ID
})
sort.Slice(bs, func(i, j int) bool {
return bs[i].ID < bs[j].ID
})
for i := range as {
a, b := as[i], bs[i]
if a.Name != b.Name {
t.Fatalf("expected t have Name %q; got %q", a.Name, b.Name)
}
if a.State != b.State {
t.Fatalf("expected t have State %q; got %q", a.State, b.State)
}
if a.Value != b.Value {
t.Fatalf("expected t have Value %f; got %f", a.Value, b.Value)
}
if !reflect.DeepEqual(a.Annotations, b.Annotations) {
t.Fatalf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations)
}
if !reflect.DeepEqual(a.Labels, b.Labels) {
t.Fatalf("expected to have labels %#v; got %#v", a.Labels, b.Labels)
}
}
}
type fakeNotifier struct {
sync.Mutex
alerts []notifier.Alert
}
func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error {
fn.Lock()
defer fn.Unlock()
fn.alerts = alerts
return nil
}
func (fn *fakeNotifier) getAlerts() []notifier.Alert {
fn.Lock()
defer fn.Unlock()
return fn.alerts
}

View File

@ -8,7 +8,6 @@ import (
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
@ -64,23 +63,17 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
eu, err := getExternalURL(*externalURL, *httpListenAddr, httpserver.IsTLS())
if err != nil {
logger.Fatalf("can not get external url:%s ", err)
logger.Fatalf("can not get external url: %s ", err)
}
notifier.InitTemplateFunc(eu)
logger.Infof("reading alert rules configuration file from %s", strings.Join(*rulePath, ";"))
groups, err := readRules()
if err != nil {
logger.Fatalf("cannot parse configuration file: %s", err)
}
w := &watchdog{
manager := &manager{
groups: make(map[uint64]*Group),
storage: datasource.NewVMStorage(*datasourceURL, *basicAuthUsername, *basicAuthPassword, &http.Client{}),
alertProvider: notifier.NewAlertManager(*notifierURL, func(group, name string) string {
return fmt.Sprintf("%s/api/v1/%s/%s/status", eu, group, name)
notifier: notifier.NewAlertManager(*notifierURL, func(group, alert string) string {
return fmt.Sprintf("%s/api/v1/%s/%s/status", eu, group, alert)
}, &http.Client{}),
}
if *remoteWriteURL != "" {
c, err := remotewrite.NewClient(ctx, remotewrite.Config{
Addr: *remoteWriteURL,
@ -91,26 +84,38 @@ func main() {
if err != nil {
logger.Fatalf("failed to init remotewrite client: %s", err)
}
w.rw = c
manager.rw = c
}
var restoreDS *datasource.VMStorage
if *remoteReadURL != "" {
restoreDS = datasource.NewVMStorage(*remoteReadURL, *remoteReadUsername, *remoteReadPassword, &http.Client{})
manager.rr = datasource.NewVMStorage(*remoteReadURL, *remoteReadUsername, *remoteReadPassword, &http.Client{})
}
wg := sync.WaitGroup{}
if err := manager.start(ctx, *rulePath, *validateTemplates); err != nil {
logger.Fatalf("failed to start: %s", err)
}
groupUpdateStorage := startInitGroups(ctx, w, restoreDS, groups, &wg)
rh := &requestHandler{groups: groups, mu: sync.RWMutex{}}
//run config updater
wg.Add(1)
sigHup := procutil.NewSighupChan()
go rh.runConfigUpdater(ctx, sigHup, groupUpdateStorage, w, &wg)
go func() {
// init reload metrics with positive values to improve alerting conditions
configSuccess.Set(1)
configTimestamp.Set(uint64(time.Now().UnixNano()) / 1e9)
sigHup := procutil.NewSighupChan()
for {
<-sigHup
configReloads.Inc()
logger.Infof("SIGHUP received. Going to reload rules %q ...", *rulePath)
if err := manager.update(ctx, *rulePath, *validateTemplates, false); err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("error while reloading rules: %s", err)
continue
}
configSuccess.Set(1)
configTimestamp.Set(uint64(time.Now().UnixNano()) / 1e9)
logger.Infof("Rules reloaded successfully from %q", *rulePath)
}
}()
rh := &requestHandler{m: manager}
go httpserver.Serve(*httpListenAddr, (rh).handler)
sig := procutil.WaitForSigterm()
@ -119,106 +124,16 @@ func main() {
logger.Fatalf("cannot stop the webservice: %s", err)
}
cancel()
if w.rw != nil {
err := w.rw.Close()
if err != nil {
logger.Fatalf("cannot stop the remotewrite: %s", err)
}
}
wg.Wait()
}
type watchdog struct {
storage *datasource.VMStorage
alertProvider notifier.Notifier
rw *remotewrite.Client
manager.close()
}
var (
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
execTotal = metrics.NewCounter(`vmalert_execution_total`)
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
configReloadTotal = metrics.NewCounter(`vmalert_config_reload_total`)
configReloadOkTotal = metrics.NewCounter(`vmalert_config_reload_ok_total`)
configReloadErrorTotal = metrics.NewCounter(`vmalert_config_reload_error_total`)
configReloads = metrics.NewCounter(`vmalert_config_last_reload_total`)
configReloadErrors = metrics.NewCounter(`vmalert_config_last_reload_errors_total`)
configSuccess = metrics.NewCounter(`vmalert_config_last_reload_successful`)
configTimestamp = metrics.NewCounter(`vmalert_config_last_reload_success_timestamp_seconds`)
)
func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration, groupUpdate chan Group) {
logger.Infof("watchdog for %s has been started", group.Name)
t := time.NewTicker(evaluationInterval)
defer t.Stop()
for {
select {
case newGroup := <-groupUpdate:
if newGroup.Rules == nil || len(newGroup.Rules) == 0 {
//empty rules for group
//need to exit
logger.Infof("stopping group: %s, it contains 0 rules now", group.Name)
return
}
logger.Infof("new group update received, group: %s", group.Name)
group.Update(newGroup)
logger.Infof("group was reconciled, group: %s", group.Name)
case <-t.C:
iterationTotal.Inc()
iterationStart := time.Now()
for _, rule := range group.Rules {
execTotal.Inc()
execStart := time.Now()
err := rule.Exec(ctx, w.storage)
execDuration.UpdateDuration(execStart)
if err != nil {
execErrors.Inc()
logger.Errorf("failed to execute rule %q.%q: %s", group.Name, rule.Name, err)
continue
}
var alertsToSend []notifier.Alert
for _, a := range rule.alerts {
if a.State != notifier.StatePending {
alertsToSend = append(alertsToSend, *a)
}
if a.State == notifier.StateInactive || w.rw == nil {
continue
}
tss := rule.AlertToTimeSeries(a, execStart)
for _, ts := range tss {
remoteWriteSent.Inc()
if err := w.rw.Push(ts); err != nil {
remoteWriteErrors.Inc()
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
}
}
}
alertsSent.Add(len(alertsToSend))
if err := w.alertProvider.Send(alertsToSend); err != nil {
alertsSendErrors.Inc()
logger.Errorf("failed to send alert for rule %q.%q: %s", group.Name, rule.Name, err)
}
}
iterationDuration.UpdateDuration(iterationStart)
case <-ctx.Done():
logger.Infof("%s received stop signal", group.Name)
return
}
}
}
func getExternalURL(externalURL, httpListenAddr string, isSecure bool) (*url.URL, error) {
if externalURL != "" {
return url.Parse(externalURL)
@ -248,31 +163,3 @@ func checkFlags() {
logger.Fatalf("datasource.url is empty")
}
}
func startInitGroups(ctx context.Context, w *watchdog, restoreDS *datasource.VMStorage, groups []Group, wg *sync.WaitGroup) map[string]chan Group {
groupUpdateStorage := map[string]chan Group{}
for _, g := range groups {
if restoreDS != nil {
err := g.Restore(ctx, restoreDS, *remoteReadLookBack)
if err != nil {
logger.Errorf("error while restoring state for group %q: %s", g.Name, err)
}
}
groupUpdateChan := make(chan Group, 1)
groupUpdateStorage[g.Name] = groupUpdateChan
wg.Add(1)
go func(group Group) {
w.run(ctx, group, *evaluationInterval, groupUpdateChan)
wg.Done()
}(g)
}
return groupUpdateStorage
}
//wrapper
func readRules() ([]Group, error) {
return Parse(*rulePath, *validateTemplates)
}

109
app/vmalert/manager.go Normal file
View File

@ -0,0 +1,109 @@
package main
import (
"context"
"fmt"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type manager struct {
storage datasource.Querier
notifier notifier.Notifier
rw *remotewrite.Client
rr datasource.Querier
wg sync.WaitGroup
groupsMu sync.RWMutex
groups map[uint64]*Group
}
// AlertAPI generates APIAlert object from alert by its id(hash)
func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
m.groupsMu.RLock()
defer m.groupsMu.RUnlock()
g, ok := m.groups[gID]
if !ok {
return nil, fmt.Errorf("can't find group with id %q", gID)
}
for _, rule := range g.Rules {
if apiAlert := rule.AlertAPI(aID); apiAlert != nil {
return apiAlert, nil
}
}
return nil, fmt.Errorf("can't func alert with id %q in group %q", aID, g.Name)
}
func (m *manager) start(ctx context.Context, path []string, validate bool) error {
return m.update(ctx, path, validate, true)
}
func (m *manager) close() {
if m.rw != nil {
err := m.rw.Close()
if err != nil {
logger.Fatalf("cannot stop the remotewrite: %s", err)
}
}
m.wg.Wait()
}
func (m *manager) startGroup(ctx context.Context, group Group, restore bool) {
if restore {
err := group.Restore(ctx, m.rr, *remoteReadLookBack)
if err != nil {
logger.Errorf("error while restoring state for group %q: %s", group.Name, err)
}
}
m.wg.Add(1)
id := group.ID()
go func() {
group.start(ctx, *evaluationInterval, m.storage, m.notifier, m.rw)
m.wg.Done()
}()
m.groups[id] = &group
}
func (m *manager) update(ctx context.Context, path []string, validate, restore bool) error {
logger.Infof("reading alert rules configuration file from %q", strings.Join(path, ";"))
newGroups, err := Parse(path, validate)
if err != nil {
return fmt.Errorf("cannot parse configuration file: %s", err)
}
groupsRegistry := make(map[uint64]Group)
for _, ng := range newGroups {
groupsRegistry[ng.ID()] = ng
}
m.groupsMu.Lock()
for _, og := range m.groups {
id := og.ID()
ng, ok := groupsRegistry[id]
if !ok {
// old group is not present in new list
// and must be stopped and deleted
og.close()
delete(m.groups, og.ID())
og = nil
continue
}
og.updateWith(ng)
delete(groupsRegistry, ng.ID())
}
for _, ng := range groupsRegistry {
m.startGroup(ctx, ng, restore)
}
m.groupsMu.Unlock()
return nil
}

152
app/vmalert/manager_test.go Normal file
View File

@ -0,0 +1,152 @@
package main
import (
"context"
"math/rand"
"strings"
"sync"
"testing"
"time"
)
func TestManagerUpdateError(t *testing.T) {
m := &manager{groups: make(map[uint64]*Group)}
path := []string{"foo/bar"}
err := m.update(context.Background(), path, true, false)
if err == nil {
t.Fatalf("expected to have err; got nil instead")
}
expErr := "no groups found"
if !strings.Contains(err.Error(), expErr) {
t.Fatalf("expected to got err %s; got %s", expErr, err)
}
}
// TestManagerUpdateConcurrent supposed to test concurrent
// execution of configuration update.
// Should be executed with -race flag
func TestManagerUpdateConcurrent(t *testing.T) {
m := &manager{groups: make(map[uint64]*Group)}
paths := []string{
"testdata/dir/rules0-good.rules",
"testdata/dir/rules1-good.rules",
"testdata/rules0-good.rules",
}
const n = 500
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
rnd := rand.Intn(len(paths))
path := []string{paths[rnd]}
err := m.update(context.Background(), path, true, false)
if err != nil {
t.Errorf("update error: %s", err)
}
}()
}
wg.Wait()
}
// TestManagerUpdate tests sequential configuration
// updates.
func TestManagerUpdate(t *testing.T) {
testCases := []struct {
name string
initPath string
updatePath string
want []*Group
}{
{
name: "update good rules",
initPath: "testdata/rules0-good.rules",
updatePath: "testdata/dir/rules1-good.rules",
want: []*Group{
{
File: "testdata/dir/rules1-good.rules",
Name: "duplicatedGroupDiffFiles",
Rules: []*Rule{newTestRule("VMRows", time.Second*10)},
},
},
},
{
name: "update good rules from 1 to 2 groups",
initPath: "testdata/dir/rules1-good.rules",
updatePath: "testdata/rules0-good.rules",
want: []*Group{
{
File: "testdata/rules0-good.rules",
Name: "groupGorSingleAlert", Rules: []*Rule{
newTestRule("VMRows", time.Second*10),
}},
{
File: "testdata/rules0-good.rules",
Name: "TestGroup", Rules: []*Rule{
newTestRule("Conns", time.Duration(0)),
newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)),
}},
},
},
{
name: "update with one bad rule file",
initPath: "testdata/rules0-good.rules",
updatePath: "testdata/dir/rules2-bad.rules",
want: []*Group{
{
File: "testdata/rules0-good.rules",
Name: "groupGorSingleAlert", Rules: []*Rule{
newTestRule("VMRows", time.Second*10),
}},
{
File: "testdata/rules0-good.rules",
Name: "TestGroup", Rules: []*Rule{
newTestRule("Conns", time.Duration(0)),
newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)),
}},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
m := &manager{groups: make(map[uint64]*Group)}
path := []string{tc.initPath}
if err := m.update(ctx, path, true, false); err != nil {
t.Fatalf("failed to complete initial rules update: %s", err)
}
path = []string{tc.updatePath}
_ = m.update(ctx, path, true, false)
if len(tc.want) != len(m.groups) {
t.Fatalf("\nwant number of groups: %d;\ngot: %d ", len(tc.want), len(m.groups))
}
for _, wantG := range tc.want {
gotG, ok := m.groups[wantG.ID()]
if !ok {
t.Fatalf("expected to have group %q", wantG.Name)
}
compareGroups(t, gotG, wantG)
}
cancel()
m.close()
})
}
}
func compareGroups(t *testing.T, a, b *Group) {
t.Helper()
if len(a.Rules) != len(b.Rules) {
t.Fatalf("expected group %s to have %d rules; got: %d",
a.Name, len(a.Rules), len(b.Rules))
}
for i, r := range a.Rules {
got, want := r, b.Rules[i]
if got.Name != want.Name {
t.Fatalf("expected to have rule %q; got %q", want.Name, got.Name)
}
}
}

View File

@ -12,7 +12,7 @@ import (
// Alert the triggered alert
// TODO: Looks like alert name isn't unique
type Alert struct {
Group string
GroupID uint64
Name string
Labels map[string]string
Annotations map[string]string

View File

@ -2,6 +2,7 @@ package notifier
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
@ -17,13 +18,21 @@ type AlertManager struct {
}
// Send an alert or resolve message
func (am *AlertManager) Send(alerts []Alert) error {
func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error {
b := &bytes.Buffer{}
writeamRequest(b, alerts, am.argFunc)
resp, err := am.client.Post(am.alertURL, "application/json", b)
req, err := http.NewRequest("POST", am.alertURL, b)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx)
resp, err := am.client.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
@ -37,7 +46,7 @@ func (am *AlertManager) Send(alerts []Alert) error {
}
// AlertURLGenerator returns URL to single alert by given name
type AlertURLGenerator func(group, id string) string
type AlertURLGenerator func(group, alert string) string
const alertManagerPath = "/api/v2/alerts"

View File

@ -9,7 +9,7 @@
{% for i, alert := range alerts %}
{
"startsAt":{%q= alert.Start.Format(time.RFC3339Nano) %},
"generatorURL": {%q= generatorURL(alert.Group, strconv.FormatUint(alert.ID, 10)) %},
"generatorURL": {%q= generatorURL(strconv.FormatUint(alert.GroupID, 10), strconv.FormatUint(alert.ID, 10)) %},
{% if !alert.End.IsZero() %}
"endsAt":{%q= alert.End.Format(time.RFC3339Nano) %},
{% endif %}

View File

@ -1,131 +1,131 @@
// Code generated by qtc from "alertmanager_request.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmalert/notifier/alertmanager_request.qtpl:1
//line notifier/alertmanager_request.qtpl:1
package notifier
//line app/vmalert/notifier/alertmanager_request.qtpl:1
//line notifier/alertmanager_request.qtpl:1
import (
"strconv"
"time"
)
//line app/vmalert/notifier/alertmanager_request.qtpl:7
//line notifier/alertmanager_request.qtpl:7
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmalert/notifier/alertmanager_request.qtpl:7
//line notifier/alertmanager_request.qtpl:7
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmalert/notifier/alertmanager_request.qtpl:7
//line notifier/alertmanager_request.qtpl:7
func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
//line app/vmalert/notifier/alertmanager_request.qtpl:7
//line notifier/alertmanager_request.qtpl:7
qw422016.N().S(`[`)
//line app/vmalert/notifier/alertmanager_request.qtpl:9
//line notifier/alertmanager_request.qtpl:9
for i, alert := range alerts {
//line app/vmalert/notifier/alertmanager_request.qtpl:9
//line notifier/alertmanager_request.qtpl:9
qw422016.N().S(`{"startsAt":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:11
//line notifier/alertmanager_request.qtpl:11
qw422016.N().Q(alert.Start.Format(time.RFC3339Nano))
//line app/vmalert/notifier/alertmanager_request.qtpl:11
//line notifier/alertmanager_request.qtpl:11
qw422016.N().S(`,"generatorURL":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:12
qw422016.N().Q(generatorURL(alert.Group, strconv.FormatUint(alert.ID, 10)))
//line app/vmalert/notifier/alertmanager_request.qtpl:12
//line notifier/alertmanager_request.qtpl:12
qw422016.N().Q(generatorURL(strconv.FormatUint(alert.GroupID, 10), strconv.FormatUint(alert.ID, 10)))
//line notifier/alertmanager_request.qtpl:12
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:13
//line notifier/alertmanager_request.qtpl:13
if !alert.End.IsZero() {
//line app/vmalert/notifier/alertmanager_request.qtpl:13
//line notifier/alertmanager_request.qtpl:13
qw422016.N().S(`"endsAt":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:14
//line notifier/alertmanager_request.qtpl:14
qw422016.N().Q(alert.End.Format(time.RFC3339Nano))
//line app/vmalert/notifier/alertmanager_request.qtpl:14
//line notifier/alertmanager_request.qtpl:14
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:15
//line notifier/alertmanager_request.qtpl:15
}
//line app/vmalert/notifier/alertmanager_request.qtpl:15
//line notifier/alertmanager_request.qtpl:15
qw422016.N().S(`"labels": {"alertname":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:17
//line notifier/alertmanager_request.qtpl:17
qw422016.N().Q(alert.Name)
//line app/vmalert/notifier/alertmanager_request.qtpl:18
//line notifier/alertmanager_request.qtpl:18
for k, v := range alert.Labels {
//line app/vmalert/notifier/alertmanager_request.qtpl:18
//line notifier/alertmanager_request.qtpl:18
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:19
//line notifier/alertmanager_request.qtpl:19
qw422016.N().Q(k)
//line app/vmalert/notifier/alertmanager_request.qtpl:19
//line notifier/alertmanager_request.qtpl:19
qw422016.N().S(`:`)
//line app/vmalert/notifier/alertmanager_request.qtpl:19
//line notifier/alertmanager_request.qtpl:19
qw422016.N().Q(v)
//line app/vmalert/notifier/alertmanager_request.qtpl:20
//line notifier/alertmanager_request.qtpl:20
}
//line app/vmalert/notifier/alertmanager_request.qtpl:20
//line notifier/alertmanager_request.qtpl:20
qw422016.N().S(`},"annotations": {`)
//line app/vmalert/notifier/alertmanager_request.qtpl:23
//line notifier/alertmanager_request.qtpl:23
c := len(alert.Annotations)
//line app/vmalert/notifier/alertmanager_request.qtpl:24
//line notifier/alertmanager_request.qtpl:24
for k, v := range alert.Annotations {
//line app/vmalert/notifier/alertmanager_request.qtpl:25
//line notifier/alertmanager_request.qtpl:25
c = c - 1
//line app/vmalert/notifier/alertmanager_request.qtpl:26
//line notifier/alertmanager_request.qtpl:26
qw422016.N().Q(k)
//line app/vmalert/notifier/alertmanager_request.qtpl:26
//line notifier/alertmanager_request.qtpl:26
qw422016.N().S(`:`)
//line app/vmalert/notifier/alertmanager_request.qtpl:26
//line notifier/alertmanager_request.qtpl:26
qw422016.N().Q(v)
//line app/vmalert/notifier/alertmanager_request.qtpl:26
//line notifier/alertmanager_request.qtpl:26
if c > 0 {
//line app/vmalert/notifier/alertmanager_request.qtpl:26
//line notifier/alertmanager_request.qtpl:26
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:26
//line notifier/alertmanager_request.qtpl:26
}
//line app/vmalert/notifier/alertmanager_request.qtpl:27
//line notifier/alertmanager_request.qtpl:27
}
//line app/vmalert/notifier/alertmanager_request.qtpl:27
//line notifier/alertmanager_request.qtpl:27
qw422016.N().S(`}}`)
//line app/vmalert/notifier/alertmanager_request.qtpl:30
//line notifier/alertmanager_request.qtpl:30
if i != len(alerts)-1 {
//line app/vmalert/notifier/alertmanager_request.qtpl:30
//line notifier/alertmanager_request.qtpl:30
qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:30
//line notifier/alertmanager_request.qtpl:30
}
//line app/vmalert/notifier/alertmanager_request.qtpl:31
//line notifier/alertmanager_request.qtpl:31
}
//line app/vmalert/notifier/alertmanager_request.qtpl:31
//line notifier/alertmanager_request.qtpl:31
qw422016.N().S(`]`)
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
}
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
streamamRequest(qw422016, alerts, generatorURL)
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
}
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
func amRequest(alerts []Alert, generatorURL func(string, string) string) string {
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
writeamRequest(qb422016, alerts, generatorURL)
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
qs422016 := string(qb422016.B)
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
return qs422016
//line app/vmalert/notifier/alertmanager_request.qtpl:33
//line notifier/alertmanager_request.qtpl:33
}

View File

@ -1,6 +1,7 @@
package notifier
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
@ -40,8 +41,8 @@ func TestAlertManager_Send(t *testing.T) {
if len(a) != 1 {
t.Errorf("expected 1 alert in array got %d", len(a))
}
if a[0].GeneratorURL != "group0" {
t.Errorf("exptected alert0 as generatorURL got %s", a[0].GeneratorURL)
if a[0].GeneratorURL != "0/0" {
t.Errorf("exptected 0/0 as generatorURL got %s", a[0].GeneratorURL)
}
if a[0].Labels["alertname"] != "alert0" {
t.Errorf("exptected alert0 as alert name got %s", a[0].Labels["alertname"])
@ -57,16 +58,16 @@ func TestAlertManager_Send(t *testing.T) {
srv := httptest.NewServer(mux)
defer srv.Close()
am := NewAlertManager(srv.URL, func(group, name string) string {
return group + name
return group + "/" + name
}, srv.Client())
if err := am.Send([]Alert{{}, {}}); err == nil {
if err := am.Send(context.Background(), []Alert{{}, {}}); err == nil {
t.Error("expected connection error got nil")
}
if err := am.Send([]Alert{}); err == nil {
if err := am.Send(context.Background(), []Alert{}); err == nil {
t.Error("expected wrong http code error got nil")
}
if err := am.Send([]Alert{{
Group: "group",
if err := am.Send(context.Background(), []Alert{{
GroupID: 0,
Name: "alert0",
Start: time.Now().UTC(),
End: time.Now().UTC(),

View File

@ -1,6 +1,8 @@
package notifier
import "context"
// Notifier is common interface for alert manager provider
type Notifier interface {
Send(alerts []Alert) error
Send(ctx context.Context, alerts []Alert) error
}

View File

@ -17,51 +17,6 @@ import (
"github.com/VictoriaMetrics/metricsql"
)
// Group grouping array of alert
type Group struct {
Name string
Rules []*Rule
}
// Restore restores alerts state for all group rules with For > 0
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
for _, rule := range g.Rules {
if rule.For == 0 {
return nil
}
if err := rule.Restore(ctx, q, lookback); err != nil {
return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err)
}
}
return nil
}
// Update group
func (g *Group) Update(newGroup Group) *Group {
//check if old rule exists at new rules
for _, newRule := range newGroup.Rules {
for _, oldRule := range g.Rules {
if newRule.Name == oldRule.Name {
//is lock nessesary?
oldRule.mu.Lock()
//we copy only rules related values
//it`s safe to add additional fields to rule
//struct
oldRule.Annotations = newRule.Annotations
oldRule.Labels = newRule.Labels
oldRule.For = newRule.For
oldRule.Expr = newRule.Expr
oldRule.group = newRule.group
newRule = oldRule
oldRule.mu.Unlock()
}
}
}
//swap rules
g.Rules = newGroup.Rules
return g
}
// Rule is basic alert entity
type Rule struct {
Name string `yaml:"alert"`
@ -84,6 +39,10 @@ type Rule struct {
lastExecError error
}
func (r *Rule) id() string {
return r.Name
}
// Validate validates rule
func (r *Rule) Validate() error {
if r.Name == "" {
@ -124,8 +83,16 @@ func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
h := hash(m)
updated[h] = struct{}{}
if a, ok := r.alerts[h]; ok {
// update Value field with latest value
a.Value = m.Value
if a.Value != m.Value {
// update Value field with latest value
a.Value = m.Value
// and re-exec template since Value can be used
// in templates
err = r.template(a)
if err != nil {
return err
}
}
continue
}
a, err := r.newAlert(m)
@ -180,15 +147,13 @@ func hash(m datasource.Metric) uint64 {
func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) {
a := &notifier.Alert{
Group: r.group.Name,
Name: r.Name,
Labels: map[string]string{},
Value: m.Value,
Start: time.Now(),
GroupID: r.group.ID(),
Name: r.Name,
Labels: map[string]string{},
Value: m.Value,
Start: time.Now(),
// TODO: support End time
}
// 1. use data labels
for _, l := range m.Labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
@ -196,28 +161,31 @@ func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) {
}
a.Labels[l.Name] = l.Value
}
return a, r.template(a)
}
// 2. template rule labels with data labels
func (r *Rule) template(a *notifier.Alert) error {
// 1. template rule labels with data labels
rLabels, err := a.ExecTemplate(r.Labels)
if err != nil {
return a, err
return err
}
// 3. merge data labels and rule labels
// 2. merge data labels and rule labels
// metric labels may be overridden by
// rule labels
for k, v := range rLabels {
a.Labels[k] = v
}
// 4. template merged labels
// 3. template merged labels
a.Labels, err = a.ExecTemplate(a.Labels)
if err != nil {
return a, err
return err
}
a.Annotations, err = a.ExecTemplate(r.Annotations)
return a, err
return err
}
// AlertAPI generates APIAlert object from alert by its id(hash)
@ -244,10 +212,11 @@ func (r *Rule) AlertsAPI() []*APIAlert {
func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert {
return &APIAlert{
// encode as string to avoid rounding
ID: fmt.Sprintf("%d", a.ID),
// encode as strings to avoid rounding
ID: fmt.Sprintf("%d", a.ID),
GroupID: fmt.Sprintf("%d", a.GroupID),
Name: a.Name,
Group: a.Group,
Expression: r.Expr,
Labels: a.Labels,
Annotations: a.Annotations,
@ -360,7 +329,7 @@ func (r *Rule) Restore(ctx context.Context, q datasource.Querier, lookback time.
a.State = notifier.StatePending
a.Start = time.Unix(int64(m.Value), 0)
r.alerts[a.ID] = a
logger.Infof("alert %q.%q restored to state at %v", a.Group, a.Name, a.Start)
logger.Infof("alert %q(%d) restored to state at %v", a.Name, a.ID, a.Start)
}
return nil
}

View File

@ -2,7 +2,7 @@ package main
import (
"context"
"reflect"
"sync"
"testing"
"time"
@ -393,19 +393,28 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric {
}
type fakeQuerier struct {
sync.Mutex
metrics []datasource.Metric
}
func (fq *fakeQuerier) reset() {
fq.Lock()
fq.metrics = fq.metrics[:0]
fq.Unlock()
}
func (fq *fakeQuerier) add(metrics ...datasource.Metric) {
fq.Lock()
fq.metrics = append(fq.metrics, metrics...)
fq.Unlock()
}
func (fq fakeQuerier) Query(ctx context.Context, query string) ([]datasource.Metric, error) {
return fq.metrics, nil
func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) {
fq.Lock()
cpy := make([]datasource.Metric, len(fq.metrics))
copy(cpy, fq.metrics)
fq.Unlock()
return cpy, nil
}
func TestRule_Restore(t *testing.T) {
@ -534,86 +543,3 @@ func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) dat
m.Value = value
return m
}
func TestGroup_Update(t *testing.T) {
type fields struct {
Name string
Rules []*Rule
}
type args struct {
newGroup Group
}
tests := []struct {
name string
fields fields
args args
want *Group
}{
{
name: "update group with replace one value",
args: args{newGroup: Group{Name: "base-group", Rules: []*Rule{
{
Annotations: map[string]string{"different": "annotation"},
For: time.Second * 30,
},
}}},
fields: fields{
Name: "base-group",
Rules: []*Rule{
{
Annotations: map[string]string{"one": "annotations"},
},
},
},
want: &Group{
Name: "base-group",
Rules: []*Rule{
{Annotations: map[string]string{"different": "annotation"}, For: time.Second * 30},
},
},
},
{
name: "update group with change one value for rule",
args: args{newGroup: Group{Name: "base-group-2", Rules: []*Rule{
{
Annotations: map[string]string{"different": "annotation", "replace-value": "new-one"},
For: time.Second * 30,
Labels: map[string]string{"label-1": "value-1"},
Expr: "rate(vm) > 1",
},
}}},
fields: fields{
Name: "base-group-2",
Rules: []*Rule{
{
Annotations: map[string]string{"different": "annotation", "replace-value": "old-one"},
For: time.Second * 50,
Expr: "rate(vm) > 5",
},
},
},
want: &Group{
Name: "base-group-2",
Rules: []*Rule{
{
Annotations: map[string]string{"different": "annotation", "replace-value": "new-one"},
For: time.Second * 30,
Labels: map[string]string{"label-1": "value-1"},
Expr: "rate(vm) > 1",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := &Group{
Name: tt.fields.Name,
Rules: tt.fields.Rules,
}
if got := g.Update(tt.args.newGroup); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Update() = %v, want %v", got, tt.want)
}
})
}
}

11
app/vmalert/testdata/rules1-good.rules vendored Normal file
View File

@ -0,0 +1,11 @@
groups:
- name: groupTest
rules:
- alert: VMRows
for: 1ms
expr: vm_rows > 0
labels:
label: bar
host: "{{ $labels.instance }}"
annotations:
summary: "{{ $value }}"

View File

@ -1,27 +1,25 @@
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
)
// APIAlert has info for an alert.
// APIAlert represents an notifier.Alert state
// for WEB view
type APIAlert struct {
ID string `json:"id"`
Name string `json:"name"`
Group string `json:"group"`
GroupID string `json:"group_id"`
Expression string `json:"expression"`
State string `json:"state"`
Value string `json:"value"`
@ -31,82 +29,15 @@ type APIAlert struct {
}
type requestHandler struct {
groups []Group
mu sync.RWMutex
}
func (rh *requestHandler) runConfigUpdater(ctx context.Context, reloadChan <-chan os.Signal, groupUpdateStorage map[string]chan Group, w *watchdog, wg *sync.WaitGroup) {
logger.Infof("starting config updater")
defer wg.Done()
for {
select {
case <-reloadChan:
logger.Infof("get sighup signal, updating config")
configReloadTotal.Inc()
newRules, err := readRules()
if err != nil {
logger.Errorf("sighup, cannot read new rules: %v", err)
configReloadErrorTotal.Inc()
continue
}
rh.mu.Lock()
configReloadOkTotal.Inc()
//send new group to running watchers
for _, group := range newRules {
//update or start new group
if updateChan, ok := groupUpdateStorage[group.Name]; ok {
updateChan <- group
} else {
//its new group, we need to start it
updateChan := make(chan Group, 1)
groupUpdateStorage[group.Name] = updateChan
wg.Add(1)
go func(grp Group) {
w.run(ctx, grp, *evaluationInterval, updateChan)
wg.Done()
}(group)
//add new group to route handler
rh.groups = append(rh.groups, group)
}
}
//we have to check, if group is missing and remove it
for groupName, updateChan := range groupUpdateStorage {
var exist bool
for _, newGroup := range newRules {
if groupName == newGroup.Name {
exist = true
}
}
if !exist {
logger.Infof("group not exists in new rules, remove it, group: %s", groupName)
delete(groupUpdateStorage, groupName)
updateChan <- Group{Rules: []*Rule{}}
for i, group := range rh.groups {
if group.Name == groupName {
rh.groups[i] = rh.groups[len(rh.groups)-1]
rh.groups[len(rh.groups)-1] = Group{}
rh.groups = rh.groups[:len(rh.groups)-1]
}
}
}
}
rh.mu.Unlock()
logger.Infof("finished sync")
case <-ctx.Done():
logger.Infof("exiting config updater")
return
}
}
m *manager
}
var pathList = [][]string{
{"/api/v1/alerts", "list all active alerts"},
{"/api/v1/groupName/alertID/status", "get alert status by ID"},
{"/api/v1/groupID/alertID/status", "get alert status by ID"},
// /metrics is served by httpserver by default
{"/metrics", "list of application metrics"},
{"/-/reload", "reload configuration"},
}
func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
@ -126,7 +57,6 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
procutil.SelfSIGHUP()
w.WriteHeader(http.StatusOK)
return true
default:
// /api/v1/<groupName>/<alertID>/status
if strings.HasSuffix(r.URL.Path, "/status") {
@ -145,10 +75,10 @@ type listAlertsResponse struct {
}
func (rh *requestHandler) list() ([]byte, error) {
rh.mu.RLock()
defer rh.mu.RUnlock()
rh.m.groupsMu.RLock()
defer rh.m.groupsMu.RUnlock()
lr := listAlertsResponse{Status: "success"}
for _, g := range rh.groups {
for _, g := range rh.m.groups {
for _, r := range g.Rules {
lr.Data.Alerts = append(lr.Data.Alerts, r.AlertsAPI()...)
}
@ -170,8 +100,9 @@ func (rh *requestHandler) list() ([]byte, error) {
}
func (rh *requestHandler) alert(path string) ([]byte, error) {
rh.mu.RLock()
defer rh.mu.RUnlock()
rh.m.groupsMu.RLock()
defer rh.m.groupsMu.RUnlock()
parts := strings.SplitN(strings.TrimPrefix(path, "/api/v1/"), "/", 3)
if len(parts) != 3 {
return nil, &httpserver.ErrorWithStatusCode{
@ -179,29 +110,20 @@ func (rh *requestHandler) alert(path string) ([]byte, error) {
StatusCode: http.StatusBadRequest,
}
}
group := strings.TrimRight(parts[0], "/")
idStr := strings.TrimRight(parts[1], "/")
id, err := strconv.ParseUint(idStr, 10, 0)
groupID, err := uint64FromPath(parts[0])
if err != nil {
return nil, &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf(`cannot parse int from %q`, idStr),
StatusCode: http.StatusBadRequest,
}
return nil, badRequest(fmt.Errorf(`cannot parse groupID: %s`, err))
}
for _, g := range rh.groups {
if g.Name != group {
continue
}
for _, rule := range g.Rules {
if apiAlert := rule.AlertAPI(id); apiAlert != nil {
return json.Marshal(apiAlert)
}
}
alertID, err := uint64FromPath(parts[1])
if err != nil {
return nil, badRequest(fmt.Errorf(`cannot parse alertID: %s`, err))
}
return nil, &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf(`cannot find alert %s in %q`, idStr, group),
StatusCode: http.StatusNotFound,
resp, err := rh.m.AlertAPI(groupID, alertID)
if err != nil {
return nil, errResponse(err, http.StatusNotFound)
}
return json.Marshal(resp)
}
// responseHandler wrapper on http.ResponseWriter with sugar
@ -215,3 +137,19 @@ func (w responseHandler) handle(b []byte, err error) {
w.Header().Set("Content-Type", "application/json")
w.Write(b)
}
func uint64FromPath(path string) (uint64, error) {
s := strings.TrimRight(path, "/")
return strconv.ParseUint(s, 10, 0)
}
func badRequest(err error) *httpserver.ErrorWithStatusCode {
return errResponse(err, http.StatusBadRequest)
}
func errResponse(err error, sc int) *httpserver.ErrorWithStatusCode {
return &httpserver.ErrorWithStatusCode{
Err: err,
StatusCode: sc,
}
}

View File

@ -1,17 +1,13 @@
package main
import (
"context"
"encoding/json"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"net/http"
"net/http/httptest"
"os"
"reflect"
"sync"
"syscall"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
)
func TestHandler(t *testing.T) {
@ -21,13 +17,14 @@ func TestHandler(t *testing.T) {
0: {},
},
}
rh := &requestHandler{
groups: []Group{{
Name: "group",
Rules: []*Rule{rule},
}},
mu: sync.RWMutex{},
g := &Group{
Name: "group",
Rules: []*Rule{rule},
}
m := &manager{groups: make(map[uint64]*Group)}
m.groups[0] = g
rh := &requestHandler{m: m}
getResp := func(url string, to interface{}, code int) {
t.Helper()
resp, err := http.Get(url)
@ -57,114 +54,21 @@ func TestHandler(t *testing.T) {
t.Errorf("expected 1 alert got %d", length)
}
})
t.Run("/api/v1/group/0/status", func(t *testing.T) {
t.Run("/api/v1/0/0/status", func(t *testing.T) {
alert := &APIAlert{}
getResp(ts.URL+"/api/v1/group/0/status", alert, 200)
getResp(ts.URL+"/api/v1/0/0/status", alert, 200)
expAlert := rule.newAlertAPI(*rule.alerts[0])
if !reflect.DeepEqual(alert, expAlert) {
t.Errorf("expected %v is equal to %v", alert, expAlert)
}
})
t.Run("/api/v1/group/1/status", func(t *testing.T) {
getResp(ts.URL+"/api/v1/group/1/status", nil, 404)
t.Run("/api/v1/0/1/status", func(t *testing.T) {
getResp(ts.URL+"/api/v1/0/1/status", nil, 404)
})
t.Run("/api/v1/unknown-group/0/status", func(t *testing.T) {
getResp(ts.URL+"/api/v1/unknown-group/0/status", nil, 404)
t.Run("/api/v1/1/0/status", func(t *testing.T) {
getResp(ts.URL+"/api/v1/1/0/status", nil, 404)
})
t.Run("/", func(t *testing.T) {
getResp(ts.URL, nil, 200)
})
}
func Test_requestHandler_runConfigUpdater(t *testing.T) {
type fields struct {
groups []Group
}
type args struct {
updateChan chan os.Signal
w *watchdog
wg *sync.WaitGroup
initRulePath []string
updateRulePath string
}
tests := []struct {
name string
fields fields
args args
want []Group
}{
{
name: "update good rules",
args: args{
w: &watchdog{},
wg: &sync.WaitGroup{},
updateChan: make(chan os.Signal),
initRulePath: []string{"testdata/rules0-good.rules"},
updateRulePath: "testdata/dir/rules1-good.rules",
},
fields: fields{
groups: []Group{},
},
want: []Group{{Name: "duplicatedGroupDiffFiles", Rules: []*Rule{newTestRule("VMRows", time.Second*10)}}},
},
{
name: "update with one bad rule file",
args: args{
w: &watchdog{},
wg: &sync.WaitGroup{},
updateChan: make(chan os.Signal),
initRulePath: []string{"testdata/rules0-good.rules"},
updateRulePath: "testdata/dir/rules2-bad.rules",
},
fields: fields{
groups: []Group{},
},
want: []Group{
{
Name: "duplicatedGroupDiffFiles", Rules: []*Rule{
newTestRule("VMRows", time.Second*10),
}},
{
Name: "TestGroup", Rules: []*Rule{
newTestRule("Conns", time.Duration(0)),
newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)),
}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
grp, err := Parse(tt.args.initRulePath, *validateTemplates)
if err != nil {
t.Errorf("cannot setup test: %v", err)
cancel()
return
}
groupUpdateStorage := startInitGroups(ctx, tt.args.w, nil, grp, tt.args.wg)
rh := &requestHandler{
groups: grp,
mu: sync.RWMutex{},
}
tt.args.wg.Add(1)
go func() {
//possible side effect with global var modification
err = rulePath.Set(tt.args.updateRulePath)
if err != nil {
t.Errorf("cannot update rule")
panic(err)
}
//need some delay
time.Sleep(time.Millisecond * 300)
tt.args.updateChan <- syscall.SIGHUP
cancel()
}()
rh.runConfigUpdater(ctx, tt.args.updateChan, groupUpdateStorage, tt.args.w, tt.args.wg)
tt.args.wg.Wait()
if len(tt.want) != len(rh.groups) {
t.Errorf("want: %v,\ngot :%v ", tt.want, rh.groups)
}
})
}
}