VictoriaMetrics/app/vmalert/group.go
Roman Khavronenko c7f3e58032 vmalert: avoid sending resolves for pending alerts (#498)
Before the change we were sending notifications to notifier
if following conditions are met:
* alert is in Fire state
* alert is in Inactive state

We were sending Inactive notifications to resolve alert ASAP. 
Unfortunately, we were sending resolves for Pending alerts that become
Inactive, which is wrong.

In this change we delete alert from the active list if
it was Pending and become Inactive. In this way we now
have Inactive alerts only if they were in state Fire before.
See test change for example.
2020-05-19 11:55:00 +03:00

198 lines
5.2 KiB
Go

package main
import (
"context"
"fmt"
"hash/fnv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
// Group is an entity for grouping rules
type Group struct {
Name string
File string
Rules []*Rule
doneCh chan struct{}
finishedCh chan struct{}
// channel accepts new Group obj
// which supposed to update current group
updateCh chan Group
}
// ID return unique group ID that consists of
// rules file and group name
func (g *Group) ID() uint64 {
hash := fnv.New64a()
hash.Write([]byte(g.File))
hash.Write([]byte("\xff"))
hash.Write([]byte(g.Name))
return hash.Sum64()
}
// Restore restores alerts state for all group rules with For > 0
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
for _, rule := range g.Rules {
if rule.For == 0 {
return nil
}
if err := rule.Restore(ctx, q, lookback); err != nil {
return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err)
}
}
return nil
}
// updateWith updates existing group with
// passed group object.
// Not thread-safe.
func (g *Group) updateWith(newGroup Group) {
rulesRegistry := make(map[string]*Rule)
for _, nr := range newGroup.Rules {
rulesRegistry[nr.id()] = nr
}
for i, or := range g.Rules {
nr, ok := rulesRegistry[or.id()]
if !ok {
// old rule is not present in the new list
// so we mark it for removing
g.Rules[i] = nil
continue
}
// copy all significant fields.
// alerts state isn't copied since
// it should be updated in next 2 Execs
or.For = nr.For
or.Expr = nr.Expr
or.Labels = nr.Labels
or.Annotations = nr.Annotations
delete(rulesRegistry, nr.id())
}
var newRules []*Rule
for _, r := range g.Rules {
if r == nil {
// skip nil rules
continue
}
newRules = append(newRules, r)
}
// add the rest of rules from registry
for _, nr := range rulesRegistry {
newRules = append(newRules, nr)
}
g.Rules = newRules
}
var (
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
execTotal = metrics.NewCounter(`vmalert_execution_total`)
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
)
func (g *Group) close() {
if g.doneCh == nil {
return
}
close(g.doneCh)
<-g.finishedCh
}
func (g *Group) start(ctx context.Context, interval time.Duration,
querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
logger.Infof("group %q started", g.Name)
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("group %q: context cancelled", g.Name)
close(g.finishedCh)
return
case <-g.doneCh:
logger.Infof("group %q: received stop signal", g.Name)
close(g.finishedCh)
return
case ng := <-g.updateCh:
g.updateWith(ng)
case <-t.C:
iterationTotal.Inc()
iterationStart := time.Now()
for _, rule := range g.Rules {
execTotal.Inc()
execStart := time.Now()
err := rule.Exec(ctx, querier)
execDuration.UpdateDuration(execStart)
if err != nil {
execErrors.Inc()
logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule.Name, err)
continue
}
var alertsToSend []notifier.Alert
for _, a := range rule.alerts {
switch a.State {
case notifier.StateFiring:
// set End to execStart + 3 intervals
// so notifier can resolve it automatically if `vmalert`
// won't be able to send resolve for some reason
a.End = execStart.Add(3 * interval)
alertsToSend = append(alertsToSend, *a)
pushToRW(rw, rule, a, execStart)
case notifier.StatePending:
pushToRW(rw, rule, a, execStart)
case notifier.StateInactive:
// set End to execStart to notify
// that it was just resolved
a.End = execStart
alertsToSend = append(alertsToSend, *a)
}
}
if len(alertsToSend) == 0 {
continue
}
alertsSent.Add(len(alertsToSend))
if err := nr.Send(ctx, alertsToSend); err != nil {
alertsSendErrors.Inc()
logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule.Name, err)
}
}
iterationDuration.UpdateDuration(iterationStart)
}
}
}
func pushToRW(rw *remotewrite.Client, rule *Rule, a *notifier.Alert, timestamp time.Time) {
if rw == nil {
return
}
tss := rule.AlertToTimeSeries(a, timestamp)
remoteWriteSent.Add(len(tss))
for _, ts := range tss {
if err := rw.Push(ts); err != nil {
remoteWriteErrors.Inc()
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
}
}
}