vmalert: followup for 76f05f8670 (#2706)

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2022-06-09 08:58:25 +02:00 committed by GitHub
parent 76f05f8670
commit 48a60eb593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 39 additions and 54 deletions

View File

@ -88,7 +88,7 @@ run-vmalert-sd: vmalert
-configCheckInterval=10s -configCheckInterval=10s
replay-vmalert: vmalert replay-vmalert: vmalert
./bin/vmalert -rule=app/vmalert/config/testdata/rules-replay-good.rules \ ./bin/vmalert -rule=app/vmalert/config/testdata/rules/rules-replay-good.rules \
-datasource.url=http://localhost:8428 \ -datasource.url=http://localhost:8428 \
-remoteWrite.url=http://localhost:8428 \ -remoteWrite.url=http://localhost:8428 \
-external.label=cluster=east-1 \ -external.label=cluster=east-1 \

View File

@ -539,6 +539,7 @@ See full description for these flags in `./vmalert --help`.
* Graphite engine isn't supported yet; * Graphite engine isn't supported yet;
* `query` template function is disabled for performance reasons (might be changed in future); * `query` template function is disabled for performance reasons (might be changed in future);
* `limit` group's param has no effect during replay (might be changed in future);
## Monitoring ## Monitoring

View File

@ -193,13 +193,12 @@ func (ar *AlertingRule) toLabels(m datasource.Metric, qFn templates.QueryFn) (*l
// It doesn't update internal states of the Rule and meant to be used just // It doesn't update internal states of the Rule and meant to be used just
// to get time series for backfilling. // to get time series for backfilling.
// It returns ALERT and ALERT_FOR_STATE time series as result. // It returns ALERT and ALERT_FOR_STATE time series as result.
func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error) { func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) {
series, err := ar.q.QueryRange(ctx, ar.Expr, start, end) series, err := ar.q.QueryRange(ctx, ar.Expr, start, end)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var result []prompbmarshal.TimeSeries var result []prompbmarshal.TimeSeries
timestamp2Series := make(map[int64][]prompbmarshal.TimeSeries, 0)
qFn := func(query string) ([]datasource.Metric, error) { qFn := func(query string) ([]datasource.Metric, error) {
return nil, fmt.Errorf("`query` template isn't supported in replay mode") return nil, fmt.Errorf("`query` template isn't supported in replay mode")
} }
@ -211,14 +210,11 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time, lim
if ar.For == 0 { // if alert is instant if ar.For == 0 { // if alert is instant
a.State = notifier.StateFiring a.State = notifier.StateFiring
for i := range s.Values { for i := range s.Values {
if limit > 0 { result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
timestamp2Series[s.Timestamps[i]] = append(timestamp2Series[s.Timestamps[i]], ar.alertToTimeSeries(a, s.Timestamps[i])...)
} else {
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
}
} }
continue continue
} }
// if alert with For > 0 // if alert with For > 0
prevT := time.Time{} prevT := time.Time{}
for i := range s.Values { for i := range s.Values {
@ -232,28 +228,9 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time, lim
a.Start = at a.Start = at
} }
prevT = at prevT = at
if limit > 0 { result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
timestamp2Series[s.Timestamps[i]] = append(timestamp2Series[s.Timestamps[i]], ar.alertToTimeSeries(a, s.Timestamps[i])...)
} else {
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
}
} }
} }
if limit <= 0 {
return result, nil
}
sortedTimestamp := make([]int64, 0)
for timestamp := range timestamp2Series {
sortedTimestamp = append(sortedTimestamp, timestamp)
}
sort.Slice(sortedTimestamp, func(i, j int) bool { return sortedTimestamp[i] < sortedTimestamp[j] })
for _, timestamp := range sortedTimestamp {
if len(timestamp2Series[timestamp]) > limit {
logger.Errorf("exec exceeded limit of %d with %d alerts", limit, len(timestamp2Series[timestamp]))
continue
}
result = append(result, timestamp2Series[timestamp]...)
}
return result, nil return result, nil
} }

View File

@ -3,7 +3,6 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
@ -473,7 +472,7 @@ func TestAlertingRule_ExecRange(t *testing.T) {
tc.rule.q = fq tc.rule.q = fq
tc.rule.GroupID = fakeGroup.ID() tc.rule.GroupID = fakeGroup.ID()
fq.add(tc.data...) fq.add(tc.data...)
gotTS, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now(), 0) gotTS, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now())
if err != nil { if err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
@ -691,15 +690,6 @@ func TestAlertingRuleLimit(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
for _, testCase := range testCases {
tss, err := ar.ExecRange(context.TODO(), timestamp, timestamp, testCase.limit)
if err != nil {
t.Fatal(err)
}
if len(tss) != testCase.tssNum {
t.Fatal(fmt.Errorf("tss len %d is not equal to supposed %d", len(tss), testCase.tssNum))
}
}
fq.reset() fq.reset()
} }

