diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index 51eee637fd..3b607eb4cd 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -140,7 +140,17 @@ func (ar *AlertingRule) Exec(ctx context.Context, q datasource.Querier, series b updated := make(map[uint64]struct{}) // update list of active alerts for _, m := range qMetrics { + for k, v := range ar.Labels { + // apply extra labels + m.SetLabel(k, v) + } + h := hash(m) + if _, ok := updated[h]; ok { + // duplicate may be caused by extra labels + // conflicting with the metric labels + return nil, fmt.Errorf("labels %v: %w", m.Labels, errDuplicate) + } updated[h] = struct{}{} if a, ok := ar.alerts[h]; ok { if a.Value != m.Value { @@ -258,25 +268,11 @@ func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time) (*notifie } func (ar *AlertingRule) template(a *notifier.Alert) error { - // 1. template rule labels with data labels - rLabels, err := a.ExecTemplate(ar.Labels) - if err != nil { - return err - } - - // 2. merge data labels and rule labels - // metric labels may be overridden by - // rule labels - for k, v := range rLabels { - a.Labels[k] = v - } - - // 3. template merged labels + var err error a.Labels, err = a.ExecTemplate(a.Labels) if err != nil { return err } - a.Annotations, err = a.ExecTemplate(ar.Annotations) return err } @@ -419,14 +415,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb // drop all extra labels, so hash key will // be identical to time series received in Exec for _, l := range labels { - if l.Name == alertNameLabel { - continue - } - if l.Name == alertGroupNameLabel { - continue - } - // drop all overridden labels - if _, ok := ar.Labels[l.Name]; ok { + if l.Name == alertNameLabel || l.Name == alertGroupNameLabel { continue } m.Labels = append(m.Labels, l) diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go index 55aa2b48da..c646c17298 100644 --- a/app/vmalert/alerting_test.go +++ b/app/vmalert/alerting_test.go @@ -2,6 +2,8 @@ package main import ( "context" + "errors" + "strings" "testing" "time" @@ -218,19 +220,6 @@ func TestAlertingRule_Exec(t *testing.T) { hash(metricWithLabels(t, "name", "foo2")): {State: notifier.StateFiring}, }, }, - { - newTestAlertingRule("duplicate", 0), - [][]datasource.Metric{ - { - // metrics with the same labelset should result in one alert - metricWithLabels(t, "name", "foo", "type", "bar"), - metricWithLabels(t, "type", "bar", "name", "foo"), - }, - }, - map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, "name", "foo", "type", "bar")): {State: notifier.StateFiring}, - }, - }, { newTestAlertingRule("for-pending", time.Minute), [][]datasource.Metric{ @@ -376,7 +365,7 @@ func TestAlertingRule_Restore(t *testing.T) { alertNameLabel, "", "foo", "bar", "namespace", "baz", - // following pair supposed to be dropped + // extra labels set by rule "source", "vm", ), }, @@ -384,6 +373,7 @@ func TestAlertingRule_Restore(t *testing.T) { hash(metricWithLabels(t, "foo", "bar", "namespace", "baz", + "source", "vm", )): {State: notifier.StatePending, Start: time.Now().Truncate(time.Hour)}, }, @@ -442,6 +432,38 @@ func TestAlertingRule_Restore(t *testing.T) { } } +func TestAlertingRule_Exec_Negative(t *testing.T) { + fq := &fakeQuerier{} + ar := newTestAlertingRule("test", 0) + ar.Labels = map[string]string{"job": "test"} + + // successful attempt + fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) + _, err := ar.Exec(context.TODO(), fq, false) + if err != nil { + t.Fatal(err) + } + + // label `job` will collide with rule extra label and will make both time series equal + fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz")) + _, err = ar.Exec(context.TODO(), fq, false) + if !errors.Is(err, errDuplicate) { + t.Fatalf("expected to have %s error; got %s", errDuplicate, err) + } + + fq.reset() + + expErr := "connection reset by peer" + fq.setErr(errors.New(expErr)) + _, err = ar.Exec(context.TODO(), fq, false) + if err == nil { + t.Fatalf("expected to get err; got nil") + } + if !strings.Contains(err.Error(), expErr) { + t.Fatalf("expected to get err %q; got %q insterad", expErr, err) + } +} + func newTestRuleWithLabels(name string, labels ...string) *AlertingRule { r := newTestAlertingRule(name, 0) r.Labels = make(map[string]string) diff --git a/app/vmalert/datasource/datasource.go b/app/vmalert/datasource/datasource.go index 3525f0ed57..de0265d24b 100644 --- a/app/vmalert/datasource/datasource.go +++ b/app/vmalert/datasource/datasource.go @@ -17,6 +17,23 @@ type Metric struct { Value float64 } +// SetLabel adds or updates existing one label +// by the given key and label +func (m *Metric) SetLabel(key, value string) { + for i, l := range m.Labels { + if l.Name == key { + m.Labels[i].Value = value + return + } + } + m.AddLabel(key, value) +} + +// AddLabel appends the given label to the label set +func (m *Metric) AddLabel(key, value string) { + m.Labels = append(m.Labels, Label{Name: key, Value: value}) +} + // Label represents metric's label type Label struct { Name string diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index 71fe7f71d6..bf57a04be7 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -37,7 +37,7 @@ func (r response) metrics() ([]Metric, error) { } m.Labels = nil for k, v := range r.Data.Result[i].Labels { - m.Labels = append(m.Labels, Label{Name: k, Value: v}) + m.AddLabel(k, v) } m.Timestamp = int64(res.TV[0].(float64)) m.Value = f diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index 4a3246a1c7..d8b98dcdd2 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -172,6 +172,11 @@ func TestGroupStart(t *testing.T) { t.Fatalf("faield to create alert: %s", err) } alert1.State = notifier.StateFiring + // add external label + alert1.Labels["cluster"] = "east-1" + // add rule labels - see config/testdata/rules1-good.rules + alert1.Labels["label"] = "bar" + alert1.Labels["host"] = inst1 alert1.ID = hash(m1) alert2, err := r.newAlert(m2, time.Now()) @@ -179,6 +184,11 @@ func TestGroupStart(t *testing.T) { t.Fatalf("faield to create alert: %s", err) } alert2.State = notifier.StateFiring + // add external label + alert2.Labels["cluster"] = "east-1" + // add rule labels - see config/testdata/rules1-good.rules + alert2.Labels["label"] = "bar" + alert2.Labels["host"] = inst2 alert2.ID = hash(m2) finished := make(chan struct{}) diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go index 9dd7b4de2c..89f0c57e18 100644 --- a/app/vmalert/recording.go +++ b/app/vmalert/recording.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "fmt" "hash/fnv" "sort" @@ -79,8 +78,6 @@ func (rr *RecordingRule) Close() { metrics.UnregisterMetric(rr.metrics.errors.name) } -var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels") - // Exec executes RecordingRule expression via the given Querier. func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) { if !series { diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index 1410d2bd67..293e929bf7 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -25,3 +26,5 @@ type Rule interface { // such as metrics unregister Close() } + +var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels")