2020-05-10 18:58:17 +02:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2023-03-01 15:48:20 +01:00
|
|
|
"errors"
|
2020-05-10 18:58:17 +02:00
|
|
|
"fmt"
|
|
|
|
"hash/fnv"
|
2021-12-02 13:45:08 +01:00
|
|
|
"net/url"
|
2022-03-29 15:09:07 +02:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
2020-06-01 12:46:37 +02:00
|
|
|
"sync"
|
2020-05-10 18:58:17 +02:00
|
|
|
"time"
|
|
|
|
|
2022-06-09 08:21:30 +02:00
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
|
|
|
2020-06-01 12:46:37 +02:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
2020-05-10 18:58:17 +02:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
2020-06-29 21:21:03 +02:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
2022-03-29 15:09:07 +02:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
2020-05-10 18:58:17 +02:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
2022-03-29 15:09:07 +02:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
2020-05-10 18:58:17 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
// Group is an entity for grouping rules
|
|
|
|
type Group struct {
|
2022-03-15 12:54:53 +01:00
|
|
|
mu sync.RWMutex
|
|
|
|
Name string
|
|
|
|
File string
|
|
|
|
Rules []Rule
|
2022-07-22 10:44:55 +02:00
|
|
|
Type config.Type
|
2022-03-15 12:54:53 +01:00
|
|
|
Interval time.Duration
|
2023-09-06 16:29:59 +02:00
|
|
|
EvalOffset *time.Duration
|
2022-06-09 08:21:30 +02:00
|
|
|
Limit int
|
2022-03-15 12:54:53 +01:00
|
|
|
Concurrency int
|
|
|
|
Checksum string
|
|
|
|
LastEvaluation time.Time
|
2021-08-31 13:52:34 +02:00
|
|
|
|
2023-04-27 12:17:26 +02:00
|
|
|
Labels map[string]string
|
|
|
|
Params url.Values
|
|
|
|
Headers map[string]string
|
|
|
|
NotifierHeaders map[string]string
|
2020-05-10 18:58:17 +02:00
|
|
|
|
2020-05-17 16:12:09 +02:00
|
|
|
doneCh chan struct{}
|
|
|
|
finishedCh chan struct{}
|
|
|
|
// channel accepts new Group obj
|
|
|
|
// which supposed to update current group
|
2020-06-01 12:46:37 +02:00
|
|
|
updateCh chan *Group
|
2023-03-01 15:48:20 +01:00
|
|
|
// evalCancel stores the cancel fn for interrupting
|
|
|
|
// rules evaluation. Used on groups update() and close().
|
|
|
|
evalCancel context.CancelFunc
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
|
|
|
|
metrics *groupMetrics
|
|
|
|
}
|
|
|
|
|
|
|
|
type groupMetrics struct {
|
2022-02-02 13:11:41 +01:00
|
|
|
iterationTotal *utils.Counter
|
|
|
|
iterationDuration *utils.Summary
|
2022-03-29 15:09:07 +02:00
|
|
|
iterationMissed *utils.Counter
|
2022-05-20 17:31:16 +02:00
|
|
|
iterationInterval *utils.Gauge
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
}
|
|
|
|
|
2022-05-20 17:31:16 +02:00
|
|
|
func newGroupMetrics(g *Group) *groupMetrics {
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
m := &groupMetrics{}
|
2022-05-20 17:31:16 +02:00
|
|
|
labels := fmt.Sprintf(`group=%q, file=%q`, g.Name, g.File)
|
2022-02-02 13:11:41 +01:00
|
|
|
m.iterationTotal = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
|
|
|
|
m.iterationDuration = utils.GetOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
|
2022-03-29 15:09:07 +02:00
|
|
|
m.iterationMissed = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels))
|
2022-05-20 17:31:16 +02:00
|
|
|
m.iterationInterval = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_iteration_interval_seconds{%s}`, labels), func() float64 {
|
|
|
|
g.mu.RLock()
|
|
|
|
i := g.Interval.Seconds()
|
|
|
|
g.mu.RUnlock()
|
|
|
|
return i
|
|
|
|
})
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
return m
|
2020-06-01 12:46:37 +02:00
|
|
|
}
|
|
|
|
|
2021-08-31 13:52:34 +02:00
|
|
|
// merges group rule labels into result map
|
|
|
|
// set2 has priority over set1.
|
|
|
|
func mergeLabels(groupName, ruleName string, set1, set2 map[string]string) map[string]string {
|
|
|
|
r := map[string]string{}
|
|
|
|
for k, v := range set1 {
|
|
|
|
r[k] = v
|
|
|
|
}
|
|
|
|
for k, v := range set2 {
|
|
|
|
if prevV, ok := r[k]; ok {
|
|
|
|
logger.Infof("label %q=%q for rule %q.%q overwritten with external label %q=%q",
|
|
|
|
k, prevV, groupName, ruleName, k, v)
|
|
|
|
}
|
|
|
|
r[k] = v
|
|
|
|
}
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
2021-04-28 22:41:15 +02:00
|
|
|
func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval time.Duration, labels map[string]string) *Group {
|
2020-06-01 12:46:37 +02:00
|
|
|
g := &Group{
|
2023-04-27 12:17:26 +02:00
|
|
|
Type: cfg.Type,
|
|
|
|
Name: cfg.Name,
|
|
|
|
File: cfg.File,
|
|
|
|
Interval: cfg.Interval.Duration(),
|
|
|
|
Limit: cfg.Limit,
|
|
|
|
Concurrency: cfg.Concurrency,
|
|
|
|
Checksum: cfg.Checksum,
|
|
|
|
Params: cfg.Params,
|
|
|
|
Headers: make(map[string]string),
|
|
|
|
NotifierHeaders: make(map[string]string),
|
|
|
|
Labels: cfg.Labels,
|
2021-05-22 23:26:01 +02:00
|
|
|
|
|
|
|
doneCh: make(chan struct{}),
|
|
|
|
finishedCh: make(chan struct{}),
|
|
|
|
updateCh: make(chan *Group),
|
2020-06-01 12:46:37 +02:00
|
|
|
}
|
|
|
|
if g.Interval == 0 {
|
|
|
|
g.Interval = defaultInterval
|
|
|
|
}
|
2020-06-09 14:21:20 +02:00
|
|
|
if g.Concurrency < 1 {
|
|
|
|
g.Concurrency = 1
|
|
|
|
}
|
2023-09-06 16:29:59 +02:00
|
|
|
if cfg.EvalOffset != nil {
|
|
|
|
g.EvalOffset = &cfg.EvalOffset.D
|
|
|
|
}
|
2022-07-22 10:44:55 +02:00
|
|
|
for _, h := range cfg.Headers {
|
|
|
|
g.Headers[h.Key] = h.Value
|
|
|
|
}
|
2023-04-27 12:17:26 +02:00
|
|
|
for _, h := range cfg.NotifierHeaders {
|
|
|
|
g.NotifierHeaders[h.Key] = h.Value
|
|
|
|
}
|
2022-05-20 17:31:16 +02:00
|
|
|
g.metrics = newGroupMetrics(g)
|
2020-06-01 12:46:37 +02:00
|
|
|
rules := make([]Rule, len(cfg.Rules))
|
|
|
|
for i, r := range cfg.Rules {
|
2021-08-31 13:52:34 +02:00
|
|
|
var extraLabels map[string]string
|
|
|
|
// apply external labels
|
|
|
|
if len(labels) > 0 {
|
|
|
|
extraLabels = labels
|
|
|
|
}
|
|
|
|
// apply group labels, it has priority on external labels
|
|
|
|
if len(cfg.Labels) > 0 {
|
|
|
|
extraLabels = mergeLabels(g.Name, r.Name(), extraLabels, g.Labels)
|
2020-07-28 13:20:31 +02:00
|
|
|
}
|
2021-08-31 13:52:34 +02:00
|
|
|
// apply rules labels, it has priority on other labels
|
|
|
|
if len(extraLabels) > 0 {
|
|
|
|
r.Labels = mergeLabels(g.Name, r.Name(), extraLabels, r.Labels)
|
|
|
|
}
|
|
|
|
|
2021-04-28 22:41:15 +02:00
|
|
|
rules[i] = g.newRule(qb, r)
|
2020-06-01 12:46:37 +02:00
|
|
|
}
|
|
|
|
g.Rules = rules
|
|
|
|
return g
|
|
|
|
}
|
|
|
|
|
2021-04-28 22:41:15 +02:00
|
|
|
func (g *Group) newRule(qb datasource.QuerierBuilder, rule config.Rule) Rule {
|
2020-06-01 12:46:37 +02:00
|
|
|
if rule.Alert != "" {
|
2021-04-28 22:41:15 +02:00
|
|
|
return newAlertingRule(qb, g, rule)
|
2020-06-01 12:46:37 +02:00
|
|
|
}
|
2021-04-28 22:41:15 +02:00
|
|
|
return newRecordingRule(qb, g, rule)
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// ID return unique group ID that consists of
|
2022-02-02 13:11:41 +01:00
|
|
|
// rules file and group Name
|
2020-05-17 16:12:09 +02:00
|
|
|
func (g *Group) ID() uint64 {
|
2021-10-19 15:44:13 +02:00
|
|
|
g.mu.RLock()
|
|
|
|
defer g.mu.RUnlock()
|
|
|
|
|
2020-05-10 18:58:17 +02:00
|
|
|
hash := fnv.New64a()
|
|
|
|
hash.Write([]byte(g.File))
|
|
|
|
hash.Write([]byte("\xff"))
|
|
|
|
hash.Write([]byte(g.Name))
|
2021-02-01 14:02:44 +01:00
|
|
|
hash.Write([]byte(g.Type.Get()))
|
2023-09-06 16:29:59 +02:00
|
|
|
hash.Write([]byte(g.Interval.String()))
|
|
|
|
if g.EvalOffset != nil {
|
|
|
|
hash.Write([]byte(g.EvalOffset.String()))
|
|
|
|
}
|
2020-05-10 18:58:17 +02:00
|
|
|
return hash.Sum64()
|
|
|
|
}
|
|
|
|
|
2020-06-01 12:46:37 +02:00
|
|
|
// Restore restores alerts state for group rules
|
2023-02-04 04:46:13 +01:00
|
|
|
func (g *Group) Restore(ctx context.Context, qb datasource.QuerierBuilder, ts time.Time, lookback time.Duration) error {
|
2020-05-10 18:58:17 +02:00
|
|
|
for _, rule := range g.Rules {
|
2023-02-04 04:46:13 +01:00
|
|
|
ar, ok := rule.(*AlertingRule)
|
2020-06-01 12:46:37 +02:00
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
2023-02-04 04:46:13 +01:00
|
|
|
if ar.For < 1 {
|
2020-06-01 12:46:37 +02:00
|
|
|
continue
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
2022-12-01 09:27:39 +01:00
|
|
|
q := qb.BuildWithParams(datasource.QuerierParams{
|
2023-02-04 04:46:13 +01:00
|
|
|
DataSourceType: g.Type.String(),
|
|
|
|
EvaluationInterval: g.Interval,
|
|
|
|
QueryParams: g.Params,
|
|
|
|
Headers: g.Headers,
|
|
|
|
Debug: ar.Debug,
|
2022-12-01 09:27:39 +01:00
|
|
|
})
|
2023-02-04 04:46:13 +01:00
|
|
|
if err := ar.Restore(ctx, q, ts, lookback); err != nil {
|
2020-06-30 21:58:18 +02:00
|
|
|
return fmt.Errorf("error while restoring rule %q: %w", rule, err)
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// updateWith updates existing group with
|
2020-06-01 12:46:37 +02:00
|
|
|
// passed group object. This function ignores group
|
|
|
|
// evaluation interval change. It supposed to be updated
|
|
|
|
// in group.start function.
|
2020-05-17 16:12:09 +02:00
|
|
|
// Not thread-safe.
|
2020-06-01 12:46:37 +02:00
|
|
|
func (g *Group) updateWith(newGroup *Group) error {
|
|
|
|
rulesRegistry := make(map[uint64]Rule)
|
2020-05-10 18:58:17 +02:00
|
|
|
for _, nr := range newGroup.Rules {
|
2020-06-01 12:46:37 +02:00
|
|
|
rulesRegistry[nr.ID()] = nr
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
for i, or := range g.Rules {
|
2020-06-01 12:46:37 +02:00
|
|
|
nr, ok := rulesRegistry[or.ID()]
|
2020-05-10 18:58:17 +02:00
|
|
|
if !ok {
|
|
|
|
// old rule is not present in the new list
|
2020-05-15 08:55:22 +02:00
|
|
|
// so we mark it for removing
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
g.Rules[i].Close()
|
2020-05-15 08:55:22 +02:00
|
|
|
g.Rules[i] = nil
|
2020-05-10 18:58:17 +02:00
|
|
|
continue
|
|
|
|
}
|
2020-06-01 12:46:37 +02:00
|
|
|
if err := or.UpdateWith(nr); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
delete(rulesRegistry, nr.ID())
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
|
|
|
|
2020-06-01 12:46:37 +02:00
|
|
|
var newRules []Rule
|
2020-05-15 08:55:22 +02:00
|
|
|
for _, r := range g.Rules {
|
|
|
|
if r == nil {
|
|
|
|
// skip nil rules
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newRules = append(newRules, r)
|
|
|
|
}
|
|
|
|
// add the rest of rules from registry
|
2020-05-10 18:58:17 +02:00
|
|
|
for _, nr := range rulesRegistry {
|
2020-05-15 08:55:22 +02:00
|
|
|
newRules = append(newRules, nr)
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
2021-09-23 16:55:59 +02:00
|
|
|
// note that g.Interval is not updated here
|
|
|
|
// so the value can be compared later in
|
|
|
|
// group.Start function
|
2021-02-01 14:02:44 +01:00
|
|
|
g.Type = newGroup.Type
|
2020-06-09 14:21:20 +02:00
|
|
|
g.Concurrency = newGroup.Concurrency
|
2021-12-02 13:45:08 +01:00
|
|
|
g.Params = newGroup.Params
|
2022-07-21 15:59:55 +02:00
|
|
|
g.Headers = newGroup.Headers
|
2023-04-27 12:17:26 +02:00
|
|
|
g.NotifierHeaders = newGroup.NotifierHeaders
|
2021-08-31 13:52:34 +02:00
|
|
|
g.Labels = newGroup.Labels
|
2022-06-09 08:58:25 +02:00
|
|
|
g.Limit = newGroup.Limit
|
2020-09-11 21:14:30 +02:00
|
|
|
g.Checksum = newGroup.Checksum
|
2020-05-15 08:55:22 +02:00
|
|
|
g.Rules = newRules
|
2020-06-01 12:46:37 +02:00
|
|
|
return nil
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
|
|
|
|
2023-03-01 15:48:20 +01:00
|
|
|
// interruptEval interrupts in-flight rules evaluations
|
|
|
|
// within the group. It is expected that g.evalCancel
|
|
|
|
// will be repopulated after the call.
|
|
|
|
func (g *Group) interruptEval() {
|
|
|
|
g.mu.RLock()
|
|
|
|
defer g.mu.RUnlock()
|
|
|
|
|
|
|
|
if g.evalCancel != nil {
|
|
|
|
g.evalCancel()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-10 18:58:17 +02:00
|
|
|
func (g *Group) close() {
|
2020-05-17 16:12:09 +02:00
|
|
|
if g.doneCh == nil {
|
2020-05-10 18:58:17 +02:00
|
|
|
return
|
|
|
|
}
|
2020-05-17 16:12:09 +02:00
|
|
|
close(g.doneCh)
|
2023-03-01 15:48:20 +01:00
|
|
|
g.interruptEval()
|
2020-05-17 16:12:09 +02:00
|
|
|
<-g.finishedCh
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
|
2022-02-02 13:11:41 +01:00
|
|
|
g.metrics.iterationDuration.Unregister()
|
|
|
|
g.metrics.iterationTotal.Unregister()
|
2022-05-20 17:31:16 +02:00
|
|
|
g.metrics.iterationMissed.Unregister()
|
|
|
|
g.metrics.iterationInterval.Unregister()
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
for _, rule := range g.Rules {
|
|
|
|
rule.Close()
|
|
|
|
}
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
|
|
|
|
2020-09-03 00:00:55 +02:00
|
|
|
var skipRandSleepOnGroupStart bool
|
|
|
|
|
2023-07-28 10:42:02 +02:00
|
|
|
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client, rr datasource.QuerierBuilder) {
|
2020-06-09 14:21:20 +02:00
|
|
|
defer func() { close(g.finishedCh) }()
|
2020-09-03 00:00:55 +02:00
|
|
|
|
2023-09-06 16:29:59 +02:00
|
|
|
// sleep random duration to spread group rules evaluation
|
|
|
|
// over time in order to reduce load on datasource.
|
2023-03-06 14:04:43 +01:00
|
|
|
if !skipRandSleepOnGroupStart {
|
2023-09-06 16:29:59 +02:00
|
|
|
sleepBeforeStart := delayBeforeStart(time.Now(), g.ID(), g.Interval, g.EvalOffset)
|
|
|
|
g.infof("will start in %v", sleepBeforeStart)
|
|
|
|
|
|
|
|
sleepTimer := time.NewTimer(sleepBeforeStart)
|
2023-03-06 14:04:43 +01:00
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
sleepTimer.Stop()
|
|
|
|
return
|
|
|
|
case <-g.doneCh:
|
|
|
|
sleepTimer.Stop()
|
|
|
|
return
|
|
|
|
case <-sleepTimer.C:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-06 16:29:59 +02:00
|
|
|
evalTS := time.Now()
|
|
|
|
|
2022-03-29 15:09:07 +02:00
|
|
|
e := &executor{
|
|
|
|
rw: rw,
|
|
|
|
notifiers: nts,
|
2023-04-27 13:02:21 +02:00
|
|
|
notifierHeaders: g.NotifierHeaders,
|
2023-04-27 12:17:26 +02:00
|
|
|
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
|
|
}
|
2022-03-29 15:09:07 +02:00
|
|
|
|
2023-09-06 16:29:59 +02:00
|
|
|
g.infof("started")
|
2022-03-29 15:09:07 +02:00
|
|
|
|
2023-03-01 15:48:20 +01:00
|
|
|
eval := func(ctx context.Context, ts time.Time) {
|
2022-03-29 15:09:07 +02:00
|
|
|
g.metrics.iterationTotal.Inc()
|
|
|
|
|
|
|
|
start := time.Now()
|
|
|
|
|
|
|
|
if len(g.Rules) < 1 {
|
|
|
|
g.metrics.iterationDuration.UpdateDuration(start)
|
|
|
|
g.LastEvaluation = start
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
|
2022-06-09 08:21:30 +02:00
|
|
|
errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration, g.Limit)
|
2022-03-29 15:09:07 +02:00
|
|
|
for err := range errs {
|
|
|
|
if err != nil {
|
|
|
|
logger.Errorf("group %q: %s", g.Name, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
g.metrics.iterationDuration.UpdateDuration(start)
|
|
|
|
g.LastEvaluation = start
|
|
|
|
}
|
|
|
|
|
2023-03-01 15:48:20 +01:00
|
|
|
evalCtx, cancel := context.WithCancel(ctx)
|
|
|
|
g.mu.Lock()
|
|
|
|
g.evalCancel = cancel
|
|
|
|
g.mu.Unlock()
|
|
|
|
defer g.evalCancel()
|
|
|
|
|
|
|
|
eval(evalCtx, evalTS)
|
2022-03-29 15:09:07 +02:00
|
|
|
|
2020-06-01 12:46:37 +02:00
|
|
|
t := time.NewTicker(g.Interval)
|
2020-05-10 18:58:17 +02:00
|
|
|
defer t.Stop()
|
2023-02-04 04:46:13 +01:00
|
|
|
|
|
|
|
// restore the rules state after the first evaluation
|
|
|
|
// so only active alerts can be restored.
|
|
|
|
if rr != nil {
|
|
|
|
err := g.Restore(ctx, rr, evalTS, *remoteReadLookBack)
|
|
|
|
if err != nil {
|
|
|
|
logger.Errorf("error while restoring ruleState for group %q: %s", g.Name, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-10 18:58:17 +02:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
logger.Infof("group %q: context cancelled", g.Name)
|
|
|
|
return
|
2020-05-17 16:12:09 +02:00
|
|
|
case <-g.doneCh:
|
2020-05-10 18:58:17 +02:00
|
|
|
logger.Infof("group %q: received stop signal", g.Name)
|
|
|
|
return
|
2020-05-17 16:12:09 +02:00
|
|
|
case ng := <-g.updateCh:
|
2020-06-01 12:46:37 +02:00
|
|
|
g.mu.Lock()
|
2023-03-01 15:48:20 +01:00
|
|
|
|
|
|
|
// it is expected that g.evalCancel will be evoked
|
|
|
|
// somewhere else to unblock group from the rules evaluation.
|
|
|
|
// we recreate the evalCtx and g.evalCancel, so it can
|
|
|
|
// be called again.
|
|
|
|
evalCtx, cancel = context.WithCancel(ctx)
|
|
|
|
g.evalCancel = cancel
|
|
|
|
|
2020-06-01 12:46:37 +02:00
|
|
|
err := g.updateWith(ng)
|
|
|
|
if err != nil {
|
|
|
|
logger.Errorf("group %q: failed to update: %s", g.Name, err)
|
|
|
|
g.mu.Unlock()
|
|
|
|
continue
|
|
|
|
}
|
2022-05-13 10:04:49 +02:00
|
|
|
|
2023-09-06 16:29:59 +02:00
|
|
|
// ensure that staleness is tracked for existing rules only
|
2022-05-13 10:04:49 +02:00
|
|
|
e.purgeStaleSeries(g.Rules)
|
2023-04-27 13:02:21 +02:00
|
|
|
e.notifierHeaders = g.NotifierHeaders
|
2020-06-01 12:46:37 +02:00
|
|
|
g.mu.Unlock()
|
2023-09-06 16:29:59 +02:00
|
|
|
|
|
|
|
g.infof("re-started")
|
2020-05-10 18:58:17 +02:00
|
|
|
case <-t.C:
|
2022-03-29 15:09:07 +02:00
|
|
|
missed := (time.Since(evalTS) / g.Interval) - 1
|
2023-07-13 17:11:22 +02:00
|
|
|
if missed < 0 {
|
|
|
|
// missed can become < 0 due to irregular delays during evaluation
|
|
|
|
// which can result in time.Since(evalTS) < g.Interval
|
|
|
|
missed = 0
|
|
|
|
}
|
2022-03-29 15:09:07 +02:00
|
|
|
if missed > 0 {
|
|
|
|
g.metrics.iterationMissed.Inc()
|
2020-06-09 14:21:20 +02:00
|
|
|
}
|
2022-03-29 15:09:07 +02:00
|
|
|
evalTS = evalTS.Add((missed + 1) * g.Interval)
|
|
|
|
|
2023-03-01 15:48:20 +01:00
|
|
|
eval(evalCtx, evalTS)
|
2020-06-09 14:21:20 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-06-01 12:46:37 +02:00
|
|
|
|
2023-09-06 16:29:59 +02:00
|
|
|
// delayBeforeStart returns a duration on the interval between [ts..ts+interval].
|
|
|
|
// delayBeforeStart accounts for `offset`, so returned duration should be always
|
|
|
|
// bigger than the `offset`.
|
|
|
|
func delayBeforeStart(ts time.Time, key uint64, interval time.Duration, offset *time.Duration) time.Duration {
|
|
|
|
var randSleep time.Duration
|
|
|
|
randSleep = time.Duration(float64(interval) * (float64(key) / (1 << 64)))
|
|
|
|
sleepOffset := time.Duration(ts.UnixNano() % interval.Nanoseconds())
|
|
|
|
if randSleep < sleepOffset {
|
|
|
|
randSleep += interval
|
|
|
|
}
|
|
|
|
randSleep -= sleepOffset
|
|
|
|
// check if `ts` after randSleep is before `offset`,
|
|
|
|
// if it is, add extra eval_offset to randSleep.
|
|
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3409.
|
|
|
|
if offset != nil {
|
|
|
|
tmpEvalTS := ts.Add(randSleep)
|
|
|
|
if tmpEvalTS.Before(tmpEvalTS.Truncate(interval).Add(*offset)) {
|
|
|
|
randSleep += *offset
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return randSleep.Truncate(time.Second)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *Group) infof(format string, args ...interface{}) {
|
|
|
|
msg := fmt.Sprintf(format, args...)
|
|
|
|
logger.Infof("group %q %s; interval=%v; eval_offset=%v; concurrency=%d",
|
|
|
|
g.Name, msg, g.Interval, g.EvalOffset, g.Concurrency)
|
|
|
|
}
|
|
|
|
|
2022-03-16 16:26:33 +01:00
|
|
|
// getResolveDuration returns the duration after which firing alert
|
|
|
|
// can be considered as resolved.
|
2022-03-29 15:09:07 +02:00
|
|
|
func getResolveDuration(groupInterval, delta, maxDuration time.Duration) time.Duration {
|
2022-03-16 16:26:33 +01:00
|
|
|
if groupInterval > delta {
|
|
|
|
delta = groupInterval
|
2021-09-13 14:48:18 +02:00
|
|
|
}
|
2022-03-16 16:26:33 +01:00
|
|
|
resolveDuration := delta * 4
|
2022-03-29 15:09:07 +02:00
|
|
|
if maxDuration > 0 && resolveDuration > maxDuration {
|
|
|
|
resolveDuration = maxDuration
|
2022-03-16 16:26:33 +01:00
|
|
|
}
|
|
|
|
return resolveDuration
|
2021-09-13 14:48:18 +02:00
|
|
|
}
|
|
|
|
|
2020-06-09 14:21:20 +02:00
|
|
|
type executor struct {
|
2023-04-27 13:02:21 +02:00
|
|
|
notifiers func() []notifier.Notifier
|
|
|
|
notifierHeaders map[string]string
|
|
|
|
|
2023-07-28 10:42:02 +02:00
|
|
|
rw *remotewrite.Client
|
2022-03-29 15:09:07 +02:00
|
|
|
|
2022-03-30 12:37:27 +02:00
|
|
|
previouslySentSeriesToRWMu sync.Mutex
|
2022-03-29 15:09:07 +02:00
|
|
|
// previouslySentSeriesToRW stores series sent to RW on previous iteration
|
|
|
|
// map[ruleID]map[ruleLabels][]prompb.Label
|
|
|
|
// where `ruleID` is ID of the Rule within a Group
|
|
|
|
// and `ruleLabels` is []prompb.Label marshalled to a string
|
|
|
|
previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label
|
2020-06-09 14:21:20 +02:00
|
|
|
}
|
|
|
|
|
2022-06-09 08:21:30 +02:00
|
|
|
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration, limit int) chan error {
|
2020-06-09 14:21:20 +02:00
|
|
|
res := make(chan error, len(rules))
|
|
|
|
if concurrency == 1 {
|
|
|
|
// fast path
|
|
|
|
for _, rule := range rules {
|
2022-06-09 08:21:30 +02:00
|
|
|
res <- e.exec(ctx, rule, ts, resolveDuration, limit)
|
2020-06-09 14:21:20 +02:00
|
|
|
}
|
|
|
|
close(res)
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
|
|
|
sem := make(chan struct{}, concurrency)
|
|
|
|
go func() {
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
for _, rule := range rules {
|
|
|
|
sem <- struct{}{}
|
|
|
|
wg.Add(1)
|
|
|
|
go func(r Rule) {
|
2022-06-09 08:21:30 +02:00
|
|
|
res <- e.exec(ctx, r, ts, resolveDuration, limit)
|
2020-06-09 14:21:20 +02:00
|
|
|
<-sem
|
|
|
|
wg.Done()
|
|
|
|
}(rule)
|
|
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
close(res)
|
|
|
|
}()
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
var (
|
2021-08-31 11:28:02 +02:00
|
|
|
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
|
|
|
|
|
|
|
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
|
|
|
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
|
|
|
|
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
2022-01-07 15:15:34 +01:00
|
|
|
remoteWriteTotal = metrics.NewCounter(`vmalert_remotewrite_total`)
|
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 08:41:29 +02:00
|
|
|
)
|
|
|
|
|
2022-06-09 08:21:30 +02:00
|
|
|
func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration, limit int) error {
|
2020-06-09 14:21:20 +02:00
|
|
|
execTotal.Inc()
|
|
|
|
|
2022-06-09 08:21:30 +02:00
|
|
|
tss, err := rule.Exec(ctx, ts, limit)
|
2020-06-09 14:21:20 +02:00
|
|
|
if err != nil {
|
2023-03-01 15:48:20 +01:00
|
|
|
if errors.Is(err, context.Canceled) {
|
|
|
|
// the context can be cancelled on graceful shutdown
|
|
|
|
// or on group update. So no need to handle the error as usual.
|
|
|
|
return nil
|
|
|
|
}
|
2020-06-09 14:21:20 +02:00
|
|
|
execErrors.Inc()
|
2020-06-30 21:58:18 +02:00
|
|
|
return fmt.Errorf("rule %q: failed to execute: %w", rule, err)
|
2020-06-09 14:21:20 +02:00
|
|
|
}
|
|
|
|
|
2022-03-29 15:09:07 +02:00
|
|
|
if e.rw != nil {
|
2022-12-06 15:36:46 +01:00
|
|
|
pushToRW := func(tss []prompbmarshal.TimeSeries) error {
|
|
|
|
var lastErr error
|
2022-03-29 15:09:07 +02:00
|
|
|
for _, ts := range tss {
|
|
|
|
remoteWriteTotal.Inc()
|
|
|
|
if err := e.rw.Push(ts); err != nil {
|
|
|
|
remoteWriteErrors.Inc()
|
2022-12-06 15:36:46 +01:00
|
|
|
lastErr = fmt.Errorf("rule %q: remote write failure: %w", rule, err)
|
2022-03-29 15:09:07 +02:00
|
|
|
}
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
2022-12-06 15:36:46 +01:00
|
|
|
return lastErr
|
|
|
|
}
|
|
|
|
if err := pushToRW(tss); err != nil {
|
|
|
|
return err
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
2022-12-06 15:36:46 +01:00
|
|
|
|
2022-03-29 15:09:07 +02:00
|
|
|
staleSeries := e.getStaleSeries(rule, tss, ts)
|
2022-12-06 15:36:46 +01:00
|
|
|
if err := pushToRW(staleSeries); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
2020-06-09 14:21:20 +02:00
|
|
|
|
|
|
|
ar, ok := rule.(*AlertingRule)
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
2022-03-16 16:26:33 +01:00
|
|
|
|
2022-03-29 15:09:07 +02:00
|
|
|
alerts := ar.alertsToSend(ts, resolveDuration, *resendDelay)
|
2020-06-09 14:21:20 +02:00
|
|
|
if len(alerts) < 1 {
|
|
|
|
return nil
|
|
|
|
}
|
2020-06-29 21:21:03 +02:00
|
|
|
|
2022-06-18 09:11:37 +02:00
|
|
|
wg := sync.WaitGroup{}
|
2022-12-06 15:36:46 +01:00
|
|
|
errGr := new(utils.ErrGroup)
|
2022-02-02 13:11:41 +01:00
|
|
|
for _, nt := range e.notifiers() {
|
2022-06-18 09:11:37 +02:00
|
|
|
wg.Add(1)
|
|
|
|
go func(nt notifier.Notifier) {
|
2023-04-27 12:17:26 +02:00
|
|
|
if err := nt.Send(ctx, alerts, e.notifierHeaders); err != nil {
|
2022-06-18 09:11:37 +02:00
|
|
|
errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
|
|
|
|
}
|
|
|
|
wg.Done()
|
|
|
|
}(nt)
|
2020-06-09 14:21:20 +02:00
|
|
|
}
|
2022-06-18 09:11:37 +02:00
|
|
|
wg.Wait()
|
2020-06-29 21:21:03 +02:00
|
|
|
return errGr.Err()
|
2020-05-10 18:58:17 +02:00
|
|
|
}
|
2022-03-29 15:09:07 +02:00
|
|
|
|
|
|
|
// getStaledSeries checks whether there are stale series from previously sent ones.
|
|
|
|
func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries {
|
2022-03-30 12:37:27 +02:00
|
|
|
ruleLabels := make(map[string][]prompbmarshal.Label, len(tss))
|
2022-03-29 15:09:07 +02:00
|
|
|
for _, ts := range tss {
|
|
|
|
// convert labels to strings so we can compare with previously sent series
|
|
|
|
key := labelsToString(ts.Labels)
|
|
|
|
ruleLabels[key] = ts.Labels
|
|
|
|
}
|
|
|
|
|
|
|
|
rID := rule.ID()
|
|
|
|
var staleS []prompbmarshal.TimeSeries
|
|
|
|
// check whether there are series which disappeared and need to be marked as stale
|
2022-03-30 12:37:27 +02:00
|
|
|
e.previouslySentSeriesToRWMu.Lock()
|
2022-03-29 15:09:07 +02:00
|
|
|
for key, labels := range e.previouslySentSeriesToRW[rID] {
|
|
|
|
if _, ok := ruleLabels[key]; ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
// previously sent series are missing in current series, so we mark them as stale
|
|
|
|
ss := newTimeSeriesPB([]float64{decimal.StaleNaN}, []int64{timestamp.Unix()}, labels)
|
|
|
|
staleS = append(staleS, ss)
|
|
|
|
}
|
|
|
|
// set previous series to current
|
|
|
|
e.previouslySentSeriesToRW[rID] = ruleLabels
|
2022-03-30 12:37:27 +02:00
|
|
|
e.previouslySentSeriesToRWMu.Unlock()
|
2022-03-29 15:09:07 +02:00
|
|
|
|
|
|
|
return staleS
|
|
|
|
}
|
|
|
|
|
2022-05-13 10:04:49 +02:00
|
|
|
// purgeStaleSeries deletes references in tracked
|
|
|
|
// previouslySentSeriesToRW list to Rules which aren't present
|
|
|
|
// in the given activeRules list. The method is used when the list
|
|
|
|
// of loaded rules has changed and executor has to remove
|
|
|
|
// references to non-existing rules.
|
|
|
|
func (e *executor) purgeStaleSeries(activeRules []Rule) {
|
|
|
|
newPreviouslySentSeriesToRW := make(map[uint64]map[string][]prompbmarshal.Label)
|
|
|
|
|
|
|
|
e.previouslySentSeriesToRWMu.Lock()
|
|
|
|
|
|
|
|
for _, rule := range activeRules {
|
|
|
|
id := rule.ID()
|
|
|
|
prev, ok := e.previouslySentSeriesToRW[id]
|
|
|
|
if ok {
|
|
|
|
// keep previous series for staleness detection
|
|
|
|
newPreviouslySentSeriesToRW[id] = prev
|
|
|
|
}
|
|
|
|
}
|
|
|
|
e.previouslySentSeriesToRW = nil
|
|
|
|
e.previouslySentSeriesToRW = newPreviouslySentSeriesToRW
|
|
|
|
|
|
|
|
e.previouslySentSeriesToRWMu.Unlock()
|
|
|
|
}
|
|
|
|
|
2022-03-29 15:09:07 +02:00
|
|
|
func labelsToString(labels []prompbmarshal.Label) string {
|
|
|
|
var b strings.Builder
|
|
|
|
b.WriteRune('{')
|
|
|
|
for i, label := range labels {
|
|
|
|
if len(label.Name) == 0 {
|
|
|
|
b.WriteString("__name__")
|
|
|
|
} else {
|
|
|
|
b.WriteString(label.Name)
|
|
|
|
}
|
|
|
|
b.WriteRune('=')
|
|
|
|
b.WriteString(strconv.Quote(label.Value))
|
|
|
|
if i < len(labels)-1 {
|
|
|
|
b.WriteRune(',')
|
|
|
|
}
|
|
|
|
}
|
|
|
|
b.WriteRune('}')
|
|
|
|
return b.String()
|
|
|
|
}
|