mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 14:22:15 +01:00
e850bf0eff
During group's update rules deletion was causing slice mutations while slice index was assumed to be unchanged. This caused "slice bounds out of range" errors when multiple rules were deleted sequentially.
176 lines
4.5 KiB
Go
176 lines
4.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// Group is an entity for grouping rules
|
|
type Group struct {
|
|
Name string
|
|
File string
|
|
Rules []*Rule
|
|
|
|
done chan struct{}
|
|
finished chan struct{}
|
|
}
|
|
|
|
// ID return unique group ID that consists of
|
|
// rules file and group name
|
|
func (g Group) ID() uint64 {
|
|
hash := fnv.New64a()
|
|
hash.Write([]byte(g.File))
|
|
hash.Write([]byte("\xff"))
|
|
hash.Write([]byte(g.Name))
|
|
return hash.Sum64()
|
|
}
|
|
|
|
// Restore restores alerts state for all group rules with For > 0
|
|
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
|
|
for _, rule := range g.Rules {
|
|
if rule.For == 0 {
|
|
return nil
|
|
}
|
|
if err := rule.Restore(ctx, q, lookback); err != nil {
|
|
return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateWith updates existing group with
|
|
// passed group object. Must be called
|
|
// under mutex lock.
|
|
func (g *Group) updateWith(newGroup Group) {
|
|
rulesRegistry := make(map[string]*Rule)
|
|
for _, nr := range newGroup.Rules {
|
|
rulesRegistry[nr.id()] = nr
|
|
}
|
|
|
|
for i, or := range g.Rules {
|
|
nr, ok := rulesRegistry[or.id()]
|
|
if !ok {
|
|
// old rule is not present in the new list
|
|
// so we mark it for removing
|
|
g.Rules[i] = nil
|
|
continue
|
|
}
|
|
|
|
// copy all significant fields.
|
|
// alerts state isn't copied since
|
|
// it should be updated in next 2 Evals
|
|
or.For = nr.For
|
|
or.Expr = nr.Expr
|
|
or.Labels = nr.Labels
|
|
or.Annotations = nr.Annotations
|
|
delete(rulesRegistry, nr.id())
|
|
}
|
|
|
|
var newRules []*Rule
|
|
for _, r := range g.Rules {
|
|
if r == nil {
|
|
// skip nil rules
|
|
continue
|
|
}
|
|
newRules = append(newRules, r)
|
|
}
|
|
// add the rest of rules from registry
|
|
for _, nr := range rulesRegistry {
|
|
newRules = append(newRules, nr)
|
|
}
|
|
g.Rules = newRules
|
|
}
|
|
|
|
var (
|
|
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
|
|
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
|
|
|
|
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
|
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
|
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
|
|
|
|
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
|
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
|
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
|
|
|
|
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
|
|
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
|
)
|
|
|
|
func (g *Group) close() {
|
|
if g.done == nil {
|
|
return
|
|
}
|
|
close(g.done)
|
|
<-g.finished
|
|
}
|
|
|
|
func (g *Group) start(ctx context.Context, interval time.Duration,
|
|
querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
|
|
logger.Infof("group %q started", g.Name)
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Infof("group %q: context cancelled", g.Name)
|
|
close(g.finished)
|
|
return
|
|
case <-g.done:
|
|
logger.Infof("group %q: received stop signal", g.Name)
|
|
close(g.finished)
|
|
return
|
|
case <-t.C:
|
|
iterationTotal.Inc()
|
|
iterationStart := time.Now()
|
|
for _, rule := range g.Rules {
|
|
execTotal.Inc()
|
|
|
|
execStart := time.Now()
|
|
err := rule.Exec(ctx, querier)
|
|
execDuration.UpdateDuration(execStart)
|
|
|
|
if err != nil {
|
|
execErrors.Inc()
|
|
logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule.Name, err)
|
|
continue
|
|
}
|
|
|
|
var alertsToSend []notifier.Alert
|
|
for _, a := range rule.alerts {
|
|
if a.State != notifier.StatePending {
|
|
alertsToSend = append(alertsToSend, *a)
|
|
}
|
|
if a.State == notifier.StateInactive || rw == nil {
|
|
continue
|
|
}
|
|
tss := rule.AlertToTimeSeries(a, execStart)
|
|
for _, ts := range tss {
|
|
remoteWriteSent.Inc()
|
|
if err := rw.Push(ts); err != nil {
|
|
remoteWriteErrors.Inc()
|
|
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
|
|
}
|
|
}
|
|
}
|
|
if len(alertsToSend) > 0 {
|
|
alertsSent.Add(len(alertsToSend))
|
|
if err := nr.Send(ctx, alertsToSend); err != nil {
|
|
alertsSendErrors.Inc()
|
|
logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule.Name, err)
|
|
}
|
|
}
|
|
}
|
|
iterationDuration.UpdateDuration(iterationStart)
|
|
}
|
|
}
|
|
}
|