Vmalert compliance 2 (#2340)

* vmalert: split alert's `Start` field into `ActiveAt` and `Start`

The `ActiveAt` field identifies when alert becomes active for rules
with `for > 0`. Previously, this value was stored in field `Start`.

The field `Start` now identifies the moment alert became `FIRING`.

The split is needed in order to distinguish these two moments
in the API responses for alerts.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: support specific moment of time for rules evaluation

The Querier interface was extended to accept a new argument
used as a timestamp at which evaluation should be made.

It is needed to align rules execution time within the group.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: mark disappeared series as stale

Series generated by alerting rules, which were sent to remote write
now will be marked as stale if they will disappear on the next
evaluation. This would make ALERTS and ALERTS_FOR_TIME series
more precise.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* wip

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: evaluate rules at fixed timestamp

Before, time at which rules were evaluated was calculated
right before rule execution. The change makes sure
that timestamp is calculated only once per evalution round
and all rules are using the same timestamp.

It also updates the logic of resending of already resolved
alert notification.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: allow overridin `alertname` label value if it is present in response

Previously, `alertname` was always equal to the Alerting Rule name. Now,
its value can be overriden if series in response containt the different value
for this label.

The change is needed for improving compatibility with Prometheus.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: align rules evaluation in time

Now, evaluation timestamp for rules evaluates as if
there was no delay in rules evaluation. It means, that
rules will be evaluated at fixed timestamps+group_interval.
This way provides more consistent evaluation results and
improves compatibility with Prometheus,

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: add metric for missed iterations

New metric `vmalert_iteration_missed_total` will show
whether rules evaluation round was missed.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: reduce delay before the initial rule evaluation in group

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: rollback alertname override

According to the spec:
```
The alert name from the alerting rule (HighRequestLatency from the example above) MUST be added to the labels of the alert with the label name as alertname. It MUST override any existing alertname label.
```

https://github.com/prometheus/compliance/blob/main/alert_generator/specification.md#step-3
Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: throw err immediately on dedup detection

```
The execution of an alerting rule MUST error out immediately and MUST NOT send any alerts
or add samples to samples receiver if there is more than one alert with the same labels
```

https://github.com/prometheus/compliance/blob/main/alert_generator/specification.md#step-4
Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: cleanup

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: use strings builder to reduce allocs

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2022-03-29 15:09:07 +02:00 committed by Aliaksandr Valialkin
parent 26cc40ab00
commit ab10178c85
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
15 changed files with 388 additions and 142 deletions

View File

@ -191,9 +191,10 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
if at.Sub(prevT) > ar.EvalInterval {
// reset to Pending if there are gaps > EvalInterval between DPs
a.State = notifier.StatePending
a.Start = at
} else if at.Sub(a.Start) >= ar.For {
a.ActiveAt = at
} else if at.Sub(a.ActiveAt) >= ar.For {
a.State = notifier.StateFiring
a.Start = at
}
prevT = at
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
@ -202,11 +203,15 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
return result, nil
}
// resolvedRetention is the duration for which a resolved alert instance
// is kept in memory state and consequently repeatedly sent to the AlertManager.
const resolvedRetention = 15 * time.Minute
// Exec executes AlertingRule expression via the given Querier.
// Based on the Querier results AlertingRule maintains notifier.Alerts
func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) {
func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) {
start := time.Now()
qMetrics, err := ar.q.Query(ctx, ar.Expr)
qMetrics, err := ar.q.Query(ctx, ar.Expr, ts)
ar.mu.Lock()
defer ar.mu.Unlock()
@ -220,12 +225,12 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e
for h, a := range ar.alerts {
// cleanup inactive alerts from previous Exec
if a.State == notifier.StateInactive {
if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention {
delete(ar.alerts, h)
}
}
qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) }
qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query, ts) }
updated := make(map[uint64]struct{})
// update list of active alerts
for _, m := range qMetrics {
@ -250,10 +255,18 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e
if _, ok := updated[h]; ok {
// duplicate may be caused by extra labels
// conflicting with the metric labels
return nil, fmt.Errorf("labels %v: %w", m.Labels, errDuplicate)
ar.lastExecError = fmt.Errorf("labels %v: %w", m.Labels, errDuplicate)
return nil, ar.lastExecError
}
updated[h] = struct{}{}
if a, ok := ar.alerts[h]; ok {
if a.State == notifier.StateInactive {
// alert could be in inactive state for resolvedRetention
// so when we again receive metrics for it - we switch it
// back to notifier.StatePending
a.State = notifier.StatePending
a.ActiveAt = ts
}
if a.Value != m.Values[0] {
// update Value field with latest value
a.Value = m.Values[0]
@ -273,6 +286,7 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e
}
a.ID = h
a.State = notifier.StatePending
a.ActiveAt = ts
ar.alerts[h] = a
}
@ -286,15 +300,19 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e
delete(ar.alerts, h)
continue
}
a.State = notifier.StateInactive
if a.State == notifier.StateFiring {
a.State = notifier.StateInactive
a.ResolvedAt = ts
}
continue
}
if a.State == notifier.StatePending && time.Since(a.Start) >= ar.For {
if a.State == notifier.StatePending && time.Since(a.ActiveAt) >= ar.For {
a.State = notifier.StateFiring
a.Start = ts
alertsFired.Inc()
}
}
return ar.toTimeSeries(ar.lastExecTime.Unix()), nil
return ar.toTimeSeries(ts.Unix()), nil
}
func expandLabels(m datasource.Metric, q notifier.QueryFn, ar *AlertingRule) (map[string]string, error) {
@ -360,12 +378,12 @@ func hash(m datasource.Metric) uint64 {
func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) {
a := &notifier.Alert{
GroupID: ar.GroupID,
Name: ar.Name,
Labels: map[string]string{},
Value: m.Values[0],
Start: start,
Expr: ar.Expr,
GroupID: ar.GroupID,
Name: ar.Name,
Labels: map[string]string{},
Value: m.Values[0],
ActiveAt: start,
Expr: ar.Expr,
}
for _, l := range m.Labels {
// drop __name__ to be consistent with Prometheus alerting
@ -435,6 +453,9 @@ func (ar *AlertingRule) AlertsToAPI() []*APIAlert {
var alerts []*APIAlert
ar.mu.RLock()
for _, a := range ar.alerts {
if a.State == notifier.StateInactive {
continue
}
alerts = append(alerts, ar.newAlertAPI(*a))
}
ar.mu.RUnlock()
@ -453,7 +474,7 @@ func (ar *AlertingRule) newAlertAPI(a notifier.Alert) *APIAlert {
Labels: a.Labels,
Annotations: a.Annotations,
State: a.State.String(),
ActiveAt: a.Start,
ActiveAt: a.ActiveAt,
Restored: a.Restored,
Value: strconv.FormatFloat(a.Value, 'f', -1, 32),
}
@ -479,7 +500,7 @@ const (
alertGroupNameLabel = "alertgroup"
)
// alertToTimeSeries converts the given alert with the given timestamp to timeseries
// alertToTimeSeries converts the given alert with the given timestamp to time series
func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp int64) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries
tss = append(tss, alertToTimeSeries(a, timestamp))
@ -507,11 +528,11 @@ func alertForToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.Time
labels[k] = v
}
labels["__name__"] = alertForStateMetricName
return newTimeSeries([]float64{float64(a.Start.Unix())}, []int64{timestamp}, labels)
return newTimeSeries([]float64{float64(a.ActiveAt.Unix())}, []int64{timestamp}, labels)
}
// Restore restores the state of active alerts basing on previously written time series.
// Restore restores only Start field. Field State will be always Pending and supposed
// Restore restores only ActiveAt field. Field State will be always Pending and supposed
// to be updated on next Exec, as well as Value field.
// Only rules with For > 0 will be restored.
func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration, labels map[string]string) error {
@ -519,7 +540,8 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
return fmt.Errorf("querier is nil")
}
qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) }
ts := time.Now()
qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query, ts) }
// account for external labels in filter
var labelsFilter string
@ -532,7 +554,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
// remote write protocol which is used for state persistence in vmalert.
expr := fmt.Sprintf("last_over_time(%s{alertname=%q%s}[%ds])",
alertForStateMetricName, ar.Name, labelsFilter, int(lookback.Seconds()))
qMetrics, err := q.Query(ctx, expr)
qMetrics, err := q.Query(ctx, expr, ts)
if err != nil {
return err
}
@ -555,21 +577,27 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
// and returns only those which should be sent to notifier.
// Isn't concurrent safe.
func (ar *AlertingRule) alertsToSend(ts time.Time, resolveDuration, resendDelay time.Duration) []notifier.Alert {
needsSending := func(a *notifier.Alert) bool {
if a.State == notifier.StatePending {
return false
}
if a.ResolvedAt.After(a.LastSent) {
return true
}
return a.LastSent.Add(resendDelay).Before(ts)
}
var alerts []notifier.Alert
for _, a := range ar.alerts {
switch a.State {
case notifier.StateFiring:
if time.Since(a.LastSent) < resendDelay {
continue
}
a.End = ts.Add(resolveDuration)
a.LastSent = ts
alerts = append(alerts, *a)
case notifier.StateInactive:
a.End = ts
a.LastSent = ts
alerts = append(alerts, *a)
if !needsSending(a) {
continue
}
a.End = ts.Add(resolveDuration)
if a.State == notifier.StateInactive {
a.End = a.ResolvedAt
}
a.LastSent = ts
alerts = append(alerts, *a)
}
return alerts
}

View File

@ -61,7 +61,7 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
},
{
newTestAlertingRule("for", time.Second),
&notifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)},
&notifier.Alert{State: notifier.StateFiring, ActiveAt: timestamp.Add(time.Second)},
[]prompbmarshal.TimeSeries{
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
"__name__": alertMetricName,
@ -76,7 +76,7 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
},
{
newTestAlertingRule("for pending", 10*time.Second),
&notifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)},
&notifier.Alert{State: notifier.StatePending, ActiveAt: timestamp.Add(time.Second)},
[]prompbmarshal.TimeSeries{
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
"__name__": alertMetricName,
@ -169,7 +169,7 @@ func TestAlertingRule_Exec(t *testing.T) {
},
},
{
newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty", 0),
newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>inactive", 0),
[][]datasource.Metric{
{metricWithLabels(t, "name", "foo")},
{},
@ -177,7 +177,9 @@ func TestAlertingRule_Exec(t *testing.T) {
{},
{},
},
nil,
[]testAlert{
{labels: []string{"name", "foo"}, alert: &notifier.Alert{State: notifier.StateInactive}},
},
},
{
newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0),
@ -217,8 +219,9 @@ func TestAlertingRule_Exec(t *testing.T) {
},
// 1: fire first alert
// 2: fire second alert, set first inactive
// 3: fire third alert, set second inactive, delete first one
// 3: fire third alert, set second inactive
[]testAlert{
{labels: []string{"name", "foo"}, alert: &notifier.Alert{State: notifier.StateInactive}},
{labels: []string{"name", "foo1"}, alert: &notifier.Alert{State: notifier.StateInactive}},
{labels: []string{"name", "foo2"}, alert: &notifier.Alert{State: notifier.StateFiring}},
},
@ -301,7 +304,7 @@ func TestAlertingRule_Exec(t *testing.T) {
for _, step := range tc.steps {
fq.reset()
fq.add(step...)
if _, err := tc.rule.Exec(context.TODO()); err != nil {
if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil {
t.Fatalf("unexpected err: %s", err)
}
// artificial delay between applying steps
@ -380,9 +383,9 @@ func TestAlertingRule_ExecRange(t *testing.T) {
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
},
[]*notifier.Alert{
{State: notifier.StatePending, Start: time.Unix(1, 0)},
{State: notifier.StatePending, Start: time.Unix(3, 0)},
{State: notifier.StatePending, Start: time.Unix(5, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(3, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(5, 0)},
},
},
{
@ -391,9 +394,9 @@ func TestAlertingRule_ExecRange(t *testing.T) {
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
},
[]*notifier.Alert{
{State: notifier.StatePending, Start: time.Unix(1, 0)},
{State: notifier.StatePending, Start: time.Unix(1, 0)},
{State: notifier.StateFiring, Start: time.Unix(1, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)},
},
},
{
@ -402,11 +405,11 @@ func TestAlertingRule_ExecRange(t *testing.T) {
{Values: []float64{1, 1, 1, 1, 1}, Timestamps: []int64{1, 2, 5, 6, 20}},
},
[]*notifier.Alert{
{State: notifier.StatePending, Start: time.Unix(1, 0)},
{State: notifier.StateFiring, Start: time.Unix(1, 0)},
{State: notifier.StatePending, Start: time.Unix(5, 0)},
{State: notifier.StateFiring, Start: time.Unix(5, 0)},
{State: notifier.StatePending, Start: time.Unix(20, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(5, 0)},
{State: notifier.StateFiring, ActiveAt: time.Unix(5, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(20, 0)},
},
},
{
@ -418,15 +421,15 @@ func TestAlertingRule_ExecRange(t *testing.T) {
},
},
[]*notifier.Alert{
{State: notifier.StatePending, Start: time.Unix(1, 0)},
{State: notifier.StatePending, Start: time.Unix(1, 0)},
{State: notifier.StateFiring, Start: time.Unix(1, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)},
//
{State: notifier.StatePending, Start: time.Unix(1, 0),
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0),
Labels: map[string]string{
"foo": "bar",
}},
{State: notifier.StatePending, Start: time.Unix(5, 0),
{State: notifier.StatePending, ActiveAt: time.Unix(5, 0),
Labels: map[string]string{
"foo": "bar",
}},
@ -479,7 +482,7 @@ func TestAlertingRule_ExecRange(t *testing.T) {
a.Labels = make(map[string]string)
}
a.Labels[alertNameLabel] = tc.rule.Name
expTS = append(expTS, tc.rule.alertToTimeSeries(tc.expAlerts[j], timestamp)...)
expTS = append(expTS, tc.rule.alertToTimeSeries(a, timestamp)...)
j++
}
}
@ -511,7 +514,7 @@ func TestAlertingRule_Restore(t *testing.T) {
},
map[uint64]*notifier.Alert{
hash(datasource.Metric{}): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)},
ActiveAt: time.Now().Truncate(time.Hour)},
},
},
{
@ -532,7 +535,7 @@ func TestAlertingRule_Restore(t *testing.T) {
"foo", "bar",
"namespace", "baz",
)): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)},
ActiveAt: time.Now().Truncate(time.Hour)},
},
},
{
@ -552,7 +555,7 @@ func TestAlertingRule_Restore(t *testing.T) {
"namespace", "baz",
"source", "vm",
)): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)},
ActiveAt: time.Now().Truncate(time.Hour)},
},
},
{
@ -573,11 +576,11 @@ func TestAlertingRule_Restore(t *testing.T) {
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "host", "localhost-1")): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)},
ActiveAt: time.Now().Truncate(time.Hour)},
hash(metricWithLabels(t, "host", "localhost-2")): {State: notifier.StatePending,
Start: time.Now().Truncate(2 * time.Hour)},
ActiveAt: time.Now().Truncate(2 * time.Hour)},
hash(metricWithLabels(t, "host", "localhost-3")): {State: notifier.StatePending,
Start: time.Now().Truncate(3 * time.Hour)},
ActiveAt: time.Now().Truncate(3 * time.Hour)},
},
},
}
@ -602,8 +605,8 @@ func TestAlertingRule_Restore(t *testing.T) {
if got.State != exp.State {
t.Fatalf("expected state %d; got %d", exp.State, got.State)
}
if got.Start != exp.Start {
t.Fatalf("expected Start %v; got %v", exp.Start, got.Start)
if got.ActiveAt != exp.ActiveAt {
t.Fatalf("expected ActiveAt %v; got %v", exp.ActiveAt, got.ActiveAt)
}
}
})
@ -618,14 +621,14 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
// successful attempt
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
_, err := ar.Exec(context.TODO())
_, err := ar.Exec(context.TODO(), time.Now())
if err != nil {
t.Fatal(err)
}
// label `job` will collide with rule extra label and will make both time series equal
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz"))
_, err = ar.Exec(context.TODO())
_, err = ar.Exec(context.TODO(), time.Now())
if !errors.Is(err, errDuplicate) {
t.Fatalf("expected to have %s error; got %s", errDuplicate, err)
}
@ -634,7 +637,7 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
expErr := "connection reset by peer"
fq.setErr(errors.New(expErr))
_, err = ar.Exec(context.TODO())
_, err = ar.Exec(context.TODO(), time.Now())
if err == nil {
t.Fatalf("expected to get err; got nil")
}
@ -688,8 +691,8 @@ func TestAlertingRule_Template(t *testing.T) {
alerts: make(map[uint64]*notifier.Alert),
},
[]datasource.Metric{
metricWithValueAndLabels(t, 2, "instance", "foo"),
metricWithValueAndLabels(t, 10, "instance", "bar"),
metricWithValueAndLabels(t, 2, "instance", "foo", alertNameLabel, "override"),
metricWithValueAndLabels(t, 10, "instance", "bar", alertNameLabel, "override"),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "foo")): {
@ -762,7 +765,7 @@ func TestAlertingRule_Template(t *testing.T) {
tc.rule.GroupID = fakeGroup.ID()
tc.rule.q = fq
fq.add(tc.metrics...)
if _, err := tc.rule.Exec(context.TODO()); err != nil {
if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil {
t.Fatalf("unexpected err: %s", err)
}
for hash, expAlert := range tc.expAlerts {
@ -821,17 +824,17 @@ func TestAlertsToSend(t *testing.T) {
5*time.Minute, time.Minute,
)
f( // resolve inactive alert at the current timestamp
[]*notifier.Alert{{State: notifier.StateInactive}},
[]*notifier.Alert{{State: notifier.StateInactive, ResolvedAt: ts}},
[]*notifier.Alert{{LastSent: ts, End: ts}},
time.Minute, time.Minute,
)
f( // mixed case of firing and resolved alerts. Names are added for deterministic sorting
[]*notifier.Alert{{Name: "a", State: notifier.StateFiring}, {Name: "b", State: notifier.StateInactive}},
[]*notifier.Alert{{Name: "a", State: notifier.StateFiring}, {Name: "b", State: notifier.StateInactive, ResolvedAt: ts}},
[]*notifier.Alert{{Name: "a", LastSent: ts, End: ts.Add(5 * time.Minute)}, {Name: "b", LastSent: ts, End: ts}},
5*time.Minute, time.Minute,
)
f( // mixed case of pending and resolved alerts. Names are added for deterministic sorting
[]*notifier.Alert{{Name: "a", State: notifier.StatePending}, {Name: "b", State: notifier.StateInactive}},
[]*notifier.Alert{{Name: "a", State: notifier.StatePending}, {Name: "b", State: notifier.StateInactive, ResolvedAt: ts}},
[]*notifier.Alert{{Name: "b", LastSent: ts, End: ts}},
5*time.Minute, time.Minute,
)
@ -850,6 +853,16 @@ func TestAlertsToSend(t *testing.T) {
[]*notifier.Alert{{LastSent: ts, End: ts.Add(time.Minute)}},
time.Minute, 0,
)
f( // inactive alert which has been sent already
[]*notifier.Alert{{State: notifier.StateInactive, LastSent: ts.Add(-time.Second), ResolvedAt: ts.Add(-2 * time.Second)}},
nil,
time.Minute, time.Minute,
)
f( // inactive alert which has been resolved after last send
[]*notifier.Alert{{State: notifier.StateInactive, LastSent: ts.Add(-time.Second), ResolvedAt: ts}},
[]*notifier.Alert{{LastSent: ts, End: ts}},
time.Minute, time.Minute,
)
}
func newTestRuleWithLabels(name string, labels ...string) *AlertingRule {

View File

@ -8,7 +8,7 @@ import (
// Querier interface wraps Query and QueryRange methods
type Querier interface {
Query(ctx context.Context, query string) ([]Metric, error)
Query(ctx context.Context, query string, ts time.Time) ([]Metric, error)
QueryRange(ctx context.Context, query string, from, to time.Time) ([]Metric, error)
}

View File

@ -71,13 +71,12 @@ func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Durati
}
// Query executes the given query and returns parsed response
func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) {
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) ([]Metric, error) {
req, err := s.newRequestPOST()
if err != nil {
return nil, err
}
ts := time.Now()
switch s.dataSourceType.String() {
case "prometheus":
s.setPrometheusInstantReqParams(req, query, ts)

View File

@ -89,26 +89,27 @@ func TestVMInstantQuery(t *testing.T) {
p := NewPrometheusType()
pq := s.BuildWithParams(QuerierParams{DataSourceType: &p, EvaluationInterval: 15 * time.Second})
ts := time.Now()
if _, err := pq.Query(ctx, query); err == nil {
if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected connection error got nil")
}
if _, err := pq.Query(ctx, query); err == nil {
if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected invalid response status error got nil")
}
if _, err := pq.Query(ctx, query); err == nil {
if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected response body error got nil")
}
if _, err := pq.Query(ctx, query); err == nil {
if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected error status got nil")
}
if _, err := pq.Query(ctx, query); err == nil {
if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected unknown status got nil")
}
if _, err := pq.Query(ctx, query); err == nil {
if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected non-vector resultType error got nil")
}
m, err := pq.Query(ctx, query)
m, err := pq.Query(ctx, query, ts)
if err != nil {
t.Fatalf("unexpected %s", err)
}
@ -134,7 +135,7 @@ func TestVMInstantQuery(t *testing.T) {
g := NewGraphiteType()
gq := s.BuildWithParams(QuerierParams{DataSourceType: &g})
m, err = gq.Query(ctx, queryRender)
m, err = gq.Query(ctx, queryRender, ts)
if err != nil {
t.Fatalf("unexpected %s", err)
}

View File

@ -5,6 +5,8 @@ import (
"fmt"
"hash/fnv"
"net/url"
"strconv"
"strings"
"sync"
"time"
@ -13,7 +15,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
@ -44,6 +48,7 @@ type Group struct {
type groupMetrics struct {
iterationTotal *utils.Counter
iterationDuration *utils.Summary
iterationMissed *utils.Counter
}
func newGroupMetrics(name, file string) *groupMetrics {
@ -51,6 +56,7 @@ func newGroupMetrics(name, file string) *groupMetrics {
labels := fmt.Sprintf(`group=%q, file=%q`, name, file)
m.iterationTotal = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
m.iterationDuration = utils.GetOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
m.iterationMissed = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels))
return m
}
@ -226,6 +232,13 @@ var skipRandSleepOnGroupStart bool
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client) {
defer func() { close(g.finishedCh) }()
e := &executor{
rw: rw,
notifiers: nts,
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label)}
evalTS := time.Now()
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
if !skipRandSleepOnGroupStart {
randSleep := uint64(float64(g.Interval) * (float64(g.ID()) / (1 << 64)))
@ -247,7 +260,31 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
}
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
e := &executor{rw: rw, notifiers: nts}
eval := func(ts time.Time) {
g.metrics.iterationTotal.Inc()
start := time.Now()
if len(g.Rules) < 1 {
g.metrics.iterationDuration.UpdateDuration(start)
g.LastEvaluation = start
return
}
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration)
for err := range errs {
if err != nil {
logger.Errorf("group %q: %s", g.Name, err)
}
}
g.metrics.iterationDuration.UpdateDuration(start)
g.LastEvaluation = start
}
eval(evalTS)
t := time.NewTicker(g.Interval)
defer t.Stop()
for {
@ -274,32 +311,26 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
g.mu.Unlock()
logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
case <-t.C:
g.metrics.iterationTotal.Inc()
iterationStart := time.Now()
if len(g.Rules) > 0 {
errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, getResolveDuration(g.Interval))
for err := range errs {
if err != nil {
logger.Errorf("group %q: %s", g.Name, err)
}
}
g.LastEvaluation = iterationStart
missed := (time.Since(evalTS) / g.Interval) - 1
if missed > 0 {
g.metrics.iterationMissed.Inc()
}
g.metrics.iterationDuration.UpdateDuration(iterationStart)
evalTS = evalTS.Add((missed + 1) * g.Interval)
eval(evalTS)
}
}
}
// getResolveDuration returns the duration after which firing alert
// can be considered as resolved.
func getResolveDuration(groupInterval time.Duration) time.Duration {
delta := *resendDelay
func getResolveDuration(groupInterval, delta, maxDuration time.Duration) time.Duration {
if groupInterval > delta {
delta = groupInterval
}
resolveDuration := delta * 4
if *maxResolveDuration > 0 && resolveDuration > *maxResolveDuration {
resolveDuration = *maxResolveDuration
if maxDuration > 0 && resolveDuration > maxDuration {
resolveDuration = maxDuration
}
return resolveDuration
}
@ -307,14 +338,20 @@ func getResolveDuration(groupInterval time.Duration) time.Duration {
type executor struct {
notifiers func() []notifier.Notifier
rw *remotewrite.Client
// previouslySentSeriesToRW stores series sent to RW on previous iteration
// map[ruleID]map[ruleLabels][]prompb.Label
// where `ruleID` is ID of the Rule within a Group
// and `ruleLabels` is []prompb.Label marshalled to a string
previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label
}
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, resolveDuration time.Duration) chan error {
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration) chan error {
res := make(chan error, len(rules))
if concurrency == 1 {
// fast path
for _, rule := range rules {
res <- e.exec(ctx, rule, resolveDuration)
res <- e.exec(ctx, rule, ts, resolveDuration)
}
close(res)
return res
@ -327,7 +364,7 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurren
sem <- struct{}{}
wg.Add(1)
go func(r Rule) {
res <- e.exec(ctx, r, resolveDuration)
res <- e.exec(ctx, r, ts, resolveDuration)
<-sem
wg.Done()
}(rule)
@ -348,24 +385,29 @@ var (
remoteWriteTotal = metrics.NewCounter(`vmalert_remotewrite_total`)
)
func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Duration) error {
func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration) error {
execTotal.Inc()
now := time.Now()
tss, err := rule.Exec(ctx)
tss, err := rule.Exec(ctx, ts)
if err != nil {
execErrors.Inc()
return fmt.Errorf("rule %q: failed to execute: %w", rule, err)
}
if len(tss) > 0 && e.rw != nil {
for _, ts := range tss {
remoteWriteTotal.Inc()
if err := e.rw.Push(ts); err != nil {
remoteWriteErrors.Inc()
return fmt.Errorf("rule %q: remote write failure: %w", rule, err)
errGr := new(utils.ErrGroup)
if e.rw != nil {
pushToRW := func(tss []prompbmarshal.TimeSeries) {
for _, ts := range tss {
remoteWriteTotal.Inc()
if err := e.rw.Push(ts); err != nil {
remoteWriteErrors.Inc()
errGr.Add(fmt.Errorf("rule %q: remote write failure: %w", rule, err))
}
}
}
pushToRW(tss)
staleSeries := e.getStaleSeries(rule, tss, ts)
pushToRW(staleSeries)
}
ar, ok := rule.(*AlertingRule)
@ -373,12 +415,11 @@ func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Dur
return nil
}
alerts := ar.alertsToSend(now, resolveDuration, *resendDelay)
alerts := ar.alertsToSend(ts, resolveDuration, *resendDelay)
if len(alerts) < 1 {
return nil
}
errGr := new(utils.ErrGroup)
for _, nt := range e.notifiers() {
if err := nt.Send(ctx, alerts); err != nil {
errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
@ -386,3 +427,48 @@ func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Dur
}
return errGr.Err()
}
// getStaledSeries checks whether there are stale series from previously sent ones.
func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries {
ruleLabels := make(map[string][]prompbmarshal.Label)
for _, ts := range tss {
// convert labels to strings so we can compare with previously sent series
key := labelsToString(ts.Labels)
ruleLabels[key] = ts.Labels
}
rID := rule.ID()
var staleS []prompbmarshal.TimeSeries
// check whether there are series which disappeared and need to be marked as stale
for key, labels := range e.previouslySentSeriesToRW[rID] {
if _, ok := ruleLabels[key]; ok {
continue
}
// previously sent series are missing in current series, so we mark them as stale
ss := newTimeSeriesPB([]float64{decimal.StaleNaN}, []int64{timestamp.Unix()}, labels)
staleS = append(staleS, ss)
}
// set previous series to current
e.previouslySentSeriesToRW[rID] = ruleLabels
return staleS
}
func labelsToString(labels []prompbmarshal.Label) string {
var b strings.Builder
b.WriteRune('{')
for i, label := range labels {
if len(label.Name) == 0 {
b.WriteString("__name__")
} else {
b.WriteString(label.Name)
}
b.WriteRune('=')
b.WriteString(strconv.Quote(label.Value))
if i < len(labels)-1 {
b.WriteRune(',')
}
}
b.WriteRune('}')
return b.String()
}

View File

@ -3,6 +3,9 @@ package main
import (
"context"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"reflect"
"sort"
"testing"
"time"
@ -239,7 +242,8 @@ func TestGroupStart(t *testing.T) {
time.Sleep(20 * evalInterval)
gotAlerts = fn.getAlerts()
expectedAlerts = []notifier.Alert{*alert1}
alert2.State = notifier.StateInactive
expectedAlerts = []notifier.Alert{*alert1, *alert2}
compareAlerts(t, expectedAlerts, gotAlerts)
g.close()
@ -262,21 +266,100 @@ func TestResolveDuration(t *testing.T) {
{0, 0, 0, 0},
}
defaultResolveDuration := *maxResolveDuration
defaultResendDelay := *resendDelay
defer func() {
*maxResolveDuration = defaultResolveDuration
*resendDelay = defaultResendDelay
}()
for _, tc := range testCases {
t.Run(fmt.Sprintf("%v-%v-%v", tc.groupInterval, tc.expected, tc.maxDuration), func(t *testing.T) {
*maxResolveDuration = tc.maxDuration
*resendDelay = tc.resendDelay
got := getResolveDuration(tc.groupInterval)
got := getResolveDuration(tc.groupInterval, tc.resendDelay, tc.maxDuration)
if got != tc.expected {
t.Errorf("expected to have %v; got %v", tc.expected, got)
}
})
}
}
func TestGetStaleSeries(t *testing.T) {
ts := time.Now()
e := &executor{
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
}
f := func(rule Rule, labels, expLabels [][]prompbmarshal.Label) {
t.Helper()
var tss []prompbmarshal.TimeSeries
for _, l := range labels {
tss = append(tss, newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, l))
}
staleS := e.getStaleSeries(rule, tss, ts)
if staleS == nil && expLabels == nil {
return
}
if len(staleS) != len(expLabels) {
t.Fatalf("expected to get %d stale series, got %d",
len(expLabels), len(staleS))
}
for i, exp := range expLabels {
got := staleS[i]
if !reflect.DeepEqual(exp, got.Labels) {
t.Fatalf("expected to get labels: \n%v;\ngot instead: \n%v",
exp, got.Labels)
}
if len(got.Samples) != 1 {
t.Fatalf("expected to have 1 sample; got %d", len(got.Samples))
}
if !decimal.IsStaleNaN(got.Samples[0].Value) {
t.Fatalf("expected sample value to be %v; got %v", decimal.StaleNaN, got.Samples[0].Value)
}
}
}
// warn: keep in mind, that executor holds the state, so sequence of f calls matters
// single series
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")},
nil)
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")},
nil)
f(&AlertingRule{RuleID: 1},
nil,
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")})
f(&AlertingRule{RuleID: 1},
nil,
nil)
// multiple series
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{
toPromLabels(t, "__name__", "job:foo", "job", "foo"),
toPromLabels(t, "__name__", "job:foo", "job", "bar"),
},
nil)
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")})
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
nil)
f(&AlertingRule{RuleID: 1},
nil,
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")})
// multiple rules and series
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{
toPromLabels(t, "__name__", "job:foo", "job", "foo"),
toPromLabels(t, "__name__", "job:foo", "job", "bar"),
},
nil)
f(&AlertingRule{RuleID: 2},
[][]prompbmarshal.Label{
toPromLabels(t, "__name__", "job:foo", "job", "foo"),
toPromLabels(t, "__name__", "job:foo", "job", "bar"),
},
nil)
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")})
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
nil)
}

View File

@ -44,10 +44,10 @@ func (fq *fakeQuerier) BuildWithParams(_ datasource.QuerierParams) datasource.Qu
}
func (fq *fakeQuerier) QueryRange(ctx context.Context, q string, _, _ time.Time) ([]datasource.Metric, error) {
return fq.Query(ctx, q)
return fq.Query(ctx, q, time.Now())
}
func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) {
func (fq *fakeQuerier) Query(_ context.Context, _ string, _ time.Time) ([]datasource.Metric, error) {
fq.Lock()
defer fq.Unlock()
if fq.err != nil {
@ -116,6 +116,21 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric {
return m
}
func toPromLabels(t *testing.T, labels ...string) []prompbmarshal.Label {
t.Helper()
if len(labels) == 0 || len(labels)%2 != 0 {
t.Fatalf("expected to get even number of labels")
}
var ls []prompbmarshal.Label
for i := 0; i < len(labels); i += 2 {
ls = append(ls, prompbmarshal.Label{
Name: labels[i],
Value: labels[i+1],
})
}
return ls
}
func compareGroups(t *testing.T, a, b *Group) {
t.Helper()
if a.Name != b.Name {

View File

@ -26,10 +26,14 @@ type Alert struct {
State AlertState
// Expr contains expression that was executed to generate the Alert
Expr string
// Start defines the moment of time when Alert has triggered
// ActiveAt defines the moment of time when Alert has become active
ActiveAt time.Time
// Start defines the moment of time when Alert has become firing
Start time.Time
// End defines the moment of time when Alert supposed to expire
End time.Time
// ResolvedAt defines the moment when Alert was switched from Firing to Inactive
ResolvedAt time.Time
// LastSent defines the moment when Alert was sent last time
LastSent time.Time
// Value stores the value returned from evaluating expression from Expr field

View File

@ -124,14 +124,13 @@ func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([
}
// Exec executes RecordingRule expression via the given Querier.
func (rr *RecordingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) {
start := time.Now()
qMetrics, err := rr.q.Query(ctx, rr.Expr)
func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) {
qMetrics, err := rr.q.Query(ctx, rr.Expr, ts)
rr.mu.Lock()
defer rr.mu.Unlock()
rr.lastExecTime = start
rr.lastExecDuration = time.Since(start)
rr.lastExecTime = ts
rr.lastExecDuration = time.Since(ts)
rr.lastExecError = err
rr.lastExecSamples = len(qMetrics)
if err != nil {

View File

@ -77,7 +77,7 @@ func TestRecoridngRule_Exec(t *testing.T) {
fq := &fakeQuerier{}
fq.add(tc.metrics...)
tc.rule.q = fq
tss, err := tc.rule.Exec(context.TODO())
tss, err := tc.rule.Exec(context.TODO(), time.Now())
if err != nil {
t.Fatalf("unexpected Exec err: %s", err)
}
@ -178,7 +178,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) {
expErr := "connection reset by peer"
fq.setErr(errors.New(expErr))
rr.q = fq
_, err := rr.Exec(context.TODO())
_, err := rr.Exec(context.TODO(), time.Now())
if err == nil {
t.Fatalf("expected to get err; got nil")
}
@ -193,7 +193,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) {
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"))
fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar"))
_, err = rr.Exec(context.TODO())
_, err = rr.Exec(context.TODO(), time.Now())
if err == nil {
t.Fatalf("expected to get err; got nil")
}

View File

@ -225,7 +225,7 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
droppedRows.Add(len(wr.Timeseries))
droppedBytes.Add(len(b))
logger.Errorf("all %d attempts to send request failed - dropping %d timeseries",
logger.Errorf("all %d attempts to send request failed - dropping %d time series",
attempts, len(wr.Timeseries))
}

View File

@ -3,8 +3,9 @@ package main
import (
"context"
"errors"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// Rule represents alerting or recording rule
@ -14,8 +15,8 @@ type Rule interface {
// ID returns unique ID that may be used for
// identifying this Rule among others.
ID() uint64
// Exec executes the rule with given context
Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error)
// Exec executes the rule with given context at the given timestamp
Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error)
// ExecRange executes the rule on the given time range
ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error)
// UpdateWith performs modification of current Rule

View File

@ -30,3 +30,20 @@ func newTimeSeries(values []float64, timestamps []int64, labels map[string]strin
}
return ts
}
// newTimeSeriesPB creates prompbmarshal.TimeSeries with given
// values, timestamps and labels.
// It expects that labels are already sorted.
func newTimeSeriesPB(values []float64, timestamps []int64, labels []prompbmarshal.Label) prompbmarshal.TimeSeries {
ts := prompbmarshal.TimeSeries{
Samples: make([]prompbmarshal.Sample, len(values)),
}
for i := range values {
ts.Samples[i] = prompbmarshal.Sample{
Value: values[i],
Timestamp: time.Unix(timestamps[i], 0).UnixNano() / 1e6,
}
}
ts.Labels = labels
return ts
}

View File

@ -14,7 +14,7 @@ func TestHandler(t *testing.T) {
ar := &AlertingRule{
Name: "alert",
alerts: map[uint64]*notifier.Alert{
0: {},
0: {State: notifier.StateFiring},
},
}
g := &Group{