mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-23 08:41:15 +01:00
d71b6e6584
The feature allows to speed up group rules execution by executing them concurrently. Change also contains README changes to reflect configuration details.
136 lines
3.2 KiB
Go
136 lines
3.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// manager controls group states
|
|
type manager struct {
|
|
storage datasource.Querier
|
|
notifier notifier.Notifier
|
|
|
|
rw *remotewrite.Client
|
|
rr datasource.Querier
|
|
|
|
wg sync.WaitGroup
|
|
|
|
groupsMu sync.RWMutex
|
|
groups map[uint64]*Group
|
|
}
|
|
|
|
// AlertAPI generates APIAlert object from alert by its ID(hash)
|
|
func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
|
|
m.groupsMu.RLock()
|
|
defer m.groupsMu.RUnlock()
|
|
|
|
g, ok := m.groups[gID]
|
|
if !ok {
|
|
return nil, fmt.Errorf("can't find group with id %q", gID)
|
|
}
|
|
for _, rule := range g.Rules {
|
|
ar, ok := rule.(*AlertingRule)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if apiAlert := ar.AlertAPI(aID); apiAlert != nil {
|
|
return apiAlert, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name)
|
|
}
|
|
|
|
func (m *manager) start(ctx context.Context, path []string, validateTpl, validateExpr bool) error {
|
|
return m.update(ctx, path, validateTpl, validateExpr, true)
|
|
}
|
|
|
|
func (m *manager) close() {
|
|
if m.rw != nil {
|
|
err := m.rw.Close()
|
|
if err != nil {
|
|
logger.Fatalf("cannot stop the remotewrite: %s", err)
|
|
}
|
|
}
|
|
m.wg.Wait()
|
|
}
|
|
|
|
func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) {
|
|
if restore && m.rr != nil {
|
|
err := group.Restore(ctx, m.rr, *remoteReadLookBack)
|
|
if err != nil {
|
|
logger.Errorf("error while restoring state for group %q: %s", group.Name, err)
|
|
}
|
|
}
|
|
|
|
m.wg.Add(1)
|
|
id := group.ID()
|
|
go func() {
|
|
group.start(ctx, m.storage, m.notifier, m.rw)
|
|
m.wg.Done()
|
|
}()
|
|
m.groups[id] = group
|
|
}
|
|
|
|
func (m *manager) update(ctx context.Context, path []string, validateTpl, validateExpr, restore bool) error {
|
|
logger.Infof("reading rules configuration file from %q", strings.Join(path, ";"))
|
|
groupsCfg, err := config.Parse(path, validateTpl, validateExpr)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse configuration file: %s", err)
|
|
}
|
|
|
|
groupsRegistry := make(map[uint64]*Group)
|
|
for _, cfg := range groupsCfg {
|
|
ng := newGroup(cfg, *evaluationInterval)
|
|
groupsRegistry[ng.ID()] = ng
|
|
}
|
|
|
|
m.groupsMu.Lock()
|
|
for _, og := range m.groups {
|
|
ng, ok := groupsRegistry[og.ID()]
|
|
if !ok {
|
|
// old group is not present in new list
|
|
// and must be stopped and deleted
|
|
og.close()
|
|
delete(m.groups, og.ID())
|
|
og = nil
|
|
continue
|
|
}
|
|
og.updateCh <- ng
|
|
delete(groupsRegistry, ng.ID())
|
|
}
|
|
|
|
for _, ng := range groupsRegistry {
|
|
m.startGroup(ctx, ng, restore)
|
|
}
|
|
m.groupsMu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (g *Group) toAPI() APIGroup {
|
|
ag := APIGroup{
|
|
// encode as strings to avoid rounding
|
|
ID: fmt.Sprintf("%d", g.ID()),
|
|
Name: g.Name,
|
|
File: g.File,
|
|
Interval: g.Interval.String(),
|
|
Concurrency: g.Concurrency,
|
|
}
|
|
for _, r := range g.Rules {
|
|
switch v := r.(type) {
|
|
case *AlertingRule:
|
|
ag.AlertingRules = append(ag.AlertingRules, v.RuleAPI())
|
|
case *RecordingRule:
|
|
ag.RecordingRules = append(ag.RecordingRules, v.RuleAPI())
|
|
}
|
|
}
|
|
return ag
|
|
}
|