vmalert: avoid blocking APIs when alerting rule uses template functio… (#6129)

* vmalert: avoid blocking APIs when alerting rule uses template function `query`

* app/vmalert: small refactoring

* simplify labels and templates expanding
* simplify `newAlert` interface
* fix `TestGroupStart` which mistakenly skipped annotations
and response labels check

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* reduce alerts lock time when restore

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Hui Wang 2024-04-19 15:16:26 +08:00 committed by Aliaksandr Valialkin
parent 95b0f82c9b
commit e0d47ab6af
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
3 changed files with 97 additions and 66 deletions

View File

@ -314,23 +314,20 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([]
return nil, fmt.Errorf("`query` template isn't supported in replay mode") return nil, fmt.Errorf("`query` template isn't supported in replay mode")
} }
for _, s := range res.Data { for _, s := range res.Data {
ls, err := ar.toLabels(s, qFn) ls, as, err := ar.expandTemplates(s, qFn, time.Time{})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to expand labels: %s", err) return nil, fmt.Errorf("failed to expand templates: %s", err)
}
h := hash(ls.processed)
a, err := ar.newAlert(s, nil, time.Time{}, qFn) // initial alert
if err != nil {
return nil, fmt.Errorf("failed to create alert: %w", err)
} }
alertID := hash(ls.processed)
a := ar.newAlert(s, time.Time{}, ls.processed, as) // initial alert
prevT := time.Time{} prevT := time.Time{}
for i := range s.Values { for i := range s.Values {
at := time.Unix(s.Timestamps[i], 0) at := time.Unix(s.Timestamps[i], 0)
// try to restore alert's state on the first iteration // try to restore alert's state on the first iteration
if at.Equal(start) { if at.Equal(start) {
if _, ok := ar.alerts[h]; ok { if _, ok := ar.alerts[alertID]; ok {
a = ar.alerts[h] a = ar.alerts[alertID]
prevT = at prevT = at
} }
} }
@ -352,7 +349,7 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([]
// save alert's state on last iteration, so it can be used on the next execRange call // save alert's state on last iteration, so it can be used on the next execRange call
if at.Equal(end) { if at.Equal(end) {
holdAlertState[h] = a holdAlertState[alertID] = a
} }
} }
} }
@ -386,15 +383,34 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
} }
}() }()
ar.alertsMu.Lock()
defer ar.alertsMu.Unlock()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err) return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err)
} }
ar.logDebugf(ts, nil, "query returned %d samples (elapsed: %s)", curState.Samples, curState.Duration) ar.logDebugf(ts, nil, "query returned %d samples (elapsed: %s)", curState.Samples, curState.Duration)
qFn := func(query string) ([]datasource.Metric, error) {
res, _, err := ar.q.Query(ctx, query, ts)
return res.Data, err
}
// template labels and annotations before updating ar.alerts,
// since they could use `query` function which takes a while to execute,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079.
expandedLabels := make([]*labelSet, len(res.Data))
expandedAnnotations := make([]map[string]string, len(res.Data))
for i, m := range res.Data {
ls, as, err := ar.expandTemplates(m, qFn, ts)
if err != nil {
curState.Err = fmt.Errorf("failed to expand templates: %w", err)
return nil, curState.Err
}
expandedLabels[i] = ls
expandedAnnotations[i] = as
}
ar.alertsMu.Lock()
defer ar.alertsMu.Unlock()
for h, a := range ar.alerts { for h, a := range ar.alerts {
// cleanup inactive alerts from previous Exec // cleanup inactive alerts from previous Exec
if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention { if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention {
@ -403,26 +419,18 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
} }
} }
qFn := func(query string) ([]datasource.Metric, error) {
res, _, err := ar.q.Query(ctx, query, ts)
return res.Data, err
}
updated := make(map[uint64]struct{}) updated := make(map[uint64]struct{})
// update list of active alerts // update list of active alerts
for _, m := range res.Data { for i, m := range res.Data {
ls, err := ar.toLabels(m, qFn) labels, annotations := expandedLabels[i], expandedAnnotations[i]
if err != nil { alertID := hash(labels.processed)
curState.Err = fmt.Errorf("failed to expand labels: %w", err) if _, ok := updated[alertID]; ok {
return nil, curState.Err
}
h := hash(ls.processed)
if _, ok := updated[h]; ok {
// duplicate may be caused the removal of `__name__` label // duplicate may be caused the removal of `__name__` label
curState.Err = fmt.Errorf("labels %v: %w", ls.processed, errDuplicate) curState.Err = fmt.Errorf("labels %v: %w", labels.processed, errDuplicate)
return nil, curState.Err return nil, curState.Err
} }
updated[h] = struct{}{} updated[alertID] = struct{}{}
if a, ok := ar.alerts[h]; ok { if a, ok := ar.alerts[alertID]; ok {
if a.State == notifier.StateInactive { if a.State == notifier.StateInactive {
// alert could be in inactive state for resolvedRetention // alert could be in inactive state for resolvedRetention
// so when we again receive metrics for it - we switch it // so when we again receive metrics for it - we switch it
@ -432,22 +440,17 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
ar.logDebugf(ts, a, "INACTIVE => PENDING") ar.logDebugf(ts, a, "INACTIVE => PENDING")
} }
a.Value = m.Values[0] a.Value = m.Values[0]
// re-exec template since Value or query can be used in annotations a.Annotations = annotations
a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations)
if err != nil { if err != nil {
return nil, err return nil, err
} }
a.KeepFiringSince = time.Time{} a.KeepFiringSince = time.Time{}
continue continue
} }
a, err := ar.newAlert(m, ls, ts, qFn) a := ar.newAlert(m, ts, labels.processed, annotations)
if err != nil { a.ID = alertID
curState.Err = fmt.Errorf("failed to create alert: %w", err)
return nil, curState.Err
}
a.ID = h
a.State = notifier.StatePending a.State = notifier.StatePending
ar.alerts[h] = a ar.alerts[alertID] = a
ar.logDebugf(ts, a, "created in state PENDING") ar.logDebugf(ts, a, "created in state PENDING")
} }
var numActivePending int var numActivePending int
@ -497,6 +500,28 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
return ar.toTimeSeries(ts.Unix()), nil return ar.toTimeSeries(ts.Unix()), nil
} }
func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.QueryFn, ts time.Time) (*labelSet, map[string]string, error) {
ls, err := ar.toLabels(m, qFn)
if err != nil {
return nil, nil, fmt.Errorf("failed to expand labels: %w", err)
}
tplData := notifier.AlertTplData{
Value: m.Values[0],
Labels: ls.origin,
Expr: ar.Expr,
AlertID: hash(ls.processed),
GroupID: ar.GroupID,
ActiveAt: ts,
For: ar.For,
}
as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData)
if err != nil {
return nil, nil, fmt.Errorf("failed to template annotations: %w", err)
}
return ls, as, nil
}
func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries { func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries var tss []prompbmarshal.TimeSeries
for _, a := range ar.alerts { for _, a := range ar.alerts {
@ -530,25 +555,25 @@ func hash(labels map[string]string) uint64 {
return hash.Sum64() return hash.Sum64()
} }
func (ar *AlertingRule) newAlert(m datasource.Metric, ls *labelSet, start time.Time, qFn templates.QueryFn) (*notifier.Alert, error) { func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, labels, annotations map[string]string) *notifier.Alert {
var err error as := make(map[string]string)
if ls == nil { if annotations != nil {
ls, err = ar.toLabels(m, qFn) as = annotations
if err != nil {
return nil, fmt.Errorf("failed to expand labels: %w", err)
}
} }
a := &notifier.Alert{ ls := make(map[string]string)
GroupID: ar.GroupID, if labels != nil {
Name: ar.Name, ls = labels
Labels: ls.processed, }
Value: m.Values[0], return &notifier.Alert{
ActiveAt: start, GroupID: ar.GroupID,
Expr: ar.Expr, Name: ar.Name,
For: ar.For, Expr: ar.Expr,
For: ar.For,
ActiveAt: start,
Value: m.Values[0],
Labels: ls,
Annotations: as,
} }
a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations)
return a, err
} }
const ( const (
@ -604,9 +629,6 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti
return nil return nil
} }
ar.alertsMu.Lock()
defer ar.alertsMu.Unlock()
if len(ar.alerts) < 1 { if len(ar.alerts) < 1 {
return nil return nil
} }
@ -631,6 +653,10 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti
ar.logDebugf(ts, nil, "no response was received from restore query") ar.logDebugf(ts, nil, "no response was received from restore query")
return nil return nil
} }
ar.alertsMu.Lock()
defer ar.alertsMu.Unlock()
for _, series := range res.Data { for _, series := range res.Data {
series.DelLabel("__name__") series.DelLabel("__name__")
labelSet := make(map[string]string, len(series.Labels)) labelSet := make(map[string]string, len(series.Labels))

View File

@ -223,13 +223,15 @@ func TestGroupStart(t *testing.T) {
m2 := metricWithLabels(t, "instance", inst2, "job", job) m2 := metricWithLabels(t, "instance", inst2, "job", job)
r := g.Rules[0].(*AlertingRule) r := g.Rules[0].(*AlertingRule)
alert1, err := r.newAlert(m1, nil, time.Now(), nil) alert1 := r.newAlert(m1, time.Now(), nil, nil)
if err != nil {
t.Fatalf("faield to create alert: %s", err)
}
alert1.State = notifier.StateFiring alert1.State = notifier.StateFiring
// add annotations
alert1.Annotations["summary"] = "1"
// add external label // add external label
alert1.Labels["cluster"] = "east-1" alert1.Labels["cluster"] = "east-1"
// add labels from response
alert1.Labels["job"] = job
alert1.Labels["instance"] = inst1
// add rule labels // add rule labels
alert1.Labels["label"] = "bar" alert1.Labels["label"] = "bar"
alert1.Labels["host"] = inst1 alert1.Labels["host"] = inst1
@ -238,13 +240,15 @@ func TestGroupStart(t *testing.T) {
alert1.Labels[alertGroupNameLabel] = g.Name alert1.Labels[alertGroupNameLabel] = g.Name
alert1.ID = hash(alert1.Labels) alert1.ID = hash(alert1.Labels)
alert2, err := r.newAlert(m2, nil, time.Now(), nil) alert2 := r.newAlert(m2, time.Now(), nil, nil)
if err != nil {
t.Fatalf("faield to create alert: %s", err)
}
alert2.State = notifier.StateFiring alert2.State = notifier.StateFiring
// add annotations
alert2.Annotations["summary"] = "1"
// add external label // add external label
alert2.Labels["cluster"] = "east-1" alert2.Labels["cluster"] = "east-1"
// add labels from response
alert2.Labels["job"] = job
alert2.Labels["instance"] = inst2
// add rule labels // add rule labels
alert2.Labels["label"] = "bar" alert2.Labels["label"] = "bar"
alert2.Labels["host"] = inst2 alert2.Labels["host"] = inst2

View File

@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): in the Select component, user-entered values are now preserved on blur if they match options in the list. * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): in the Select component, user-entered values are now preserved on blur if they match options in the list.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): supported any status codes from the range 200-299 from alertmanager. Previously, only 200 status code considered a successful action. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6110). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): supported any status codes from the range 200-299 from alertmanager. Previously, only 200 status code considered a successful action. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6110).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): avoid blocking `/api/v1/rules`, `/api/v1/alerts`, `/metrics` APIs when alerting rule uses template functions `query`, which could takes a while to execute. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): don't treat concurrency limit hit as an error of the backend. Previously, hitting the concurrency limit would increment both `vmauth_concurrent_requests_limit_reached_total` and `vmauth_user_request_backend_errors_total` counters. Now, only concurrency limit counter is incremented. Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5565). * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): don't treat concurrency limit hit as an error of the backend. Previously, hitting the concurrency limit would increment both `vmauth_concurrent_requests_limit_reached_total` and `vmauth_user_request_backend_errors_total` counters. Now, only concurrency limit counter is incremented. Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5565).
## [v1.100.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.100.1) ## [v1.100.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.100.1)