View File

@ -489,6 +489,22 @@ rules:
name: TestGroup name: TestGroup
params: params:
nocache: ["0"] nocache: ["0"]
rules:
- alert: foo
expr: sum by(job) (up == 1)
`)
})
t.Run("`limit` change", func(t *testing.T) {
f(t, `
name: TestGroup
limit: 5
rules:
- alert: foo
expr: sum by(job) (up == 1)
`, `
name: TestGroup
limit: 10
rules: rules:
- alert: foo - alert: foo
expr: sum by(job) (up == 1) expr: sum by(job) (up == 1)

View File

@ -2,6 +2,7 @@ groups:
- name: ReplayGroup - name: ReplayGroup
interval: 1m interval: 1m
concurrency: 1 concurrency: 1
limit: 1000
rules: rules:
- record: type:vm_cache_entries:rate5m - record: type:vm_cache_entries:rate5m
expr: sum(rate(vm_cache_entries[5m])) by (type) expr: sum(rate(vm_cache_entries[5m])) by (type)

View File

@ -2,6 +2,7 @@ groups:
- name: TestGroup - name: TestGroup
interval: 2s interval: 2s
concurrency: 2 concurrency: 2
limit: 1000
params: params:
denyPartialResponse: ["true"] denyPartialResponse: ["true"]
extra_label: ["env=dev"] extra_label: ["env=dev"]

View File

@ -218,6 +218,7 @@ func (g *Group) updateWith(newGroup *Group) error {
g.Concurrency = newGroup.Concurrency g.Concurrency = newGroup.Concurrency
g.Params = newGroup.Params g.Params = newGroup.Params
g.Labels = newGroup.Labels g.Labels = newGroup.Labels
g.Limit = newGroup.Limit
g.Checksum = newGroup.Checksum g.Checksum = newGroup.Checksum
g.Rules = newRules g.Rules = newRules
return nil return nil

View File

@ -104,7 +104,7 @@ func (rr *RecordingRule) Close() {
// ExecRange executes recording rule on the given time range similarly to Exec. // ExecRange executes recording rule on the given time range similarly to Exec.
// It doesn't update internal states of the Rule and meant to be used just // It doesn't update internal states of the Rule and meant to be used just
// to get time series for backfilling. // to get time series for backfilling.
func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error) { func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) {
series, err := rr.q.QueryRange(ctx, rr.Expr, start, end) series, err := rr.q.QueryRange(ctx, rr.Expr, start, end)
if err != nil { if err != nil {
return nil, err return nil, err
@ -120,9 +120,6 @@ func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time, li
duplicates[key] = struct{}{} duplicates[key] = struct{}{}
tss = append(tss, ts) tss = append(tss, ts)
} }
if limit > 0 && len(tss) > limit {
return nil, fmt.Errorf("exec exceeded limit of %d with %d series", limit, len(tss))
}
return tss, nil return tss, nil
} }

View File

@ -158,7 +158,7 @@ func TestRecordingRule_ExecRange(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
fq.add(tc.metrics...) fq.add(tc.metrics...)
tc.rule.q = fq tc.rule.q = fq
tss, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now(), 0) tss, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now())
if err != nil { if err != nil {
t.Fatalf("unexpected Exec err: %s", err) t.Fatalf("unexpected Exec err: %s", err)
} }
@ -207,10 +207,6 @@ func TestRecordingRuleLimit(t *testing.T) {
if err != nil && !strings.EqualFold(err.Error(), testCase.err) { if err != nil && !strings.EqualFold(err.Error(), testCase.err) {
t.Fatal(err) t.Fatal(err)
} }
_, err = rule.ExecRange(context.TODO(), timestamp.Add(-2*time.Second), timestamp, testCase.limit)
if err != nil && !strings.EqualFold(err.Error(), testCase.err) {
t.Fatal(err)
}
} }
} }

View File

