VictoriaMetrics/app/vmalert/rule.go
Haleygo b52f1d1f0a
vmalert: add evalAlignment for rule group and fix evalutaion timstamp (#5066)
* vmalert: add `query_time_alignment` for rule group

1. add `eval_alignment` attribute for group which by default is true. So group rule query stamp will be aligned with interval and propagated to ALERT metrics and the messages for alertmanager;
2. deprecate `datasource.queryTimeAlignment` flag.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049
(cherry picked from commit 2aa0f5fc41)
2023-10-10 12:45:37 +02:00

119 lines
2.9 KiB
Go

package main
import (
"context"
"errors"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// Rule represents alerting or recording rule
// that has unique ID, can be Executed and
// updated with other Rule.
type Rule interface {
// ID returns unique ID that may be used for
// identifying this Rule among others.
ID() uint64
// Exec executes the rule with given context at the given timestamp and limit.
// returns an err if number of resulting time series exceeds the limit.
Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error)
// ExecRange executes the rule on the given time range.
ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error)
// UpdateWith performs modification of current Rule
// with fields of the given Rule.
UpdateWith(Rule) error
// ToAPI converts Rule into APIRule
ToAPI() APIRule
// Close performs the shutdown procedures for rule
// such as metrics unregister
Close()
}
var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels. See https://docs.victoriametrics.com/vmalert.html#series-with-the-same-labelset for details")
type ruleState struct {
sync.RWMutex
entries []ruleStateEntry
cur int
}
type ruleStateEntry struct {
// stores last moment of time rule.Exec was called
time time.Time
// stores the timestamp rule.Exec was called with
at time.Time
// stores the duration of the last rule.Exec call
duration time.Duration
// stores last error that happened in Exec func
// resets on every successful Exec
// may be used as Health ruleState
err error
// stores the number of samples returned during
// the last evaluation
samples int
// stores the number of time series fetched during
// the last evaluation.
// Is supported by VictoriaMetrics only, starting from v1.90.0
// If seriesFetched == nil, then this attribute was missing in
// datasource response (unsupported).
seriesFetched *int
// stores the curl command reflecting the HTTP request used during rule.Exec
curl string
}
func newRuleState(size int) *ruleState {
if size < 1 {
size = 1
}
return &ruleState{
entries: make([]ruleStateEntry, size),
}
}
func (s *ruleState) getLast() ruleStateEntry {
s.RLock()
defer s.RUnlock()
return s.entries[s.cur]
}
func (s *ruleState) size() int {
s.RLock()
defer s.RUnlock()
return len(s.entries)
}
func (s *ruleState) getAll() []ruleStateEntry {
entries := make([]ruleStateEntry, 0)
s.RLock()
defer s.RUnlock()
cur := s.cur
for {
e := s.entries[cur]
if !e.time.IsZero() || !e.at.IsZero() {
entries = append(entries, e)
}
cur--
if cur < 0 {
cur = cap(s.entries) - 1
}
if cur == s.cur {
return entries
}
}
}
func (s *ruleState) add(e ruleStateEntry) {
s.Lock()
defer s.Unlock()
s.cur++
if s.cur > cap(s.entries)-1 {
s.cur = 0
}
s.entries[s.cur] = e
}