mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-02 01:00:07 +01:00
877940a131
vmalert: add experimental feature of storing Rule's evaluation state The new feature keeps last 20 state changes of each Rule in memory. The state are available for view on the Rule's view page. The page can be opened by clicking on `Details` link next to Rule's name on the `/groups` page. States change suppose to help in investigating cases when Rule doesn't generate alerts or records. Signed-off-by: hagen1778 <roman@victoriametrics.com>
239 lines
5.3 KiB
Go
239 lines
5.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// manager controls group states
|
|
type manager struct {
|
|
querierBuilder datasource.QuerierBuilder
|
|
notifiers func() []notifier.Notifier
|
|
|
|
rw *remotewrite.Client
|
|
// remote read builder.
|
|
rr datasource.QuerierBuilder
|
|
|
|
wg sync.WaitGroup
|
|
labels map[string]string
|
|
|
|
groupsMu sync.RWMutex
|
|
groups map[uint64]*Group
|
|
}
|
|
|
|
// RuleAPI generates APIRule object from alert by its ID(hash)
|
|
func (m *manager) RuleAPI(gID, rID uint64) (APIRule, error) {
|
|
m.groupsMu.RLock()
|
|
defer m.groupsMu.RUnlock()
|
|
|
|
g, ok := m.groups[gID]
|
|
if !ok {
|
|
return APIRule{}, fmt.Errorf("can't find group with id %d", gID)
|
|
}
|
|
for _, rule := range g.Rules {
|
|
if rule.ID() == rID {
|
|
return rule.ToAPI(), nil
|
|
}
|
|
}
|
|
return APIRule{}, fmt.Errorf("can't find rule with id %d in group %q", rID, g.Name)
|
|
}
|
|
|
|
// AlertAPI generates APIAlert object from alert by its ID(hash)
|
|
func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
|
|
m.groupsMu.RLock()
|
|
defer m.groupsMu.RUnlock()
|
|
|
|
g, ok := m.groups[gID]
|
|
if !ok {
|
|
return nil, fmt.Errorf("can't find group with id %d", gID)
|
|
}
|
|
for _, rule := range g.Rules {
|
|
ar, ok := rule.(*AlertingRule)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if apiAlert := ar.AlertAPI(aID); apiAlert != nil {
|
|
return apiAlert, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("can't find alert with id %d in group %q", aID, g.Name)
|
|
}
|
|
|
|
func (m *manager) start(ctx context.Context, groupsCfg []config.Group) error {
|
|
return m.update(ctx, groupsCfg, true)
|
|
}
|
|
|
|
func (m *manager) close() {
|
|
if m.rw != nil {
|
|
err := m.rw.Close()
|
|
if err != nil {
|
|
logger.Fatalf("cannot stop the remotewrite: %s", err)
|
|
}
|
|
}
|
|
m.wg.Wait()
|
|
}
|
|
|
|
func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) error {
|
|
if restore && m.rr != nil {
|
|
err := group.Restore(ctx, m.rr, *remoteReadLookBack, m.labels)
|
|
if err != nil {
|
|
if !*remoteReadIgnoreRestoreErrors {
|
|
return fmt.Errorf("failed to restore ruleState for group %q: %w", group.Name, err)
|
|
}
|
|
logger.Errorf("error while restoring ruleState for group %q: %s", group.Name, err)
|
|
}
|
|
}
|
|
|
|
m.wg.Add(1)
|
|
id := group.ID()
|
|
go func() {
|
|
group.start(ctx, m.notifiers, m.rw)
|
|
m.wg.Done()
|
|
}()
|
|
m.groups[id] = group
|
|
return nil
|
|
}
|
|
|
|
func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore bool) error {
|
|
var rrPresent, arPresent bool
|
|
groupsRegistry := make(map[uint64]*Group)
|
|
for _, cfg := range groupsCfg {
|
|
for _, r := range cfg.Rules {
|
|
if rrPresent && arPresent {
|
|
continue
|
|
}
|
|
if r.Record != "" {
|
|
rrPresent = true
|
|
}
|
|
if r.Alert != "" {
|
|
arPresent = true
|
|
}
|
|
}
|
|
ng := newGroup(cfg, m.querierBuilder, *evaluationInterval, m.labels)
|
|
groupsRegistry[ng.ID()] = ng
|
|
}
|
|
|
|
if rrPresent && m.rw == nil {
|
|
return fmt.Errorf("config contains recording rules but `-remoteWrite.url` isn't set")
|
|
}
|
|
if arPresent && m.notifiers == nil {
|
|
return fmt.Errorf("config contains alerting rules but neither `-notifier.url` nor `-notifier.config` aren't set")
|
|
}
|
|
|
|
type updateItem struct {
|
|
old *Group
|
|
new *Group
|
|
}
|
|
var toUpdate []updateItem
|
|
|
|
m.groupsMu.Lock()
|
|
for _, og := range m.groups {
|
|
ng, ok := groupsRegistry[og.ID()]
|
|
if !ok {
|
|
// old group is not present in new list,
|
|
// so must be stopped and deleted
|
|
og.close()
|
|
delete(m.groups, og.ID())
|
|
og = nil
|
|
continue
|
|
}
|
|
delete(groupsRegistry, ng.ID())
|
|
if og.Checksum != ng.Checksum {
|
|
toUpdate = append(toUpdate, updateItem{old: og, new: ng})
|
|
}
|
|
}
|
|
for _, ng := range groupsRegistry {
|
|
if err := m.startGroup(ctx, ng, restore); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
m.groupsMu.Unlock()
|
|
|
|
if len(toUpdate) > 0 {
|
|
var wg sync.WaitGroup
|
|
for _, item := range toUpdate {
|
|
wg.Add(1)
|
|
go func(old *Group, new *Group) {
|
|
old.updateCh <- new
|
|
wg.Done()
|
|
}(item.old, item.new)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (g *Group) toAPI() APIGroup {
|
|
g.mu.RLock()
|
|
defer g.mu.RUnlock()
|
|
|
|
ag := APIGroup{
|
|
// encode as string to avoid rounding
|
|
ID: fmt.Sprintf("%d", g.ID()),
|
|
|
|
Name: g.Name,
|
|
Type: g.Type.String(),
|
|
File: g.File,
|
|
Interval: g.Interval.Seconds(),
|
|
LastEvaluation: g.LastEvaluation,
|
|
Concurrency: g.Concurrency,
|
|
Params: urlValuesToStrings(g.Params),
|
|
Headers: headersToStrings(g.Headers),
|
|
Labels: g.Labels,
|
|
}
|
|
for _, r := range g.Rules {
|
|
ag.Rules = append(ag.Rules, r.ToAPI())
|
|
}
|
|
return ag
|
|
}
|
|
|
|
func urlValuesToStrings(values url.Values) []string {
|
|
if len(values) < 1 {
|
|
return nil
|
|
}
|
|
|
|
keys := make([]string, 0, len(values))
|
|
for k := range values {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
|
|
var res []string
|
|
for _, k := range keys {
|
|
params := values[k]
|
|
for _, v := range params {
|
|
res = append(res, fmt.Sprintf("%s=%s", k, v))
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
func headersToStrings(headers map[string]string) []string {
|
|
if len(headers) < 1 {
|
|
return nil
|
|
}
|
|
|
|
keys := make([]string, 0, len(headers))
|
|
for k := range headers {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
|
|
var res []string
|
|
for _, k := range keys {
|
|
v := headers[k]
|
|
res = append(res, fmt.Sprintf("%s: %s", k, v))
|
|
}
|
|
|
|
return res
|
|
}
|