VictoriaMetrics/app/vmalert/rule.go
Roman Khavronenko c7f3e58032 vmalert: avoid sending resolves for pending alerts (#498)
Before the change we were sending notifications to notifier
if following conditions are met:
* alert is in Fire state
* alert is in Inactive state

We were sending Inactive notifications to resolve alert ASAP. 
Unfortunately, we were sending resolves for Pending alerts that become
Inactive, which is wrong.

In this change we delete alert from the active list if
it was Pending and become Inactive. In this way we now
have Inactive alerts only if they were in state Fire before.
See test change for example.
2020-05-19 11:55:00 +03:00

340 lines
8.8 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"hash/fnv"
"sort"
"strconv"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metricsql"
)
// Rule is basic alert entity
type Rule struct {
Name string `yaml:"alert"`
Expr string `yaml:"expr"`
For time.Duration `yaml:"for"`
Labels map[string]string `yaml:"labels"`
Annotations map[string]string `yaml:"annotations"`
group Group
// guard status fields
mu sync.RWMutex
// stores list of active alerts
alerts map[uint64]*notifier.Alert
// stores last moment of time Exec was called
lastExecTime time.Time
// stores last error that happened in Exec func
// resets on every successful Exec
// may be used as Health state
lastExecError error
}
func (r *Rule) id() string {
return r.Name
}
// Validate validates rule
func (r *Rule) Validate() error {
if r.Name == "" {
return errors.New("rule name can not be empty")
}
if r.Expr == "" {
return fmt.Errorf("expression for rule %q can't be empty", r.Name)
}
if _, err := metricsql.Parse(r.Expr); err != nil {
return fmt.Errorf("invalid expression for rule %q: %w", r.Name, err)
}
return nil
}
// Exec executes Rule expression via the given Querier.
// Based on the Querier results Rule maintains notifier.Alerts
func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
qMetrics, err := q.Query(ctx, r.Expr)
r.mu.Lock()
defer r.mu.Unlock()
r.lastExecError = err
r.lastExecTime = time.Now()
if err != nil {
return fmt.Errorf("failed to execute query %q: %s", r.Expr, err)
}
for h, a := range r.alerts {
// cleanup inactive alerts from previous Exec
if a.State == notifier.StateInactive {
delete(r.alerts, h)
}
}
updated := make(map[uint64]struct{})
// update list of active alerts
for _, m := range qMetrics {
h := hash(m)
updated[h] = struct{}{}
if a, ok := r.alerts[h]; ok {
if a.Value != m.Value {
// update Value field with latest value
a.Value = m.Value
// and re-exec template since Value can be used
// in templates
err = r.template(a)
if err != nil {
return err
}
}
continue
}
a, err := r.newAlert(m)
if err != nil {
r.lastExecError = err
return fmt.Errorf("failed to create alert: %s", err)
}
a.ID = h
a.State = notifier.StatePending
r.alerts[h] = a
}
for h, a := range r.alerts {
// if alert wasn't updated in this iteration
// means it is resolved already
if _, ok := updated[h]; !ok {
if a.State == notifier.StatePending {
// alert was in Pending state - it is not
// active anymore
delete(r.alerts, h)
continue
}
a.State = notifier.StateInactive
continue
}
if a.State == notifier.StatePending && time.Since(a.Start) >= r.For {
a.State = notifier.StateFiring
alertsFired.Inc()
}
}
return nil
}
// TODO: consider hashing algorithm in VM
func hash(m datasource.Metric) uint64 {
hash := fnv.New64a()
labels := m.Labels
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
for _, l := range labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
hash.Write([]byte(l.Name))
hash.Write([]byte(l.Value))
hash.Write([]byte("\xff"))
}
return hash.Sum64()
}
func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) {
a := &notifier.Alert{
GroupID: r.group.ID(),
Name: r.Name,
Labels: map[string]string{},
Value: m.Value,
Start: time.Now(),
// TODO: support End time
}
for _, l := range m.Labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
a.Labels[l.Name] = l.Value
}
return a, r.template(a)
}
func (r *Rule) template(a *notifier.Alert) error {
// 1. template rule labels with data labels
rLabels, err := a.ExecTemplate(r.Labels)
if err != nil {
return err
}
// 2. merge data labels and rule labels
// metric labels may be overridden by
// rule labels
for k, v := range rLabels {
a.Labels[k] = v
}
// 3. template merged labels
a.Labels, err = a.ExecTemplate(a.Labels)
if err != nil {
return err
}
a.Annotations, err = a.ExecTemplate(r.Annotations)
return err
}
// AlertAPI generates APIAlert object from alert by its id(hash)
func (r *Rule) AlertAPI(id uint64) *APIAlert {
r.mu.RLock()
defer r.mu.RUnlock()
a, ok := r.alerts[id]
if !ok {
return nil
}
return r.newAlertAPI(*a)
}
// AlertsAPI generates list of APIAlert objects from existing alerts
func (r *Rule) AlertsAPI() []*APIAlert {
var alerts []*APIAlert
r.mu.RLock()
for _, a := range r.alerts {
alerts = append(alerts, r.newAlertAPI(*a))
}
r.mu.RUnlock()
return alerts
}
func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert {
return &APIAlert{
// encode as strings to avoid rounding
ID: fmt.Sprintf("%d", a.ID),
GroupID: fmt.Sprintf("%d", a.GroupID),
Name: a.Name,
Expression: r.Expr,
Labels: a.Labels,
Annotations: a.Annotations,
State: a.State.String(),
ActiveAt: a.Start,
Value: strconv.FormatFloat(a.Value, 'e', -1, 64),
}
}
const (
// AlertMetricName is the metric name for synthetic alert timeseries.
alertMetricName = "ALERTS"
// AlertForStateMetricName is the metric name for 'for' state of alert.
alertForStateMetricName = "ALERTS_FOR_STATE"
// AlertNameLabel is the label name indicating the name of an alert.
alertNameLabel = "alertname"
// AlertStateLabel is the label name indicating the state of an alert.
alertStateLabel = "alertstate"
)
// AlertToTimeSeries converts the given alert with the given timestamp to timeseries
func (r *Rule) AlertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries
tss = append(tss, alertToTimeSeries(r.Name, a, timestamp))
if r.For > 0 {
tss = append(tss, alertForToTimeSeries(r.Name, a, timestamp))
}
return tss
}
func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
labels := make(map[string]string)
for k, v := range a.Labels {
labels[k] = v
}
labels["__name__"] = alertMetricName
labels[alertNameLabel] = name
labels[alertStateLabel] = a.State.String()
return newTimeSeries(1, labels, timestamp)
}
// alertForToTimeSeries returns a timeseries that represents
// state of active alerts, where value is time when alert become active
func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
labels := make(map[string]string)
for k, v := range a.Labels {
labels[k] = v
}
labels["__name__"] = alertForStateMetricName
labels[alertNameLabel] = name
return newTimeSeries(float64(a.Start.Unix()), labels, timestamp)
}
func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries {
ts := prompbmarshal.TimeSeries{}
ts.Samples = append(ts.Samples, prompbmarshal.Sample{
Value: value,
Timestamp: timestamp.UnixNano() / 1e6,
})
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
ts.Labels = append(ts.Labels, prompbmarshal.Label{
Name: key,
Value: labels[key],
})
}
return ts
}
// Restore restores the state of active alerts basing on previously written timeseries.
// Restore restores only Start field. Field State will be always Pending and supposed
// to be updated on next Exec, as well as Value field.
func (r *Rule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
if q == nil {
return fmt.Errorf("querier is nil")
}
// Get the last datapoint in range via MetricsQL `last_over_time`.
// We don't use plain PromQL since Prometheus doesn't support
// remote write protocol which is used for state persistence in vmalert.
expr := fmt.Sprintf("last_over_time(%s{alertname=%q}[%ds])",
alertForStateMetricName, r.Name, int(lookback.Seconds()))
qMetrics, err := q.Query(ctx, expr)
if err != nil {
return err
}
for _, m := range qMetrics {
labels := m.Labels
m.Labels = make([]datasource.Label, 0)
// drop all extra labels, so hash key will
// be identical to timeseries received in Exec
for _, l := range labels {
if l.Name == alertNameLabel {
continue
}
// drop all overridden labels
if _, ok := r.Labels[l.Name]; ok {
continue
}
m.Labels = append(m.Labels, l)
}
a, err := r.newAlert(m)
if err != nil {
return fmt.Errorf("failed to create alert: %s", err)
}
a.ID = hash(m)
a.State = notifier.StatePending
a.Start = time.Unix(int64(m.Value), 0)
r.alerts[a.ID] = a
logger.Infof("alert %q(%d) restored to state at %v", a.Name, a.ID, a.Start)
}
return nil
}