@ -88,6 +88,10 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
"\nrequests to make: \t%d"+ "\nrequests to make: \t%d"+
"\nmax range per request: \t%v\n", "\nmax range per request: \t%v\n",
g.Name, g.Interval, iterations, step) g.Name, g.Interval, iterations, step)
if g.Limit > 0 {
fmt.Printf("\nPlease note, `limit: %d` param has no effect during replay.\n",
g.Limit)
}
for _, rule := range g.Rules { for _, rule := range g.Rules {
fmt.Printf("> Rule %q (ID: %d)\n", rule, rule.ID()) fmt.Printf("> Rule %q (ID: %d)\n", rule, rule.ID())
var bar *pb.ProgressBar var bar *pb.ProgressBar
@ -96,7 +100,7 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
} }
ri.reset() ri.reset()
for ri.next() { for ri.next() {
n, err := replayRule(rule, ri.s, ri.e, rw, g.Limit) n, err := replayRule(rule, ri.s, ri.e, rw)
if err != nil { if err != nil {
logger.Fatalf("rule %q: %s", rule, err) logger.Fatalf("rule %q: %s", rule, err)
} }
@ -115,11 +119,11 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
return total return total
} }
func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client, limit int) (int, error) { func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) {
var err error var err error
var tss []prompbmarshal.TimeSeries var tss []prompbmarshal.TimeSeries
for i := 0; i < *replayRuleRetryAttempts; i++ { for i := 0; i < *replayRuleRetryAttempts; i++ {
tss, err = rule.ExecRange(context.Background(), start, end, limit) tss, err = rule.ExecRange(context.Background(), start, end)
if err == nil { if err == nil {
break break
} }

View File

@ -18,9 +18,8 @@ type Rule interface {
// Exec executes the rule with given context at the given timestamp and limit. // Exec executes the rule with given context at the given timestamp and limit.
// returns an err if number of resulting time series exceeds the limit. // returns an err if number of resulting time series exceeds the limit.
Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error)
// ExecRange executes the rule on the given time range and limit. // ExecRange executes the rule on the given time range.
// returns an err if number of resulting time series exceeds the limit. ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error)
ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error)
// UpdateWith performs modification of current Rule // UpdateWith performs modification of current Rule
// with fields of the given Rule. // with fields of the given Rule.
UpdateWith(Rule) error UpdateWith(Rule) error

View File

@ -23,6 +23,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: add support of `lowercase` and `uppercase` relabeling actions in the same way as [Prometheus 2.36.0 does](https://github.com/prometheus/prometheus/releases/tag/v2.36.0). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2664). * FEATURE: add support of `lowercase` and `uppercase` relabeling actions in the same way as [Prometheus 2.36.0 does](https://github.com/prometheus/prometheus/releases/tag/v2.36.0). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2664).
* FEATURE: add ability to change the `indexdb` rotation timezone offset via `-retentionTimezoneOffset` command-line flag. Previously it was performed at 4am UTC time. This could lead to performance degradation in the middle of the day when VictoriaMetrics runs in time zones located too far from UTC. Thanks to @cnych for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574). * FEATURE: add ability to change the `indexdb` rotation timezone offset via `-retentionTimezoneOffset` command-line flag. Previously it was performed at 4am UTC time. This could lead to performance degradation in the middle of the day when VictoriaMetrics runs in time zones located too far from UTC. Thanks to @cnych for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574).
* FEATURE: limit the number of background merge threads on systems with big number of CPU cores by default. This increases the max size of parts, which can be created during background merge when `-storageDataPath` directory has limited free disk space. This may improve on-disk data compression efficiency and query performance. The limits can be tuned if needed with `-smallMergeConcurrency` and `-bigMergeConcurrency` command-line flags. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2673). * FEATURE: limit the number of background merge threads on systems with big number of CPU cores by default. This increases the max size of parts, which can be created during background merge when `-storageDataPath` directory has limited free disk space. This may improve on-disk data compression efficiency and query performance. The limits can be tuned if needed with `-smallMergeConcurrency` and `-bigMergeConcurrency` command-line flags. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2673).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `limit` param per-group for limiting number of produced samples per each rule. Thanks to @Howie59 for [implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2676).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): remove dependency on Internet access at [web API pages](https://docs.victoriametrics.com/vmalert.html#web). Previously the functionality and the layout of these pages was broken without Internet access. See [shis issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2594). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): remove dependency on Internet access at [web API pages](https://docs.victoriametrics.com/vmalert.html#web). Previously the functionality and the layout of these pages was broken without Internet access. See [shis issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2594).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): implement the `http://vmagent:8429/service-discovery` page in the same way as Prometheus does. This page shows the original labels for all the discovered targets alongside the resulting labels after the relabeling. This simplifies service discovery debugging. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): implement the `http://vmagent:8429/service-discovery` page in the same way as Prometheus does. This page shows the original labels for all the discovered targets alongside the resulting labels after the relabeling. This simplifies service discovery debugging.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): remove dependency on Internet access at `http://vmagent:8429/targets` page. Previously the page layout was broken without Internet access. See [shis issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2594). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): remove dependency on Internet access at `http://vmagent:8429/targets` page. Previously the page layout was broken without Internet access. See [shis issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2594).

View File

@ -543,6 +543,7 @@ See full description for these flags in `./vmalert --help`.
* Graphite engine isn't supported yet; * Graphite engine isn't supported yet;
* `query` template function is disabled for performance reasons (might be changed in future); * `query` template function is disabled for performance reasons (might be changed in future);
* `limit` group's param has no effect during replay (might be changed in future);
## Monitoring ## Monitoring