diff --git a/app/vmalert/Makefile b/app/vmalert/Makefile new file mode 100644 index 0000000000..64b9e27ad4 --- /dev/null +++ b/app/vmalert/Makefile @@ -0,0 +1,78 @@ +# All these commands must run from repository root. + +vmalert: + APP_NAME=vmalert $(MAKE) app-local + +vmalert-race: + APP_NAME=vmalert RACE=-race $(MAKE) app-local + +vmalert-prod: + APP_NAME=vmalert $(MAKE) app-via-docker + +vmalert-pure-prod: + APP_NAME=vmalert $(MAKE) app-via-docker-pure + +vmalert-amd64-prod: + APP_NAME=vmalert $(MAKE) app-via-docker-amd64 + +vmalert-arm-prod: + APP_NAME=vmalert $(MAKE) app-via-docker-arm + +vmalert-arm64-prod: + APP_NAME=vmalert $(MAKE) app-via-docker-arm64 + +vmalert-ppc64le-prod: + APP_NAME=vmalert $(MAKE) app-via-docker-ppc64le + +vmalert-386-prod: + APP_NAME=vmalert $(MAKE) app-via-docker-386 + +package-vmalert: + APP_NAME=vmalert $(MAKE) package-via-docker + +package-vmalert-pure: + APP_NAME=vmalert $(MAKE) package-via-docker-pure + +package-vmalert-amd64: + APP_NAME=vmalert $(MAKE) package-via-docker-amd64 + +package-vmalert-arm: + APP_NAME=vmalert $(MAKE) package-via-docker-arm + +package-vmalert-arm64: + APP_NAME=vmalert $(MAKE) package-via-docker-arm64 + +package-vmalert-ppc64le: + APP_NAME=vmalert $(MAKE) package-via-docker-ppc64le + +package-vmalert-386: + APP_NAME=vmalert $(MAKE) package-via-docker-386 + +publish-vmalert: + APP_NAME=vmalert $(MAKE) publish-via-docker + +test-vmalert: + go test -race -cover ./app/vmalert + +run-vmalert: vmalert + ./bin/vmalert -rule=app/vmalert/testdata/rules0-good.rules \ + -datasource.url=http://localhost:8428 -notifier.url=http://localhost:9093 \ + -evaluationInterval=3s + +vmalert-amd64: + CGO_ENABLED=1 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-amd64 ./app/vmalert + +vmalert-arm: + CGO_ENABLED=0 GOOS=linux GOARCH=arm GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-arm ./app/vmalert + +vmalert-arm64: + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-arm64 ./app/vmalert + +vmalert-ppc64le: + CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-ppc64le ./app/vmalert + +vmalert-386: + CGO_ENABLED=0 GOOS=linux GOARCH=386 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-386 ./app/vmalert + +vmalert-pure: + APP_NAME=vmalert $(MAKE) app-local-pure diff --git a/app/vmalert/README.md b/app/vmalert/README.md new file mode 100644 index 0000000000..a9fe645e04 --- /dev/null +++ b/app/vmalert/README.md @@ -0,0 +1,95 @@ +## VM Alert + +`vmalert` executes a list of given MetricsQL expressions (rules) and +sends alerts to [Alert Manager](https://github.com/prometheus/alertmanager). + +NOTE: `vmalert` is in early alpha and wasn't tested in production systems yet. + +### Features: +* Integration with [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) TSDB; +* VictoriaMetrics [MetricsQL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/MetricsQL) + expressions validation; +* Prometheus [alerting rules definition format](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#defining-alerting-rules) + support; +* Integration with [Alertmanager](https://github.com/prometheus/alertmanager); +* Lightweight without extra dependencies. + +### TODO: +* Persist alerts state as timeseries in TSDB. Currently, alerts state is stored +in process memory only and will be lost on restart; +* Configuration hot reload. + +### QuickStart + +To build `vmalert` from sources: +``` +git clone https://github.com/VictoriaMetrics/VictoriaMetrics +cd VictoriaMetrics +make vmalert +``` +The build binary will be placed to `VictoriaMetrics/bin` folder. + +To start using `vmalert` you will need the following things: +* list of alert rules - PromQL/MetricsQL expressions to execute; +* datasource address - reachable VictoriaMetrics instance for rules execution; +* notifier address - reachable Alertmanager instance for processing, +aggregating alerts and sending notifications. + +Then configure `vmalert` accordingly: +``` +./bin/vmalert -rule=alert.rules \ + -datasource.url=http://localhost:8428 \ + -notifier.url=http://localhost:9093 +``` + +Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata/rules0-good.rules) + +`vmalert` runs evaluation for every group in a separate goroutine. +Rules in group evaluated one-by-one sequentially. + +`vmalert` also runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints: +* `http:///api/v1/alerts` - list of all active alerts; +* `http:///api/v1///status" ` - get alert status by ID. +Used as alert source in AlertManager. +* `http:///metrics` - application metrics. + +### Configuration + +The shortlist of configuration flags is the following: +``` +Usage of vmalert: + -datasource.url string + Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428 + -datasource.basicAuth.password string + Optional basic auth password to use for -datasource.url + -datasource.basicAuth.username string + Optional basic auth username to use for -datasource.url + -evaluationInterval duration + How often to evaluate the rules. Default 1m (default 1m0s) + -external.url string + External URL is used as alert's source for sent alerts to the notifier + -httpListenAddr string + Address to listen for http connections (default ":8880") + -notifier.url string + Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093 + -remotewrite.url string + Optional URL to remote-write compatible storage where to write timeseriesbased on active alerts. E.g. http://127.0.0.1:8428 + -rule value + Path to the file with alert rules. + Supports patterns. Flag can be specified multiple times. + Examples: + -rule /path/to/file. Path to a single file with alerting rules + -rule dir/*.yaml -rule /*.yaml. Relative path to all .yaml files in "dir" folder, + absolute path to all .yaml files in root. + -rule.validateTemplates + Indicates to validate annotation and label templates (default true) +``` + +Pass `-help` to `vmalert` in order to see the full list of supported +command-line flags with their descriptions. + +### Contributing + +`vmalert` is mostly designed and built by VictoriaMetrics community. +Feel free to share your experience and ideas for improving this +software. Please keep simplicity as the main priority. \ No newline at end of file diff --git a/app/vmalert/config.go b/app/vmalert/config.go new file mode 100644 index 0000000000..ba15956a11 --- /dev/null +++ b/app/vmalert/config.go @@ -0,0 +1,70 @@ +package main + +import ( + "fmt" + "gopkg.in/yaml.v2" + "io/ioutil" + "path/filepath" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" +) + +// Parse parses rule configs from given file patterns +func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) { + var fp []string + for _, pattern := range pathPatterns { + matches, err := filepath.Glob(pattern) + if err != nil { + return nil, fmt.Errorf("error reading file patther %s:%v", pattern, err) + } + fp = append(fp, matches...) + } + var groups []Group + for _, file := range fp { + groupsNames := map[string]struct{}{} + gr, err := parseFile(file) + if err != nil { + return nil, fmt.Errorf("file %s: %w", file, err) + } + for _, group := range gr { + if _, ok := groupsNames[group.Name]; ok { + return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", file, group.Name) + } + groupsNames[group.Name] = struct{}{} + for _, rule := range group.Rules { + if err = rule.Validate(); err != nil { + return nil, fmt.Errorf("invalid rule filepath:%s, group %s:%w", file, group.Name, err) + } + // TODO: this init looks weird here + rule.alerts = make(map[uint64]*notifier.Alert) + if validateAnnotations { + if err = notifier.ValidateTemplates(rule.Annotations); err != nil { + return nil, fmt.Errorf("invalid annotations filepath:%s, group %s:%w", file, group.Name, err) + } + if err = notifier.ValidateTemplates(rule.Labels); err != nil { + return nil, fmt.Errorf("invalid labels filepath:%s, group %s:%w", file, group.Name, err) + } + } + rule.group = &group + } + } + groups = append(groups, gr...) + } + if len(groups) < 1 { + return nil, fmt.Errorf("no groups found in %s", strings.Join(pathPatterns, ";")) + } + return groups, nil +} + +func parseFile(path string) ([]Group, error) { + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("error reading alert rule file: %w", err) + } + g := struct { + Groups []Group `yaml:"groups"` + }{} + err = yaml.Unmarshal(data, &g) + return g.Groups, err +} diff --git a/app/vmalert/config_test.go b/app/vmalert/config_test.go new file mode 100644 index 0000000000..057d1ab8bc --- /dev/null +++ b/app/vmalert/config_test.go @@ -0,0 +1,39 @@ +package main + +import ( + "net/url" + "os" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" +) + +func TestMain(m *testing.M) { + u, _ := url.Parse("https://victoriametrics.com/path") + notifier.InitTemplateFunc(u) + os.Exit(m.Run()) +} + +func TestParseGood(t *testing.T) { + if _, err := Parse([]string{"testdata/*good.rules", "testdata/dir/*good.*"}, true); err != nil { + t.Errorf("error parsing files %s", err) + } +} + +func TestParseBad(t *testing.T) { + if _, err := Parse([]string{"testdata/rules0-bad.rules"}, true); err == nil { + t.Errorf("expected syntaxt error") + } + if _, err := Parse([]string{"testdata/dir/rules0-bad.rules"}, true); err == nil { + t.Errorf("expected template annotation error") + } + if _, err := Parse([]string{"testdata/dir/rules1-bad.rules"}, true); err == nil { + t.Errorf("expected same group error") + } + if _, err := Parse([]string{"testdata/dir/rules2-bad.rules"}, true); err == nil { + t.Errorf("expected template label error") + } + if _, err := Parse([]string{"testdata/*.yaml"}, true); err == nil { + t.Errorf("expected empty group") + } +} diff --git a/app/vmalert/datasource/datasource.go b/app/vmalert/datasource/datasource.go new file mode 100644 index 0000000000..3525f0ed57 --- /dev/null +++ b/app/vmalert/datasource/datasource.go @@ -0,0 +1,24 @@ +package datasource + +import "context" + +// Querier interface wraps Query method which +// executes given query and returns list of Metrics +// as result +type Querier interface { + Query(ctx context.Context, query string) ([]Metric, error) +} + +// Metric is the basic entity which should be return by datasource +// It represents single data point with full list of labels +type Metric struct { + Labels []Label + Timestamp int64 + Value float64 +} + +// Label represents metric's label +type Label struct { + Name string + Value string +} diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go new file mode 100644 index 0000000000..8ec74193d4 --- /dev/null +++ b/app/vmalert/datasource/vm.go @@ -0,0 +1,103 @@ +package datasource + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" +) + +type response struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Labels map[string]string `json:"metric"` + TV [2]interface{} `json:"value"` + } `json:"result"` + } `json:"data"` + ErrorType string `json:"errorType"` + Error string `json:"error"` +} + +func (r response) metrics() ([]Metric, error) { + var ms []Metric + var m Metric + var f float64 + var err error + for i, res := range r.Data.Result { + f, err = strconv.ParseFloat(res.TV[1].(string), 64) + if err != nil { + return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %s", res, res.TV[1], err) + } + m.Labels = nil + for k, v := range r.Data.Result[i].Labels { + m.Labels = append(m.Labels, Label{Name: k, Value: v}) + } + m.Timestamp = int64(res.TV[0].(float64)) + m.Value = f + ms = append(ms, m) + } + return ms, nil +} + +const queryPath = "/api/v1/query?query=" + +// VMStorage represents vmstorage entity with ability to read and write metrics +type VMStorage struct { + c *http.Client + queryURL string + basicAuthUser, basicAuthPass string +} + +// NewVMStorage is a constructor for VMStorage +func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, c *http.Client) *VMStorage { + return &VMStorage{ + c: c, + basicAuthUser: basicAuthUser, + basicAuthPass: basicAuthPass, + queryURL: strings.TrimSuffix(baseURL, "/") + queryPath, + } +} + +// Query reads metrics from datasource by given query +func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) { + const ( + statusSuccess, statusError, rtVector = "success", "error", "vector" + ) + req, err := http.NewRequest("POST", s.queryURL+url.QueryEscape(query), nil) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + if s.basicAuthPass != "" { + req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass) + } + resp, err := s.c.Do(req.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("error getting response from %s:%s", req.URL, err) + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + return nil, fmt.Errorf("datasource returns unxeprected response code %d for %s with err %s. Reponse body %s", resp.StatusCode, req.URL, err, body) + } + r := &response{} + if err := json.NewDecoder(resp.Body).Decode(r); err != nil { + return nil, fmt.Errorf("error parsing metrics for %s:%s", req.URL, err) + } + if r.Status == statusError { + return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL, r.ErrorType, r.Error) + } + if r.Status != statusSuccess { + return nil, fmt.Errorf("unkown status:%s, Expected success or error ", r.Status) + } + if r.Data.ResultType != rtVector { + return nil, fmt.Errorf("unkown restul type:%s. Expected vector", r.Data.ResultType) + } + return r.metrics() +} diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go new file mode 100644 index 0000000000..cc1382815b --- /dev/null +++ b/app/vmalert/datasource/vm_test.go @@ -0,0 +1,93 @@ +package datasource + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" +) + +var ( + ctx = context.Background() + basicAuthName = "foo" + basicAuthPass = "bar" + query = "vm_rows" +) + +func TestVMSelectQuery(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) { + t.Errorf("should not be called") + }) + c := -1 + mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) { + c++ + if r.Method != http.MethodPost { + t.Errorf("expected POST method got %s", r.Method) + } + if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass { + t.Errorf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass) + } + if r.URL.Query().Get("query") != query { + t.Errorf("exptected %s in query param, got %s", query, r.URL.Query().Get("query")) + } + switch c { + case 0: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + case 1: + w.WriteHeader(500) + case 2: + w.Write([]byte("[]")) + case 3: + w.Write([]byte(`{"status":"error", "errorType":"type:", "error":"some error msg"}`)) + case 4: + w.Write([]byte(`{"status":"unknown"}`)) + case 5: + w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`)) + case 6: + w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]}]}}`)) + } + }) + + srv := httptest.NewServer(mux) + defer srv.Close() + am := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, srv.Client()) + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected connection error got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected invalid response status error got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected response body error got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected error status got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected unkown status got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected non-vector resultType error got nil") + } + m, err := am.Query(ctx, query) + if err != nil { + t.Fatalf("unexpected %s", err) + } + if len(m) != 1 { + t.Fatalf("exptected 1 metric got %d in %+v", len(m), m) + } + expected := Metric{ + Labels: []Label{{Value: "vm_rows", Name: "__name__"}}, + Timestamp: 1583786142, + Value: 13763, + } + if m[0].Timestamp != expected.Timestamp && + m[0].Value != expected.Value && + m[0].Labels[0].Value != expected.Labels[0].Value && + m[0].Labels[0].Name != expected.Labels[0].Name { + t.Fatalf("unexpected metric %+v want %+v", m[0], expected) + } + +} diff --git a/app/vmalert/main.go b/app/vmalert/main.go new file mode 100644 index 0000000000..d600bdc550 --- /dev/null +++ b/app/vmalert/main.go @@ -0,0 +1,213 @@ +package main + +import ( + "context" + "flag" + "fmt" + "net/http" + "net/url" + "os" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rulePath = flagutil.NewArray("rule", `Path to the file with alert rules. +Supports patterns. Flag can be specified multiple times. +Examples: + -rule /path/to/file. Path to a single file with alerting rules + -rule dir/*.yaml -rule /*.yaml. Relative path to all .yaml files in "dir" folder, +absolute path to all .yaml files in root.`) + validateTemplates = flag.Bool("rule.validateTemplates", true, "Indicates to validate annotation and label templates") + httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections") + datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428") + basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username to use for -datasource.url") + basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password to use for -datasource.url") + remoteWriteURL = flag.String("remotewrite.url", "", "Optional URL to remote-write compatible storage where to write timeseries"+ + "based on active alerts. E.g. http://127.0.0.1:8428") + evaluationInterval = flag.Duration("evaluationInterval", 1*time.Minute, "How often to evaluate the rules. Default 1m") + notifierURL = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093") + externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier") +) + +// TODO: hot configuration reload +// TODO: alerts state persistence +func main() { + envflag.Parse() + buildinfo.Init() + logger.Init() + checkFlags() + ctx, cancel := context.WithCancel(context.Background()) + eu, err := getExternalURL(*externalURL, *httpListenAddr, httpserver.IsTLS()) + if err != nil { + logger.Fatalf("can not get external url:%s ", err) + } + notifier.InitTemplateFunc(eu) + + logger.Infof("reading alert rules configuration file from %s", strings.Join(*rulePath, ";")) + groups, err := Parse(*rulePath, *validateTemplates) + if err != nil { + logger.Fatalf("cannot parse configuration file: %s", err) + } + + w := &watchdog{ + storage: datasource.NewVMStorage(*datasourceURL, *basicAuthUsername, *basicAuthPassword, &http.Client{}), + alertProvider: notifier.NewAlertManager(*notifierURL, func(group, name string) string { + return fmt.Sprintf("%s/api/v1/%s/%s/status", eu, group, name) + }, &http.Client{}), + } + + if *remoteWriteURL != "" { + c, err := remotewrite.NewClient(ctx, remotewrite.Config{ + Addr: *remoteWriteURL, + FlushInterval: *evaluationInterval, + }) + if err != nil { + logger.Fatalf("failed to init remotewrite client: %s", err) + } + w.rw = c + } + + wg := sync.WaitGroup{} + for i := range groups { + wg.Add(1) + go func(group Group) { + w.run(ctx, group, *evaluationInterval) + wg.Done() + }(groups[i]) + } + + go httpserver.Serve(*httpListenAddr, (&requestHandler{groups: groups}).handler) + + sig := procutil.WaitForSigterm() + logger.Infof("service received signal %s", sig) + if err := httpserver.Stop(*httpListenAddr); err != nil { + logger.Fatalf("cannot stop the webservice: %s", err) + } + cancel() + if w.rw != nil { + err := w.rw.Close() + if err != nil { + logger.Fatalf("cannot stop the remotewrite: %s", err) + } + } + wg.Wait() +} + +type watchdog struct { + storage *datasource.VMStorage + alertProvider notifier.Notifier + rw *remotewrite.Client +} + +var ( + iterationTotal = metrics.NewCounter(`vmalert_iteration_total`) + iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`) + + execTotal = metrics.NewCounter(`vmalert_execution_total`) + execErrors = metrics.NewCounter(`vmalert_execution_errors_total`) + execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`) + + alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`) + alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`) + alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`) + + remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`) + remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) +) + +func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration) { + logger.Infof("watchdog for %s has been started", group.Name) + t := time.NewTicker(evaluationInterval) + defer t.Stop() + for { + + select { + case <-t.C: + iterationTotal.Inc() + iterationStart := time.Now() + for _, rule := range group.Rules { + execTotal.Inc() + + execStart := time.Now() + err := rule.Exec(ctx, w.storage) + execDuration.UpdateDuration(execStart) + + if err != nil { + execErrors.Inc() + logger.Errorf("failed to execute rule %q.%q: %s", group.Name, rule.Name, err) + continue + } + + var alertsToSend []notifier.Alert + for _, a := range rule.alerts { + if a.State != notifier.StatePending { + alertsToSend = append(alertsToSend, *a) + } + if a.State == notifier.StateInactive || w.rw == nil { + continue + } + tss := rule.AlertToTimeSeries(a, execStart) + for _, ts := range tss { + remoteWriteSent.Inc() + if err := w.rw.Push(ts); err != nil { + remoteWriteErrors.Inc() + logger.Errorf("failed to push timeseries to remotewrite: %s", err) + } + } + } + alertsSent.Add(len(alertsToSend)) + if err := w.alertProvider.Send(alertsToSend); err != nil { + alertsSendErrors.Inc() + logger.Errorf("failed to send alert for rule %q.%q: %s", group.Name, rule.Name, err) + } + } + iterationDuration.UpdateDuration(iterationStart) + case <-ctx.Done(): + logger.Infof("%s received stop signal", group.Name) + return + } + } +} + +func getExternalURL(externalURL, httpListenAddr string, isSecure bool) (*url.URL, error) { + if externalURL != "" { + return url.Parse(externalURL) + } + hname, err := os.Hostname() + if err != nil { + return nil, err + } + port := "" + if ipport := strings.Split(httpListenAddr, ":"); len(ipport) > 1 { + port = ":" + ipport[1] + } + schema := "http://" + if isSecure { + schema = "https://" + } + return url.Parse(fmt.Sprintf("%s%s%s", schema, hname, port)) +} + +func checkFlags() { + if *notifierURL == "" { + flag.PrintDefaults() + logger.Fatalf("notifier.url is empty") + } + if *datasourceURL == "" { + flag.PrintDefaults() + logger.Fatalf("datasource.url is empty") + } +} diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go new file mode 100644 index 0000000000..3a2b5de626 --- /dev/null +++ b/app/vmalert/notifier/alert.go @@ -0,0 +1,105 @@ +package notifier + +import ( + "bytes" + "fmt" + "io" + "strings" + "text/template" + "time" +) + +// Alert the triggered alert +// TODO: Looks like alert name isn't unique +type Alert struct { + Group string + Name string + Labels map[string]string + Annotations map[string]string + State AlertState + + Start time.Time + End time.Time + Value float64 + ID uint64 +} + +// AlertState type indicates the Alert state +type AlertState int + +const ( + // StateInactive is the state of an alert that is neither firing nor pending. + StateInactive AlertState = iota + // StatePending is the state of an alert that has been active for less than + // the configured threshold duration. + StatePending + // StateFiring is the state of an alert that has been active for longer than + // the configured threshold duration. + StateFiring +) + +// String stringer for AlertState +func (as AlertState) String() string { + switch as { + case StateFiring: + return "firing" + case StatePending: + return "pending" + } + return "inactive" +} + +type alertTplData struct { + Labels map[string]string + Value float64 +} + +const tplHeader = `{{ $value := .Value }}{{ $labels := .Labels }}` + +// ExecTemplate executes the Alert template for give +// map of annotations. +func (a *Alert) ExecTemplate(annotations map[string]string) (map[string]string, error) { + tplData := alertTplData{Value: a.Value, Labels: a.Labels} + return templateAnnotations(annotations, tplHeader, tplData) +} + +// ValidateTemplates validate annotations for possible template error, uses empty data for template population +func ValidateTemplates(annotations map[string]string) error { + _, err := templateAnnotations(annotations, tplHeader, alertTplData{ + Labels: map[string]string{}, + Value: 0, + }) + return err +} + +func templateAnnotations(annotations map[string]string, header string, data alertTplData) (map[string]string, error) { + var builder strings.Builder + var buf bytes.Buffer + eg := errGroup{} + r := make(map[string]string, len(annotations)) + for key, text := range annotations { + r[key] = text + buf.Reset() + builder.Reset() + builder.Grow(len(header) + len(text)) + builder.WriteString(header) + builder.WriteString(text) + if err := templateAnnotation(&buf, builder.String(), data); err != nil { + eg.errs = append(eg.errs, fmt.Sprintf("key %s, template %s:%s", key, text, err)) + continue + } + r[key] = buf.String() + } + return r, eg.err() +} + +func templateAnnotation(dst io.Writer, text string, data alertTplData) error { + tpl, err := template.New("").Funcs(tmplFunc).Option("missingkey=zero").Parse(text) + if err != nil { + return fmt.Errorf("error parsing annotation:%w", err) + } + if err = tpl.Execute(dst, data); err != nil { + return fmt.Errorf("error evaluating annotation template:%w", err) + } + return nil +} diff --git a/app/vmalert/notifier/alert_test.go b/app/vmalert/notifier/alert_test.go new file mode 100644 index 0000000000..9d7cc3608d --- /dev/null +++ b/app/vmalert/notifier/alert_test.go @@ -0,0 +1,65 @@ +package notifier + +import ( + "fmt" + "testing" +) + +func TestAlert_ExecTemplate(t *testing.T) { + testCases := []struct { + alert *Alert + annotations map[string]string + expTpl map[string]string + }{ + { + alert: &Alert{}, + annotations: map[string]string{}, + expTpl: map[string]string{}, + }, + { + alert: &Alert{ + Value: 1e4, + Labels: map[string]string{ + "instance": "localhost", + }, + }, + annotations: map[string]string{}, + expTpl: map[string]string{}, + }, + { + alert: &Alert{ + Value: 1e4, + Labels: map[string]string{ + "job": "staging", + "instance": "localhost", + }, + }, + annotations: map[string]string{ + "summary": "Too high connection number for {{$labels.instance}} for job {{$labels.job}}", + "description": "It is {{ $value }} connections for {{$labels.instance}}", + }, + expTpl: map[string]string{ + "summary": "Too high connection number for localhost for job staging", + "description": "It is 10000 connections for localhost", + }, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + tpl, err := tc.alert.ExecTemplate(tc.annotations) + if err != nil { + t.Fatal(err) + } + if len(tpl) != len(tc.expTpl) { + t.Fatalf("expected %d elements; got %d", len(tc.expTpl), len(tpl)) + } + for k := range tc.expTpl { + got, exp := tpl[k], tc.expTpl[k] + if got != exp { + t.Fatalf("expected %q=%q; got %q=%q", k, exp, k, got) + } + } + }) + } +} diff --git a/app/vmalert/notifier/alertmanager.go b/app/vmalert/notifier/alertmanager.go new file mode 100644 index 0000000000..ab1ba6cc63 --- /dev/null +++ b/app/vmalert/notifier/alertmanager.go @@ -0,0 +1,51 @@ +package notifier + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "strings" +) + +// AlertManager represents integration provider with Prometheus alert manager +// https://github.com/prometheus/alertmanager +type AlertManager struct { + alertURL string + argFunc AlertURLGenerator + client *http.Client +} + +// Send an alert or resolve message +func (am *AlertManager) Send(alerts []Alert) error { + b := &bytes.Buffer{} + writeamRequest(b, alerts, am.argFunc) + resp, err := am.client.Post(am.alertURL, "application/json", b) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response from %q: %s", am.alertURL, err) + } + return fmt.Errorf("invalid SC %d from %q; response body: %s", resp.StatusCode, am.alertURL, string(body)) + } + return nil +} + +// AlertURLGenerator returns URL to single alert by given name +type AlertURLGenerator func(group, id string) string + +const alertManagerPath = "/api/v2/alerts" + +// NewAlertManager is a constructor for AlertManager +func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager { + return &AlertManager{ + alertURL: strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath, + argFunc: fn, + client: c, + } +} diff --git a/app/vmalert/notifier/alertmanager_request.qtpl b/app/vmalert/notifier/alertmanager_request.qtpl new file mode 100644 index 0000000000..1c64f1f373 --- /dev/null +++ b/app/vmalert/notifier/alertmanager_request.qtpl @@ -0,0 +1,34 @@ +{% import ( + "strconv" + "time" +) %} +{% stripspace %} + +{% func amRequest(alerts []Alert, generatorURL func(string, string) string) %} +[ +{% for i, alert := range alerts %} +{ + "startsAt":{%q= alert.Start.Format(time.RFC3339Nano) %}, + "generatorURL": {%q= generatorURL(alert.Group, strconv.FormatUint(alert.ID, 10)) %}, + {% if !alert.End.IsZero() %} + "endsAt":{%q= alert.End.Format(time.RFC3339Nano) %}, + {% endif %} + "labels": { + "alertname":{%q= alert.Name %} + {% for k,v := range alert.Labels %} + ,{%q= k %}:{%q= v %} + {% endfor %} + }, + "annotations": { + {% code c := len(alert.Annotations) %} + {% for k,v := range alert.Annotations %} + {% code c = c-1 %} + {%q= k %}:{%q= v %}{% if c > 0 %},{% endif %} + {% endfor %} + } +} +{% if i != len(alerts)-1 %},{% endif %} +{% endfor %} +] +{% endfunc %} +{% endstripspace %} diff --git a/app/vmalert/notifier/alertmanager_request.qtpl.go b/app/vmalert/notifier/alertmanager_request.qtpl.go new file mode 100644 index 0000000000..26aebf5848 --- /dev/null +++ b/app/vmalert/notifier/alertmanager_request.qtpl.go @@ -0,0 +1,131 @@ +// Code generated by qtc from "alertmanager_request.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +//line app/vmalert/notifier/alertmanager_request.qtpl:1 +package notifier + +//line app/vmalert/notifier/alertmanager_request.qtpl:1 +import ( + "strconv" + "time" +) + +//line app/vmalert/notifier/alertmanager_request.qtpl:7 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vmalert/notifier/alertmanager_request.qtpl:7 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vmalert/notifier/alertmanager_request.qtpl:7 +func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) { +//line app/vmalert/notifier/alertmanager_request.qtpl:7 + qw422016.N().S(`[`) +//line app/vmalert/notifier/alertmanager_request.qtpl:9 + for i, alert := range alerts { +//line app/vmalert/notifier/alertmanager_request.qtpl:9 + qw422016.N().S(`{"startsAt":`) +//line app/vmalert/notifier/alertmanager_request.qtpl:11 + qw422016.N().Q(alert.Start.Format(time.RFC3339Nano)) +//line app/vmalert/notifier/alertmanager_request.qtpl:11 + qw422016.N().S(`,"generatorURL":`) +//line app/vmalert/notifier/alertmanager_request.qtpl:12 + qw422016.N().Q(generatorURL(alert.Group, strconv.FormatUint(alert.ID, 10))) +//line app/vmalert/notifier/alertmanager_request.qtpl:12 + qw422016.N().S(`,`) +//line app/vmalert/notifier/alertmanager_request.qtpl:13 + if !alert.End.IsZero() { +//line app/vmalert/notifier/alertmanager_request.qtpl:13 + qw422016.N().S(`"endsAt":`) +//line app/vmalert/notifier/alertmanager_request.qtpl:14 + qw422016.N().Q(alert.End.Format(time.RFC3339Nano)) +//line app/vmalert/notifier/alertmanager_request.qtpl:14 + qw422016.N().S(`,`) +//line app/vmalert/notifier/alertmanager_request.qtpl:15 + } +//line app/vmalert/notifier/alertmanager_request.qtpl:15 + qw422016.N().S(`"labels": {"alertname":`) +//line app/vmalert/notifier/alertmanager_request.qtpl:17 + qw422016.N().Q(alert.Name) +//line app/vmalert/notifier/alertmanager_request.qtpl:18 + for k, v := range alert.Labels { +//line app/vmalert/notifier/alertmanager_request.qtpl:18 + qw422016.N().S(`,`) +//line app/vmalert/notifier/alertmanager_request.qtpl:19 + qw422016.N().Q(k) +//line app/vmalert/notifier/alertmanager_request.qtpl:19 + qw422016.N().S(`:`) +//line app/vmalert/notifier/alertmanager_request.qtpl:19 + qw422016.N().Q(v) +//line app/vmalert/notifier/alertmanager_request.qtpl:20 + } +//line app/vmalert/notifier/alertmanager_request.qtpl:20 + qw422016.N().S(`},"annotations": {`) +//line app/vmalert/notifier/alertmanager_request.qtpl:23 + c := len(alert.Annotations) + +//line app/vmalert/notifier/alertmanager_request.qtpl:24 + for k, v := range alert.Annotations { +//line app/vmalert/notifier/alertmanager_request.qtpl:25 + c = c - 1 + +//line app/vmalert/notifier/alertmanager_request.qtpl:26 + qw422016.N().Q(k) +//line app/vmalert/notifier/alertmanager_request.qtpl:26 + qw422016.N().S(`:`) +//line app/vmalert/notifier/alertmanager_request.qtpl:26 + qw422016.N().Q(v) +//line app/vmalert/notifier/alertmanager_request.qtpl:26 + if c > 0 { +//line app/vmalert/notifier/alertmanager_request.qtpl:26 + qw422016.N().S(`,`) +//line app/vmalert/notifier/alertmanager_request.qtpl:26 + } +//line app/vmalert/notifier/alertmanager_request.qtpl:27 + } +//line app/vmalert/notifier/alertmanager_request.qtpl:27 + qw422016.N().S(`}}`) +//line app/vmalert/notifier/alertmanager_request.qtpl:30 + if i != len(alerts)-1 { +//line app/vmalert/notifier/alertmanager_request.qtpl:30 + qw422016.N().S(`,`) +//line app/vmalert/notifier/alertmanager_request.qtpl:30 + } +//line app/vmalert/notifier/alertmanager_request.qtpl:31 + } +//line app/vmalert/notifier/alertmanager_request.qtpl:31 + qw422016.N().S(`]`) +//line app/vmalert/notifier/alertmanager_request.qtpl:33 +} + +//line app/vmalert/notifier/alertmanager_request.qtpl:33 +func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) { +//line app/vmalert/notifier/alertmanager_request.qtpl:33 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vmalert/notifier/alertmanager_request.qtpl:33 + streamamRequest(qw422016, alerts, generatorURL) +//line app/vmalert/notifier/alertmanager_request.qtpl:33 + qt422016.ReleaseWriter(qw422016) +//line app/vmalert/notifier/alertmanager_request.qtpl:33 +} + +//line app/vmalert/notifier/alertmanager_request.qtpl:33 +func amRequest(alerts []Alert, generatorURL func(string, string) string) string { +//line app/vmalert/notifier/alertmanager_request.qtpl:33 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vmalert/notifier/alertmanager_request.qtpl:33 + writeamRequest(qb422016, alerts, generatorURL) +//line app/vmalert/notifier/alertmanager_request.qtpl:33 + qs422016 := string(qb422016.B) +//line app/vmalert/notifier/alertmanager_request.qtpl:33 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vmalert/notifier/alertmanager_request.qtpl:33 + return qs422016 +//line app/vmalert/notifier/alertmanager_request.qtpl:33 +} diff --git a/app/vmalert/notifier/alertmanager_test.go b/app/vmalert/notifier/alertmanager_test.go new file mode 100644 index 0000000000..710d147bc9 --- /dev/null +++ b/app/vmalert/notifier/alertmanager_test.go @@ -0,0 +1,80 @@ +package notifier + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestAlertManager_Send(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) { + t.Errorf("should not be called") + }) + c := -1 + mux.HandleFunc(alertManagerPath, func(w http.ResponseWriter, r *http.Request) { + c++ + if r.Method != http.MethodPost { + t.Errorf("expected POST method got %s", r.Method) + } + switch c { + case 0: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + case 1: + w.WriteHeader(500) + case 2: + var a []struct { + Labels map[string]string `json:"labels"` + StartsAt time.Time `json:"startsAt"` + EndAt time.Time `json:"endsAt"` + Annotations map[string]string `json:"annotations"` + GeneratorURL string `json:"generatorURL"` + } + if err := json.NewDecoder(r.Body).Decode(&a); err != nil { + t.Errorf("can not unmarshal data into alert %s", err) + t.FailNow() + } + if len(a) != 1 { + t.Errorf("expected 1 alert in array got %d", len(a)) + } + if a[0].GeneratorURL != "group0" { + t.Errorf("exptected alert0 as generatorURL got %s", a[0].GeneratorURL) + } + if a[0].Labels["alertname"] != "alert0" { + t.Errorf("exptected alert0 as alert name got %s", a[0].Labels["alertname"]) + } + if a[0].StartsAt.IsZero() { + t.Errorf("exptected non-zero start time") + } + if a[0].EndAt.IsZero() { + t.Errorf("exptected non-zero end time") + } + } + }) + srv := httptest.NewServer(mux) + defer srv.Close() + am := NewAlertManager(srv.URL, func(group, name string) string { + return group + name + }, srv.Client()) + if err := am.Send([]Alert{{}, {}}); err == nil { + t.Error("expected connection error got nil") + } + if err := am.Send([]Alert{}); err == nil { + t.Error("expected wrong http code error got nil") + } + if err := am.Send([]Alert{{ + Group: "group", + Name: "alert0", + Start: time.Now().UTC(), + End: time.Now().UTC(), + Annotations: map[string]string{"a": "b", "c": "d", "e": "f"}, + }}); err != nil { + t.Errorf("unexpected error %s", err) + } + if c != 2 { + t.Errorf("expected 2 calls(count from zero) to server got %d", c) + } +} diff --git a/app/vmalert/notifier/notifier.go b/app/vmalert/notifier/notifier.go new file mode 100644 index 0000000000..598be77d00 --- /dev/null +++ b/app/vmalert/notifier/notifier.go @@ -0,0 +1,6 @@ +package notifier + +// Notifier is common interface for alert manager provider +type Notifier interface { + Send(alerts []Alert) error +} diff --git a/app/vmalert/notifier/template_func.go b/app/vmalert/notifier/template_func.go new file mode 100644 index 0000000000..45f5e349f4 --- /dev/null +++ b/app/vmalert/notifier/template_func.go @@ -0,0 +1,171 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notifier + +import ( + "fmt" + html_template "html/template" + "math" + "net/url" + "regexp" + "strings" + text_template "text/template" + "time" +) + +var tmplFunc text_template.FuncMap + +// InitTemplateFunc returns template helper functions +func InitTemplateFunc(externalURL *url.URL) { + tmplFunc = text_template.FuncMap{ + "args": func(args ...interface{}) map[string]interface{} { + result := make(map[string]interface{}) + for i, a := range args { + result[fmt.Sprintf("arg%d", i)] = a + } + return result + }, + "reReplaceAll": func(pattern, repl, text string) string { + re := regexp.MustCompile(pattern) + return re.ReplaceAllString(text, repl) + }, + "safeHtml": func(text string) html_template.HTML { + return html_template.HTML(text) + }, + "match": regexp.MatchString, + "title": strings.Title, + "toUpper": strings.ToUpper, + "toLower": strings.ToLower, + "humanize": func(v float64) string { + if v == 0 || math.IsNaN(v) || math.IsInf(v, 0) { + return fmt.Sprintf("%.4g", v) + } + if math.Abs(v) >= 1 { + prefix := "" + for _, p := range []string{"k", "M", "G", "T", "P", "E", "Z", "Y"} { + if math.Abs(v) < 1000 { + break + } + prefix = p + v /= 1000 + } + return fmt.Sprintf("%.4g%s", v, prefix) + } + prefix := "" + for _, p := range []string{"m", "u", "n", "p", "f", "a", "z", "y"} { + if math.Abs(v) >= 1 { + break + } + prefix = p + v *= 1000 + } + return fmt.Sprintf("%.4g%s", v, prefix) + }, + "humanize1024": func(v float64) string { + if math.Abs(v) <= 1 || math.IsNaN(v) || math.IsInf(v, 0) { + return fmt.Sprintf("%.4g", v) + } + prefix := "" + for _, p := range []string{"ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi", "Yi"} { + if math.Abs(v) < 1024 { + break + } + prefix = p + v /= 1024 + } + return fmt.Sprintf("%.4g%s", v, prefix) + }, + "humanizeDuration": func(v float64) string { + if math.IsNaN(v) || math.IsInf(v, 0) { + return fmt.Sprintf("%.4g", v) + } + if v == 0 { + return fmt.Sprintf("%.4gs", v) + } + if math.Abs(v) >= 1 { + sign := "" + if v < 0 { + sign = "-" + v = -v + } + seconds := int64(v) % 60 + minutes := (int64(v) / 60) % 60 + hours := (int64(v) / 60 / 60) % 24 + days := int64(v) / 60 / 60 / 24 + // For days to minutes, we display seconds as an integer. + if days != 0 { + return fmt.Sprintf("%s%dd %dh %dm %ds", sign, days, hours, minutes, seconds) + } + if hours != 0 { + return fmt.Sprintf("%s%dh %dm %ds", sign, hours, minutes, seconds) + } + if minutes != 0 { + return fmt.Sprintf("%s%dm %ds", sign, minutes, seconds) + } + // For seconds, we display 4 significant digits. + return fmt.Sprintf("%s%.4gs", sign, v) + } + prefix := "" + for _, p := range []string{"m", "u", "n", "p", "f", "a", "z", "y"} { + if math.Abs(v) >= 1 { + break + } + prefix = p + v *= 1000 + } + return fmt.Sprintf("%.4g%ss", v, prefix) + }, + "humanizePercentage": func(v float64) string { + return fmt.Sprintf("%.4g%%", v*100) + }, + "humanizeTimestamp": func(v float64) string { + if math.IsNaN(v) || math.IsInf(v, 0) { + return fmt.Sprintf("%.4g", v) + } + t := TimeFromUnixNano(int64(v * 1e9)).Time().UTC() + return fmt.Sprint(t) + }, + "pathPrefix": func() string { + return externalURL.Path + }, + "externalURL": func() string { + return externalURL.String() + }, + } +} + +// Time is the number of milliseconds since the epoch +// (1970-01-01 00:00 UTC) excluding leap seconds. +type Time int64 + +// TimeFromUnixNano returns the Time equivalent to the Unix Time +// t provided in nanoseconds. +func TimeFromUnixNano(t int64) Time { + return Time(t / nanosPerTick) +} + +// The number of nanoseconds per minimum tick. +const nanosPerTick = int64(minimumTick / time.Nanosecond) + +// MinimumTick is the minimum supported time resolution. This has to be +// at least time.Second in order for the code below to work. +const minimumTick = time.Millisecond + +// second is the Time duration equivalent to one second. +const second = int64(time.Second / minimumTick) + +// Time returns the time.Time representation of t. +func (t Time) Time() time.Time { + return time.Unix(int64(t)/second, (int64(t)%second)*nanosPerTick) +} diff --git a/app/vmalert/notifier/utils.go b/app/vmalert/notifier/utils.go new file mode 100644 index 0000000000..8cf7c901cd --- /dev/null +++ b/app/vmalert/notifier/utils.go @@ -0,0 +1,21 @@ +package notifier + +import ( + "fmt" + "strings" +) + +type errGroup struct { + errs []string +} + +func (eg *errGroup) err() error { + if eg == nil || len(eg.errs) == 0 { + return nil + } + return eg +} + +func (eg *errGroup) Error() string { + return fmt.Sprintf("errors:%s", strings.Join(eg.errs, "\n")) +} diff --git a/app/vmalert/remotewrite/remotewrite.go b/app/vmalert/remotewrite/remotewrite.go new file mode 100644 index 0000000000..716e3aa077 --- /dev/null +++ b/app/vmalert/remotewrite/remotewrite.go @@ -0,0 +1,187 @@ +package remotewrite + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/golang/snappy" +) + +// Client is an asynchronous HTTP client for writing +// timeseries via remote write protocol. +type Client struct { + addr string + c *http.Client + input chan prompbmarshal.TimeSeries + baUser, baPass string + flushInterval time.Duration + maxBatchSize int + maxQueueSize int + + wg sync.WaitGroup + doneCh chan struct{} +} + +type Config struct { + // Addr of remote storage + Addr string + + BasicAuthUser string + BasicAuthPass string + + // MaxBatchSize defines max number of timeseries + // to be flushed at once + MaxBatchSize int + // MaxQueueSize defines max length of input queue + // populated by Push method + MaxQueueSize int + // FlushInterval defines time interval for flushing batches + FlushInterval time.Duration + // WriteTimeout defines timeout for HTTP write request + // to remote storage + WriteTimeout time.Duration +} + +const ( + defaultMaxBatchSize = 1e3 + defaultMaxQueueSize = 100 + defaultFlushInterval = 5 * time.Second + defaultWriteTimeout = 30 * time.Second +) + +const writePath = "/api/v1/write" + +// NewClient returns asynchronous client for +// writing timeseries via remotewrite protocol. +func NewClient(ctx context.Context, cfg Config) (*Client, error) { + if cfg.Addr == "" { + return nil, fmt.Errorf("config.Addr can't be empty") + } + if cfg.MaxBatchSize == 0 { + cfg.MaxBatchSize = defaultMaxBatchSize + } + if cfg.MaxQueueSize == 0 { + cfg.MaxQueueSize = defaultMaxQueueSize + } + if cfg.FlushInterval == 0 { + cfg.FlushInterval = defaultFlushInterval + } + if cfg.WriteTimeout == 0 { + cfg.WriteTimeout = defaultWriteTimeout + } + c := &Client{ + c: &http.Client{ + Timeout: cfg.WriteTimeout, + }, + addr: strings.TrimSuffix(cfg.Addr, "/") + writePath, + baUser: cfg.BasicAuthUser, + baPass: cfg.BasicAuthPass, + flushInterval: cfg.FlushInterval, + maxBatchSize: cfg.MaxBatchSize, + doneCh: make(chan struct{}), + input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize), + } + c.run(ctx) + return c, nil +} + +// Push adds timeseries into queue for writing into remote storage. +// Push returns and error if client is stopped or if queue is full. +func (c *Client) Push(s prompbmarshal.TimeSeries) error { + select { + case <-c.doneCh: + return fmt.Errorf("client is closed") + case c.input <- s: + return nil + default: + return fmt.Errorf("failed to push timeseries - queue is full (%d entries)", + c.maxQueueSize) + } +} + +// Close stops the client and waits for all goroutines +// to exit. +func (c *Client) Close() error { + if c.doneCh == nil { + return fmt.Errorf("client is already closed") + } + close(c.input) + close(c.doneCh) + c.wg.Wait() + return nil +} + +func (c *Client) run(ctx context.Context) { + ticker := time.NewTicker(c.flushInterval) + wr := prompbmarshal.WriteRequest{} + shutdown := func() { + for ts := range c.input { + wr.Timeseries = append(wr.Timeseries, ts) + } + lastCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) + c.flush(lastCtx, wr) + cancel() + } + c.wg.Add(1) + go func() { + defer c.wg.Done() + defer ticker.Stop() + for { + select { + case <-c.doneCh: + shutdown() + return + case <-ctx.Done(): + shutdown() + return + case <-ticker.C: + c.flush(ctx, wr) + wr = prompbmarshal.WriteRequest{} + case ts := <-c.input: + wr.Timeseries = append(wr.Timeseries, ts) + if len(wr.Timeseries) >= c.maxBatchSize { + c.flush(ctx, wr) + wr = prompbmarshal.WriteRequest{} + } + } + } + }() +} + +func (c *Client) flush(ctx context.Context, wr prompbmarshal.WriteRequest) { + if len(wr.Timeseries) < 1 { + return + } + data, err := wr.Marshal() + if err != nil { + logger.Errorf("failed to marshal WriteRequest: %s", err) + return + } + req, err := http.NewRequest("POST", c.addr, bytes.NewReader(snappy.Encode(nil, data))) + if err != nil { + logger.Errorf("failed to create new HTTP request: %s", err) + return + } + if c.baPass != "" { + req.SetBasicAuth(c.baUser, c.baPass) + } + resp, err := c.c.Do(req.WithContext(ctx)) + if err != nil { + logger.Errorf("error getting response from %s:%s", req.URL, err) + return + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusNoContent { + body, _ := ioutil.ReadAll(resp.Body) + logger.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body) + return + } +} diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go new file mode 100644 index 0000000000..1a6a78c91e --- /dev/null +++ b/app/vmalert/rule.go @@ -0,0 +1,269 @@ +package main + +import ( + "context" + "errors" + "fmt" + "hash/fnv" + "sort" + "strconv" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +// Group grouping array of alert +type Group struct { + Name string + Rules []*Rule +} + +// Rule is basic alert entity +type Rule struct { + Name string `yaml:"alert"` + Expr string `yaml:"expr"` + For time.Duration `yaml:"for"` + Labels map[string]string `yaml:"labels"` + Annotations map[string]string `yaml:"annotations"` + + group *Group + + // guard status fields + mu sync.RWMutex + // stores list of active alerts + alerts map[uint64]*notifier.Alert + // stores last moment of time Exec was called + lastExecTime time.Time + // stores last error that happened in Exec func + // resets on every successful Exec + // may be used as Health state + lastExecError error +} + +// Validate validates rule +func (r *Rule) Validate() error { + if r.Name == "" { + return errors.New("rule name can not be empty") + } + if r.Expr == "" { + return fmt.Errorf("expression for rule %q can't be empty", r.Name) + } + if _, err := metricsql.Parse(r.Expr); err != nil { + return fmt.Errorf("invalid expression for rule %q: %w", r.Name, err) + } + return nil +} + +// Exec executes Rule expression via the given Querier. +// Based on the Querier results Rule maintains notifier.Alerts +func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error { + qMetrics, err := q.Query(ctx, r.Expr) + r.mu.Lock() + defer r.mu.Unlock() + + r.lastExecError = err + r.lastExecTime = time.Now() + if err != nil { + return fmt.Errorf("failed to execute query %q: %s", r.Expr, err) + } + + for h, a := range r.alerts { + // cleanup inactive alerts from previous Eval + if a.State == notifier.StateInactive { + delete(r.alerts, h) + } + } + + updated := make(map[uint64]struct{}) + // update list of active alerts + for _, m := range qMetrics { + h := hash(m) + updated[h] = struct{}{} + if _, ok := r.alerts[h]; ok { + continue + } + a, err := r.newAlert(m) + if err != nil { + r.lastExecError = err + return fmt.Errorf("failed to create alert: %s", err) + } + a.ID = h + a.State = notifier.StatePending + r.alerts[h] = a + } + + for h, a := range r.alerts { + // if alert wasn't updated in this iteration + // means it is resolved already + if _, ok := updated[h]; !ok { + a.State = notifier.StateInactive + // set endTime to last execution time + // so it can be sent by notifier on next step + a.End = r.lastExecTime + continue + } + if a.State == notifier.StatePending && time.Since(a.Start) >= r.For { + a.State = notifier.StateFiring + alertsFired.Inc() + } + if a.State == notifier.StateFiring { + a.End = r.lastExecTime.Add(3 * *evaluationInterval) + } + } + return nil +} + +// TODO: consider hashing algorithm in VM +func hash(m datasource.Metric) uint64 { + hash := fnv.New64a() + labels := m.Labels + sort.Slice(labels, func(i, j int) bool { + return labels[i].Name < labels[j].Name + }) + for _, l := range labels { + hash.Write([]byte(l.Name)) + hash.Write([]byte(l.Value)) + hash.Write([]byte("\xff")) + } + return hash.Sum64() +} + +func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) { + a := ¬ifier.Alert{ + Group: r.group.Name, + Name: r.Name, + Labels: map[string]string{}, + Value: m.Value, + Start: time.Now(), + // TODO: support End time + } + + // 1. use data labels + for _, l := range m.Labels { + a.Labels[l.Name] = l.Value + } + + // 2. template rule labels with data labels + rLabels, err := a.ExecTemplate(r.Labels) + if err != nil { + return a, err + } + + // 3. merge data labels and rule labels + // metric labels may be overridden by + // rule labels + for k, v := range rLabels { + a.Labels[k] = v + } + + // 4. template merged labels + a.Labels, err = a.ExecTemplate(a.Labels) + if err != nil { + return a, err + } + + a.Annotations, err = a.ExecTemplate(r.Annotations) + return a, err +} + +// AlertAPI generates APIAlert object from alert by its id(hash) +func (r *Rule) AlertAPI(id uint64) *APIAlert { + r.mu.RLock() + defer r.mu.RUnlock() + a, ok := r.alerts[id] + if !ok { + return nil + } + return r.newAlertAPI(*a) +} + +// AlertsAPI generates list of APIAlert objects from existing alerts +func (r *Rule) AlertsAPI() []*APIAlert { + var alerts []*APIAlert + r.mu.RLock() + for _, a := range r.alerts { + alerts = append(alerts, r.newAlertAPI(*a)) + } + r.mu.RUnlock() + return alerts +} + +func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert { + return &APIAlert{ + ID: a.ID, + Name: a.Name, + Group: a.Group, + Expression: r.Expr, + Labels: a.Labels, + Annotations: a.Annotations, + State: a.State.String(), + ActiveAt: a.Start, + Value: strconv.FormatFloat(a.Value, 'e', -1, 64), + } +} + +const ( + // AlertMetricName is the metric name for synthetic alert timeseries. + alertMetricName = "ALERTS" + // AlertForStateMetricName is the metric name for 'for' state of alert. + alertForStateMetricName = "ALERTS_FOR_STATE" + + // AlertNameLabel is the label name indicating the name of an alert. + alertNameLabel = "alertname" + // AlertStateLabel is the label name indicating the state of an alert. + alertStateLabel = "alertstate" +) + +func (r *Rule) AlertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries { + var tss []prompbmarshal.TimeSeries + tss = append(tss, alertToTimeSeries(r.Name, a, timestamp)) + if r.For > 0 { + tss = append(tss, alertForToTimeSeries(r.Name, a, timestamp)) + } + return tss +} + +func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries { + labels := make(map[string]string) + for k, v := range a.Labels { + labels[k] = v + } + labels["__name__"] = alertMetricName + labels[alertNameLabel] = name + labels[alertStateLabel] = a.State.String() + return newTimeSeries(1, labels, timestamp) +} + +func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries { + labels := make(map[string]string) + for k, v := range a.Labels { + labels[k] = v + } + labels["__name__"] = alertForStateMetricName + labels[alertNameLabel] = name + return newTimeSeries(float64(a.Start.Unix()), labels, timestamp) +} + +func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries { + ts := prompbmarshal.TimeSeries{} + ts.Samples = append(ts.Samples, prompbmarshal.Sample{ + Value: value, + Timestamp: timestamp.UnixNano() / 1e6, + }) + keys := make([]string, 0, len(labels)) + for k := range labels { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + ts.Labels = append(ts.Labels, prompbmarshal.Label{ + Name: key, + Value: labels[key], + }) + } + return ts +} diff --git a/app/vmalert/rule_test.go b/app/vmalert/rule_test.go new file mode 100644 index 0000000000..f3ccded063 --- /dev/null +++ b/app/vmalert/rule_test.go @@ -0,0 +1,399 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestRule_Validate(t *testing.T) { + if err := (&Rule{}).Validate(); err == nil { + t.Errorf("exptected empty name error") + } + if err := (&Rule{Name: "alert"}).Validate(); err == nil { + t.Errorf("exptected empty expr error") + } + if err := (&Rule{Name: "alert", Expr: "test{"}).Validate(); err == nil { + t.Errorf("exptected invalid expr error") + } + if err := (&Rule{Name: "alert", Expr: "test>0"}).Validate(); err != nil { + t.Errorf("exptected valid rule got %s", err) + } +} + +func TestRule_AlertToTimeSeries(t *testing.T) { + timestamp := time.Now() + testCases := []struct { + rule *Rule + alert *notifier.Alert + expTS []prompbmarshal.TimeSeries + }{ + { + newTestRule("instant", 0), + ¬ifier.Alert{State: notifier.StateFiring}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StateFiring.String(), + alertNameLabel: "instant", + }, timestamp), + }, + }, + { + newTestRule("instant extra labels", 0), + ¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{ + "job": "foo", + "instance": "bar", + }}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StateFiring.String(), + alertNameLabel: "instant extra labels", + "job": "foo", + "instance": "bar", + }, timestamp), + }, + }, + { + newTestRule("instant labels override", 0), + ¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{ + alertStateLabel: "foo", + "__name__": "bar", + }}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StateFiring.String(), + alertNameLabel: "instant labels override", + }, timestamp), + }, + }, + { + newTestRule("for", time.Second), + ¬ifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StateFiring.String(), + alertNameLabel: "for", + }, timestamp), + newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{ + "__name__": alertForStateMetricName, + alertNameLabel: "for", + }, timestamp), + }, + }, + { + newTestRule("for pending", 10*time.Second), + ¬ifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)}, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": alertMetricName, + alertStateLabel: notifier.StatePending.String(), + alertNameLabel: "for pending", + }, timestamp), + newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{ + "__name__": alertForStateMetricName, + alertNameLabel: "for pending", + }, timestamp), + }, + }, + } + for _, tc := range testCases { + t.Run(tc.rule.Name, func(t *testing.T) { + tss := tc.rule.AlertToTimeSeries(tc.alert, timestamp) + if len(tc.expTS) != len(tss) { + t.Fatalf("expected number of timeseries %d; got %d", len(tc.expTS), len(tss)) + } + for i := range tc.expTS { + expTS, gotTS := tc.expTS[i], tss[i] + if len(expTS.Samples) != len(gotTS.Samples) { + t.Fatalf("expected number of samples %d; got %d", len(expTS.Samples), len(gotTS.Samples)) + } + for i, exp := range expTS.Samples { + got := gotTS.Samples[i] + if got.Value != exp.Value { + t.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value) + } + if got.Timestamp != exp.Timestamp { + t.Errorf("expected timestamp %d; got %d", exp.Timestamp, got.Timestamp) + } + } + if len(expTS.Labels) != len(gotTS.Labels) { + t.Fatalf("expected number of labels %d; got %d", len(expTS.Labels), len(gotTS.Labels)) + } + for i, exp := range expTS.Labels { + got := gotTS.Labels[i] + if got.Name != exp.Name { + t.Errorf("expected label name %q; got %q", exp.Name, got.Name) + } + if got.Value != exp.Value { + t.Errorf("expected label value %q; got %q", exp.Value, got.Value) + } + } + } + }) + } +} + +func newTestRule(name string, waitFor time.Duration) *Rule { + return &Rule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor} +} + +func TestRule_Exec(t *testing.T) { + testCases := []struct { + rule *Rule + steps [][]datasource.Metric + expAlerts map[uint64]*notifier.Alert + }{ + { + newTestRule("empty", 0), + [][]datasource.Metric{}, + map[uint64]*notifier.Alert{}, + }, + { + newTestRule("single-firing", 0), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, + }, + }, + { + newTestRule("single-firing=>inactive", 0), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive}, + }, + }, + { + newTestRule("single-firing=>inactive=>firing", 0), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {}, + {metricWithLabels(t, "__name__", "foo")}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, + }, + }, + { + newTestRule("single-firing=>inactive=>firing=>inactive", 0), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {}, + {metricWithLabels(t, "__name__", "foo")}, + {}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive}, + }, + }, + { + newTestRule("single-firing=>inactive=>firing=>inactive=>empty", 0), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {}, + {metricWithLabels(t, "__name__", "foo")}, + {}, + {}, + }, + map[uint64]*notifier.Alert{}, + }, + { + newTestRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {}, + {metricWithLabels(t, "__name__", "foo")}, + {}, + {}, + {metricWithLabels(t, "__name__", "foo")}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, + }, + }, + { + newTestRule("multiple-firing", 0), + [][]datasource.Metric{ + { + metricWithLabels(t, "__name__", "foo"), + metricWithLabels(t, "__name__", "foo1"), + metricWithLabels(t, "__name__", "foo2"), + }, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, + hash(metricWithLabels(t, "__name__", "foo1")): {State: notifier.StateFiring}, + hash(metricWithLabels(t, "__name__", "foo2")): {State: notifier.StateFiring}, + }, + }, + { + newTestRule("multiple-steps-firing", 0), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {metricWithLabels(t, "__name__", "foo1")}, + {metricWithLabels(t, "__name__", "foo2")}, + }, + // 1: fire first alert + // 2: fire second alert, set first inactive + // 3: fire third alert, set second inactive, delete first one + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo1")): {State: notifier.StateInactive}, + hash(metricWithLabels(t, "__name__", "foo2")): {State: notifier.StateFiring}, + }, + }, + { + newTestRule("duplicate", 0), + [][]datasource.Metric{ + { + // metrics with the same labelset should result in one alert + metricWithLabels(t, "__name__", "foo", "type", "bar"), + metricWithLabels(t, "type", "bar", "__name__", "foo"), + }, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo", "type", "bar")): {State: notifier.StateFiring}, + }, + }, + { + newTestRule("for-pending", time.Minute), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StatePending}, + }, + }, + { + newTestRule("for-fired", time.Millisecond), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {metricWithLabels(t, "__name__", "foo")}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, + }, + }, + { + newTestRule("for-pending=>inactive", time.Millisecond), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {metricWithLabels(t, "__name__", "foo")}, + // empty step to reset pending alerts + {}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive}, + }, + }, + { + newTestRule("for-pending=>firing=>inactive", time.Millisecond), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {metricWithLabels(t, "__name__", "foo")}, + // empty step to reset pending alerts + {}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive}, + }, + }, + { + newTestRule("for-pending=>firing=>inactive=>pending", time.Millisecond), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {metricWithLabels(t, "__name__", "foo")}, + // empty step to reset pending alerts + {}, + {metricWithLabels(t, "__name__", "foo")}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StatePending}, + }, + }, + { + newTestRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond), + [][]datasource.Metric{ + {metricWithLabels(t, "__name__", "foo")}, + {metricWithLabels(t, "__name__", "foo")}, + // empty step to reset pending alerts + {}, + {metricWithLabels(t, "__name__", "foo")}, + {metricWithLabels(t, "__name__", "foo")}, + }, + map[uint64]*notifier.Alert{ + hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, + }, + }, + } + fakeGroup := &Group{Name: "TestRule_Exec"} + for _, tc := range testCases { + t.Run(tc.rule.Name, func(t *testing.T) { + fq := &fakeQuerier{} + tc.rule.group = fakeGroup + for _, step := range tc.steps { + fq.reset() + fq.add(t, step...) + if err := tc.rule.Exec(context.TODO(), fq); err != nil { + t.Fatalf("unexpected err: %s", err) + } + // artificial delay between applying steps + time.Sleep(time.Millisecond) + } + if len(tc.rule.alerts) != len(tc.expAlerts) { + t.Fatalf("expected %d alerts; got %d", len(tc.expAlerts), len(tc.rule.alerts)) + } + for key, exp := range tc.expAlerts { + got, ok := tc.rule.alerts[key] + if !ok { + t.Fatalf("expected to have key %d", key) + } + if got.State != exp.State { + t.Fatalf("expected state %d; got %d", exp.State, got.State) + } + } + }) + } +} + +func metricWithLabels(t *testing.T, labels ...string) datasource.Metric { + t.Helper() + if len(labels) == 0 || len(labels)%2 != 0 { + t.Fatalf("expected to get even number of labels") + } + m := datasource.Metric{} + for i := 0; i < len(labels); i += 2 { + m.Labels = append(m.Labels, datasource.Label{ + Name: labels[i], + Value: labels[i+1], + }) + } + return m +} + +type fakeQuerier struct { + metrics []datasource.Metric +} + +func (fq *fakeQuerier) reset() { + fq.metrics = fq.metrics[:0] +} + +func (fq *fakeQuerier) add(t *testing.T, metrics ...datasource.Metric) { + fq.metrics = append(fq.metrics, metrics...) +} + +func (fq fakeQuerier) Query(ctx context.Context, query string) ([]datasource.Metric, error) { + return fq.metrics, nil +} diff --git a/app/vmalert/testdata/dir/rules0-bad.rules b/app/vmalert/testdata/dir/rules0-bad.rules new file mode 100644 index 0000000000..c4a971a9b5 --- /dev/null +++ b/app/vmalert/testdata/dir/rules0-bad.rules @@ -0,0 +1,19 @@ +groups: + - name: group + rules: + - alert: InvalidAnnotations + for: 5m + expr: vm_rows > 0 + labels: + label: bar + annotations: + summary: "{{ $value }" + description: "{{$labels}}" + - alert: UnkownAnnotationsFunction + for: 5m + expr: vm_rows > 0 + labels: + label: bar + annotations: + summary: "{{ value|query }}" + description: "{{$labels}}" diff --git a/app/vmalert/testdata/dir/rules0-good.rules b/app/vmalert/testdata/dir/rules0-good.rules new file mode 100644 index 0000000000..ec5b0bc9d2 --- /dev/null +++ b/app/vmalert/testdata/dir/rules0-good.rules @@ -0,0 +1,13 @@ +groups: + - name: duplicatedGroupDiffFiles + rules: + - alert: VMRows + for: 5m + expr: vm_rows > 0 + labels: + label: bar + annotations: + summary: "{{ $value|humanize }}" + description: "{{$labels}}" + + diff --git a/app/vmalert/testdata/dir/rules1-bad.rules b/app/vmalert/testdata/dir/rules1-bad.rules new file mode 100644 index 0000000000..205ff5883d --- /dev/null +++ b/app/vmalert/testdata/dir/rules1-bad.rules @@ -0,0 +1,22 @@ +groups: + - name: sameGroup + rules: + - alert: alert + for: 5m + expr: vm_rows > 0 + labels: + label: bar + annotations: + summary: "{{ $value }}" + description: "{{$labels}}" + - name: sameGroup + rules: + - alert: alert + for: 5m + expr: vm_rows > 0 + labels: + label: bar + annotations: + summary: "{{ $value }}" + description: "{{$labels}}" + diff --git a/app/vmalert/testdata/dir/rules1-good.rules b/app/vmalert/testdata/dir/rules1-good.rules new file mode 100644 index 0000000000..1e602e0312 --- /dev/null +++ b/app/vmalert/testdata/dir/rules1-good.rules @@ -0,0 +1,13 @@ +groups: + - name: duplicatedGroupDiffFiles + rules: + - alert: VMRows + for: 5m + expr: vm_rows > 0 + labels: + label: bar + annotations: + summary: "{{ $value }}" + description: "{{$labels}}" + + diff --git a/app/vmalert/testdata/dir/rules2-bad.rules b/app/vmalert/testdata/dir/rules2-bad.rules new file mode 100644 index 0000000000..57ed213576 --- /dev/null +++ b/app/vmalert/testdata/dir/rules2-bad.rules @@ -0,0 +1,11 @@ +groups: + - name: group + rules: + - alert: UnkownLabelFunction + for: 5m + expr: vm_rows > 0 + labels: + label: bar + summary: "{{ value|query }}" + annotations: + description: "{{$labels}}" diff --git a/app/vmalert/testdata/rules0-bad.rules b/app/vmalert/testdata/rules0-bad.rules new file mode 100644 index 0000000000..0353e54b4b --- /dev/null +++ b/app/vmalert/testdata/rules0-bad.rules @@ -0,0 +1,28 @@ +groups: + - name: group + rules: + - alert: InvalidExpr + for: 5m + expr: vm_rows{ > 0 + labels: + label: bar + annotations: + summary: "{{ $value }}" + description: "{{$labels}}" + - alert: EmptyExpr + for: 5m + expr: "" + labels: + label: bar + annotations: + summary: "{{ $value }}" + description: "{{$labels}}" + - alert: "" + for: 5m + expr: vm_rows > 0 + labels: + label: foo + annotations: + summary: "{{ $value }}" + description: "{{$labels}}" + diff --git a/app/vmalert/testdata/rules0-good.rules b/app/vmalert/testdata/rules0-good.rules new file mode 100644 index 0000000000..ddb55a1d81 --- /dev/null +++ b/app/vmalert/testdata/rules0-good.rules @@ -0,0 +1,23 @@ +groups: + - name: groupGorSingleAlert + rules: + - alert: VMRows + for: 10s + expr: vm_rows > 0 + labels: + label: bar + template: "{{ $value|humanize }}" + annotations: + summary: "{{ $value|humanize }}" + description: "{{$labels}}" + + - name: TestGroup + rules: + - alert: Conns + expr: sum(vm_tcplistener_conns) by(instance) > 1 + annotations: + summary: "Too high connection number for {{$labels.instance}}" + description: "It is {{ $value }} connections for {{$labels.instance}}" + - alert: ExampleAlertAlwaysFiring + expr: sum by(job) + (up == 1) \ No newline at end of file diff --git a/app/vmalert/web.go b/app/vmalert/web.go new file mode 100644 index 0000000000..8269a35ba4 --- /dev/null +++ b/app/vmalert/web.go @@ -0,0 +1,134 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" +) + +// APIAlert has info for an alert. +type APIAlert struct { + ID uint64 `json:"id"` + Name string `json:"name"` + Group string `json:"group"` + Expression string `json:"expression"` + State string `json:"state"` + Value string `json:"value"` + Labels map[string]string `json:"labels"` + Annotations map[string]string `json:"annotations"` + ActiveAt time.Time `json:"activeAt"` +} + +type requestHandler struct { + groups []Group +} + +var pathList = [][]string{ + {"/api/v1/alerts", "list all active alerts"}, + {"/api/v1/groupName/alertID/status", "get alert status by ID"}, + // /metrics is served by httpserver by default + {"/metrics", "list of application metrics"}, +} + +func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool { + resph := responseHandler{w} + switch r.URL.Path { + case "/": + for _, path := range pathList { + p, doc := path[0], path[1] + fmt.Fprintf(w, "%q - %s
", p, p, doc) + } + return true + case "/api/v1/alerts": + resph.handle(rh.list()) + return true + default: + // /api/v1///status + if strings.HasSuffix(r.URL.Path, "/status") { + resph.handle(rh.alert(r.URL.Path)) + return true + } + return false + } +} + +type listAlertsResponse struct { + Data struct { + Alerts []*APIAlert `json:"alerts"` + } `json:"data"` + Status string `json:"status"` +} + +func (rh *requestHandler) list() ([]byte, error) { + lr := listAlertsResponse{Status: "success"} + for _, g := range rh.groups { + for _, r := range g.Rules { + lr.Data.Alerts = append(lr.Data.Alerts, r.AlertsAPI()...) + } + } + + // sort list of alerts for deterministic output + sort.Slice(lr.Data.Alerts, func(i, j int) bool { + return lr.Data.Alerts[i].Name < lr.Data.Alerts[j].Name + }) + + b, err := json.Marshal(lr) + if err != nil { + return nil, &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf(`error encoding list of active alerts: %s`, err), + StatusCode: http.StatusInternalServerError, + } + } + return b, nil +} + +func (rh *requestHandler) alert(path string) ([]byte, error) { + parts := strings.SplitN(strings.TrimPrefix(path, "/api/v1/"), "/", 3) + if len(parts) != 3 { + return nil, &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf(`path %q cointains /status suffix but doesn't match pattern "/group/alert/status"`, path), + StatusCode: http.StatusBadRequest, + } + } + group := strings.TrimRight(parts[0], "/") + idStr := strings.TrimRight(parts[1], "/") + id, err := strconv.ParseUint(idStr, 10, 0) + if err != nil { + return nil, &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf(`cannot parse int from %q`, idStr), + StatusCode: http.StatusBadRequest, + } + } + for _, g := range rh.groups { + if g.Name != group { + continue + } + for i := range g.Rules { + if apiAlert := g.Rules[i].AlertAPI(id); apiAlert != nil { + return json.Marshal(apiAlert) + } + } + } + return nil, &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf(`cannot find alert %s in %q`, idStr, group), + StatusCode: http.StatusNotFound, + } +} + +// responseHandler wrapper on http.ResponseWriter with sugar +type responseHandler struct{ http.ResponseWriter } + +func (w responseHandler) handle(b []byte, err error) { + if err != nil { + httpserver.Errorf(w, "%s", err) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(b) +} diff --git a/app/vmalert/web_test.go b/app/vmalert/web_test.go new file mode 100644 index 0000000000..8d2c02d76c --- /dev/null +++ b/app/vmalert/web_test.go @@ -0,0 +1,72 @@ +package main + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" +) + +func TestHandler(t *testing.T) { + rule := &Rule{ + Name: "alert", + alerts: map[uint64]*notifier.Alert{ + 0: {}, + }, + } + rh := &requestHandler{ + groups: []Group{{ + Name: "group", + Rules: []*Rule{rule}, + }}, + } + getResp := func(url string, to interface{}, code int) { + t.Helper() + resp, err := http.Get(url) + if err != nil { + t.Errorf("unexpected err %s", err) + } + if code != resp.StatusCode { + t.Errorf("unexpected status code %d want %d", resp.StatusCode, code) + } + defer func() { + if err := resp.Body.Close(); err != nil { + t.Errorf("err closing body %s", err) + } + }() + if to != nil { + if err = json.NewDecoder(resp.Body).Decode(to); err != nil { + t.Errorf("unexpected err %s", err) + } + } + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { rh.handler(w, r) })) + defer ts.Close() + t.Run("/api/v1/alerts", func(t *testing.T) { + lr := listAlertsResponse{} + getResp(ts.URL+"/api/v1/alerts", &lr, 200) + if length := len(lr.Data.Alerts); length != 1 { + t.Errorf("expected 1 alert got %d", length) + } + }) + t.Run("/api/v1/group/0/status", func(t *testing.T) { + alert := &APIAlert{} + getResp(ts.URL+"/api/v1/group/0/status", alert, 200) + expAlert := rule.newAlertAPI(*rule.alerts[0]) + if !reflect.DeepEqual(alert, expAlert) { + t.Errorf("expected %v is equal to %v", alert, expAlert) + } + }) + t.Run("/api/v1/group/1/status", func(t *testing.T) { + getResp(ts.URL+"/api/v1/group/1/status", nil, 404) + }) + t.Run("/api/v1/unknown-group/0/status", func(t *testing.T) { + getResp(ts.URL+"/api/v1/unknown-group/0/status", nil, 404) + }) + t.Run("/", func(t *testing.T) { + getResp(ts.URL, nil, 200) + }) +}