mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-12 13:32:25 +01:00
d6415b2572
'any' type is supported starting from Go1.18. Let's consistently use it instead of 'interface{}' type across the code base, since `any` is easier to read than 'interface{}'.
803 lines
23 KiB
Go
803 lines
23 KiB
Go
package rule
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"net/url"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
|
|
"github.com/cheggaaa/pb/v3"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
ruleUpdateEntriesLimit = flag.Int("rule.updateEntriesLimit", 20, "Defines the max number of rule's state updates stored in-memory. "+
|
|
"Rule's updates are available on rule's Details page and are used for debugging purposes. The number of stored updates can be overridden per rule via update_entries_limit param.")
|
|
resendDelay = flag.Duration("rule.resendDelay", 0, "MiniMum amount of time to wait before resending an alert to notifier")
|
|
maxResolveDuration = flag.Duration("rule.maxResolveDuration", 0, "Limits the maxiMum duration for automatic alert expiration, "+
|
|
"which by default is 4 times evaluationInterval of the parent group")
|
|
evalDelay = flag.Duration("rule.evalDelay", 30*time.Second, "Adjustment of the `time` parameter for rule evaluation requests to compensate intentional data delay from the datasource."+
|
|
"Normally, should be equal to `-search.latencyOffset` (cmd-line flag configured for VictoriaMetrics single-node or vmselect).")
|
|
disableAlertGroupLabel = flag.Bool("disableAlertgroupLabel", false, "Whether to disable adding group's Name as label to generated alerts and time series.")
|
|
remoteReadLookBack = flag.Duration("remoteRead.lookback", time.Hour, "Lookback defines how far to look into past for alerts timeseries."+
|
|
" For example, if lookback=1h then range from now() to now()-1h will be scanned.")
|
|
)
|
|
|
|
// Group is an entity for grouping rules
|
|
type Group struct {
|
|
mu sync.RWMutex
|
|
Name string
|
|
File string
|
|
Rules []Rule
|
|
Type config.Type
|
|
Interval time.Duration
|
|
EvalOffset *time.Duration
|
|
// EvalDelay will adjust timestamp for rule evaluation requests to compensate intentional query delay from datasource.
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5155
|
|
EvalDelay *time.Duration
|
|
Limit int
|
|
Concurrency int
|
|
Checksum string
|
|
LastEvaluation time.Time
|
|
|
|
Labels map[string]string
|
|
Params url.Values
|
|
Headers map[string]string
|
|
NotifierHeaders map[string]string
|
|
|
|
doneCh chan struct{}
|
|
finishedCh chan struct{}
|
|
// channel accepts new Group obj
|
|
// which supposed to update current group
|
|
updateCh chan *Group
|
|
// evalCancel stores the cancel fn for interrupting
|
|
// rules evaluation. Used on groups update() and close().
|
|
evalCancel context.CancelFunc
|
|
|
|
metrics *groupMetrics
|
|
// evalAlignment will make the timestamp of group query
|
|
// requests be aligned with interval
|
|
evalAlignment *bool
|
|
}
|
|
|
|
type groupMetrics struct {
|
|
iterationTotal *utils.Counter
|
|
iterationDuration *utils.Summary
|
|
iterationMissed *utils.Counter
|
|
iterationInterval *utils.Gauge
|
|
}
|
|
|
|
func newGroupMetrics(g *Group) *groupMetrics {
|
|
m := &groupMetrics{}
|
|
labels := fmt.Sprintf(`group=%q, file=%q`, g.Name, g.File)
|
|
m.iterationTotal = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
|
|
m.iterationDuration = utils.GetOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
|
|
m.iterationMissed = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels))
|
|
m.iterationInterval = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_iteration_interval_seconds{%s}`, labels), func() float64 {
|
|
g.mu.RLock()
|
|
i := g.Interval.Seconds()
|
|
g.mu.RUnlock()
|
|
return i
|
|
})
|
|
return m
|
|
}
|
|
|
|
// merges group rule labels into result map
|
|
// set2 has priority over set1.
|
|
func mergeLabels(groupName, ruleName string, set1, set2 map[string]string) map[string]string {
|
|
r := map[string]string{}
|
|
for k, v := range set1 {
|
|
r[k] = v
|
|
}
|
|
for k, v := range set2 {
|
|
if prevV, ok := r[k]; ok {
|
|
logger.Infof("label %q=%q for rule %q.%q overwritten with external label %q=%q",
|
|
k, prevV, groupName, ruleName, k, v)
|
|
}
|
|
r[k] = v
|
|
}
|
|
return r
|
|
}
|
|
|
|
// NewGroup returns a new group
|
|
func NewGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval time.Duration, labels map[string]string) *Group {
|
|
g := &Group{
|
|
Type: cfg.Type,
|
|
Name: cfg.Name,
|
|
File: cfg.File,
|
|
Interval: cfg.Interval.Duration(),
|
|
Limit: cfg.Limit,
|
|
Concurrency: cfg.Concurrency,
|
|
Checksum: cfg.Checksum,
|
|
Params: cfg.Params,
|
|
Headers: make(map[string]string),
|
|
NotifierHeaders: make(map[string]string),
|
|
Labels: cfg.Labels,
|
|
evalAlignment: cfg.EvalAlignment,
|
|
|
|
doneCh: make(chan struct{}),
|
|
finishedCh: make(chan struct{}),
|
|
updateCh: make(chan *Group),
|
|
}
|
|
if g.Interval == 0 {
|
|
g.Interval = defaultInterval
|
|
}
|
|
if g.Concurrency < 1 {
|
|
g.Concurrency = 1
|
|
}
|
|
if cfg.EvalOffset != nil {
|
|
g.EvalOffset = &cfg.EvalOffset.D
|
|
}
|
|
if cfg.EvalDelay != nil {
|
|
g.EvalDelay = &cfg.EvalDelay.D
|
|
}
|
|
for _, h := range cfg.Headers {
|
|
g.Headers[h.Key] = h.Value
|
|
}
|
|
for _, h := range cfg.NotifierHeaders {
|
|
g.NotifierHeaders[h.Key] = h.Value
|
|
}
|
|
g.metrics = newGroupMetrics(g)
|
|
rules := make([]Rule, len(cfg.Rules))
|
|
for i, r := range cfg.Rules {
|
|
var extraLabels map[string]string
|
|
// apply external labels
|
|
if len(labels) > 0 {
|
|
extraLabels = labels
|
|
}
|
|
// apply group labels, it has priority on external labels
|
|
if len(cfg.Labels) > 0 {
|
|
extraLabels = mergeLabels(g.Name, r.Name(), extraLabels, g.Labels)
|
|
}
|
|
// apply rules labels, it has priority on other labels
|
|
if len(extraLabels) > 0 {
|
|
r.Labels = mergeLabels(g.Name, r.Name(), extraLabels, r.Labels)
|
|
}
|
|
|
|
rules[i] = g.newRule(qb, r)
|
|
}
|
|
g.Rules = rules
|
|
return g
|
|
}
|
|
|
|
func (g *Group) newRule(qb datasource.QuerierBuilder, r config.Rule) Rule {
|
|
if r.Alert != "" {
|
|
return NewAlertingRule(qb, g, r)
|
|
}
|
|
return NewRecordingRule(qb, g, r)
|
|
}
|
|
|
|
// ID return unique group ID that consists of
|
|
// rules file and group Name
|
|
func (g *Group) ID() uint64 {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
hash := fnv.New64a()
|
|
hash.Write([]byte(g.File))
|
|
hash.Write([]byte("\xff"))
|
|
hash.Write([]byte(g.Name))
|
|
hash.Write([]byte(g.Type.Get()))
|
|
hash.Write([]byte(g.Interval.String()))
|
|
if g.EvalOffset != nil {
|
|
hash.Write([]byte(g.EvalOffset.String()))
|
|
}
|
|
return hash.Sum64()
|
|
}
|
|
|
|
// restore restores alerts state for group rules
|
|
func (g *Group) restore(ctx context.Context, qb datasource.QuerierBuilder, ts time.Time, lookback time.Duration) error {
|
|
for _, rule := range g.Rules {
|
|
ar, ok := rule.(*AlertingRule)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if ar.For < 1 {
|
|
continue
|
|
}
|
|
q := qb.BuildWithParams(datasource.QuerierParams{
|
|
DataSourceType: g.Type.String(),
|
|
EvaluationInterval: g.Interval,
|
|
QueryParams: g.Params,
|
|
Headers: g.Headers,
|
|
Debug: ar.Debug,
|
|
})
|
|
if err := ar.restore(ctx, q, ts, lookback); err != nil {
|
|
return fmt.Errorf("error while restoring rule %q: %w", rule, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateWith updates existing group with
|
|
// passed group object. This function ignores group
|
|
// evaluation interval change. It supposed to be updated
|
|
// in group.Start function.
|
|
// Not thread-safe.
|
|
func (g *Group) updateWith(newGroup *Group) error {
|
|
rulesRegistry := make(map[uint64]Rule)
|
|
for _, nr := range newGroup.Rules {
|
|
rulesRegistry[nr.ID()] = nr
|
|
}
|
|
|
|
for i, or := range g.Rules {
|
|
nr, ok := rulesRegistry[or.ID()]
|
|
if !ok {
|
|
// old rule is not present in the new list
|
|
// so we mark it for removing
|
|
g.Rules[i].close()
|
|
g.Rules[i] = nil
|
|
continue
|
|
}
|
|
if err := or.updateWith(nr); err != nil {
|
|
return err
|
|
}
|
|
delete(rulesRegistry, nr.ID())
|
|
}
|
|
|
|
var newRules []Rule
|
|
for _, r := range g.Rules {
|
|
if r == nil {
|
|
// skip nil rules
|
|
continue
|
|
}
|
|
newRules = append(newRules, r)
|
|
}
|
|
// add the rest of rules from registry
|
|
for _, nr := range rulesRegistry {
|
|
newRules = append(newRules, nr)
|
|
}
|
|
// note that g.Interval is not updated here
|
|
// so the value can be compared later in
|
|
// group.Start function
|
|
g.Type = newGroup.Type
|
|
g.Concurrency = newGroup.Concurrency
|
|
g.Params = newGroup.Params
|
|
g.Headers = newGroup.Headers
|
|
g.NotifierHeaders = newGroup.NotifierHeaders
|
|
g.Labels = newGroup.Labels
|
|
g.Limit = newGroup.Limit
|
|
g.Checksum = newGroup.Checksum
|
|
g.Rules = newRules
|
|
return nil
|
|
}
|
|
|
|
// InterruptEval interrupts in-flight rules evaluations
|
|
// within the group. It is expected that g.evalCancel
|
|
// will be repopulated after the call.
|
|
func (g *Group) InterruptEval() {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
if g.evalCancel != nil {
|
|
g.evalCancel()
|
|
}
|
|
}
|
|
|
|
// Close stops the group and it's rules, unregisters group metrics
|
|
func (g *Group) Close() {
|
|
if g.doneCh == nil {
|
|
return
|
|
}
|
|
close(g.doneCh)
|
|
g.InterruptEval()
|
|
<-g.finishedCh
|
|
|
|
g.metrics.iterationDuration.Unregister()
|
|
g.metrics.iterationTotal.Unregister()
|
|
g.metrics.iterationMissed.Unregister()
|
|
g.metrics.iterationInterval.Unregister()
|
|
for _, rule := range g.Rules {
|
|
rule.close()
|
|
}
|
|
}
|
|
|
|
// SkipRandSleepOnGroupStart will skip random sleep delay in group first evaluation
|
|
var SkipRandSleepOnGroupStart bool
|
|
|
|
// Start starts group's evaluation
|
|
func (g *Group) Start(ctx context.Context, nts func() []notifier.Notifier, rw remotewrite.RWClient, rr datasource.QuerierBuilder) {
|
|
defer func() { close(g.finishedCh) }()
|
|
|
|
evalTS := time.Now()
|
|
// sleep random duration to spread group rules evaluation
|
|
// over time in order to reduce load on datasource.
|
|
if !SkipRandSleepOnGroupStart {
|
|
sleepBeforeStart := delayBeforeStart(evalTS, g.ID(), g.Interval, g.EvalOffset)
|
|
g.infof("will start in %v", sleepBeforeStart)
|
|
|
|
sleepTimer := time.NewTimer(sleepBeforeStart)
|
|
select {
|
|
case <-ctx.Done():
|
|
sleepTimer.Stop()
|
|
return
|
|
case <-g.doneCh:
|
|
sleepTimer.Stop()
|
|
return
|
|
case <-sleepTimer.C:
|
|
}
|
|
evalTS = evalTS.Add(sleepBeforeStart)
|
|
}
|
|
|
|
e := &executor{
|
|
Rw: rw,
|
|
Notifiers: nts,
|
|
notifierHeaders: g.NotifierHeaders,
|
|
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
}
|
|
|
|
g.infof("started")
|
|
|
|
eval := func(ctx context.Context, ts time.Time) {
|
|
g.metrics.iterationTotal.Inc()
|
|
|
|
start := time.Now()
|
|
|
|
if len(g.Rules) < 1 {
|
|
g.metrics.iterationDuration.UpdateDuration(start)
|
|
g.LastEvaluation = start
|
|
return
|
|
}
|
|
|
|
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
|
|
ts = g.adjustReqTimestamp(ts)
|
|
errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration, g.Limit)
|
|
for err := range errs {
|
|
if err != nil {
|
|
logger.Errorf("group %q: %s", g.Name, err)
|
|
}
|
|
}
|
|
g.metrics.iterationDuration.UpdateDuration(start)
|
|
g.LastEvaluation = start
|
|
}
|
|
|
|
evalCtx, cancel := context.WithCancel(ctx)
|
|
g.mu.Lock()
|
|
g.evalCancel = cancel
|
|
g.mu.Unlock()
|
|
defer g.evalCancel()
|
|
|
|
eval(evalCtx, evalTS)
|
|
|
|
t := time.NewTicker(g.Interval)
|
|
defer t.Stop()
|
|
|
|
// restore the rules state after the first evaluation
|
|
// so only active alerts can be restored.
|
|
if rr != nil {
|
|
err := g.restore(ctx, rr, evalTS, *remoteReadLookBack)
|
|
if err != nil {
|
|
logger.Errorf("error while restoring ruleState for group %q: %s", g.Name, err)
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Infof("group %q: context cancelled", g.Name)
|
|
return
|
|
case <-g.doneCh:
|
|
logger.Infof("group %q: received stop signal", g.Name)
|
|
return
|
|
case ng := <-g.updateCh:
|
|
g.mu.Lock()
|
|
|
|
// it is expected that g.evalCancel will be evoked
|
|
// somewhere else to unblock group from the rules evaluation.
|
|
// we recreate the evalCtx and g.evalCancel, so it can
|
|
// be called again.
|
|
evalCtx, cancel = context.WithCancel(ctx)
|
|
g.evalCancel = cancel
|
|
|
|
err := g.updateWith(ng)
|
|
if err != nil {
|
|
logger.Errorf("group %q: failed to update: %s", g.Name, err)
|
|
g.mu.Unlock()
|
|
continue
|
|
}
|
|
|
|
// ensure that staleness is tracked for existing rules only
|
|
e.purgeStaleSeries(g.Rules)
|
|
e.notifierHeaders = g.NotifierHeaders
|
|
g.mu.Unlock()
|
|
|
|
g.infof("re-started")
|
|
case <-t.C:
|
|
missed := (time.Since(evalTS) / g.Interval) - 1
|
|
if missed < 0 {
|
|
// missed can become < 0 due to irregular delays during evaluation
|
|
// which can result in time.Since(evalTS) < g.Interval
|
|
missed = 0
|
|
}
|
|
if missed > 0 {
|
|
g.metrics.iterationMissed.Inc()
|
|
}
|
|
evalTS = evalTS.Add((missed + 1) * g.Interval)
|
|
|
|
eval(evalCtx, evalTS)
|
|
}
|
|
}
|
|
}
|
|
|
|
// UpdateWith inserts new group to updateCh
|
|
func (g *Group) UpdateWith(new *Group) {
|
|
g.updateCh <- new
|
|
}
|
|
|
|
// DeepCopy returns a deep copy of group
|
|
func (g *Group) DeepCopy() *Group {
|
|
g.mu.RLock()
|
|
data, _ := json.Marshal(g)
|
|
g.mu.RUnlock()
|
|
newG := Group{}
|
|
_ = json.Unmarshal(data, &newG)
|
|
newG.Rules = g.Rules
|
|
return &newG
|
|
}
|
|
|
|
// delayBeforeStart returns a duration on the interval between [ts..ts+interval].
|
|
// delayBeforeStart accounts for `offset`, so returned duration should be always
|
|
// bigger than the `offset`.
|
|
func delayBeforeStart(ts time.Time, key uint64, interval time.Duration, offset *time.Duration) time.Duration {
|
|
var randSleep time.Duration
|
|
randSleep = time.Duration(float64(interval) * (float64(key) / (1 << 64)))
|
|
sleepOffset := time.Duration(ts.UnixNano() % interval.Nanoseconds())
|
|
if randSleep < sleepOffset {
|
|
randSleep += interval
|
|
}
|
|
randSleep -= sleepOffset
|
|
// check if `ts` after randSleep is before `offset`,
|
|
// if it is, add extra eval_offset to randSleep.
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3409.
|
|
if offset != nil {
|
|
tmpEvalTS := ts.Add(randSleep)
|
|
if tmpEvalTS.Before(tmpEvalTS.Truncate(interval).Add(*offset)) {
|
|
randSleep += *offset
|
|
}
|
|
}
|
|
return randSleep
|
|
}
|
|
|
|
func (g *Group) infof(format string, args ...any) {
|
|
msg := fmt.Sprintf(format, args...)
|
|
logger.Infof("group %q %s; interval=%v; eval_offset=%v; concurrency=%d",
|
|
g.Name, msg, g.Interval, g.EvalOffset, g.Concurrency)
|
|
}
|
|
|
|
// Replay performs group replay
|
|
func (g *Group) Replay(start, end time.Time, rw remotewrite.RWClient, maxDataPoint, replayRuleRetryAttempts int, replayDelay time.Duration, disableProgressBar bool) int {
|
|
var total int
|
|
step := g.Interval * time.Duration(maxDataPoint)
|
|
ri := rangeIterator{start: start, end: end, step: step}
|
|
iterations := int(end.Sub(start)/step) + 1
|
|
fmt.Printf("\nGroup %q"+
|
|
"\ninterval: \t%v"+
|
|
"\nrequests to make: \t%d"+
|
|
"\nmax range per request: \t%v\n",
|
|
g.Name, g.Interval, iterations, step)
|
|
if g.Limit > 0 {
|
|
fmt.Printf("\nPlease note, `limit: %d` param has no effect during replay.\n",
|
|
g.Limit)
|
|
}
|
|
for _, rule := range g.Rules {
|
|
fmt.Printf("> Rule %q (ID: %d)\n", rule, rule.ID())
|
|
var bar *pb.ProgressBar
|
|
if !disableProgressBar {
|
|
bar = pb.StartNew(iterations)
|
|
}
|
|
ri.reset()
|
|
for ri.next() {
|
|
n, err := replayRule(rule, ri.s, ri.e, rw, replayRuleRetryAttempts)
|
|
if err != nil {
|
|
logger.Fatalf("rule %q: %s", rule, err)
|
|
}
|
|
total += n
|
|
if bar != nil {
|
|
bar.Increment()
|
|
}
|
|
}
|
|
if bar != nil {
|
|
bar.Finish()
|
|
}
|
|
// sleep to let remote storage to flush data on-disk
|
|
// so chained rules could be calculated correctly
|
|
time.Sleep(replayDelay)
|
|
}
|
|
return total
|
|
}
|
|
|
|
// ExecOnce evaluates all the rules under group for once with given timestamp.
|
|
func (g *Group) ExecOnce(ctx context.Context, nts func() []notifier.Notifier, rw remotewrite.RWClient, evalTS time.Time) chan error {
|
|
e := &executor{
|
|
Rw: rw,
|
|
Notifiers: nts,
|
|
notifierHeaders: g.NotifierHeaders,
|
|
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
}
|
|
if len(g.Rules) < 1 {
|
|
return nil
|
|
}
|
|
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
|
|
return e.execConcurrently(ctx, g.Rules, evalTS, g.Concurrency, resolveDuration, g.Limit)
|
|
}
|
|
|
|
type rangeIterator struct {
|
|
step time.Duration
|
|
start, end time.Time
|
|
|
|
iter int
|
|
s, e time.Time
|
|
}
|
|
|
|
func (ri *rangeIterator) reset() {
|
|
ri.iter = 0
|
|
ri.s, ri.e = time.Time{}, time.Time{}
|
|
}
|
|
|
|
func (ri *rangeIterator) next() bool {
|
|
ri.s = ri.start.Add(ri.step * time.Duration(ri.iter))
|
|
if !ri.end.After(ri.s) {
|
|
return false
|
|
}
|
|
ri.e = ri.s.Add(ri.step)
|
|
if ri.e.After(ri.end) {
|
|
ri.e = ri.end
|
|
}
|
|
ri.iter++
|
|
return true
|
|
}
|
|
|
|
// getResolveDuration returns the duration after which firing alert
|
|
// can be considered as resolved.
|
|
func getResolveDuration(groupInterval, delta, maxDuration time.Duration) time.Duration {
|
|
if groupInterval > delta {
|
|
delta = groupInterval
|
|
}
|
|
resolveDuration := delta * 4
|
|
if maxDuration > 0 && resolveDuration > maxDuration {
|
|
resolveDuration = maxDuration
|
|
}
|
|
return resolveDuration
|
|
}
|
|
|
|
func (g *Group) adjustReqTimestamp(timestamp time.Time) time.Time {
|
|
if g.EvalOffset != nil {
|
|
// calculate the min timestamp on the evaluationInterval
|
|
intervalStart := timestamp.Truncate(g.Interval)
|
|
ts := intervalStart.Add(*g.EvalOffset)
|
|
if timestamp.Before(ts) {
|
|
// if passed timestamp is before the expected evaluation offset,
|
|
// then we should adjust it to the previous evaluation round.
|
|
// E.g. request with evaluationInterval=1h and evaluationOffset=30m
|
|
// was evaluated at 11:20. Then the timestamp should be adjusted
|
|
// to 10:30, to the previous evaluationInterval.
|
|
return ts.Add(-g.Interval)
|
|
}
|
|
// when `eval_offset` is using, ts shouldn't be effect by `eval_alignment` and `eval_delay`
|
|
// since it should be always aligned.
|
|
return ts
|
|
}
|
|
|
|
timestamp = timestamp.Add(-g.getEvalDelay())
|
|
|
|
// always apply the alignment as a last step
|
|
if g.evalAlignment == nil || *g.evalAlignment {
|
|
// align query time with interval to get similar result with grafana when plotting time series.
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049
|
|
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
|
|
return timestamp.Truncate(g.Interval)
|
|
}
|
|
return timestamp
|
|
}
|
|
|
|
func (g *Group) getEvalDelay() time.Duration {
|
|
if g.EvalDelay != nil {
|
|
return *g.EvalDelay
|
|
}
|
|
return *evalDelay
|
|
}
|
|
|
|
// executor contains group's notify and rw configs
|
|
type executor struct {
|
|
Notifiers func() []notifier.Notifier
|
|
notifierHeaders map[string]string
|
|
|
|
Rw remotewrite.RWClient
|
|
|
|
previouslySentSeriesToRWMu sync.Mutex
|
|
// previouslySentSeriesToRW stores series sent to RW on previous iteration
|
|
// map[ruleID]map[ruleLabels][]prompb.Label
|
|
// where `ruleID` is ID of the Rule within a Group
|
|
// and `ruleLabels` is []prompb.Label marshalled to a string
|
|
previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label
|
|
}
|
|
|
|
// execConcurrently executes rules concurrently if concurrency>1
|
|
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration, limit int) chan error {
|
|
res := make(chan error, len(rules))
|
|
if concurrency == 1 {
|
|
// fast path
|
|
for _, rule := range rules {
|
|
res <- e.exec(ctx, rule, ts, resolveDuration, limit)
|
|
}
|
|
close(res)
|
|
return res
|
|
}
|
|
|
|
sem := make(chan struct{}, concurrency)
|
|
go func() {
|
|
wg := sync.WaitGroup{}
|
|
for _, r := range rules {
|
|
sem <- struct{}{}
|
|
wg.Add(1)
|
|
go func(r Rule) {
|
|
res <- e.exec(ctx, r, ts, resolveDuration, limit)
|
|
<-sem
|
|
wg.Done()
|
|
}(r)
|
|
}
|
|
wg.Wait()
|
|
close(res)
|
|
}()
|
|
return res
|
|
}
|
|
|
|
var (
|
|
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
|
|
|
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
|
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
|
)
|
|
|
|
func (e *executor) exec(ctx context.Context, r Rule, ts time.Time, resolveDuration time.Duration, limit int) error {
|
|
execTotal.Inc()
|
|
|
|
tss, err := r.exec(ctx, ts, limit)
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
// the context can be cancelled on graceful shutdown
|
|
// or on group update. So no need to handle the error as usual.
|
|
return nil
|
|
}
|
|
execErrors.Inc()
|
|
return fmt.Errorf("rule %q: failed to execute: %w", r, err)
|
|
}
|
|
|
|
if e.Rw != nil {
|
|
pushToRW := func(tss []prompbmarshal.TimeSeries) error {
|
|
var lastErr error
|
|
for _, ts := range tss {
|
|
if err := e.Rw.Push(ts); err != nil {
|
|
lastErr = fmt.Errorf("rule %q: remote write failure: %w", r, err)
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
if err := pushToRW(tss); err != nil {
|
|
return err
|
|
}
|
|
|
|
staleSeries := e.getStaleSeries(r, tss, ts)
|
|
if err := pushToRW(staleSeries); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
ar, ok := r.(*AlertingRule)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
alerts := ar.alertsToSend(resolveDuration, *resendDelay)
|
|
if len(alerts) < 1 {
|
|
return nil
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
errGr := new(utils.ErrGroup)
|
|
for _, nt := range e.Notifiers() {
|
|
wg.Add(1)
|
|
go func(nt notifier.Notifier) {
|
|
if err := nt.Send(ctx, alerts, e.notifierHeaders); err != nil {
|
|
errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", r, nt.Addr(), err))
|
|
}
|
|
wg.Done()
|
|
}(nt)
|
|
}
|
|
wg.Wait()
|
|
return errGr.Err()
|
|
}
|
|
|
|
var bbPool bytesutil.ByteBufferPool
|
|
|
|
// getStaleSeries checks whether there are stale series from previously sent ones.
|
|
func (e *executor) getStaleSeries(r Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries {
|
|
bb := bbPool.Get()
|
|
defer bbPool.Put(bb)
|
|
|
|
ruleLabels := make(map[string][]prompbmarshal.Label, len(tss))
|
|
for _, ts := range tss {
|
|
// convert labels to strings, so we can compare with previously sent series
|
|
bb.B = labelsToString(bb.B, ts.Labels)
|
|
ruleLabels[string(bb.B)] = ts.Labels
|
|
bb.Reset()
|
|
}
|
|
|
|
rID := r.ID()
|
|
var staleS []prompbmarshal.TimeSeries
|
|
// check whether there are series which disappeared and need to be marked as stale
|
|
e.previouslySentSeriesToRWMu.Lock()
|
|
for key, labels := range e.previouslySentSeriesToRW[rID] {
|
|
if _, ok := ruleLabels[key]; ok {
|
|
continue
|
|
}
|
|
// previously sent series are missing in current series, so we mark them as stale
|
|
ss := newTimeSeriesPB([]float64{decimal.StaleNaN}, []int64{timestamp.Unix()}, labels)
|
|
staleS = append(staleS, ss)
|
|
}
|
|
// set previous series to current
|
|
e.previouslySentSeriesToRW[rID] = ruleLabels
|
|
e.previouslySentSeriesToRWMu.Unlock()
|
|
|
|
return staleS
|
|
}
|
|
|
|
// purgeStaleSeries deletes references in tracked
|
|
// previouslySentSeriesToRW list to Rules which aren't present
|
|
// in the given activeRules list. The method is used when the list
|
|
// of loaded rules has changed and executor has to remove
|
|
// references to non-existing rules.
|
|
func (e *executor) purgeStaleSeries(activeRules []Rule) {
|
|
newPreviouslySentSeriesToRW := make(map[uint64]map[string][]prompbmarshal.Label)
|
|
|
|
e.previouslySentSeriesToRWMu.Lock()
|
|
|
|
for _, rule := range activeRules {
|
|
id := rule.ID()
|
|
prev, ok := e.previouslySentSeriesToRW[id]
|
|
if ok {
|
|
// keep previous series for staleness detection
|
|
newPreviouslySentSeriesToRW[id] = prev
|
|
}
|
|
}
|
|
e.previouslySentSeriesToRW = nil
|
|
e.previouslySentSeriesToRW = newPreviouslySentSeriesToRW
|
|
|
|
e.previouslySentSeriesToRWMu.Unlock()
|
|
}
|
|
|
|
func labelsToString(dst []byte, labels []prompbmarshal.Label) []byte {
|
|
dst = append(dst, '{')
|
|
for i, label := range labels {
|
|
if len(label.Name) == 0 {
|
|
dst = append(dst, "__name__"...)
|
|
} else {
|
|
dst = append(dst, label.Name...)
|
|
}
|
|
dst = append(dst, '=')
|
|
dst = strconv.AppendQuote(dst, label.Value)
|
|
if i < len(labels)-1 {
|
|
dst = append(dst, ',')
|
|
}
|
|
}
|
|
dst = append(dst, '}')
|
|
return dst
|
|
}
|