diff --git a/app/vmalert/README.md b/app/vmalert/README.md index b0d21a6d0..a9fe645e0 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -42,7 +42,7 @@ Then configure `vmalert` accordingly: -notifier.url=http://localhost:9093 ``` -Example for `.rules` file bay be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata/rules0-good.rules) +Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata/rules0-good.rules) `vmalert` runs evaluation for every group in a separate goroutine. Rules in group evaluated one-by-one sequentially. @@ -68,8 +68,12 @@ Usage of vmalert: How often to evaluate the rules. Default 1m (default 1m0s) -external.url string External URL is used as alert's source for sent alerts to the notifier + -httpListenAddr string + Address to listen for http connections (default ":8880") -notifier.url string Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093 + -remotewrite.url string + Optional URL to remote-write compatible storage where to write timeseriesbased on active alerts. E.g. http://127.0.0.1:8428 -rule value Path to the file with alert rules. Supports patterns. Flag can be specified multiple times. diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 3bd87b5c0..d600bdc55 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -29,11 +30,13 @@ Examples: -rule /path/to/file. Path to a single file with alerting rules -rule dir/*.yaml -rule /*.yaml. Relative path to all .yaml files in "dir" folder, absolute path to all .yaml files in root.`) - validateTemplates = flag.Bool("rule.validateTemplates", true, "Indicates to validate annotation and label templates") - httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections") - datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428") - basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username to use for -datasource.url") - basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password to use for -datasource.url") + validateTemplates = flag.Bool("rule.validateTemplates", true, "Indicates to validate annotation and label templates") + httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections") + datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428") + basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username to use for -datasource.url") + basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password to use for -datasource.url") + remoteWriteURL = flag.String("remotewrite.url", "", "Optional URL to remote-write compatible storage where to write timeseries"+ + "based on active alerts. E.g. http://127.0.0.1:8428") evaluationInterval = flag.Duration("evaluationInterval", 1*time.Minute, "How often to evaluate the rules. Default 1m") notifierURL = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093") externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier") @@ -56,7 +59,7 @@ func main() { logger.Infof("reading alert rules configuration file from %s", strings.Join(*rulePath, ";")) groups, err := Parse(*rulePath, *validateTemplates) if err != nil { - logger.Fatalf("Cannot parse configuration file: %s", err) + logger.Fatalf("cannot parse configuration file: %s", err) } w := &watchdog{ @@ -65,6 +68,18 @@ func main() { return fmt.Sprintf("%s/api/v1/%s/%s/status", eu, group, name) }, &http.Client{}), } + + if *remoteWriteURL != "" { + c, err := remotewrite.NewClient(ctx, remotewrite.Config{ + Addr: *remoteWriteURL, + FlushInterval: *evaluationInterval, + }) + if err != nil { + logger.Fatalf("failed to init remotewrite client: %s", err) + } + w.rw = c + } + wg := sync.WaitGroup{} for i := range groups { wg.Add(1) @@ -82,12 +97,19 @@ func main() { logger.Fatalf("cannot stop the webservice: %s", err) } cancel() + if w.rw != nil { + err := w.rw.Close() + if err != nil { + logger.Fatalf("cannot stop the remotewrite: %s", err) + } + } wg.Wait() } type watchdog struct { storage *datasource.VMStorage alertProvider notifier.Notifier + rw *remotewrite.Client } var ( @@ -97,6 +119,13 @@ var ( execTotal = metrics.NewCounter(`vmalert_execution_total`) execErrors = metrics.NewCounter(`vmalert_execution_errors_total`) execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`) + + alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`) + alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`) + alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`) + + remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`) + remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) ) func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration) { @@ -122,7 +151,26 @@ func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time continue } - if err := rule.Send(ctx, w.alertProvider); err != nil { + var alertsToSend []notifier.Alert + for _, a := range rule.alerts { + if a.State != notifier.StatePending { + alertsToSend = append(alertsToSend, *a) + } + if a.State == notifier.StateInactive || w.rw == nil { + continue + } + tss := rule.AlertToTimeSeries(a, execStart) + for _, ts := range tss { + remoteWriteSent.Inc() + if err := w.rw.Push(ts); err != nil { + remoteWriteErrors.Inc() + logger.Errorf("failed to push timeseries to remotewrite: %s", err) + } + } + } + alertsSent.Add(len(alertsToSend)) + if err := w.alertProvider.Send(alertsToSend); err != nil { + alertsSendErrors.Inc() logger.Errorf("failed to send alert for rule %q.%q: %s", group.Name, rule.Name, err) } } diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index 766791307..3a2b5de62 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -103,18 +103,3 @@ func templateAnnotation(dst io.Writer, text string, data alertTplData) error { } return nil } - -type errGroup struct { - errs []string -} - -func (eg *errGroup) err() error { - if eg == nil || len(eg.errs) == 0 { - return nil - } - return eg -} - -func (eg *errGroup) Error() string { - return fmt.Sprintf("errors:%s", strings.Join(eg.errs, "\n")) -} diff --git a/app/vmalert/notifier/utils.go b/app/vmalert/notifier/utils.go new file mode 100644 index 000000000..8cf7c901c --- /dev/null +++ b/app/vmalert/notifier/utils.go @@ -0,0 +1,21 @@ +package notifier + +import ( + "fmt" + "strings" +) + +type errGroup struct { + errs []string +} + +func (eg *errGroup) err() error { + if eg == nil || len(eg.errs) == 0 { + return nil + } + return eg +} + +func (eg *errGroup) Error() string { + return fmt.Sprintf("errors:%s", strings.Join(eg.errs, "\n")) +} diff --git a/app/vmalert/remotewrite/remotewrite.go b/app/vmalert/remotewrite/remotewrite.go new file mode 100644 index 000000000..716e3aa07 --- /dev/null +++ b/app/vmalert/remotewrite/remotewrite.go @@ -0,0 +1,187 @@ +package remotewrite + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/golang/snappy" +) + +// Client is an asynchronous HTTP client for writing +// timeseries via remote write protocol. +type Client struct { + addr string + c *http.Client + input chan prompbmarshal.TimeSeries + baUser, baPass string + flushInterval time.Duration + maxBatchSize int + maxQueueSize int + + wg sync.WaitGroup + doneCh chan struct{} +} + +type Config struct { + // Addr of remote storage + Addr string + + BasicAuthUser string + BasicAuthPass string + + // MaxBatchSize defines max number of timeseries + // to be flushed at once + MaxBatchSize int + // MaxQueueSize defines max length of input queue + // populated by Push method + MaxQueueSize int + // FlushInterval defines time interval for flushing batches + FlushInterval time.Duration + // WriteTimeout defines timeout for HTTP write request + // to remote storage + WriteTimeout time.Duration +} + +const ( + defaultMaxBatchSize = 1e3 + defaultMaxQueueSize = 100 + defaultFlushInterval = 5 * time.Second + defaultWriteTimeout = 30 * time.Second +) + +const writePath = "/api/v1/write" + +// NewClient returns asynchronous client for +// writing timeseries via remotewrite protocol. +func NewClient(ctx context.Context, cfg Config) (*Client, error) { + if cfg.Addr == "" { + return nil, fmt.Errorf("config.Addr can't be empty") + } + if cfg.MaxBatchSize == 0 { + cfg.MaxBatchSize = defaultMaxBatchSize + } + if cfg.MaxQueueSize == 0 { + cfg.MaxQueueSize = defaultMaxQueueSize + } + if cfg.FlushInterval == 0 { + cfg.FlushInterval = defaultFlushInterval + } + if cfg.WriteTimeout == 0 { + cfg.WriteTimeout = defaultWriteTimeout + } + c := &Client{ + c: &http.Client{ + Timeout: cfg.WriteTimeout, + }, + addr: strings.TrimSuffix(cfg.Addr, "/") + writePath, + baUser: cfg.BasicAuthUser, + baPass: cfg.BasicAuthPass, + flushInterval: cfg.FlushInterval, + maxBatchSize: cfg.MaxBatchSize, + doneCh: make(chan struct{}), + input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize), + } + c.run(ctx) + return c, nil +} + +// Push adds timeseries into queue for writing into remote storage. +// Push returns and error if client is stopped or if queue is full. +func (c *Client) Push(s prompbmarshal.TimeSeries) error { + select { + case <-c.doneCh: + return fmt.Errorf("client is closed") + case c.input <- s: + return nil + default: + return fmt.Errorf("failed to push timeseries - queue is full (%d entries)", + c.maxQueueSize) + } +} + +// Close stops the client and waits for all goroutines +// to exit. +func (c *Client) Close() error { + if c.doneCh == nil { + return fmt.Errorf("client is already closed") + } + close(c.input) + close(c.doneCh) + c.wg.Wait() + return nil +} + +func (c *Client) run(ctx context.Context) { + ticker := time.NewTicker(c.flushInterval) + wr := prompbmarshal.WriteRequest{} + shutdown := func() { + for ts := range c.input { + wr.Timeseries = append(wr.Timeseries, ts) + } + lastCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) + c.flush(lastCtx, wr) + cancel() + } + c.wg.Add(1) + go func() { + defer c.wg.Done() + defer ticker.Stop() + for { + select { + case <-c.doneCh: + shutdown() + return + case <-ctx.Done(): + shutdown() + return + case <-ticker.C: + c.flush(ctx, wr) + wr = prompbmarshal.WriteRequest{} + case ts := <-c.input: + wr.Timeseries = append(wr.Timeseries, ts) + if len(wr.Timeseries) >= c.maxBatchSize { + c.flush(ctx, wr) + wr = prompbmarshal.WriteRequest{} + } + } + } + }() +} + +func (c *Client) flush(ctx context.Context, wr prompbmarshal.WriteRequest) { + if len(wr.Timeseries) < 1 { + return + } + data, err := wr.Marshal() + if err != nil { + logger.Errorf("failed to marshal WriteRequest: %s", err) + return + } + req, err := http.NewRequest("POST", c.addr, bytes.NewReader(snappy.Encode(nil, data))) + if err != nil { + logger.Errorf("failed to create new HTTP request: %s", err) + return + } + if c.baPass != "" { + req.SetBasicAuth(c.baUser, c.baPass) + } + resp, err := c.c.Do(req.WithContext(ctx)) + if err != nil { + logger.Errorf("error getting response from %s:%s", req.URL, err) + return + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusNoContent { + body, _ := ioutil.ReadAll(resp.Body) + logger.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body) + return + } +} diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index b68538937..1a6a78c91 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -13,7 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql" - "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) // Group grouping array of alert @@ -117,36 +117,6 @@ func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error { return nil } -// Send sends the active alerts via given -// notifier.Notifier. -// See for reference https://prometheus.io/docs/alerting/clients/ -// TODO: add tests for endAt value -func (r *Rule) Send(_ context.Context, ap notifier.Notifier) error { - // copy alerts to new list to avoid locks - var alertsCopy []notifier.Alert - r.mu.Lock() - for _, a := range r.alerts { - if a.State == notifier.StatePending { - continue - } - // it is safe to dereference instead of deep-copy - // because only simple types may be changed during rule.Exec - alertsCopy = append(alertsCopy, *a) - } - r.mu.Unlock() - - if len(alertsCopy) < 1 { - return nil - } - alertsSent.Add(len(alertsCopy)) - return ap.Send(alertsCopy) -} - -var ( - alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`) - alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`) -) - // TODO: consider hashing algorithm in VM func hash(m datasource.Metric) uint64 { hash := fnv.New64a() @@ -235,3 +205,65 @@ func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert { Value: strconv.FormatFloat(a.Value, 'e', -1, 64), } } + +const ( + // AlertMetricName is the metric name for synthetic alert timeseries. + alertMetricName = "ALERTS" + // AlertForStateMetricName is the metric name for 'for' state of alert. + alertForStateMetricName = "ALERTS_FOR_STATE" + + // AlertNameLabel is the label name indicating the name of an alert. + alertNameLabel = "alertname" + // AlertStateLabel is the label name indicating the state of an alert. + alertStateLabel = "alertstate" +) + +func (r *Rule) AlertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries { + var tss []prompbmarshal.TimeSeries + tss = append(tss, alertToTimeSeries(r.Name, a, timestamp)) + if r.For > 0 { + tss = append(tss, alertForToTimeSeries(r.Name, a, timestamp)) + } + return tss +} + +func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries { + labels := make(map[string]string) + for k, v := range a.Labels { + labels[k] = v + } + labels["__name__"] = alertMetricName + labels[alertNameLabel] = name + labels[alertStateLabel] = a.State.String() + return newTimeSeries(1, labels, timestamp) +} + +func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries { + labels := make(map[string]string) + for k, v := range a.Labels { + labels[k] = v + } + labels["__name__"] = alertForStateMetricName + labels[alertNameLabel] = name + return newTimeSeries(float64(a.Start.Unix()), labels, timestamp) +} + +func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries { + ts := prompbmarshal.TimeSeries{} + ts.Samples = append(ts.Samples, prompbmarshal.Sample{ + Value: value, + Timestamp: timestamp.UnixNano() / 1e6, + }) + keys := make([]string, 0, len(labels)) + for k := range labels { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + ts.Labels = append(ts.Labels, prompbmarshal.Label{ + Name: key, + Value: labels[key], + }) + } + return ts +} diff --git a/app/vmalert/rule_test.go b/app/vmalert/rule_test.go index 9538c94f2..f3ccded06 100644 --- a/app/vmalert/rule_test.go +++ b/app/vmalert/rule_test.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) func TestRule_Validate(t *testing.T) { @@ -24,6 +25,122 @@ func TestRule_Validate(t *testing.T) { } } +func TestRule_AlertToTimeSeries(t *testing.T) { + timestamp := time.Now() + testCases := []struct { + rule *Rule + alert *notifier.Alert + expTS []prompbmarshal.TimeSeries + }{ + { + newTestRule("instant", 0), + ¬ifier.Alert{State: notifier.StateFiring}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StateFiring.String(), + alertNameLabel: "instant", + }, timestamp), + }, + }, + { + newTestRule("instant extra labels", 0), + ¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{ + "job": "foo", + "instance": "bar", + }}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StateFiring.String(), + alertNameLabel: "instant extra labels", + "job": "foo", + "instance": "bar", + }, timestamp), + }, + }, + { + newTestRule("instant labels override", 0), + ¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{ + alertStateLabel: "foo", + "__name__": "bar", + }}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StateFiring.String(), + alertNameLabel: "instant labels override", + }, timestamp), + }, + }, + { + newTestRule("for", time.Second), + ¬ifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StateFiring.String(), + alertNameLabel: "for", + }, timestamp), + newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{ + "__name__": alertForStateMetricName, + alertNameLabel: "for", + }, timestamp), + }, + }, + { + newTestRule("for pending", 10*time.Second), + ¬ifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StatePending.String(), + alertNameLabel: "for pending", + }, timestamp), + newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{ + "__name__": alertForStateMetricName, + alertNameLabel: "for pending", + }, timestamp), + }, + }, + } + for _, tc := range testCases { + t.Run(tc.rule.Name, func(t *testing.T) { + tss := tc.rule.AlertToTimeSeries(tc.alert, timestamp) + if len(tc.expTS) != len(tss) { + t.Fatalf("expected number of timeseries %d; got %d", len(tc.expTS), len(tss)) + } + for i := range tc.expTS { + expTS, gotTS := tc.expTS[i], tss[i] + if len(expTS.Samples) != len(gotTS.Samples) { + t.Fatalf("expected number of samples %d; got %d", len(expTS.Samples), len(gotTS.Samples)) + } + for i, exp := range expTS.Samples { + got := gotTS.Samples[i] + if got.Value != exp.Value { + t.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value) + } + if got.Timestamp != exp.Timestamp { + t.Errorf("expected timestamp %d; got %d", exp.Timestamp, got.Timestamp) + } + } + if len(expTS.Labels) != len(gotTS.Labels) { + t.Fatalf("expected number of labels %d; got %d", len(expTS.Labels), len(gotTS.Labels)) + } + for i, exp := range expTS.Labels { + got := gotTS.Labels[i] + if got.Name != exp.Name { + t.Errorf("expected label name %q; got %q", exp.Name, got.Name) + } + if got.Value != exp.Value { + t.Errorf("expected label value %q; got %q", exp.Value, got.Value) + } + } + } + }) + } +} + func newTestRule(name string, waitFor time.Duration) *Rule { return &Rule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor} }