mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
app/vmalert: improve performances when rules produce large volumes of results
Some checks are pending
build / Build (push) Waiting to run
CodeQL Go / Analyze (push) Waiting to run
main / lint (push) Waiting to run
main / test (test-full) (push) Blocked by required conditions
main / test (test-full-386) (push) Blocked by required conditions
main / test (test-pure) (push) Blocked by required conditions
publish-docs / Build (push) Waiting to run
Some checks are pending
build / Build (push) Waiting to run
CodeQL Go / Analyze (push) Waiting to run
main / lint (push) Waiting to run
main / test (test-full) (push) Blocked by required conditions
main / test (test-full-386) (push) Blocked by required conditions
main / test (test-pure) (push) Blocked by required conditions
publish-docs / Build (push) Waiting to run
1. Avoid storing the last evaluation results outside of rules, check for stale time series as soon as possible; 2. remove duplicated template `Clone()`. This pull request is primarily reducing memory usage when rules produce large volumes of results, as seen in https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6894. The CPU time spent on garbage collection remains high and may be addressed in a separate PR.
This commit is contained in:
parent
0a6d58b4ca
commit
b09272ccac
@ -9,6 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||||
"github.com/VictoriaMetrics/metricsql"
|
"github.com/VictoriaMetrics/metricsql"
|
||||||
)
|
)
|
||||||
@ -48,7 +49,7 @@ Outer:
|
|||||||
}
|
}
|
||||||
var expSamples []parsedSample
|
var expSamples []parsedSample
|
||||||
for _, s := range mt.ExpSamples {
|
for _, s := range mt.ExpSamples {
|
||||||
expLb := datasource.Labels{}
|
expLb := []prompbmarshal.Label{}
|
||||||
if s.Labels != "" {
|
if s.Labels != "" {
|
||||||
metricsqlExpr, err := metricsql.Parse(s.Labels)
|
metricsqlExpr, err := metricsql.Parse(s.Labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -64,7 +65,7 @@ Outer:
|
|||||||
}
|
}
|
||||||
if len(metricsqlMetricExpr.LabelFilterss) > 0 {
|
if len(metricsqlMetricExpr.LabelFilterss) > 0 {
|
||||||
for _, l := range metricsqlMetricExpr.LabelFilterss[0] {
|
for _, l := range metricsqlMetricExpr.LabelFilterss[0] {
|
||||||
expLb = append(expLb, datasource.Label{
|
expLb = append(expLb, prompbmarshal.Label{
|
||||||
Name: l.Label,
|
Name: l.Label,
|
||||||
Value: l.Value,
|
Value: l.Value,
|
||||||
})
|
})
|
||||||
|
@ -46,8 +46,8 @@ const (
|
|||||||
graphitePrefix = "/graphite"
|
graphitePrefix = "/graphite"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Client) setGraphiteReqParams(r *http.Request, query string) {
|
func (c *Client) setGraphiteReqParams(r *http.Request, query string) {
|
||||||
if s.appendTypePrefix {
|
if c.appendTypePrefix {
|
||||||
r.URL.Path += graphitePrefix
|
r.URL.Path += graphitePrefix
|
||||||
}
|
}
|
||||||
r.URL.Path += graphitePath
|
r.URL.Path += graphitePath
|
||||||
@ -58,7 +58,7 @@ func (s *Client) setGraphiteReqParams(r *http.Request, query string) {
|
|||||||
q.Set("target", query)
|
q.Set("target", query)
|
||||||
q.Set("until", "now")
|
q.Set("until", "now")
|
||||||
|
|
||||||
for k, vs := range s.extraParams {
|
for k, vs := range c.extraParams {
|
||||||
if q.Has(k) { // extraParams are prior to params in URL
|
if q.Has(k) { // extraParams are prior to params in URL
|
||||||
q.Del(k)
|
q.Del(k)
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
|
||||||
"github.com/valyala/fastjson"
|
"github.com/valyala/fastjson"
|
||||||
)
|
)
|
||||||
@ -82,14 +83,14 @@ func (pi *promInstant) Unmarshal(b []byte) error {
|
|||||||
labels := metric.GetObject()
|
labels := metric.GetObject()
|
||||||
|
|
||||||
r := &pi.ms[i]
|
r := &pi.ms[i]
|
||||||
r.Labels = make([]Label, 0, labels.Len())
|
r.Labels = make([]prompbmarshal.Label, 0, labels.Len())
|
||||||
labels.Visit(func(key []byte, v *fastjson.Value) {
|
labels.Visit(func(key []byte, v *fastjson.Value) {
|
||||||
lv, errLocal := v.StringBytes()
|
lv, errLocal := v.StringBytes()
|
||||||
if errLocal != nil {
|
if errLocal != nil {
|
||||||
err = fmt.Errorf("error when parsing label value %q: %s", v, errLocal)
|
err = fmt.Errorf("error when parsing label value %q: %s", v, errLocal)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
r.Labels = append(r.Labels, Label{
|
r.Labels = append(r.Labels, prompbmarshal.Label{
|
||||||
Name: string(key),
|
Name: string(key),
|
||||||
Value: string(lv),
|
Value: string(lv),
|
||||||
})
|
})
|
||||||
@ -219,8 +220,8 @@ func parsePrometheusResponse(req *http.Request, resp *http.Response) (res Result
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Client) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) {
|
func (c *Client) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) {
|
||||||
if s.appendTypePrefix {
|
if c.appendTypePrefix {
|
||||||
r.URL.Path += "/prometheus"
|
r.URL.Path += "/prometheus"
|
||||||
}
|
}
|
||||||
if !*disablePathAppend {
|
if !*disablePathAppend {
|
||||||
@ -228,22 +229,22 @@ func (s *Client) setPrometheusInstantReqParams(r *http.Request, query string, ti
|
|||||||
}
|
}
|
||||||
q := r.URL.Query()
|
q := r.URL.Query()
|
||||||
q.Set("time", timestamp.Format(time.RFC3339))
|
q.Set("time", timestamp.Format(time.RFC3339))
|
||||||
if !*disableStepParam && s.evaluationInterval > 0 { // set step as evaluationInterval by default
|
if !*disableStepParam && c.evaluationInterval > 0 { // set step as evaluationInterval by default
|
||||||
// always convert to seconds to keep compatibility with older
|
// always convert to seconds to keep compatibility with older
|
||||||
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
|
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
|
||||||
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
|
q.Set("step", fmt.Sprintf("%ds", int(c.evaluationInterval.Seconds())))
|
||||||
}
|
}
|
||||||
if !*disableStepParam && s.queryStep > 0 { // override step with user-specified value
|
if !*disableStepParam && c.queryStep > 0 { // override step with user-specified value
|
||||||
// always convert to seconds to keep compatibility with older
|
// always convert to seconds to keep compatibility with older
|
||||||
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
|
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
|
||||||
q.Set("step", fmt.Sprintf("%ds", int(s.queryStep.Seconds())))
|
q.Set("step", fmt.Sprintf("%ds", int(c.queryStep.Seconds())))
|
||||||
}
|
}
|
||||||
r.URL.RawQuery = q.Encode()
|
r.URL.RawQuery = q.Encode()
|
||||||
s.setReqParams(r, query)
|
c.setReqParams(r, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Client) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) {
|
func (c *Client) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) {
|
||||||
if s.appendTypePrefix {
|
if c.appendTypePrefix {
|
||||||
r.URL.Path += "/prometheus"
|
r.URL.Path += "/prometheus"
|
||||||
}
|
}
|
||||||
if !*disablePathAppend {
|
if !*disablePathAppend {
|
||||||
@ -252,11 +253,11 @@ func (s *Client) setPrometheusRangeReqParams(r *http.Request, query string, star
|
|||||||
q := r.URL.Query()
|
q := r.URL.Query()
|
||||||
q.Add("start", start.Format(time.RFC3339))
|
q.Add("start", start.Format(time.RFC3339))
|
||||||
q.Add("end", end.Format(time.RFC3339))
|
q.Add("end", end.Format(time.RFC3339))
|
||||||
if s.evaluationInterval > 0 { // set step as evaluationInterval by default
|
if c.evaluationInterval > 0 { // set step as evaluationInterval by default
|
||||||
// always convert to seconds to keep compatibility with older
|
// always convert to seconds to keep compatibility with older
|
||||||
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
|
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
|
||||||
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
|
q.Set("step", fmt.Sprintf("%ds", int(c.evaluationInterval.Seconds())))
|
||||||
}
|
}
|
||||||
r.URL.RawQuery = q.Encode()
|
r.URL.RawQuery = q.Encode()
|
||||||
s.setReqParams(r, query)
|
c.setReqParams(r, query)
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -144,12 +145,12 @@ func TestVMInstantQuery(t *testing.T) {
|
|||||||
}
|
}
|
||||||
expected := []Metric{
|
expected := []Metric{
|
||||||
{
|
{
|
||||||
Labels: []Label{{Value: "vm_rows", Name: "__name__"}, {Value: "bar", Name: "foo"}},
|
Labels: []prompbmarshal.Label{{Value: "vm_rows", Name: "__name__"}, {Value: "bar", Name: "foo"}},
|
||||||
Timestamps: []int64{1583786142},
|
Timestamps: []int64{1583786142},
|
||||||
Values: []float64{13763},
|
Values: []float64{13763},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Labels: []Label{{Value: "vm_requests", Name: "__name__"}, {Value: "baz", Name: "foo"}},
|
Labels: []prompbmarshal.Label{{Value: "vm_requests", Name: "__name__"}, {Value: "baz", Name: "foo"}},
|
||||||
Timestamps: []int64{1583786140},
|
Timestamps: []int64{1583786140},
|
||||||
Values: []float64{2000},
|
Values: []float64{2000},
|
||||||
},
|
},
|
||||||
@ -214,7 +215,7 @@ func TestVMInstantQuery(t *testing.T) {
|
|||||||
}
|
}
|
||||||
exp := []Metric{
|
exp := []Metric{
|
||||||
{
|
{
|
||||||
Labels: []Label{{Value: "constantLine(10)", Name: "name"}},
|
Labels: []prompbmarshal.Label{{Value: "constantLine(10)", Name: "name"}},
|
||||||
Timestamps: []int64{1611758403},
|
Timestamps: []int64{1611758403},
|
||||||
Values: []float64{10},
|
Values: []float64{10},
|
||||||
},
|
},
|
||||||
@ -236,12 +237,12 @@ func TestVMInstantQuery(t *testing.T) {
|
|||||||
}
|
}
|
||||||
expected = []Metric{
|
expected = []Metric{
|
||||||
{
|
{
|
||||||
Labels: []Label{{Value: "total", Name: "stats_result"}, {Value: "bar", Name: "foo"}},
|
Labels: []prompbmarshal.Label{{Value: "total", Name: "stats_result"}, {Value: "bar", Name: "foo"}},
|
||||||
Timestamps: []int64{1583786142},
|
Timestamps: []int64{1583786142},
|
||||||
Values: []float64{13763},
|
Values: []float64{13763},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Labels: []Label{{Value: "total", Name: "stats_result"}, {Value: "baz", Name: "foo"}},
|
Labels: []prompbmarshal.Label{{Value: "total", Name: "stats_result"}, {Value: "baz", Name: "foo"}},
|
||||||
Timestamps: []int64{1583786140},
|
Timestamps: []int64{1583786140},
|
||||||
Values: []float64{2000},
|
Values: []float64{2000},
|
||||||
},
|
},
|
||||||
@ -444,7 +445,7 @@ func TestVMRangeQuery(t *testing.T) {
|
|||||||
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
|
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
|
||||||
}
|
}
|
||||||
expected := Metric{
|
expected := Metric{
|
||||||
Labels: []Label{{Value: "vm_rows", Name: "__name__"}},
|
Labels: []prompbmarshal.Label{{Value: "vm_rows", Name: "__name__"}},
|
||||||
Timestamps: []int64{1583786142},
|
Timestamps: []int64{1583786142},
|
||||||
Values: []float64{13763},
|
Values: []float64{13763},
|
||||||
}
|
}
|
||||||
@ -475,7 +476,7 @@ func TestVMRangeQuery(t *testing.T) {
|
|||||||
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
|
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
|
||||||
}
|
}
|
||||||
expected = Metric{
|
expected = Metric{
|
||||||
Labels: []Label{{Value: "total", Name: "stats_result"}},
|
Labels: []prompbmarshal.Label{{Value: "total", Name: "stats_result"}},
|
||||||
Timestamps: []int64{1583786142},
|
Timestamps: []int64{1583786142},
|
||||||
Values: []float64{10},
|
Values: []float64{10},
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Client) setVLogsInstantReqParams(r *http.Request, query string, timestamp time.Time) {
|
func (c *Client) setVLogsInstantReqParams(r *http.Request, query string, timestamp time.Time) {
|
||||||
// there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix.
|
// there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix.
|
||||||
if !*disablePathAppend {
|
if !*disablePathAppend {
|
||||||
r.URL.Path += "/select/logsql/stats_query"
|
r.URL.Path += "/select/logsql/stats_query"
|
||||||
@ -16,15 +16,15 @@ func (s *Client) setVLogsInstantReqParams(r *http.Request, query string, timesta
|
|||||||
q.Set("time", timestamp.Format(time.RFC3339))
|
q.Set("time", timestamp.Format(time.RFC3339))
|
||||||
// set the `start` and `end` params if applyIntervalAsTimeFilter is enabled(time filter is missing in the rule expr),
|
// set the `start` and `end` params if applyIntervalAsTimeFilter is enabled(time filter is missing in the rule expr),
|
||||||
// so the query will be executed in time range [timestamp - evaluationInterval, timestamp].
|
// so the query will be executed in time range [timestamp - evaluationInterval, timestamp].
|
||||||
if s.applyIntervalAsTimeFilter && s.evaluationInterval > 0 {
|
if c.applyIntervalAsTimeFilter && c.evaluationInterval > 0 {
|
||||||
q.Set("start", timestamp.Add(-s.evaluationInterval).Format(time.RFC3339))
|
q.Set("start", timestamp.Add(-c.evaluationInterval).Format(time.RFC3339))
|
||||||
q.Set("end", timestamp.Format(time.RFC3339))
|
q.Set("end", timestamp.Format(time.RFC3339))
|
||||||
}
|
}
|
||||||
r.URL.RawQuery = q.Encode()
|
r.URL.RawQuery = q.Encode()
|
||||||
s.setReqParams(r, query)
|
c.setReqParams(r, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Client) setVLogsRangeReqParams(r *http.Request, query string, start, end time.Time) {
|
func (c *Client) setVLogsRangeReqParams(r *http.Request, query string, start, end time.Time) {
|
||||||
// there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix.
|
// there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix.
|
||||||
if !*disablePathAppend {
|
if !*disablePathAppend {
|
||||||
r.URL.Path += "/select/logsql/stats_query_range"
|
r.URL.Path += "/select/logsql/stats_query_range"
|
||||||
@ -33,11 +33,11 @@ func (s *Client) setVLogsRangeReqParams(r *http.Request, query string, start, en
|
|||||||
q.Add("start", start.Format(time.RFC3339))
|
q.Add("start", start.Format(time.RFC3339))
|
||||||
q.Add("end", end.Format(time.RFC3339))
|
q.Add("end", end.Format(time.RFC3339))
|
||||||
// set step as evaluationInterval by default
|
// set step as evaluationInterval by default
|
||||||
if s.evaluationInterval > 0 {
|
if c.evaluationInterval > 0 {
|
||||||
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
|
q.Set("step", fmt.Sprintf("%ds", int(c.evaluationInterval.Seconds())))
|
||||||
}
|
}
|
||||||
r.URL.RawQuery = q.Encode()
|
r.URL.RawQuery = q.Encode()
|
||||||
s.setReqParams(r, query)
|
c.setReqParams(r, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseVLogsResponse(req *http.Request, resp *http.Response) (res Result, err error) {
|
func parseVLogsResponse(req *http.Request, resp *http.Response) (res Result, err error) {
|
||||||
|
@ -8,6 +8,8 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Querier interface wraps Query and QueryRange methods
|
// Querier interface wraps Query and QueryRange methods
|
||||||
@ -55,7 +57,7 @@ type QuerierParams struct {
|
|||||||
|
|
||||||
// Metric is the basic entity which should be return by datasource
|
// Metric is the basic entity which should be return by datasource
|
||||||
type Metric struct {
|
type Metric struct {
|
||||||
Labels []Label
|
Labels []prompbmarshal.Label
|
||||||
Timestamps []int64
|
Timestamps []int64
|
||||||
Values []float64
|
Values []float64
|
||||||
}
|
}
|
||||||
@ -72,22 +74,9 @@ func (m *Metric) SetLabel(key, value string) {
|
|||||||
m.AddLabel(key, value)
|
m.AddLabel(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetLabels sets the given map as Metric labels
|
|
||||||
func (m *Metric) SetLabels(ls map[string]string) {
|
|
||||||
var i int
|
|
||||||
m.Labels = make([]Label, len(ls))
|
|
||||||
for k, v := range ls {
|
|
||||||
m.Labels[i] = Label{
|
|
||||||
Name: k,
|
|
||||||
Value: v,
|
|
||||||
}
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddLabel appends the given label to the label set
|
// AddLabel appends the given label to the label set
|
||||||
func (m *Metric) AddLabel(key, value string) {
|
func (m *Metric) AddLabel(key, value string) {
|
||||||
m.Labels = append(m.Labels, Label{Name: key, Value: value})
|
m.Labels = append(m.Labels, prompbmarshal.Label{Name: key, Value: value})
|
||||||
}
|
}
|
||||||
|
|
||||||
// DelLabel deletes the given label from the label set
|
// DelLabel deletes the given label from the label set
|
||||||
@ -110,14 +99,8 @@ func (m *Metric) Label(key string) string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// Label represents metric's label
|
|
||||||
type Label struct {
|
|
||||||
Name string
|
|
||||||
Value string
|
|
||||||
}
|
|
||||||
|
|
||||||
// Labels is collection of Label
|
// Labels is collection of Label
|
||||||
type Labels []Label
|
type Labels []prompbmarshal.Label
|
||||||
|
|
||||||
func (ls Labels) Len() int { return len(ls) }
|
func (ls Labels) Len() int { return len(ls) }
|
||||||
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
|
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
|
||||||
@ -172,7 +155,7 @@ func LabelCompare(a, b Labels) int {
|
|||||||
// ConvertToLabels convert map to Labels
|
// ConvertToLabels convert map to Labels
|
||||||
func ConvertToLabels(m map[string]string) (labelset Labels) {
|
func ConvertToLabels(m map[string]string) (labelset Labels) {
|
||||||
for k, v := range m {
|
for k, v := range m {
|
||||||
labelset = append(labelset, Label{
|
labelset = append(labelset, prompbmarshal.Label{
|
||||||
Name: k,
|
Name: k,
|
||||||
Value: v,
|
Value: v,
|
||||||
})
|
})
|
||||||
|
@ -3,6 +3,8 @@ package datasource
|
|||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPromInstant_UnmarshalPositive(t *testing.T) {
|
func TestPromInstant_UnmarshalPositive(t *testing.T) {
|
||||||
@ -21,7 +23,7 @@ func TestPromInstant_UnmarshalPositive(t *testing.T) {
|
|||||||
|
|
||||||
f(`[{"metric":{"__name__":"up"},"value":[1583780000,"42"]}]`, []Metric{
|
f(`[{"metric":{"__name__":"up"},"value":[1583780000,"42"]}]`, []Metric{
|
||||||
{
|
{
|
||||||
Labels: []Label{{Name: "__name__", Value: "up"}},
|
Labels: []prompbmarshal.Label{{Name: "__name__", Value: "up"}},
|
||||||
Timestamps: []int64{1583780000},
|
Timestamps: []int64{1583780000},
|
||||||
Values: []float64{42},
|
Values: []float64{42},
|
||||||
},
|
},
|
||||||
@ -31,17 +33,17 @@ func TestPromInstant_UnmarshalPositive(t *testing.T) {
|
|||||||
{"metric":{"__name__":"foo"},"value":[1583780001,"7"]},
|
{"metric":{"__name__":"foo"},"value":[1583780001,"7"]},
|
||||||
{"metric":{"__name__":"baz", "instance":"bar"},"value":[1583780002,"8"]}]`, []Metric{
|
{"metric":{"__name__":"baz", "instance":"bar"},"value":[1583780002,"8"]}]`, []Metric{
|
||||||
{
|
{
|
||||||
Labels: []Label{{Name: "__name__", Value: "up"}},
|
Labels: []prompbmarshal.Label{{Name: "__name__", Value: "up"}},
|
||||||
Timestamps: []int64{1583780000},
|
Timestamps: []int64{1583780000},
|
||||||
Values: []float64{42},
|
Values: []float64{42},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Labels: []Label{{Name: "__name__", Value: "foo"}},
|
Labels: []prompbmarshal.Label{{Name: "__name__", Value: "foo"}},
|
||||||
Timestamps: []int64{1583780001},
|
Timestamps: []int64{1583780001},
|
||||||
Values: []float64{7},
|
Values: []float64{7},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Labels: []Label{{Name: "__name__", Value: "baz"}, {Name: "instance", Value: "bar"}},
|
Labels: []prompbmarshal.Label{{Name: "__name__", Value: "baz"}, {Name: "instance", Value: "bar"}},
|
||||||
Timestamps: []int64{1583780002},
|
Timestamps: []int64{1583780002},
|
||||||
Values: []float64{8},
|
Values: []float64{8},
|
||||||
},
|
},
|
||||||
|
@ -167,14 +167,8 @@ type tplData struct {
|
|||||||
ExternalURL string
|
ExternalURL string
|
||||||
}
|
}
|
||||||
|
|
||||||
func templateAnnotation(dst io.Writer, text string, data tplData, tmpl *textTpl.Template, execute bool) error {
|
func templateAnnotation(dst io.Writer, text string, data tplData, tpl *textTpl.Template, execute bool) error {
|
||||||
tpl, err := tmpl.Clone()
|
tpl, err := tpl.Parse(text)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error cloning template before parse annotation: %w", err)
|
|
||||||
}
|
|
||||||
// Clone() doesn't copy tpl Options, so we set them manually
|
|
||||||
tpl = tpl.Option("missingkey=zero")
|
|
||||||
tpl, err = tpl.Parse(text)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing annotation template: %w", err)
|
return fmt.Errorf("error parsing annotation template: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ func TestAlertExecTemplate(t *testing.T) {
|
|||||||
qFn := func(_ string) ([]datasource.Metric, error) {
|
qFn := func(_ string) ([]datasource.Metric, error) {
|
||||||
return []datasource.Metric{
|
return []datasource.Metric{
|
||||||
{
|
{
|
||||||
Labels: []datasource.Label{
|
Labels: []prompbmarshal.Label{
|
||||||
{Name: "foo", Value: "bar"},
|
{Name: "foo", Value: "bar"},
|
||||||
{Name: "baz", Value: "qux"},
|
{Name: "baz", Value: "qux"},
|
||||||
},
|
},
|
||||||
@ -41,7 +41,7 @@ func TestAlertExecTemplate(t *testing.T) {
|
|||||||
Timestamps: []int64{1},
|
Timestamps: []int64{1},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Labels: []datasource.Label{
|
Labels: []prompbmarshal.Label{
|
||||||
{Name: "foo", Value: "garply"},
|
{Name: "foo", Value: "garply"},
|
||||||
{Name: "baz", Value: "fred"},
|
{Name: "baz", Value: "fred"},
|
||||||
},
|
},
|
||||||
|
@ -14,8 +14,10 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/templates"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/templates"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AlertingRule is basic alert entity
|
// AlertingRule is basic alert entity
|
||||||
@ -454,13 +456,16 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
|
|||||||
ar.logDebugf(ts, a, "created in state PENDING")
|
ar.logDebugf(ts, a, "created in state PENDING")
|
||||||
}
|
}
|
||||||
var numActivePending int
|
var numActivePending int
|
||||||
|
var tss []prompbmarshal.TimeSeries
|
||||||
for h, a := range ar.alerts {
|
for h, a := range ar.alerts {
|
||||||
// if alert wasn't updated in this iteration
|
// if alert wasn't updated in this iteration
|
||||||
// means it is resolved already
|
// means it is resolved already
|
||||||
if _, ok := updated[h]; !ok {
|
if _, ok := updated[h]; !ok {
|
||||||
if a.State == notifier.StatePending {
|
if a.State == notifier.StatePending {
|
||||||
// alert was in Pending state - it is not
|
// alert was in Pending state - it is not active anymore
|
||||||
// active anymore
|
// add stale time series
|
||||||
|
tss = append(tss, pendingAlertStaleTimeSeries(a.Labels, ts.Unix(), true)...)
|
||||||
|
|
||||||
delete(ar.alerts, h)
|
delete(ar.alerts, h)
|
||||||
ar.logDebugf(ts, a, "PENDING => DELETED: is absent in current evaluation round")
|
ar.logDebugf(ts, a, "PENDING => DELETED: is absent in current evaluation round")
|
||||||
continue
|
continue
|
||||||
@ -478,6 +483,9 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
|
|||||||
if ts.Sub(a.KeepFiringSince) >= ar.KeepFiringFor {
|
if ts.Sub(a.KeepFiringSince) >= ar.KeepFiringFor {
|
||||||
a.State = notifier.StateInactive
|
a.State = notifier.StateInactive
|
||||||
a.ResolvedAt = ts
|
a.ResolvedAt = ts
|
||||||
|
// add stale time series
|
||||||
|
tss = append(tss, firingAlertStaleTimeSeries(a.Labels, ts.Unix())...)
|
||||||
|
|
||||||
ar.logDebugf(ts, a, "FIRING => INACTIVE: is absent in current evaluation round")
|
ar.logDebugf(ts, a, "FIRING => INACTIVE: is absent in current evaluation round")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -489,6 +497,10 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
|
|||||||
a.State = notifier.StateFiring
|
a.State = notifier.StateFiring
|
||||||
a.Start = ts
|
a.Start = ts
|
||||||
alertsFired.Inc()
|
alertsFired.Inc()
|
||||||
|
if ar.For > 0 {
|
||||||
|
// add stale time series
|
||||||
|
tss = append(tss, pendingAlertStaleTimeSeries(a.Labels, ts.Unix(), false)...)
|
||||||
|
}
|
||||||
ar.logDebugf(ts, a, "PENDING => FIRING: %s since becoming active at %v", ts.Sub(a.ActiveAt), a.ActiveAt)
|
ar.logDebugf(ts, a, "PENDING => FIRING: %s since becoming active at %v", ts.Sub(a.ActiveAt), a.ActiveAt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -497,7 +509,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
|
|||||||
curState.Err = fmt.Errorf("exec exceeded limit of %d with %d alerts", limit, numActivePending)
|
curState.Err = fmt.Errorf("exec exceeded limit of %d with %d alerts", limit, numActivePending)
|
||||||
return nil, curState.Err
|
return nil, curState.Err
|
||||||
}
|
}
|
||||||
return ar.toTimeSeries(ts.Unix()), nil
|
return append(tss, ar.toTimeSeries(ts.Unix())...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.QueryFn, ts time.Time) (*labelSet, map[string]string, error) {
|
func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.QueryFn, ts time.Time) (*labelSet, map[string]string, error) {
|
||||||
@ -522,6 +534,7 @@ func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.Query
|
|||||||
return ls, as, nil
|
return ls, as, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// toTimeSeries creates `ALERTS` and `ALERTS_FOR_STATE` for active alerts
|
||||||
func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries {
|
func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries {
|
||||||
var tss []prompbmarshal.TimeSeries
|
var tss []prompbmarshal.TimeSeries
|
||||||
for _, a := range ar.alerts {
|
for _, a := range ar.alerts {
|
||||||
@ -601,26 +614,83 @@ func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp int64) []
|
|||||||
}
|
}
|
||||||
|
|
||||||
func alertToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.TimeSeries {
|
func alertToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.TimeSeries {
|
||||||
labels := make(map[string]string)
|
var labels []prompbmarshal.Label
|
||||||
for k, v := range a.Labels {
|
for k, v := range a.Labels {
|
||||||
labels[k] = v
|
labels = append(labels, prompbmarshal.Label{
|
||||||
|
Name: k,
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// __name__ already been dropped, no need to check duplication
|
||||||
|
labels = append(labels, prompbmarshal.Label{Name: "__name__", Value: alertMetricName})
|
||||||
|
if ol := promrelabel.GetLabelByName(labels, alertStateLabel); ol != nil {
|
||||||
|
ol.Value = a.State.String()
|
||||||
|
} else {
|
||||||
|
labels = append(labels, prompbmarshal.Label{Name: alertStateLabel, Value: a.State.String()})
|
||||||
}
|
}
|
||||||
labels["__name__"] = alertMetricName
|
|
||||||
labels[alertStateLabel] = a.State.String()
|
|
||||||
return newTimeSeries([]float64{1}, []int64{timestamp}, labels)
|
return newTimeSeries([]float64{1}, []int64{timestamp}, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
// alertForToTimeSeries returns a timeseries that represents
|
// alertForToTimeSeries returns a time series that represents
|
||||||
// state of active alerts, where value is time when alert become active
|
// state of active alerts, where value is time when alert become active
|
||||||
func alertForToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.TimeSeries {
|
func alertForToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.TimeSeries {
|
||||||
labels := make(map[string]string)
|
var labels []prompbmarshal.Label
|
||||||
for k, v := range a.Labels {
|
for k, v := range a.Labels {
|
||||||
labels[k] = v
|
labels = append(labels, prompbmarshal.Label{
|
||||||
|
Name: k,
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
labels["__name__"] = alertForStateMetricName
|
// __name__ already been dropped, no need to check duplication
|
||||||
|
labels = append(labels, prompbmarshal.Label{Name: "__name__", Value: alertForStateMetricName})
|
||||||
return newTimeSeries([]float64{float64(a.ActiveAt.Unix())}, []int64{timestamp}, labels)
|
return newTimeSeries([]float64{float64(a.ActiveAt.Unix())}, []int64{timestamp}, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pendingAlertStaleTimeSeries returns stale `ALERTS` and `ALERTS_FOR_STATE` time series
|
||||||
|
// for alerts which changed their state from Pending to Inactive or Firing.
|
||||||
|
func pendingAlertStaleTimeSeries(ls map[string]string, timestamp int64, includeAlertForState bool) []prompbmarshal.TimeSeries {
|
||||||
|
var result []prompbmarshal.TimeSeries
|
||||||
|
var baseLabels []prompbmarshal.Label
|
||||||
|
for k, v := range ls {
|
||||||
|
baseLabels = append(baseLabels, prompbmarshal.Label{
|
||||||
|
Name: k,
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// __name__ already been dropped, no need to check duplication
|
||||||
|
alertsLabels := append(baseLabels, prompbmarshal.Label{Name: "__name__", Value: alertMetricName})
|
||||||
|
alertsLabels = append(alertsLabels, prompbmarshal.Label{Name: alertStateLabel, Value: notifier.StatePending.String()})
|
||||||
|
result = append(result, newTimeSeries([]float64{decimal.StaleNaN}, []int64{timestamp}, alertsLabels))
|
||||||
|
|
||||||
|
if includeAlertForState {
|
||||||
|
alertsForStateLabels := append(baseLabels, prompbmarshal.Label{Name: "__name__", Value: alertForStateMetricName})
|
||||||
|
result = append(result, newTimeSeries([]float64{decimal.StaleNaN}, []int64{timestamp}, alertsForStateLabels))
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// firingAlertStaleTimeSeries returns stale `ALERTS` and `ALERTS_FOR_STATE` time series
|
||||||
|
// for alerts which changed their state from Firing to Inactive.
|
||||||
|
func firingAlertStaleTimeSeries(ls map[string]string, timestamp int64) []prompbmarshal.TimeSeries {
|
||||||
|
var baseLabels []prompbmarshal.Label
|
||||||
|
for k, v := range ls {
|
||||||
|
baseLabels = append(baseLabels, prompbmarshal.Label{
|
||||||
|
Name: k,
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// __name__ already been dropped, no need to check duplication
|
||||||
|
alertsLabels := append(baseLabels, prompbmarshal.Label{Name: "__name__", Value: alertMetricName})
|
||||||
|
alertsLabels = append(alertsLabels, prompbmarshal.Label{Name: alertStateLabel, Value: notifier.StateFiring.String()})
|
||||||
|
|
||||||
|
alertsForStateLabels := append(baseLabels, prompbmarshal.Label{Name: "__name__", Value: alertForStateMetricName})
|
||||||
|
|
||||||
|
return []prompbmarshal.TimeSeries{
|
||||||
|
newTimeSeries([]float64{decimal.StaleNaN}, []int64{timestamp}, alertsLabels),
|
||||||
|
newTimeSeries([]float64{decimal.StaleNaN}, []int64{timestamp}, alertsForStateLabels),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// restore restores the value of ActiveAt field for active alerts,
|
// restore restores the value of ActiveAt field for active alerts,
|
||||||
// based on previously written time series `alertForStateMetricName`.
|
// based on previously written time series `alertForStateMetricName`.
|
||||||
// Only rules with For > 0 can be restored.
|
// Only rules with For > 0 can be restored.
|
||||||
|
@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||||
)
|
)
|
||||||
@ -28,7 +29,7 @@ func TestAlertingRuleToTimeSeries(t *testing.T) {
|
|||||||
rule.alerts[alert.ID] = alert
|
rule.alerts[alert.ID] = alert
|
||||||
tss := rule.toTimeSeries(timestamp.Unix())
|
tss := rule.toTimeSeries(timestamp.Unix())
|
||||||
if err := compareTimeSeries(t, tssExpected, tss); err != nil {
|
if err := compareTimeSeries(t, tssExpected, tss); err != nil {
|
||||||
t.Fatalf("timeseries mismatch: %s", err)
|
t.Fatalf("timeseries mismatch for rule %q: %s", rule.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,14 +37,23 @@ func TestAlertingRuleToTimeSeries(t *testing.T) {
|
|||||||
State: notifier.StateFiring,
|
State: notifier.StateFiring,
|
||||||
ActiveAt: timestamp.Add(time.Second),
|
ActiveAt: timestamp.Add(time.Second),
|
||||||
}, []prompbmarshal.TimeSeries{
|
}, []prompbmarshal.TimeSeries{
|
||||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": alertMetricName,
|
{
|
||||||
alertStateLabel: notifier.StateFiring.String(),
|
Name: "__name__",
|
||||||
|
Value: alertMetricName,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: alertStateLabel,
|
||||||
|
Value: notifier.StateFiring.String(),
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
||||||
[]int64{timestamp.UnixNano()},
|
[]int64{timestamp.UnixNano()},
|
||||||
map[string]string{
|
[]prompbmarshal.Label{
|
||||||
"__name__": alertForStateMetricName,
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: alertForStateMetricName,
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -54,18 +64,40 @@ func TestAlertingRuleToTimeSeries(t *testing.T) {
|
|||||||
"instance": "bar",
|
"instance": "bar",
|
||||||
},
|
},
|
||||||
}, []prompbmarshal.TimeSeries{
|
}, []prompbmarshal.TimeSeries{
|
||||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()},
|
||||||
"__name__": alertMetricName,
|
[]prompbmarshal.Label{
|
||||||
alertStateLabel: notifier.StateFiring.String(),
|
{
|
||||||
"job": "foo",
|
Name: "__name__",
|
||||||
"instance": "bar",
|
Value: alertMetricName,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: alertStateLabel,
|
||||||
|
Value: notifier.StateFiring.String(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "instance",
|
||||||
|
Value: "bar",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
||||||
[]int64{timestamp.UnixNano()},
|
[]int64{timestamp.UnixNano()},
|
||||||
map[string]string{
|
[]prompbmarshal.Label{
|
||||||
"__name__": alertForStateMetricName,
|
{
|
||||||
"job": "foo",
|
Name: "__name__",
|
||||||
"instance": "bar",
|
Value: alertForStateMetricName,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "instance",
|
||||||
|
Value: "bar",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -73,18 +105,29 @@ func TestAlertingRuleToTimeSeries(t *testing.T) {
|
|||||||
State: notifier.StateFiring, ActiveAt: timestamp.Add(time.Second),
|
State: notifier.StateFiring, ActiveAt: timestamp.Add(time.Second),
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
alertStateLabel: "foo",
|
alertStateLabel: "foo",
|
||||||
"__name__": "bar",
|
|
||||||
},
|
},
|
||||||
}, []prompbmarshal.TimeSeries{
|
}, []prompbmarshal.TimeSeries{
|
||||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": alertMetricName,
|
{
|
||||||
alertStateLabel: notifier.StateFiring.String(),
|
Name: "__name__",
|
||||||
|
Value: alertMetricName,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: alertStateLabel,
|
||||||
|
Value: notifier.StateFiring.String(),
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
||||||
[]int64{timestamp.UnixNano()},
|
[]int64{timestamp.UnixNano()},
|
||||||
map[string]string{
|
[]prompbmarshal.Label{
|
||||||
"__name__": alertForStateMetricName,
|
{
|
||||||
alertStateLabel: "foo",
|
Name: "__name__",
|
||||||
|
Value: alertForStateMetricName,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: alertStateLabel,
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -92,14 +135,23 @@ func TestAlertingRuleToTimeSeries(t *testing.T) {
|
|||||||
State: notifier.StateFiring,
|
State: notifier.StateFiring,
|
||||||
ActiveAt: timestamp.Add(time.Second),
|
ActiveAt: timestamp.Add(time.Second),
|
||||||
}, []prompbmarshal.TimeSeries{
|
}, []prompbmarshal.TimeSeries{
|
||||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": alertMetricName,
|
{
|
||||||
alertStateLabel: notifier.StateFiring.String(),
|
Name: "__name__",
|
||||||
|
Value: alertMetricName,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: alertStateLabel,
|
||||||
|
Value: notifier.StateFiring.String(),
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
||||||
[]int64{timestamp.UnixNano()},
|
[]int64{timestamp.UnixNano()},
|
||||||
map[string]string{
|
[]prompbmarshal.Label{
|
||||||
"__name__": alertForStateMetricName,
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: alertForStateMetricName,
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -107,12 +159,21 @@ func TestAlertingRuleToTimeSeries(t *testing.T) {
|
|||||||
State: notifier.StatePending,
|
State: notifier.StatePending,
|
||||||
ActiveAt: timestamp.Add(time.Second),
|
ActiveAt: timestamp.Add(time.Second),
|
||||||
}, []prompbmarshal.TimeSeries{
|
}, []prompbmarshal.TimeSeries{
|
||||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": alertMetricName,
|
{
|
||||||
alertStateLabel: notifier.StatePending.String(),
|
Name: "__name__",
|
||||||
|
Value: alertMetricName,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: alertStateLabel,
|
||||||
|
Value: notifier.StatePending.String(),
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": alertForStateMetricName,
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: alertForStateMetricName,
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -124,7 +185,9 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
alert *notifier.Alert
|
alert *notifier.Alert
|
||||||
}
|
}
|
||||||
|
|
||||||
f := func(rule *AlertingRule, steps [][]datasource.Metric, alertsExpected map[int][]testAlert) {
|
ts, _ := time.Parse(time.RFC3339, "2024-10-29T00:00:00Z")
|
||||||
|
|
||||||
|
f := func(rule *AlertingRule, steps [][]datasource.Metric, alertsExpected map[int][]testAlert, tssExpected map[int][]prompbmarshal.TimeSeries) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
fq := &datasource.FakeQuerier{}
|
fq := &datasource.FakeQuerier{}
|
||||||
@ -134,13 +197,19 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
Name: "TestRule_Exec",
|
Name: "TestRule_Exec",
|
||||||
}
|
}
|
||||||
rule.GroupID = fakeGroup.ID()
|
rule.GroupID = fakeGroup.ID()
|
||||||
ts := time.Now()
|
|
||||||
for i, step := range steps {
|
for i, step := range steps {
|
||||||
fq.Reset()
|
fq.Reset()
|
||||||
fq.Add(step...)
|
fq.Add(step...)
|
||||||
if _, err := rule.exec(context.TODO(), ts, 0); err != nil {
|
tss, err := rule.exec(context.TODO(), ts, 0)
|
||||||
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %s", err)
|
t.Fatalf("unexpected error: %s", err)
|
||||||
}
|
}
|
||||||
|
// check generate time series
|
||||||
|
if _, ok := tssExpected[i]; ok {
|
||||||
|
if err := compareTimeSeries(t, tssExpected[i], tss); err != nil {
|
||||||
|
t.Fatalf("generated time series mismatch for rule %q in step %d: %s", rule.Name, i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// shift the execution timestamp before the next iteration
|
// shift the execution timestamp before the next iteration
|
||||||
ts = ts.Add(defaultStep)
|
ts = ts.Add(defaultStep)
|
||||||
@ -174,12 +243,20 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
f(newTestAlertingRule("empty", 0), [][]datasource.Metric{}, nil)
|
f(newTestAlertingRule("empty", 0), [][]datasource.Metric{}, nil, nil)
|
||||||
|
|
||||||
f(newTestAlertingRule("empty labels", 0), [][]datasource.Metric{
|
f(newTestAlertingRule("empty_labels", 0), [][]datasource.Metric{
|
||||||
{datasource.Metric{Values: []float64{1}, Timestamps: []int64{1}}},
|
{datasource.Metric{Values: []float64{1}, Timestamps: []int64{1}}},
|
||||||
}, map[int][]testAlert{
|
}, map[int][]testAlert{
|
||||||
0: {{alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
0: {{alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
||||||
|
},
|
||||||
|
map[int][]prompbmarshal.TimeSeries{
|
||||||
|
0: {
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "empty_labels"}, {Name: "alertstate", Value: "firing"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "empty_labels"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
f(newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>inactive", 0), [][]datasource.Metric{
|
f(newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>inactive", 0), [][]datasource.Metric{
|
||||||
@ -194,6 +271,25 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
2: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
2: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
||||||
3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
||||||
4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
||||||
|
}, map[int][]prompbmarshal.TimeSeries{
|
||||||
|
0: {
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
|
1: {
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
|
2: {
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Add(2 * defaultStep).Unix()), Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
f(newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>inactive=>firing", 0), [][]datasource.Metric{
|
f(newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>inactive=>firing", 0), [][]datasource.Metric{
|
||||||
@ -210,7 +306,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
||||||
4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
||||||
5: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
5: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
f(newTestAlertingRule("multiple-firing", 0), [][]datasource.Metric{
|
f(newTestAlertingRule("multiple-firing", 0), [][]datasource.Metric{
|
||||||
{
|
{
|
||||||
@ -224,7 +320,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
{labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateFiring}},
|
{labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateFiring}},
|
||||||
{labels: []string{"name", "foo2"}, alert: ¬ifier.Alert{State: notifier.StateFiring}},
|
{labels: []string{"name", "foo2"}, alert: ¬ifier.Alert{State: notifier.StateFiring}},
|
||||||
},
|
},
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
// 1: fire first alert
|
// 1: fire first alert
|
||||||
// 2: fire second alert, set first inactive
|
// 2: fire second alert, set first inactive
|
||||||
@ -233,8 +329,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
{metricWithLabels(t, "name", "foo1")},
|
{metricWithLabels(t, "name", "foo1")},
|
||||||
{metricWithLabels(t, "name", "foo2")},
|
{metricWithLabels(t, "name", "foo2")},
|
||||||
},
|
}, map[int][]testAlert{
|
||||||
map[int][]testAlert{
|
|
||||||
0: {
|
0: {
|
||||||
{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}},
|
{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}},
|
||||||
},
|
},
|
||||||
@ -247,13 +342,44 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
{labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateInactive}},
|
{labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateInactive}},
|
||||||
{labels: []string{"name", "foo2"}, alert: ¬ifier.Alert{State: notifier.StateFiring}},
|
{labels: []string{"name", "foo2"}, alert: ¬ifier.Alert{State: notifier.StateFiring}},
|
||||||
},
|
},
|
||||||
|
}, map[int][]prompbmarshal.TimeSeries{
|
||||||
|
0: {
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
|
1: {
|
||||||
|
// stale time series for foo, `firing -> inactive`
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
// new time series for foo1
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo1"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo1"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Add(defaultStep).Unix()), Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
|
2: {
|
||||||
|
// stale time series for foo1
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo1"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo1"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}},
|
||||||
|
// new time series for foo2
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo2"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo2"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Add(2 * defaultStep).Unix()), Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
f(newTestAlertingRule("for-pending", time.Minute), [][]datasource.Metric{
|
f(newTestAlertingRule("for-pending", time.Minute), [][]datasource.Metric{
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
}, map[int][]testAlert{
|
}, map[int][]testAlert{
|
||||||
0: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
0: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
f(newTestAlertingRule("for-fired", defaultStep), [][]datasource.Metric{
|
f(newTestAlertingRule("for-fired", defaultStep), [][]datasource.Metric{
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
@ -261,6 +387,22 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
}, map[int][]testAlert{
|
}, map[int][]testAlert{
|
||||||
0: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
0: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
||||||
1: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
1: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
||||||
|
}, map[int][]prompbmarshal.TimeSeries{
|
||||||
|
0: {
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
|
1: {
|
||||||
|
// stale time series for `pending -> firing`
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Add(defaultStep).Unix()), Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
f(newTestAlertingRule("for-pending=>empty", time.Second), [][]datasource.Metric{
|
f(newTestAlertingRule("for-pending=>empty", time.Second), [][]datasource.Metric{
|
||||||
@ -272,6 +414,26 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
0: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
0: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
||||||
1: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
1: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
||||||
2: {},
|
2: {},
|
||||||
|
}, map[int][]prompbmarshal.TimeSeries{
|
||||||
|
0: {
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
|
1: {
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
|
// stale time series for `pending -> inactive`
|
||||||
|
2: {
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}},
|
||||||
|
{Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "name", Value: "foo"}},
|
||||||
|
Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
f(newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", defaultStep), [][]datasource.Metric{
|
f(newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", defaultStep), [][]datasource.Metric{
|
||||||
@ -287,7 +449,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
2: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
2: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
||||||
3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
||||||
4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
f(newTestAlertingRuleWithCustomFields("for-pending=>firing=>keepfiring=>firing", defaultStep, 0, defaultStep, nil), [][]datasource.Metric{
|
f(newTestAlertingRuleWithCustomFields("for-pending=>firing=>keepfiring=>firing", defaultStep, 0, defaultStep, nil), [][]datasource.Metric{
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
@ -300,7 +462,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
1: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
1: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
||||||
2: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
2: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
||||||
3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
f(newTestAlertingRuleWithCustomFields("for-pending=>firing=>keepfiring=>keepfiring=>inactive=>pending=>firing", defaultStep, 0, 2*defaultStep, nil), [][]datasource.Metric{
|
f(newTestAlertingRuleWithCustomFields("for-pending=>firing=>keepfiring=>keepfiring=>inactive=>pending=>firing", defaultStep, 0, 2*defaultStep, nil), [][]datasource.Metric{
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
@ -321,7 +483,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||||||
4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}},
|
||||||
5: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
5: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}},
|
||||||
6: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
6: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}},
|
||||||
})
|
}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAlertingRuleExecRange(t *testing.T) {
|
func TestAlertingRuleExecRange(t *testing.T) {
|
||||||
@ -477,7 +639,7 @@ func TestAlertingRuleExecRange(t *testing.T) {
|
|||||||
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
|
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
|
||||||
{
|
{
|
||||||
Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
||||||
Labels: []datasource.Label{{Name: "foo", Value: "bar"}},
|
Labels: []prompbmarshal.Label{{Name: "foo", Value: "bar"}},
|
||||||
},
|
},
|
||||||
}, []*notifier.Alert{
|
}, []*notifier.Alert{
|
||||||
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
|
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
|
||||||
@ -523,7 +685,7 @@ func TestAlertingRuleExecRange(t *testing.T) {
|
|||||||
{Values: []float64{1, 1}, Timestamps: []int64{1, 100}},
|
{Values: []float64{1, 1}, Timestamps: []int64{1, 100}},
|
||||||
{
|
{
|
||||||
Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
||||||
Labels: []datasource.Label{{Name: "foo", Value: "bar"}},
|
Labels: []prompbmarshal.Label{{Name: "foo", Value: "bar"}},
|
||||||
},
|
},
|
||||||
}, []*notifier.Alert{
|
}, []*notifier.Alert{
|
||||||
{
|
{
|
||||||
@ -1047,7 +1209,7 @@ func newTestAlertingRuleWithCustomFields(name string, waitFor, evalInterval, kee
|
|||||||
|
|
||||||
func TestAlertingRule_ToLabels(t *testing.T) {
|
func TestAlertingRule_ToLabels(t *testing.T) {
|
||||||
metric := datasource.Metric{
|
metric := datasource.Metric{
|
||||||
Labels: []datasource.Label{
|
Labels: []prompbmarshal.Label{
|
||||||
{Name: "instance", Value: "0.0.0.0:8800"},
|
{Name: "instance", Value: "0.0.0.0:8800"},
|
||||||
{Name: "group", Value: "vmalert"},
|
{Name: "group", Value: "vmalert"},
|
||||||
{Name: "alertname", Value: "ConfigurationReloadFailure"},
|
{Name: "alertname", Value: "ConfigurationReloadFailure"},
|
||||||
|
@ -8,12 +8,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
|
|
||||||
"github.com/cheggaaa/pb/v3"
|
"github.com/cheggaaa/pb/v3"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
||||||
@ -21,7 +18,6 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
@ -353,7 +349,6 @@ func (g *Group) Start(ctx context.Context, nts func() []notifier.Notifier, rw re
|
|||||||
Rw: rw,
|
Rw: rw,
|
||||||
Notifiers: nts,
|
Notifiers: nts,
|
||||||
notifierHeaders: g.NotifierHeaders,
|
notifierHeaders: g.NotifierHeaders,
|
||||||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
g.infof("started")
|
g.infof("started")
|
||||||
@ -426,8 +421,6 @@ func (g *Group) Start(ctx context.Context, nts func() []notifier.Notifier, rw re
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure that staleness is tracked for existing rules only
|
|
||||||
e.purgeStaleSeries(g.Rules)
|
|
||||||
e.notifierHeaders = g.NotifierHeaders
|
e.notifierHeaders = g.NotifierHeaders
|
||||||
g.mu.Unlock()
|
g.mu.Unlock()
|
||||||
|
|
||||||
@ -542,7 +535,6 @@ func (g *Group) ExecOnce(ctx context.Context, nts func() []notifier.Notifier, rw
|
|||||||
Rw: rw,
|
Rw: rw,
|
||||||
Notifiers: nts,
|
Notifiers: nts,
|
||||||
notifierHeaders: g.NotifierHeaders,
|
notifierHeaders: g.NotifierHeaders,
|
||||||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
||||||
}
|
}
|
||||||
if len(g.Rules) < 1 {
|
if len(g.Rules) < 1 {
|
||||||
return nil
|
return nil
|
||||||
@ -633,13 +625,6 @@ type executor struct {
|
|||||||
notifierHeaders map[string]string
|
notifierHeaders map[string]string
|
||||||
|
|
||||||
Rw remotewrite.RWClient
|
Rw remotewrite.RWClient
|
||||||
|
|
||||||
previouslySentSeriesToRWMu sync.Mutex
|
|
||||||
// previouslySentSeriesToRW stores series sent to RW on previous iteration
|
|
||||||
// map[ruleID]map[ruleLabels][]prompb.Label
|
|
||||||
// where `ruleID` is ID of the Rule within a Group
|
|
||||||
// and `ruleLabels` is []prompb.Label marshalled to a string
|
|
||||||
previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// execConcurrently executes rules concurrently if concurrency>1
|
// execConcurrently executes rules concurrently if concurrency>1
|
||||||
@ -706,11 +691,6 @@ func (e *executor) exec(ctx context.Context, r Rule, ts time.Time, resolveDurati
|
|||||||
if err := pushToRW(tss); err != nil {
|
if err := pushToRW(tss); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
staleSeries := e.getStaleSeries(r, tss, ts)
|
|
||||||
if err := pushToRW(staleSeries); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ar, ok := r.(*AlertingRule)
|
ar, ok := r.(*AlertingRule)
|
||||||
@ -737,79 +717,3 @@ func (e *executor) exec(ctx context.Context, r Rule, ts time.Time, resolveDurati
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
return errGr.Err()
|
return errGr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
var bbPool bytesutil.ByteBufferPool
|
|
||||||
|
|
||||||
// getStaleSeries checks whether there are stale series from previously sent ones.
|
|
||||||
func (e *executor) getStaleSeries(r Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries {
|
|
||||||
bb := bbPool.Get()
|
|
||||||
defer bbPool.Put(bb)
|
|
||||||
|
|
||||||
ruleLabels := make(map[string][]prompbmarshal.Label, len(tss))
|
|
||||||
for _, ts := range tss {
|
|
||||||
// convert labels to strings, so we can compare with previously sent series
|
|
||||||
bb.B = labelsToString(bb.B, ts.Labels)
|
|
||||||
ruleLabels[string(bb.B)] = ts.Labels
|
|
||||||
bb.Reset()
|
|
||||||
}
|
|
||||||
|
|
||||||
rID := r.ID()
|
|
||||||
var staleS []prompbmarshal.TimeSeries
|
|
||||||
// check whether there are series which disappeared and need to be marked as stale
|
|
||||||
e.previouslySentSeriesToRWMu.Lock()
|
|
||||||
for key, labels := range e.previouslySentSeriesToRW[rID] {
|
|
||||||
if _, ok := ruleLabels[key]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// previously sent series are missing in current series, so we mark them as stale
|
|
||||||
ss := newTimeSeriesPB([]float64{decimal.StaleNaN}, []int64{timestamp.Unix()}, labels)
|
|
||||||
staleS = append(staleS, ss)
|
|
||||||
}
|
|
||||||
// set previous series to current
|
|
||||||
e.previouslySentSeriesToRW[rID] = ruleLabels
|
|
||||||
e.previouslySentSeriesToRWMu.Unlock()
|
|
||||||
|
|
||||||
return staleS
|
|
||||||
}
|
|
||||||
|
|
||||||
// purgeStaleSeries deletes references in tracked
|
|
||||||
// previouslySentSeriesToRW list to Rules which aren't present
|
|
||||||
// in the given activeRules list. The method is used when the list
|
|
||||||
// of loaded rules has changed and executor has to remove
|
|
||||||
// references to non-existing rules.
|
|
||||||
func (e *executor) purgeStaleSeries(activeRules []Rule) {
|
|
||||||
newPreviouslySentSeriesToRW := make(map[uint64]map[string][]prompbmarshal.Label)
|
|
||||||
|
|
||||||
e.previouslySentSeriesToRWMu.Lock()
|
|
||||||
|
|
||||||
for _, rule := range activeRules {
|
|
||||||
id := rule.ID()
|
|
||||||
prev, ok := e.previouslySentSeriesToRW[id]
|
|
||||||
if ok {
|
|
||||||
// keep previous series for staleness detection
|
|
||||||
newPreviouslySentSeriesToRW[id] = prev
|
|
||||||
}
|
|
||||||
}
|
|
||||||
e.previouslySentSeriesToRW = nil
|
|
||||||
e.previouslySentSeriesToRW = newPreviouslySentSeriesToRW
|
|
||||||
|
|
||||||
e.previouslySentSeriesToRWMu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func labelsToString(dst []byte, labels []prompbmarshal.Label) []byte {
|
|
||||||
dst = append(dst, '{')
|
|
||||||
for i, label := range labels {
|
|
||||||
if len(label.Name) == 0 {
|
|
||||||
dst = append(dst, "__name__"...)
|
|
||||||
} else {
|
|
||||||
dst = append(dst, label.Name...)
|
|
||||||
}
|
|
||||||
dst = append(dst, '=')
|
|
||||||
dst = strconv.AppendQuote(dst, label.Value)
|
|
||||||
if i < len(labels)-1 {
|
|
||||||
dst = append(dst, ',')
|
|
||||||
}
|
|
||||||
}
|
|
||||||
dst = append(dst, '}')
|
|
||||||
return dst
|
|
||||||
}
|
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -17,8 +16,6 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/templates"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/templates"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -383,153 +380,6 @@ func TestGetResolveDuration(t *testing.T) {
|
|||||||
f(2*time.Minute, 0, 1*time.Minute, 8*time.Minute)
|
f(2*time.Minute, 0, 1*time.Minute, 8*time.Minute)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetStaleSeries(t *testing.T) {
|
|
||||||
ts := time.Now()
|
|
||||||
e := &executor{
|
|
||||||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
||||||
}
|
|
||||||
f := func(r Rule, labels, expLabels [][]prompbmarshal.Label) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
var tss []prompbmarshal.TimeSeries
|
|
||||||
for _, l := range labels {
|
|
||||||
tss = append(tss, newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, l))
|
|
||||||
}
|
|
||||||
staleS := e.getStaleSeries(r, tss, ts)
|
|
||||||
if staleS == nil && expLabels == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(staleS) != len(expLabels) {
|
|
||||||
t.Fatalf("expected to get %d stale series, got %d",
|
|
||||||
len(expLabels), len(staleS))
|
|
||||||
}
|
|
||||||
for i, exp := range expLabels {
|
|
||||||
got := staleS[i]
|
|
||||||
if !reflect.DeepEqual(exp, got.Labels) {
|
|
||||||
t.Fatalf("expected to get labels: \n%v;\ngot instead: \n%v",
|
|
||||||
exp, got.Labels)
|
|
||||||
}
|
|
||||||
if len(got.Samples) != 1 {
|
|
||||||
t.Fatalf("expected to have 1 sample; got %d", len(got.Samples))
|
|
||||||
}
|
|
||||||
if !decimal.IsStaleNaN(got.Samples[0].Value) {
|
|
||||||
t.Fatalf("expected sample value to be %v; got %v", decimal.StaleNaN, got.Samples[0].Value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// warn: keep in mind, that executor holds the state, so sequence of f calls matters
|
|
||||||
|
|
||||||
// single series
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")},
|
|
||||||
nil)
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")},
|
|
||||||
nil)
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
nil,
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")})
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
nil,
|
|
||||||
nil)
|
|
||||||
|
|
||||||
// multiple series
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
[][]prompbmarshal.Label{
|
|
||||||
toPromLabels(t, "__name__", "job:foo", "job", "foo"),
|
|
||||||
toPromLabels(t, "__name__", "job:foo", "job", "bar"),
|
|
||||||
},
|
|
||||||
nil)
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")})
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
|
|
||||||
nil)
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
nil,
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")})
|
|
||||||
|
|
||||||
// multiple rules and series
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
[][]prompbmarshal.Label{
|
|
||||||
toPromLabels(t, "__name__", "job:foo", "job", "foo"),
|
|
||||||
toPromLabels(t, "__name__", "job:foo", "job", "bar"),
|
|
||||||
},
|
|
||||||
nil)
|
|
||||||
f(&AlertingRule{RuleID: 2},
|
|
||||||
[][]prompbmarshal.Label{
|
|
||||||
toPromLabels(t, "__name__", "job:foo", "job", "foo"),
|
|
||||||
toPromLabels(t, "__name__", "job:foo", "job", "bar"),
|
|
||||||
},
|
|
||||||
nil)
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")})
|
|
||||||
f(&AlertingRule{RuleID: 1},
|
|
||||||
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
|
|
||||||
nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPurgeStaleSeries(t *testing.T) {
|
|
||||||
ts := time.Now()
|
|
||||||
labels := toPromLabels(t, "__name__", "job:foo", "job", "foo")
|
|
||||||
tss := []prompbmarshal.TimeSeries{newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, labels)}
|
|
||||||
|
|
||||||
f := func(curRules, newRules, expStaleRules []Rule) {
|
|
||||||
t.Helper()
|
|
||||||
e := &executor{
|
|
||||||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
||||||
}
|
|
||||||
// seed executor with series for
|
|
||||||
// current rules
|
|
||||||
for _, rule := range curRules {
|
|
||||||
e.getStaleSeries(rule, tss, ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
e.purgeStaleSeries(newRules)
|
|
||||||
|
|
||||||
if len(e.previouslySentSeriesToRW) != len(expStaleRules) {
|
|
||||||
t.Fatalf("expected to get %d stale series, got %d",
|
|
||||||
len(expStaleRules), len(e.previouslySentSeriesToRW))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, exp := range expStaleRules {
|
|
||||||
if _, ok := e.previouslySentSeriesToRW[exp.ID()]; !ok {
|
|
||||||
t.Fatalf("expected to have rule %d; got nil instead", exp.ID())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
f(nil, nil, nil)
|
|
||||||
f(
|
|
||||||
nil,
|
|
||||||
[]Rule{&AlertingRule{RuleID: 1}},
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
f(
|
|
||||||
[]Rule{&AlertingRule{RuleID: 1}},
|
|
||||||
nil,
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
f(
|
|
||||||
[]Rule{&AlertingRule{RuleID: 1}},
|
|
||||||
[]Rule{&AlertingRule{RuleID: 2}},
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
f(
|
|
||||||
[]Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}},
|
|
||||||
[]Rule{&AlertingRule{RuleID: 2}},
|
|
||||||
[]Rule{&AlertingRule{RuleID: 2}},
|
|
||||||
)
|
|
||||||
f(
|
|
||||||
[]Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}},
|
|
||||||
[]Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}},
|
|
||||||
[]Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFaultyNotifier(t *testing.T) {
|
func TestFaultyNotifier(t *testing.T) {
|
||||||
fq := &datasource.FakeQuerier{}
|
fq := &datasource.FakeQuerier{}
|
||||||
fq.Add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
|
fq.Add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
|
||||||
@ -581,7 +431,6 @@ func TestFaultyRW(t *testing.T) {
|
|||||||
|
|
||||||
e := &executor{
|
e := &executor{
|
||||||
Rw: &remotewrite.Client{},
|
Rw: &remotewrite.Client{},
|
||||||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err := e.exec(context.Background(), r, time.Now(), 0, 10)
|
err := e.exec(context.Background(), r, time.Now(), 0, 10)
|
||||||
|
@ -1,36 +0,0 @@
|
|||||||
package rule
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
||||||
)
|
|
||||||
|
|
||||||
func BenchmarkGetStaleSeries(b *testing.B) {
|
|
||||||
ts := time.Now()
|
|
||||||
n := 100
|
|
||||||
payload := make([]prompbmarshal.TimeSeries, 0, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
s := fmt.Sprintf("%d", i)
|
|
||||||
labels := toPromLabels(b,
|
|
||||||
"__name__", "foo", ""+
|
|
||||||
"instance", s,
|
|
||||||
"job", s,
|
|
||||||
"state", s,
|
|
||||||
)
|
|
||||||
payload = append(payload, newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, labels))
|
|
||||||
}
|
|
||||||
|
|
||||||
e := &executor{
|
|
||||||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
||||||
}
|
|
||||||
ar := &AlertingRule{RuleID: 1}
|
|
||||||
|
|
||||||
b.ResetTimer()
|
|
||||||
b.ReportAllocs()
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
e.getStaleSeries(ar, payload, ts)
|
|
||||||
}
|
|
||||||
}
|
|
@ -3,16 +3,17 @@ package rule
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RecordingRule is a Rule that supposed
|
// RecordingRule is a Rule that supposed
|
||||||
@ -34,6 +35,8 @@ type RecordingRule struct {
|
|||||||
// during evaluations
|
// during evaluations
|
||||||
state *ruleState
|
state *ruleState
|
||||||
|
|
||||||
|
lastEvaluation map[string]struct{}
|
||||||
|
|
||||||
metrics *recordingRuleMetrics
|
metrics *recordingRuleMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,7 +116,7 @@ func (rr *RecordingRule) execRange(ctx context.Context, start, end time.Time) ([
|
|||||||
var tss []prompbmarshal.TimeSeries
|
var tss []prompbmarshal.TimeSeries
|
||||||
for _, s := range res.Data {
|
for _, s := range res.Data {
|
||||||
ts := rr.toTimeSeries(s)
|
ts := rr.toTimeSeries(s)
|
||||||
key := stringifyLabels(ts)
|
key := stringifyLabels(ts.Labels)
|
||||||
if _, ok := duplicates[key]; ok {
|
if _, ok := duplicates[key]; ok {
|
||||||
return nil, fmt.Errorf("original metric %v; resulting labels %q: %w", s.Labels, key, errDuplicate)
|
return nil, fmt.Errorf("original metric %v; resulting labels %q: %w", s.Labels, key, errDuplicate)
|
||||||
}
|
}
|
||||||
@ -155,28 +158,47 @@ func (rr *RecordingRule) exec(ctx context.Context, ts time.Time, limit int) ([]p
|
|||||||
return nil, curState.Err
|
return nil, curState.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
duplicates := make(map[string]struct{}, len(qMetrics))
|
curEvaluation := make(map[string]struct{}, len(qMetrics))
|
||||||
|
lastEvaluation := rr.lastEvaluation
|
||||||
var tss []prompbmarshal.TimeSeries
|
var tss []prompbmarshal.TimeSeries
|
||||||
for _, r := range qMetrics {
|
for _, r := range qMetrics {
|
||||||
ts := rr.toTimeSeries(r)
|
ts := rr.toTimeSeries(r)
|
||||||
key := stringifyLabels(ts)
|
key := stringifyLabels(ts.Labels)
|
||||||
if _, ok := duplicates[key]; ok {
|
if _, ok := curEvaluation[key]; ok {
|
||||||
curState.Err = fmt.Errorf("original metric %v; resulting labels %q: %w", r, key, errDuplicate)
|
curState.Err = fmt.Errorf("original metric %v; resulting labels %q: %w", r, key, errDuplicate)
|
||||||
return nil, curState.Err
|
return nil, curState.Err
|
||||||
}
|
}
|
||||||
duplicates[key] = struct{}{}
|
curEvaluation[key] = struct{}{}
|
||||||
|
delete(lastEvaluation, key)
|
||||||
tss = append(tss, ts)
|
tss = append(tss, ts)
|
||||||
}
|
}
|
||||||
|
// check for stale time series
|
||||||
|
for k := range lastEvaluation {
|
||||||
|
tss = append(tss, prompbmarshal.TimeSeries{
|
||||||
|
Labels: stringToLabels(k),
|
||||||
|
Samples: []prompbmarshal.Sample{
|
||||||
|
{Value: decimal.StaleNaN, Timestamp: ts.UnixNano() / 1e6},
|
||||||
|
}})
|
||||||
|
}
|
||||||
|
rr.lastEvaluation = curEvaluation
|
||||||
return tss, nil
|
return tss, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func stringifyLabels(ts prompbmarshal.TimeSeries) string {
|
func stringToLabels(s string) []prompbmarshal.Label {
|
||||||
labels := ts.Labels
|
labels := strings.Split(s, ",")
|
||||||
if len(labels) > 1 {
|
rLabels := make([]prompbmarshal.Label, 0, len(labels))
|
||||||
sort.Slice(labels, func(i, j int) bool {
|
for i := range labels {
|
||||||
return labels[i].Name < labels[j].Name
|
if label := strings.Split(labels[i], "="); len(label) == 2 {
|
||||||
|
rLabels = append(rLabels, prompbmarshal.Label{
|
||||||
|
Name: label[0],
|
||||||
|
Value: label[1],
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return rLabels
|
||||||
|
}
|
||||||
|
|
||||||
|
func stringifyLabels(labels []prompbmarshal.Label) string {
|
||||||
b := strings.Builder{}
|
b := strings.Builder{}
|
||||||
for i, l := range labels {
|
for i, l := range labels {
|
||||||
b.WriteString(l.Name)
|
b.WriteString(l.Name)
|
||||||
@ -190,19 +212,27 @@ func stringifyLabels(ts prompbmarshal.TimeSeries) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rr *RecordingRule) toTimeSeries(m datasource.Metric) prompbmarshal.TimeSeries {
|
func (rr *RecordingRule) toTimeSeries(m datasource.Metric) prompbmarshal.TimeSeries {
|
||||||
labels := make(map[string]string)
|
if preN := promrelabel.GetLabelByName(m.Labels, "__name__"); preN != nil {
|
||||||
for _, l := range m.Labels {
|
preN.Value = rr.Name
|
||||||
labels[l.Name] = l.Value
|
} else {
|
||||||
|
m.Labels = append(m.Labels, prompbmarshal.Label{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: rr.Name,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
labels["__name__"] = rr.Name
|
for k := range rr.Labels {
|
||||||
// override existing labels with configured ones
|
prevLabel := promrelabel.GetLabelByName(m.Labels, k)
|
||||||
for k, v := range rr.Labels {
|
if prevLabel != nil && prevLabel.Value != rr.Labels[k] {
|
||||||
if _, ok := labels[k]; ok && labels[k] != v {
|
// Rename the prevLabel to "exported_" + label.Name
|
||||||
labels[fmt.Sprintf("exported_%s", k)] = labels[k]
|
prevLabel.Name = fmt.Sprintf("exported_%s", prevLabel.Name)
|
||||||
}
|
}
|
||||||
labels[k] = v
|
m.Labels = append(m.Labels, prompbmarshal.Label{
|
||||||
|
Name: k,
|
||||||
|
Value: rr.Labels[k],
|
||||||
|
})
|
||||||
}
|
}
|
||||||
return newTimeSeries(m.Values, m.Timestamps, labels)
|
ts := newTimeSeries(m.Values, m.Timestamps, m.Labels)
|
||||||
|
return ts
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateWith copies all significant fields.
|
// updateWith copies all significant fields.
|
||||||
|
@ -9,59 +9,151 @@ import (
|
|||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRecordingRule_Exec(t *testing.T) {
|
func TestRecordingRule_Exec(t *testing.T) {
|
||||||
f := func(rule *RecordingRule, metrics []datasource.Metric, tssExpected []prompbmarshal.TimeSeries) {
|
ts, _ := time.Parse(time.RFC3339, "2024-10-29T00:00:00Z")
|
||||||
|
const defaultStep = 5 * time.Millisecond
|
||||||
|
|
||||||
|
f := func(rule *RecordingRule, steps [][]datasource.Metric, tssExpected [][]prompbmarshal.TimeSeries) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
fq := &datasource.FakeQuerier{}
|
fq := &datasource.FakeQuerier{}
|
||||||
fq.Add(metrics...)
|
for i, step := range steps {
|
||||||
|
fq.Reset()
|
||||||
|
fq.Add(step...)
|
||||||
rule.q = fq
|
rule.q = fq
|
||||||
rule.state = &ruleState{
|
rule.state = &ruleState{
|
||||||
entries: make([]StateEntry, 10),
|
entries: make([]StateEntry, 10),
|
||||||
}
|
}
|
||||||
tss, err := rule.exec(context.TODO(), time.Now(), 0)
|
tss, err := rule.exec(context.TODO(), ts, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected RecordingRule.exec error: %s", err)
|
t.Fatalf("unexpected RecordingRule.exec error: %s", err)
|
||||||
}
|
}
|
||||||
if err := compareTimeSeries(t, tssExpected, tss); err != nil {
|
if err := compareTimeSeries(t, tssExpected[i], tss); err != nil {
|
||||||
t.Fatalf("timeseries missmatch: %s", err)
|
t.Fatalf("time series mismatch: %s", err)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
timestamp := time.Now()
|
ts = ts.Add(defaultStep)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
f(&RecordingRule{
|
f(&RecordingRule{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
}, []datasource.Metric{
|
}, [][]datasource.Metric{{
|
||||||
metricWithValueAndLabels(t, 10, "__name__", "bar"),
|
metricWithValueAndLabels(t, 10, "__name__", "bar"),
|
||||||
}, []prompbmarshal.TimeSeries{
|
}}, [][]prompbmarshal.TimeSeries{{
|
||||||
newTimeSeries([]float64{10}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{10}, []int64{ts.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": "foo",
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
}})
|
||||||
|
|
||||||
f(&RecordingRule{
|
f(&RecordingRule{
|
||||||
Name: "foobarbaz",
|
Name: "foobarbaz",
|
||||||
}, []datasource.Metric{
|
}, [][]datasource.Metric{
|
||||||
|
{
|
||||||
metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"),
|
metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"),
|
||||||
metricWithValueAndLabels(t, 2, "__name__", "bar", "job", "bar"),
|
metricWithValueAndLabels(t, 2, "__name__", "bar", "job", "bar"),
|
||||||
metricWithValueAndLabels(t, 3, "__name__", "baz", "job", "baz"),
|
metricWithValueAndLabels(t, 3, "__name__", "baz", "job", "baz"),
|
||||||
}, []prompbmarshal.TimeSeries{
|
},
|
||||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
{
|
||||||
"__name__": "foobarbaz",
|
metricWithValueAndLabels(t, 10, "__name__", "foo", "job", "foo"),
|
||||||
"job": "foo",
|
},
|
||||||
|
{
|
||||||
|
metricWithValueAndLabels(t, 10, "__name__", "foo", "job", "bar"),
|
||||||
|
},
|
||||||
|
}, [][]prompbmarshal.TimeSeries{{
|
||||||
|
newTimeSeries([]float64{1}, []int64{ts.UnixNano()}, []prompbmarshal.Label{
|
||||||
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{2}, []int64{ts.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": "foobarbaz",
|
{
|
||||||
"job": "bar",
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "bar",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{3}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{3}, []int64{ts.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": "foobarbaz",
|
{
|
||||||
"job": "baz",
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "baz",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
newTimeSeries([]float64{10}, []int64{ts.Add(defaultStep).UnixNano()}, []prompbmarshal.Label{
|
||||||
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
// other series are with NaN values
|
||||||
|
newTimeSeries([]float64{decimal.StaleNaN}, []int64{ts.Add(defaultStep).UnixNano()}, []prompbmarshal.Label{
|
||||||
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "bar",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
newTimeSeries([]float64{decimal.StaleNaN}, []int64{ts.Add(defaultStep).UnixNano()}, []prompbmarshal.Label{
|
||||||
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "baz",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
newTimeSeries([]float64{10}, []int64{ts.Add(2 * defaultStep).UnixNano()}, []prompbmarshal.Label{
|
||||||
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "bar",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
newTimeSeries([]float64{decimal.StaleNaN}, []int64{ts.Add(2 * defaultStep).UnixNano()}, []prompbmarshal.Label{
|
||||||
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
f(&RecordingRule{
|
f(&RecordingRule{
|
||||||
@ -69,22 +161,44 @@ func TestRecordingRule_Exec(t *testing.T) {
|
|||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
"source": "test",
|
"source": "test",
|
||||||
},
|
},
|
||||||
}, []datasource.Metric{
|
}, [][]datasource.Metric{{
|
||||||
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"),
|
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"),
|
||||||
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar", "source", "origin"),
|
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar", "source", "origin"),
|
||||||
}, []prompbmarshal.TimeSeries{
|
}}, [][]prompbmarshal.TimeSeries{{
|
||||||
newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{2}, []int64{ts.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": "job:foo",
|
{
|
||||||
"job": "foo",
|
Name: "__name__",
|
||||||
"source": "test",
|
Value: "job:foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "source",
|
||||||
|
Value: "test",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{1}, []int64{ts.UnixNano()},
|
||||||
"__name__": "job:foo",
|
[]prompbmarshal.Label{
|
||||||
"job": "bar",
|
{
|
||||||
"source": "test",
|
Name: "__name__",
|
||||||
"exported_source": "origin",
|
Value: "job:foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "bar",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "source",
|
||||||
|
Value: "test",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "exported_source",
|
||||||
|
Value: "origin",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
}})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRecordingRule_ExecRange(t *testing.T) {
|
func TestRecordingRule_ExecRange(t *testing.T) {
|
||||||
@ -110,8 +224,12 @@ func TestRecordingRule_ExecRange(t *testing.T) {
|
|||||||
}, []datasource.Metric{
|
}, []datasource.Metric{
|
||||||
metricWithValuesAndLabels(t, []float64{10, 20, 30}, "__name__", "bar"),
|
metricWithValuesAndLabels(t, []float64{10, 20, 30}, "__name__", "bar"),
|
||||||
}, []prompbmarshal.TimeSeries{
|
}, []prompbmarshal.TimeSeries{
|
||||||
newTimeSeries([]float64{10, 20, 30}, []int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{10, 20, 30}, []int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()},
|
||||||
"__name__": "foo",
|
[]prompbmarshal.Label{
|
||||||
|
{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -122,18 +240,36 @@ func TestRecordingRule_ExecRange(t *testing.T) {
|
|||||||
metricWithValuesAndLabels(t, []float64{2, 3}, "__name__", "bar", "job", "bar"),
|
metricWithValuesAndLabels(t, []float64{2, 3}, "__name__", "bar", "job", "bar"),
|
||||||
metricWithValuesAndLabels(t, []float64{4, 5, 6}, "__name__", "baz", "job", "baz"),
|
metricWithValuesAndLabels(t, []float64{4, 5, 6}, "__name__", "baz", "job", "baz"),
|
||||||
}, []prompbmarshal.TimeSeries{
|
}, []prompbmarshal.TimeSeries{
|
||||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": "foobarbaz",
|
{
|
||||||
"job": "foo",
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{2, 3}, []int64{timestamp.UnixNano(), timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{2, 3}, []int64{timestamp.UnixNano(), timestamp.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": "foobarbaz",
|
{
|
||||||
"job": "bar",
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "bar",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{4, 5, 6},
|
newTimeSeries([]float64{4, 5, 6},
|
||||||
[]int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()}, map[string]string{
|
[]int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": "foobarbaz",
|
{
|
||||||
"job": "baz",
|
Name: "__name__",
|
||||||
|
Value: "foobarbaz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "baz",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -146,15 +282,34 @@ func TestRecordingRule_ExecRange(t *testing.T) {
|
|||||||
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"),
|
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"),
|
||||||
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar"),
|
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar"),
|
||||||
}, []prompbmarshal.TimeSeries{
|
}, []prompbmarshal.TimeSeries{
|
||||||
newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{
|
||||||
"__name__": "job:foo",
|
{
|
||||||
"job": "foo",
|
Name: "__name__",
|
||||||
"source": "test",
|
Value: "job:foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "source",
|
||||||
|
Value: "test",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()},
|
||||||
"__name__": "job:foo",
|
[]prompbmarshal.Label{
|
||||||
"job": "bar",
|
{
|
||||||
"source": "test",
|
Name: "__name__",
|
||||||
|
Value: "job:foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "job",
|
||||||
|
Value: "bar",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "source",
|
||||||
|
Value: "test",
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -87,7 +88,7 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric {
|
|||||||
}
|
}
|
||||||
m := datasource.Metric{Values: []float64{1}, Timestamps: []int64{1}}
|
m := datasource.Metric{Values: []float64{1}, Timestamps: []int64{1}}
|
||||||
for i := 0; i < len(labels); i += 2 {
|
for i := 0; i < len(labels); i += 2 {
|
||||||
m.Labels = append(m.Labels, datasource.Label{
|
m.Labels = append(m.Labels, prompbmarshal.Label{
|
||||||
Name: labels[i],
|
Name: labels[i],
|
||||||
Value: labels[i+1],
|
Value: labels[i+1],
|
||||||
})
|
})
|
||||||
@ -95,21 +96,6 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric {
|
|||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func toPromLabels(t testing.TB, labels ...string) []prompbmarshal.Label {
|
|
||||||
t.Helper()
|
|
||||||
if len(labels) == 0 || len(labels)%2 != 0 {
|
|
||||||
t.Fatalf("expected to get even number of labels")
|
|
||||||
}
|
|
||||||
var ls []prompbmarshal.Label
|
|
||||||
for i := 0; i < len(labels); i += 2 {
|
|
||||||
ls = append(ls, prompbmarshal.Label{
|
|
||||||
Name: labels[i],
|
|
||||||
Value: labels[i+1],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return ls
|
|
||||||
}
|
|
||||||
|
|
||||||
func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error {
|
func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
if len(a) != len(b) {
|
if len(a) != len(b) {
|
||||||
@ -122,7 +108,7 @@ func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error {
|
|||||||
}
|
}
|
||||||
for i, exp := range expTS.Samples {
|
for i, exp := range expTS.Samples {
|
||||||
got := gotTS.Samples[i]
|
got := gotTS.Samples[i]
|
||||||
if got.Value != exp.Value {
|
if got.Value != exp.Value && (!decimal.IsStaleNaN(got.Value) || !decimal.IsStaleNaN(exp.Value)) {
|
||||||
return fmt.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value)
|
return fmt.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value)
|
||||||
}
|
}
|
||||||
// timestamp validation isn't always correct for now.
|
// timestamp validation isn't always correct for now.
|
||||||
|
@ -9,10 +9,14 @@ import (
|
|||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newTimeSeries(values []float64, timestamps []int64, labels map[string]string) prompbmarshal.TimeSeries {
|
// newTimeSeries first sorts given labels, then returns new time series.
|
||||||
|
func newTimeSeries(values []float64, timestamps []int64, labels []prompbmarshal.Label) prompbmarshal.TimeSeries {
|
||||||
|
promrelabel.SortLabels(labels)
|
||||||
ts := prompbmarshal.TimeSeries{
|
ts := prompbmarshal.TimeSeries{
|
||||||
|
Labels: labels,
|
||||||
Samples: make([]prompbmarshal.Sample, len(values)),
|
Samples: make([]prompbmarshal.Sample, len(values)),
|
||||||
}
|
}
|
||||||
for i := range values {
|
for i := range values {
|
||||||
@ -21,34 +25,6 @@ func newTimeSeries(values []float64, timestamps []int64, labels map[string]strin
|
|||||||
Timestamp: time.Unix(timestamps[i], 0).UnixNano() / 1e6,
|
Timestamp: time.Unix(timestamps[i], 0).UnixNano() / 1e6,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
keys := make([]string, 0, len(labels))
|
|
||||||
for k := range labels {
|
|
||||||
keys = append(keys, k)
|
|
||||||
}
|
|
||||||
sort.Strings(keys) // make order deterministic
|
|
||||||
for _, key := range keys {
|
|
||||||
ts.Labels = append(ts.Labels, prompbmarshal.Label{
|
|
||||||
Name: key,
|
|
||||||
Value: labels[key],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return ts
|
|
||||||
}
|
|
||||||
|
|
||||||
// newTimeSeriesPB creates prompbmarshal.TimeSeries with given
|
|
||||||
// values, timestamps and labels.
|
|
||||||
// It expects that labels are already sorted.
|
|
||||||
func newTimeSeriesPB(values []float64, timestamps []int64, labels []prompbmarshal.Label) prompbmarshal.TimeSeries {
|
|
||||||
ts := prompbmarshal.TimeSeries{
|
|
||||||
Samples: make([]prompbmarshal.Sample, len(values)),
|
|
||||||
}
|
|
||||||
for i := range values {
|
|
||||||
ts.Samples[i] = prompbmarshal.Sample{
|
|
||||||
Value: values[i],
|
|
||||||
Timestamp: time.Unix(timestamps[i], 0).UnixNano() / 1e6,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ts.Labels = labels
|
|
||||||
return ts
|
return ts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,6 +169,8 @@ func GetWithFuncs(funcs textTpl.FuncMap) (*textTpl.Template, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
// Clone() doesn't copy tpl Options, so we set them manually
|
||||||
|
tmpl = tmpl.Option("missingkey=zero")
|
||||||
return tmpl.Funcs(funcs), nil
|
return tmpl.Funcs(funcs), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user