mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
app/vmalert: initial remote-write support for alerts state persistence. (#442)
* app/vmalert: initial remote-write support for alerts state persistence. If `remotewrite.url` flag is set, vmalert will send alerts state via remote-write protocol to remote storage. The sending is asynchronous to avoid blocking calls in rules evaluation loop. * app/vmalert: merge with master * app/vmalert: write both `instant` and `for` alerts timeseries states in remote storage.
This commit is contained in:
parent
90670cb55e
commit
3bfa41a95c
@ -42,7 +42,7 @@ Then configure `vmalert` accordingly:
|
||||
-notifier.url=http://localhost:9093
|
||||
```
|
||||
|
||||
Example for `.rules` file bay be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata/rules0-good.rules)
|
||||
Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata/rules0-good.rules)
|
||||
|
||||
`vmalert` runs evaluation for every group in a separate goroutine.
|
||||
Rules in group evaluated one-by-one sequentially.
|
||||
@ -68,8 +68,12 @@ Usage of vmalert:
|
||||
How often to evaluate the rules. Default 1m (default 1m0s)
|
||||
-external.url string
|
||||
External URL is used as alert's source for sent alerts to the notifier
|
||||
-httpListenAddr string
|
||||
Address to listen for http connections (default ":8880")
|
||||
-notifier.url string
|
||||
Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093
|
||||
-remotewrite.url string
|
||||
Optional URL to remote-write compatible storage where to write timeseriesbased on active alerts. E.g. http://127.0.0.1:8428
|
||||
-rule value
|
||||
Path to the file with alert rules.
|
||||
Supports patterns. Flag can be specified multiple times.
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
@ -29,11 +30,13 @@ Examples:
|
||||
-rule /path/to/file. Path to a single file with alerting rules
|
||||
-rule dir/*.yaml -rule /*.yaml. Relative path to all .yaml files in "dir" folder,
|
||||
absolute path to all .yaml files in root.`)
|
||||
validateTemplates = flag.Bool("rule.validateTemplates", true, "Indicates to validate annotation and label templates")
|
||||
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections")
|
||||
datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428")
|
||||
basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username to use for -datasource.url")
|
||||
basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password to use for -datasource.url")
|
||||
validateTemplates = flag.Bool("rule.validateTemplates", true, "Indicates to validate annotation and label templates")
|
||||
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections")
|
||||
datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428")
|
||||
basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username to use for -datasource.url")
|
||||
basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password to use for -datasource.url")
|
||||
remoteWriteURL = flag.String("remotewrite.url", "", "Optional URL to remote-write compatible storage where to write timeseries"+
|
||||
"based on active alerts. E.g. http://127.0.0.1:8428")
|
||||
evaluationInterval = flag.Duration("evaluationInterval", 1*time.Minute, "How often to evaluate the rules. Default 1m")
|
||||
notifierURL = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093")
|
||||
externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier")
|
||||
@ -56,7 +59,7 @@ func main() {
|
||||
logger.Infof("reading alert rules configuration file from %s", strings.Join(*rulePath, ";"))
|
||||
groups, err := Parse(*rulePath, *validateTemplates)
|
||||
if err != nil {
|
||||
logger.Fatalf("Cannot parse configuration file: %s", err)
|
||||
logger.Fatalf("cannot parse configuration file: %s", err)
|
||||
}
|
||||
|
||||
w := &watchdog{
|
||||
@ -65,6 +68,18 @@ func main() {
|
||||
return fmt.Sprintf("%s/api/v1/%s/%s/status", eu, group, name)
|
||||
}, &http.Client{}),
|
||||
}
|
||||
|
||||
if *remoteWriteURL != "" {
|
||||
c, err := remotewrite.NewClient(ctx, remotewrite.Config{
|
||||
Addr: *remoteWriteURL,
|
||||
FlushInterval: *evaluationInterval,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Fatalf("failed to init remotewrite client: %s", err)
|
||||
}
|
||||
w.rw = c
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for i := range groups {
|
||||
wg.Add(1)
|
||||
@ -82,12 +97,19 @@ func main() {
|
||||
logger.Fatalf("cannot stop the webservice: %s", err)
|
||||
}
|
||||
cancel()
|
||||
if w.rw != nil {
|
||||
err := w.rw.Close()
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot stop the remotewrite: %s", err)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type watchdog struct {
|
||||
storage *datasource.VMStorage
|
||||
alertProvider notifier.Notifier
|
||||
rw *remotewrite.Client
|
||||
}
|
||||
|
||||
var (
|
||||
@ -97,6 +119,13 @@ var (
|
||||
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
||||
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
||||
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
|
||||
|
||||
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
||||
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
||||
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
|
||||
|
||||
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
|
||||
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
||||
)
|
||||
|
||||
func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration) {
|
||||
@ -122,7 +151,26 @@ func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time
|
||||
continue
|
||||
}
|
||||
|
||||
if err := rule.Send(ctx, w.alertProvider); err != nil {
|
||||
var alertsToSend []notifier.Alert
|
||||
for _, a := range rule.alerts {
|
||||
if a.State != notifier.StatePending {
|
||||
alertsToSend = append(alertsToSend, *a)
|
||||
}
|
||||
if a.State == notifier.StateInactive || w.rw == nil {
|
||||
continue
|
||||
}
|
||||
tss := rule.AlertToTimeSeries(a, execStart)
|
||||
for _, ts := range tss {
|
||||
remoteWriteSent.Inc()
|
||||
if err := w.rw.Push(ts); err != nil {
|
||||
remoteWriteErrors.Inc()
|
||||
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
alertsSent.Add(len(alertsToSend))
|
||||
if err := w.alertProvider.Send(alertsToSend); err != nil {
|
||||
alertsSendErrors.Inc()
|
||||
logger.Errorf("failed to send alert for rule %q.%q: %s", group.Name, rule.Name, err)
|
||||
}
|
||||
}
|
||||
|
@ -103,18 +103,3 @@ func templateAnnotation(dst io.Writer, text string, data alertTplData) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type errGroup struct {
|
||||
errs []string
|
||||
}
|
||||
|
||||
func (eg *errGroup) err() error {
|
||||
if eg == nil || len(eg.errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
return eg
|
||||
}
|
||||
|
||||
func (eg *errGroup) Error() string {
|
||||
return fmt.Sprintf("errors:%s", strings.Join(eg.errs, "\n"))
|
||||
}
|
||||
|
21
app/vmalert/notifier/utils.go
Normal file
21
app/vmalert/notifier/utils.go
Normal file
@ -0,0 +1,21 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type errGroup struct {
|
||||
errs []string
|
||||
}
|
||||
|
||||
func (eg *errGroup) err() error {
|
||||
if eg == nil || len(eg.errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
return eg
|
||||
}
|
||||
|
||||
func (eg *errGroup) Error() string {
|
||||
return fmt.Sprintf("errors:%s", strings.Join(eg.errs, "\n"))
|
||||
}
|
187
app/vmalert/remotewrite/remotewrite.go
Normal file
187
app/vmalert/remotewrite/remotewrite.go
Normal file
@ -0,0 +1,187 @@
|
||||
package remotewrite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
// Client is an asynchronous HTTP client for writing
|
||||
// timeseries via remote write protocol.
|
||||
type Client struct {
|
||||
addr string
|
||||
c *http.Client
|
||||
input chan prompbmarshal.TimeSeries
|
||||
baUser, baPass string
|
||||
flushInterval time.Duration
|
||||
maxBatchSize int
|
||||
maxQueueSize int
|
||||
|
||||
wg sync.WaitGroup
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// Addr of remote storage
|
||||
Addr string
|
||||
|
||||
BasicAuthUser string
|
||||
BasicAuthPass string
|
||||
|
||||
// MaxBatchSize defines max number of timeseries
|
||||
// to be flushed at once
|
||||
MaxBatchSize int
|
||||
// MaxQueueSize defines max length of input queue
|
||||
// populated by Push method
|
||||
MaxQueueSize int
|
||||
// FlushInterval defines time interval for flushing batches
|
||||
FlushInterval time.Duration
|
||||
// WriteTimeout defines timeout for HTTP write request
|
||||
// to remote storage
|
||||
WriteTimeout time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
defaultMaxBatchSize = 1e3
|
||||
defaultMaxQueueSize = 100
|
||||
defaultFlushInterval = 5 * time.Second
|
||||
defaultWriteTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
const writePath = "/api/v1/write"
|
||||
|
||||
// NewClient returns asynchronous client for
|
||||
// writing timeseries via remotewrite protocol.
|
||||
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||
if cfg.Addr == "" {
|
||||
return nil, fmt.Errorf("config.Addr can't be empty")
|
||||
}
|
||||
if cfg.MaxBatchSize == 0 {
|
||||
cfg.MaxBatchSize = defaultMaxBatchSize
|
||||
}
|
||||
if cfg.MaxQueueSize == 0 {
|
||||
cfg.MaxQueueSize = defaultMaxQueueSize
|
||||
}
|
||||
if cfg.FlushInterval == 0 {
|
||||
cfg.FlushInterval = defaultFlushInterval
|
||||
}
|
||||
if cfg.WriteTimeout == 0 {
|
||||
cfg.WriteTimeout = defaultWriteTimeout
|
||||
}
|
||||
c := &Client{
|
||||
c: &http.Client{
|
||||
Timeout: cfg.WriteTimeout,
|
||||
},
|
||||
addr: strings.TrimSuffix(cfg.Addr, "/") + writePath,
|
||||
baUser: cfg.BasicAuthUser,
|
||||
baPass: cfg.BasicAuthPass,
|
||||
flushInterval: cfg.FlushInterval,
|
||||
maxBatchSize: cfg.MaxBatchSize,
|
||||
doneCh: make(chan struct{}),
|
||||
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
|
||||
}
|
||||
c.run(ctx)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Push adds timeseries into queue for writing into remote storage.
|
||||
// Push returns and error if client is stopped or if queue is full.
|
||||
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
return fmt.Errorf("client is closed")
|
||||
case c.input <- s:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("failed to push timeseries - queue is full (%d entries)",
|
||||
c.maxQueueSize)
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops the client and waits for all goroutines
|
||||
// to exit.
|
||||
func (c *Client) Close() error {
|
||||
if c.doneCh == nil {
|
||||
return fmt.Errorf("client is already closed")
|
||||
}
|
||||
close(c.input)
|
||||
close(c.doneCh)
|
||||
c.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) run(ctx context.Context) {
|
||||
ticker := time.NewTicker(c.flushInterval)
|
||||
wr := prompbmarshal.WriteRequest{}
|
||||
shutdown := func() {
|
||||
for ts := range c.input {
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
}
|
||||
lastCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
c.flush(lastCtx, wr)
|
||||
cancel()
|
||||
}
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
shutdown()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
shutdown()
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.flush(ctx, wr)
|
||||
wr = prompbmarshal.WriteRequest{}
|
||||
case ts := <-c.input:
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
if len(wr.Timeseries) >= c.maxBatchSize {
|
||||
c.flush(ctx, wr)
|
||||
wr = prompbmarshal.WriteRequest{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *Client) flush(ctx context.Context, wr prompbmarshal.WriteRequest) {
|
||||
if len(wr.Timeseries) < 1 {
|
||||
return
|
||||
}
|
||||
data, err := wr.Marshal()
|
||||
if err != nil {
|
||||
logger.Errorf("failed to marshal WriteRequest: %s", err)
|
||||
return
|
||||
}
|
||||
req, err := http.NewRequest("POST", c.addr, bytes.NewReader(snappy.Encode(nil, data)))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create new HTTP request: %s", err)
|
||||
return
|
||||
}
|
||||
if c.baPass != "" {
|
||||
req.SetBasicAuth(c.baUser, c.baPass)
|
||||
}
|
||||
resp, err := c.c.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
logger.Errorf("error getting response from %s:%s", req.URL, err)
|
||||
return
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
logger.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body)
|
||||
return
|
||||
}
|
||||
}
|
@ -13,7 +13,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
// Group grouping array of alert
|
||||
@ -117,36 +117,6 @@ func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send sends the active alerts via given
|
||||
// notifier.Notifier.
|
||||
// See for reference https://prometheus.io/docs/alerting/clients/
|
||||
// TODO: add tests for endAt value
|
||||
func (r *Rule) Send(_ context.Context, ap notifier.Notifier) error {
|
||||
// copy alerts to new list to avoid locks
|
||||
var alertsCopy []notifier.Alert
|
||||
r.mu.Lock()
|
||||
for _, a := range r.alerts {
|
||||
if a.State == notifier.StatePending {
|
||||
continue
|
||||
}
|
||||
// it is safe to dereference instead of deep-copy
|
||||
// because only simple types may be changed during rule.Exec
|
||||
alertsCopy = append(alertsCopy, *a)
|
||||
}
|
||||
r.mu.Unlock()
|
||||
|
||||
if len(alertsCopy) < 1 {
|
||||
return nil
|
||||
}
|
||||
alertsSent.Add(len(alertsCopy))
|
||||
return ap.Send(alertsCopy)
|
||||
}
|
||||
|
||||
var (
|
||||
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
||||
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
||||
)
|
||||
|
||||
// TODO: consider hashing algorithm in VM
|
||||
func hash(m datasource.Metric) uint64 {
|
||||
hash := fnv.New64a()
|
||||
@ -235,3 +205,65 @@ func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert {
|
||||
Value: strconv.FormatFloat(a.Value, 'e', -1, 64),
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
// AlertMetricName is the metric name for synthetic alert timeseries.
|
||||
alertMetricName = "ALERTS"
|
||||
// AlertForStateMetricName is the metric name for 'for' state of alert.
|
||||
alertForStateMetricName = "ALERTS_FOR_STATE"
|
||||
|
||||
// AlertNameLabel is the label name indicating the name of an alert.
|
||||
alertNameLabel = "alertname"
|
||||
// AlertStateLabel is the label name indicating the state of an alert.
|
||||
alertStateLabel = "alertstate"
|
||||
)
|
||||
|
||||
func (r *Rule) AlertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries {
|
||||
var tss []prompbmarshal.TimeSeries
|
||||
tss = append(tss, alertToTimeSeries(r.Name, a, timestamp))
|
||||
if r.For > 0 {
|
||||
tss = append(tss, alertForToTimeSeries(r.Name, a, timestamp))
|
||||
}
|
||||
return tss
|
||||
}
|
||||
|
||||
func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
labels := make(map[string]string)
|
||||
for k, v := range a.Labels {
|
||||
labels[k] = v
|
||||
}
|
||||
labels["__name__"] = alertMetricName
|
||||
labels[alertNameLabel] = name
|
||||
labels[alertStateLabel] = a.State.String()
|
||||
return newTimeSeries(1, labels, timestamp)
|
||||
}
|
||||
|
||||
func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
labels := make(map[string]string)
|
||||
for k, v := range a.Labels {
|
||||
labels[k] = v
|
||||
}
|
||||
labels["__name__"] = alertForStateMetricName
|
||||
labels[alertNameLabel] = name
|
||||
return newTimeSeries(float64(a.Start.Unix()), labels, timestamp)
|
||||
}
|
||||
|
||||
func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
ts := prompbmarshal.TimeSeries{}
|
||||
ts.Samples = append(ts.Samples, prompbmarshal.Sample{
|
||||
Value: value,
|
||||
Timestamp: timestamp.UnixNano() / 1e6,
|
||||
})
|
||||
keys := make([]string, 0, len(labels))
|
||||
for k := range labels {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
for _, key := range keys {
|
||||
ts.Labels = append(ts.Labels, prompbmarshal.Label{
|
||||
Name: key,
|
||||
Value: labels[key],
|
||||
})
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
func TestRule_Validate(t *testing.T) {
|
||||
@ -24,6 +25,122 @@ func TestRule_Validate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRule_AlertToTimeSeries(t *testing.T) {
|
||||
timestamp := time.Now()
|
||||
testCases := []struct {
|
||||
rule *Rule
|
||||
alert *notifier.Alert
|
||||
expTS []prompbmarshal.TimeSeries
|
||||
}{
|
||||
{
|
||||
newTestRule("instant", 0),
|
||||
¬ifier.Alert{State: notifier.StateFiring},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "instant",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("instant extra labels", 0),
|
||||
¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{
|
||||
"job": "foo",
|
||||
"instance": "bar",
|
||||
}},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "instant extra labels",
|
||||
"job": "foo",
|
||||
"instance": "bar",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("instant labels override", 0),
|
||||
¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{
|
||||
alertStateLabel: "foo",
|
||||
"__name__": "bar",
|
||||
}},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "instant labels override",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for", time.Second),
|
||||
¬ifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "for",
|
||||
}, timestamp),
|
||||
newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{
|
||||
"__name__": alertForStateMetricName,
|
||||
alertNameLabel: "for",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for pending", 10*time.Second),
|
||||
¬ifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StatePending.String(),
|
||||
alertNameLabel: "for pending",
|
||||
}, timestamp),
|
||||
newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{
|
||||
"__name__": alertForStateMetricName,
|
||||
alertNameLabel: "for pending",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.rule.Name, func(t *testing.T) {
|
||||
tss := tc.rule.AlertToTimeSeries(tc.alert, timestamp)
|
||||
if len(tc.expTS) != len(tss) {
|
||||
t.Fatalf("expected number of timeseries %d; got %d", len(tc.expTS), len(tss))
|
||||
}
|
||||
for i := range tc.expTS {
|
||||
expTS, gotTS := tc.expTS[i], tss[i]
|
||||
if len(expTS.Samples) != len(gotTS.Samples) {
|
||||
t.Fatalf("expected number of samples %d; got %d", len(expTS.Samples), len(gotTS.Samples))
|
||||
}
|
||||
for i, exp := range expTS.Samples {
|
||||
got := gotTS.Samples[i]
|
||||
if got.Value != exp.Value {
|
||||
t.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value)
|
||||
}
|
||||
if got.Timestamp != exp.Timestamp {
|
||||
t.Errorf("expected timestamp %d; got %d", exp.Timestamp, got.Timestamp)
|
||||
}
|
||||
}
|
||||
if len(expTS.Labels) != len(gotTS.Labels) {
|
||||
t.Fatalf("expected number of labels %d; got %d", len(expTS.Labels), len(gotTS.Labels))
|
||||
}
|
||||
for i, exp := range expTS.Labels {
|
||||
got := gotTS.Labels[i]
|
||||
if got.Name != exp.Name {
|
||||
t.Errorf("expected label name %q; got %q", exp.Name, got.Name)
|
||||
}
|
||||
if got.Value != exp.Value {
|
||||
t.Errorf("expected label value %q; got %q", exp.Value, got.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newTestRule(name string, waitFor time.Duration) *Rule {
|
||||
return &Rule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user