mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-16 17:21:04 +01:00
ca49853664
Commit fixes potential race condition when group update and generating of ID() happens simultaneously. Signed-off-by: hagen1778 <roman@victoriametrics.com>
408 lines
10 KiB
Go
408 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// Group is an entity for grouping rules
|
|
type Group struct {
|
|
mu sync.RWMutex
|
|
Name string
|
|
File string
|
|
Rules []Rule
|
|
Type datasource.Type
|
|
Interval time.Duration
|
|
Concurrency int
|
|
Checksum string
|
|
|
|
ExtraFilterLabels map[string]string
|
|
Labels map[string]string
|
|
|
|
doneCh chan struct{}
|
|
finishedCh chan struct{}
|
|
// channel accepts new Group obj
|
|
// which supposed to update current group
|
|
updateCh chan *Group
|
|
|
|
metrics *groupMetrics
|
|
}
|
|
|
|
type groupMetrics struct {
|
|
iterationTotal *counter
|
|
iterationDuration *summary
|
|
}
|
|
|
|
func newGroupMetrics(name, file string) *groupMetrics {
|
|
m := &groupMetrics{}
|
|
labels := fmt.Sprintf(`group=%q, file=%q`, name, file)
|
|
m.iterationTotal = getOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
|
|
m.iterationDuration = getOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
|
|
return m
|
|
}
|
|
|
|
// merges group rule labels into result map
|
|
// set2 has priority over set1.
|
|
func mergeLabels(groupName, ruleName string, set1, set2 map[string]string) map[string]string {
|
|
r := map[string]string{}
|
|
for k, v := range set1 {
|
|
r[k] = v
|
|
}
|
|
for k, v := range set2 {
|
|
if prevV, ok := r[k]; ok {
|
|
logger.Infof("label %q=%q for rule %q.%q overwritten with external label %q=%q",
|
|
k, prevV, groupName, ruleName, k, v)
|
|
}
|
|
r[k] = v
|
|
}
|
|
return r
|
|
}
|
|
|
|
func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval time.Duration, labels map[string]string) *Group {
|
|
g := &Group{
|
|
Type: cfg.Type,
|
|
Name: cfg.Name,
|
|
File: cfg.File,
|
|
Interval: cfg.Interval.Duration(),
|
|
Concurrency: cfg.Concurrency,
|
|
Checksum: cfg.Checksum,
|
|
ExtraFilterLabels: cfg.ExtraFilterLabels,
|
|
Labels: cfg.Labels,
|
|
|
|
doneCh: make(chan struct{}),
|
|
finishedCh: make(chan struct{}),
|
|
updateCh: make(chan *Group),
|
|
}
|
|
g.metrics = newGroupMetrics(g.Name, g.File)
|
|
if g.Interval == 0 {
|
|
g.Interval = defaultInterval
|
|
}
|
|
if g.Concurrency < 1 {
|
|
g.Concurrency = 1
|
|
}
|
|
rules := make([]Rule, len(cfg.Rules))
|
|
for i, r := range cfg.Rules {
|
|
var extraLabels map[string]string
|
|
// apply external labels
|
|
if len(labels) > 0 {
|
|
extraLabels = labels
|
|
}
|
|
// apply group labels, it has priority on external labels
|
|
if len(cfg.Labels) > 0 {
|
|
extraLabels = mergeLabels(g.Name, r.Name(), extraLabels, g.Labels)
|
|
}
|
|
// apply rules labels, it has priority on other labels
|
|
if len(extraLabels) > 0 {
|
|
r.Labels = mergeLabels(g.Name, r.Name(), extraLabels, r.Labels)
|
|
}
|
|
|
|
rules[i] = g.newRule(qb, r)
|
|
}
|
|
g.Rules = rules
|
|
return g
|
|
}
|
|
|
|
func (g *Group) newRule(qb datasource.QuerierBuilder, rule config.Rule) Rule {
|
|
if rule.Alert != "" {
|
|
return newAlertingRule(qb, g, rule)
|
|
}
|
|
return newRecordingRule(qb, g, rule)
|
|
}
|
|
|
|
// ID return unique group ID that consists of
|
|
// rules file and group name
|
|
func (g *Group) ID() uint64 {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
hash := fnv.New64a()
|
|
hash.Write([]byte(g.File))
|
|
hash.Write([]byte("\xff"))
|
|
hash.Write([]byte(g.Name))
|
|
hash.Write([]byte(g.Type.Get()))
|
|
return hash.Sum64()
|
|
}
|
|
|
|
// Restore restores alerts state for group rules
|
|
func (g *Group) Restore(ctx context.Context, qb datasource.QuerierBuilder, lookback time.Duration, labels map[string]string) error {
|
|
labels = mergeLabels(g.Name, "", labels, g.Labels)
|
|
for _, rule := range g.Rules {
|
|
rr, ok := rule.(*AlertingRule)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if rr.For < 1 {
|
|
continue
|
|
}
|
|
// ignore g.ExtraFilterLabels on purpose, so it
|
|
// won't affect the restore procedure.
|
|
q := qb.BuildWithParams(datasource.QuerierParams{})
|
|
if err := rr.Restore(ctx, q, lookback, labels); err != nil {
|
|
return fmt.Errorf("error while restoring rule %q: %w", rule, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateWith updates existing group with
|
|
// passed group object. This function ignores group
|
|
// evaluation interval change. It supposed to be updated
|
|
// in group.start function.
|
|
// Not thread-safe.
|
|
func (g *Group) updateWith(newGroup *Group) error {
|
|
rulesRegistry := make(map[uint64]Rule)
|
|
for _, nr := range newGroup.Rules {
|
|
rulesRegistry[nr.ID()] = nr
|
|
}
|
|
|
|
for i, or := range g.Rules {
|
|
nr, ok := rulesRegistry[or.ID()]
|
|
if !ok {
|
|
// old rule is not present in the new list
|
|
// so we mark it for removing
|
|
g.Rules[i].Close()
|
|
g.Rules[i] = nil
|
|
continue
|
|
}
|
|
if err := or.UpdateWith(nr); err != nil {
|
|
return err
|
|
}
|
|
delete(rulesRegistry, nr.ID())
|
|
}
|
|
|
|
var newRules []Rule
|
|
for _, r := range g.Rules {
|
|
if r == nil {
|
|
// skip nil rules
|
|
continue
|
|
}
|
|
newRules = append(newRules, r)
|
|
}
|
|
// add the rest of rules from registry
|
|
for _, nr := range rulesRegistry {
|
|
newRules = append(newRules, nr)
|
|
}
|
|
// note that g.Interval is not updated here
|
|
// so the value can be compared later in
|
|
// group.Start function
|
|
g.Type = newGroup.Type
|
|
g.Concurrency = newGroup.Concurrency
|
|
g.ExtraFilterLabels = newGroup.ExtraFilterLabels
|
|
g.Labels = newGroup.Labels
|
|
g.Checksum = newGroup.Checksum
|
|
g.Rules = newRules
|
|
return nil
|
|
}
|
|
|
|
func (g *Group) close() {
|
|
if g.doneCh == nil {
|
|
return
|
|
}
|
|
close(g.doneCh)
|
|
<-g.finishedCh
|
|
|
|
metrics.UnregisterMetric(g.metrics.iterationDuration.name)
|
|
metrics.UnregisterMetric(g.metrics.iterationTotal.name)
|
|
for _, rule := range g.Rules {
|
|
rule.Close()
|
|
}
|
|
}
|
|
|
|
var skipRandSleepOnGroupStart bool
|
|
|
|
func (g *Group) start(ctx context.Context, nts []notifier.Notifier, rw *remotewrite.Client) {
|
|
defer func() { close(g.finishedCh) }()
|
|
|
|
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
|
|
if !skipRandSleepOnGroupStart {
|
|
randSleep := uint64(float64(g.Interval) * (float64(uint32(g.ID())) / (1 << 32)))
|
|
sleepOffset := uint64(time.Now().UnixNano()) % uint64(g.Interval)
|
|
if randSleep < sleepOffset {
|
|
randSleep += uint64(g.Interval)
|
|
}
|
|
randSleep -= sleepOffset
|
|
sleepTimer := time.NewTimer(time.Duration(randSleep))
|
|
select {
|
|
case <-ctx.Done():
|
|
sleepTimer.Stop()
|
|
return
|
|
case <-g.doneCh:
|
|
sleepTimer.Stop()
|
|
return
|
|
case <-sleepTimer.C:
|
|
}
|
|
}
|
|
|
|
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
|
e := &executor{rw: rw}
|
|
for _, nt := range nts {
|
|
ent := eNotifier{
|
|
Notifier: nt,
|
|
alertsSent: getOrCreateCounter(fmt.Sprintf("vmalert_alerts_sent_total{addr=%q}", nt.Addr())),
|
|
alertsSendErrors: getOrCreateCounter(fmt.Sprintf("vmalert_alerts_send_errors_total{addr=%q}", nt.Addr())),
|
|
}
|
|
e.notifiers = append(e.notifiers, ent)
|
|
}
|
|
|
|
t := time.NewTicker(g.Interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Infof("group %q: context cancelled", g.Name)
|
|
return
|
|
case <-g.doneCh:
|
|
logger.Infof("group %q: received stop signal", g.Name)
|
|
return
|
|
case ng := <-g.updateCh:
|
|
g.mu.Lock()
|
|
err := g.updateWith(ng)
|
|
if err != nil {
|
|
logger.Errorf("group %q: failed to update: %s", g.Name, err)
|
|
g.mu.Unlock()
|
|
continue
|
|
}
|
|
if g.Interval != ng.Interval {
|
|
g.Interval = ng.Interval
|
|
t.Stop()
|
|
t = time.NewTicker(g.Interval)
|
|
}
|
|
g.mu.Unlock()
|
|
logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
|
case <-t.C:
|
|
g.metrics.iterationTotal.Inc()
|
|
iterationStart := time.Now()
|
|
resolveDuration := getResolveDuration(g.Interval)
|
|
errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, resolveDuration)
|
|
for err := range errs {
|
|
if err != nil {
|
|
logger.Errorf("group %q: %s", g.Name, err)
|
|
}
|
|
}
|
|
|
|
g.metrics.iterationDuration.UpdateDuration(iterationStart)
|
|
}
|
|
}
|
|
}
|
|
|
|
// resolveDuration for alerts is equal to 3 interval evaluations
|
|
// so in case if vmalert stops sending updates for some reason,
|
|
// notifier could automatically resolve the alert.
|
|
func getResolveDuration(groupInterval time.Duration) time.Duration {
|
|
resolveInterval := groupInterval * 3
|
|
if *maxResolveDuration > 0 && (resolveInterval > *maxResolveDuration) {
|
|
return *maxResolveDuration
|
|
}
|
|
return resolveInterval
|
|
}
|
|
|
|
type executor struct {
|
|
notifiers []eNotifier
|
|
rw *remotewrite.Client
|
|
}
|
|
|
|
type eNotifier struct {
|
|
notifier.Notifier
|
|
alertsSent *counter
|
|
alertsSendErrors *counter
|
|
}
|
|
|
|
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, resolveDuration time.Duration) chan error {
|
|
res := make(chan error, len(rules))
|
|
if concurrency == 1 {
|
|
// fast path
|
|
for _, rule := range rules {
|
|
res <- e.exec(ctx, rule, resolveDuration)
|
|
}
|
|
close(res)
|
|
return res
|
|
}
|
|
|
|
sem := make(chan struct{}, concurrency)
|
|
go func() {
|
|
wg := sync.WaitGroup{}
|
|
for _, rule := range rules {
|
|
sem <- struct{}{}
|
|
wg.Add(1)
|
|
go func(r Rule) {
|
|
res <- e.exec(ctx, r, resolveDuration)
|
|
<-sem
|
|
wg.Done()
|
|
}(rule)
|
|
}
|
|
wg.Wait()
|
|
close(res)
|
|
}()
|
|
return res
|
|
}
|
|
|
|
var (
|
|
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
|
|
|
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
|
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
|
|
|
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
|
)
|
|
|
|
func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Duration) error {
|
|
execTotal.Inc()
|
|
|
|
tss, err := rule.Exec(ctx)
|
|
if err != nil {
|
|
execErrors.Inc()
|
|
return fmt.Errorf("rule %q: failed to execute: %w", rule, err)
|
|
}
|
|
|
|
if len(tss) > 0 && e.rw != nil {
|
|
for _, ts := range tss {
|
|
if err := e.rw.Push(ts); err != nil {
|
|
remoteWriteErrors.Inc()
|
|
return fmt.Errorf("rule %q: remote write failure: %w", rule, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
ar, ok := rule.(*AlertingRule)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
var alerts []notifier.Alert
|
|
for _, a := range ar.alerts {
|
|
switch a.State {
|
|
case notifier.StateFiring:
|
|
a.End = time.Now().Add(resolveDuration)
|
|
alerts = append(alerts, *a)
|
|
case notifier.StateInactive:
|
|
// set End to execStart to notify
|
|
// that it was just resolved
|
|
a.End = time.Now()
|
|
alerts = append(alerts, *a)
|
|
}
|
|
}
|
|
if len(alerts) < 1 {
|
|
return nil
|
|
}
|
|
|
|
errGr := new(utils.ErrGroup)
|
|
for _, nt := range e.notifiers {
|
|
nt.alertsSent.Add(len(alerts))
|
|
if err := nt.Send(ctx, alerts); err != nil {
|
|
nt.alertsSendErrors.Inc()
|
|
errGr.Add(fmt.Errorf("rule %q: failed to send alerts: %w", rule, err))
|
|
}
|
|
}
|
|
return errGr.Err()
|
|
}
|