mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-22 08:10:44 +01:00
b52f1d1f0a
* vmalert: add `query_time_alignment` for rule group
1. add `eval_alignment` attribute for group which by default is true. So group rule query stamp will be aligned with interval and propagated to ALERT metrics and the messages for alertmanager;
2. deprecate `datasource.queryTimeAlignment` flag.
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049
(cherry picked from commit 2aa0f5fc41
)
119 lines
2.9 KiB
Go
119 lines
2.9 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
)
|
|
|
|
// Rule represents alerting or recording rule
|
|
// that has unique ID, can be Executed and
|
|
// updated with other Rule.
|
|
type Rule interface {
|
|
// ID returns unique ID that may be used for
|
|
// identifying this Rule among others.
|
|
ID() uint64
|
|
// Exec executes the rule with given context at the given timestamp and limit.
|
|
// returns an err if number of resulting time series exceeds the limit.
|
|
Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error)
|
|
// ExecRange executes the rule on the given time range.
|
|
ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error)
|
|
// UpdateWith performs modification of current Rule
|
|
// with fields of the given Rule.
|
|
UpdateWith(Rule) error
|
|
// ToAPI converts Rule into APIRule
|
|
ToAPI() APIRule
|
|
// Close performs the shutdown procedures for rule
|
|
// such as metrics unregister
|
|
Close()
|
|
}
|
|
|
|
var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels. See https://docs.victoriametrics.com/vmalert.html#series-with-the-same-labelset for details")
|
|
|
|
type ruleState struct {
|
|
sync.RWMutex
|
|
entries []ruleStateEntry
|
|
cur int
|
|
}
|
|
|
|
type ruleStateEntry struct {
|
|
// stores last moment of time rule.Exec was called
|
|
time time.Time
|
|
// stores the timestamp rule.Exec was called with
|
|
at time.Time
|
|
// stores the duration of the last rule.Exec call
|
|
duration time.Duration
|
|
// stores last error that happened in Exec func
|
|
// resets on every successful Exec
|
|
// may be used as Health ruleState
|
|
err error
|
|
// stores the number of samples returned during
|
|
// the last evaluation
|
|
samples int
|
|
// stores the number of time series fetched during
|
|
// the last evaluation.
|
|
// Is supported by VictoriaMetrics only, starting from v1.90.0
|
|
// If seriesFetched == nil, then this attribute was missing in
|
|
// datasource response (unsupported).
|
|
seriesFetched *int
|
|
// stores the curl command reflecting the HTTP request used during rule.Exec
|
|
curl string
|
|
}
|
|
|
|
func newRuleState(size int) *ruleState {
|
|
if size < 1 {
|
|
size = 1
|
|
}
|
|
return &ruleState{
|
|
entries: make([]ruleStateEntry, size),
|
|
}
|
|
}
|
|
|
|
func (s *ruleState) getLast() ruleStateEntry {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
return s.entries[s.cur]
|
|
}
|
|
|
|
func (s *ruleState) size() int {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
return len(s.entries)
|
|
}
|
|
|
|
func (s *ruleState) getAll() []ruleStateEntry {
|
|
entries := make([]ruleStateEntry, 0)
|
|
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
cur := s.cur
|
|
for {
|
|
e := s.entries[cur]
|
|
if !e.time.IsZero() || !e.at.IsZero() {
|
|
entries = append(entries, e)
|
|
}
|
|
cur--
|
|
if cur < 0 {
|
|
cur = cap(s.entries) - 1
|
|
}
|
|
if cur == s.cur {
|
|
return entries
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *ruleState) add(e ruleStateEntry) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
s.cur++
|
|
if s.cur > cap(s.entries)-1 {
|
|
s.cur = 0
|
|
}
|
|
s.entries[s.cur] = e
|
|
}
|