VictoriaMetrics/app/vmalert/notifier/config_watcher_test.go
Roman Khavronenko 2a3a62dc41
vmalert: support configuration file for notifiers (#2127)
vmalert: support configuration file for notifiers

* vmalert notifiers now can be configured via file
see https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file
* add support of Consul service discovery for notifiers config
see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1947
* add UI section for currently loaded/discovered notifiers
* deprecate `-rule.configCheckInterval` in favour of `-configCheckInterval`
* add ability to suppress logs for duplicated targets for notifiers discovery
* change behaviour of `vmalert_alerts_send_errors_total` - it now accounts
for failed alerts, not HTTP calls.
2022-02-02 23:42:25 +02:00

308 lines
7.3 KiB
Go

package notifier
import (
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
)
func TestConfigWatcherReload(t *testing.T) {
f, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(f.Name()) }()
writeToFile(t, f.Name(), `
static_configs:
- targets:
- localhost:9093
- localhost:9094
`)
cw, err := newWatcher(f.Name(), nil)
if err != nil {
t.Fatalf("failed to start config watcher: %s", err)
}
ns := cw.notifiers()
if len(ns) != 2 {
t.Fatalf("expected to have 2 notifiers; got %d %#v", len(ns), ns)
}
f2, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(f2.Name()) }()
writeToFile(t, f2.Name(), `
static_configs:
- targets:
- 127.0.0.1:9093
`)
checkErr(t, cw.reload(f2.Name()))
ns = cw.notifiers()
if len(ns) != 1 {
t.Fatalf("expected to have 1 notifier; got %d", len(ns))
}
expAddr := "http://127.0.0.1:9093/api/v2/alerts"
if ns[0].Addr() != expAddr {
t.Fatalf("expected to get %q; got %q instead", expAddr, ns[0].Addr())
}
}
func TestConfigWatcherStart(t *testing.T) {
consulSDServer := newFakeConsulServer()
defer consulSDServer.Close()
consulSDFile, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(consulSDFile.Name()) }()
writeToFile(t, consulSDFile.Name(), fmt.Sprintf(`
scheme: https
path_prefix: proxy
consul_sd_configs:
- server: %s
services:
- alertmanager
`, consulSDServer.URL))
prevCheckInterval := *consul.SDCheckInterval
defer func() { *consul.SDCheckInterval = prevCheckInterval }()
*consul.SDCheckInterval = time.Millisecond * 100
cw, err := newWatcher(consulSDFile.Name(), nil)
if err != nil {
t.Fatalf("failed to start config watcher: %s", err)
}
time.Sleep(*consul.SDCheckInterval * 2)
if len(cw.notifiers()) != 2 {
t.Fatalf("expected to get 2 notifiers; got %d", len(cw.notifiers()))
}
expAddr1 := fmt.Sprintf("https://%s/proxy/api/v2/alerts", fakeConsulService1)
expAddr2 := fmt.Sprintf("https://%s/proxy/api/v2/alerts", fakeConsulService2)
n1, n2 := cw.notifiers()[0], cw.notifiers()[1]
if n1.Addr() != expAddr1 {
t.Fatalf("exp address %q; got %q", expAddr1, n1.Addr())
}
if n2.Addr() != expAddr2 {
t.Fatalf("exp address %q; got %q", expAddr2, n2.Addr())
}
}
// TestConfigWatcherReloadConcurrent supposed to test concurrent
// execution of configuration update.
// Should be executed with -race flag
func TestConfigWatcherReloadConcurrent(t *testing.T) {
consulSDServer1 := newFakeConsulServer()
defer consulSDServer1.Close()
consulSDServer2 := newFakeConsulServer()
defer consulSDServer2.Close()
consulSDFile, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(consulSDFile.Name()) }()
writeToFile(t, consulSDFile.Name(), fmt.Sprintf(`
consul_sd_configs:
- server: %s
services:
- alertmanager
- server: %s
services:
- consul
`, consulSDServer1.URL, consulSDServer2.URL))
staticAndConsulSDFile, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(staticAndConsulSDFile.Name()) }()
writeToFile(t, staticAndConsulSDFile.Name(), fmt.Sprintf(`
static_configs:
- targets:
- localhost:9093
- localhost:9095
consul_sd_configs:
- server: %s
services:
- alertmanager
- server: %s
services:
- consul
`, consulSDServer1.URL, consulSDServer2.URL))
paths := []string{
staticAndConsulSDFile.Name(),
consulSDFile.Name(),
"testdata/static.good.yaml",
"unknownFields.bad.yaml",
}
cw, err := newWatcher(paths[0], nil)
if err != nil {
t.Fatalf("failed to start config watcher: %s", err)
}
const workers = 500
const iterations = 10
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
rnd := rand.Intn(len(paths))
_ = cw.reload(paths[rnd]) // update can fail and this is expected
_ = cw.notifiers()
}
}()
}
wg.Wait()
}
func writeToFile(t *testing.T, file, b string) {
t.Helper()
checkErr(t, ioutil.WriteFile(file, []byte(b), 0644))
}
func checkErr(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
}
const (
fakeConsulService1 = "127.0.0.1:9093"
fakeConsulService2 = "127.0.0.1:9095"
)
func newFakeConsulServer() *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc("/v1/agent/self", func(rw http.ResponseWriter, _ *http.Request) {
rw.Write([]byte(`{"Config": {"Datacenter": "dc1"}}`))
})
mux.HandleFunc("/v1/catalog/services", func(rw http.ResponseWriter, _ *http.Request) {
rw.Header().Set("X-Consul-Index", "1")
rw.Write([]byte(`{
"alertmanager": [
"alertmanager",
"__scheme__=http"
]
}`))
})
mux.HandleFunc("/v1/health/service/alertmanager", func(rw http.ResponseWriter, _ *http.Request) {
rw.Header().Set("X-Consul-Index", "1")
rw.Write([]byte(`
[
{
"Node": {
"ID": "e8e3629a-3f50-9d6e-aaf8-f173b5b05c72",
"Node": "machine",
"Address": "127.0.0.1",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "127.0.0.1",
"lan_ipv4": "127.0.0.1",
"wan": "127.0.0.1",
"wan_ipv4": "127.0.0.1"
},
"Meta": {
"consul-network-segment": ""
},
"CreateIndex": 13,
"ModifyIndex": 14
},
"Service": {
"ID": "am1",
"Service": "alertmanager",
"Tags": [
"alertmanager",
"__scheme__=http"
],
"Address": "",
"Meta": null,
"Port": 9093,
"Weights": {
"Passing": 1,
"Warning": 1
},
"EnableTagOverride": false,
"Proxy": {
"Mode": "",
"MeshGateway": {},
"Expose": {}
},
"Connect": {},
"CreateIndex": 16,
"ModifyIndex": 16
}
},
{
"Node": {
"ID": "e8e3629a-3f50-9d6e-aaf8-f173b5b05c72",
"Node": "machine",
"Address": "127.0.0.1",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "127.0.0.1",
"lan_ipv4": "127.0.0.1",
"wan": "127.0.0.1",
"wan_ipv4": "127.0.0.1"
},
"Meta": {
"consul-network-segment": ""
},
"CreateIndex": 13,
"ModifyIndex": 14
},
"Service": {
"ID": "am2",
"Service": "alertmanager",
"Tags": [
"alertmanager",
"bad-node"
],
"Address": "",
"Meta": null,
"Port": 9095,
"Weights": {
"Passing": 1,
"Warning": 1
},
"EnableTagOverride": false,
"Proxy": {
"Mode": "",
"MeshGateway": {},
"Expose": {}
},
"Connect": {},
"CreateIndex": 15,
"ModifyIndex": 15
}
}
]`))
})
return httptest.NewServer(mux)
}