mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-11 20:52:24 +01:00
6588fcbfca
Allow configuring the default number of stored rule's update states in memory via global `-rule.updateEntriesLimit` command-line flag or per-rule via rule's `update_entries_limit` configuration param. Signed-off-by: hagen1778 <roman@victoriametrics.com>
127 lines
2.7 KiB
Go
127 lines
2.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
)
|
|
|
|
// Rule represents alerting or recording rule
|
|
// that has unique ID, can be Executed and
|
|
// updated with other Rule.
|
|
type Rule interface {
|
|
// ID returns unique ID that may be used for
|
|
// identifying this Rule among others.
|
|
ID() uint64
|
|
// Exec executes the rule with given context at the given timestamp and limit.
|
|
// returns an err if number of resulting time series exceeds the limit.
|
|
Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error)
|
|
// ExecRange executes the rule on the given time range.
|
|
ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error)
|
|
// UpdateWith performs modification of current Rule
|
|
// with fields of the given Rule.
|
|
UpdateWith(Rule) error
|
|
// ToAPI converts Rule into APIRule
|
|
ToAPI() APIRule
|
|
// Close performs the shutdown procedures for rule
|
|
// such as metrics unregister
|
|
Close()
|
|
}
|
|
|
|
var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels")
|
|
|
|
type ruleState struct {
|
|
sync.RWMutex
|
|
entries []ruleStateEntry
|
|
cur int
|
|
// disabled defines whether ruleState tracks ruleStateEntry
|
|
disabled bool
|
|
}
|
|
|
|
type ruleStateEntry struct {
|
|
// stores last moment of time rule.Exec was called
|
|
time time.Time
|
|
// stores the timesteamp with which rule.Exec was called
|
|
at time.Time
|
|
// stores the duration of the last rule.Exec call
|
|
duration time.Duration
|
|
// stores last error that happened in Exec func
|
|
// resets on every successful Exec
|
|
// may be used as Health ruleState
|
|
err error
|
|
// stores the number of samples returned during
|
|
// the last evaluation
|
|
samples int
|
|
// stores the curl command reflecting the HTTP request used during rule.Exec
|
|
curl string
|
|
}
|
|
|
|
func newRuleState(size int) *ruleState {
|
|
if size < 1 {
|
|
return &ruleState{disabled: true}
|
|
}
|
|
return &ruleState{
|
|
entries: make([]ruleStateEntry, size),
|
|
}
|
|
}
|
|
|
|
func (s *ruleState) getLast() ruleStateEntry {
|
|
if s.disabled {
|
|
return ruleStateEntry{}
|
|
}
|
|
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
return s.entries[s.cur]
|
|
}
|
|
|
|
func (s *ruleState) size() int {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
return len(s.entries)
|
|
}
|
|
|
|
func (s *ruleState) getAll() []ruleStateEntry {
|
|
if s.disabled {
|
|
return nil
|
|
}
|
|
|
|
entries := make([]ruleStateEntry, 0)
|
|
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
cur := s.cur
|
|
for {
|
|
e := s.entries[cur]
|
|
if !e.time.IsZero() || !e.at.IsZero() {
|
|
entries = append(entries, e)
|
|
}
|
|
cur--
|
|
if cur < 0 {
|
|
cur = cap(s.entries) - 1
|
|
}
|
|
if cur == s.cur {
|
|
return entries
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *ruleState) add(e ruleStateEntry) {
|
|
if s.disabled {
|
|
return
|
|
}
|
|
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
s.cur++
|
|
if s.cur > cap(s.entries)-1 {
|
|
s.cur = 0
|
|
}
|
|
s.entries[s.cur] = e
|
|
}
|