mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-02 09:10:40 +01:00
69d1893f4c
* lib/discovery/consul: update services on the watcher's start Previously, watcher's start was only initing goroutines for discovery but not waiting for the first iteration to end. It means first Consul discovery wasn't returning discovered targets until the next iteration. The change makes the watcher's start blocking until we get first discovery iteration done and all registries updated. Signed-off-by: hagen1778 <roman@victoriametrics.com> * vmalert: remove workarounds for consul SD Now when consul SD lib properly updates services on the first start, we don't need workarounds in vmalert. Signed-off-by: hagen1778 <roman@victoriametrics.com> * lib/discovery/consul: update after review Signed-off-by: hagen1778 <roman@victoriametrics.com> * wip Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
302 lines
7.1 KiB
Go
302 lines
7.1 KiB
Go
package notifier
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"sync"
|
|
"testing"
|
|
)
|
|
|
|
func TestConfigWatcherReload(t *testing.T) {
|
|
f, err := ioutil.TempFile("", "")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer func() { _ = os.Remove(f.Name()) }()
|
|
|
|
writeToFile(t, f.Name(), `
|
|
static_configs:
|
|
- targets:
|
|
- localhost:9093
|
|
- localhost:9094
|
|
`)
|
|
cw, err := newWatcher(f.Name(), nil)
|
|
if err != nil {
|
|
t.Fatalf("failed to start config watcher: %s", err)
|
|
}
|
|
defer cw.mustStop()
|
|
ns := cw.notifiers()
|
|
if len(ns) != 2 {
|
|
t.Fatalf("expected to have 2 notifiers; got %d %#v", len(ns), ns)
|
|
}
|
|
|
|
f2, err := ioutil.TempFile("", "")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer func() { _ = os.Remove(f2.Name()) }()
|
|
|
|
writeToFile(t, f2.Name(), `
|
|
static_configs:
|
|
- targets:
|
|
- 127.0.0.1:9093
|
|
`)
|
|
checkErr(t, cw.reload(f2.Name()))
|
|
|
|
ns = cw.notifiers()
|
|
if len(ns) != 1 {
|
|
t.Fatalf("expected to have 1 notifier; got %d", len(ns))
|
|
}
|
|
expAddr := "http://127.0.0.1:9093/api/v2/alerts"
|
|
if ns[0].Addr() != expAddr {
|
|
t.Fatalf("expected to get %q; got %q instead", expAddr, ns[0].Addr())
|
|
}
|
|
}
|
|
|
|
func TestConfigWatcherStart(t *testing.T) {
|
|
consulSDServer := newFakeConsulServer()
|
|
defer consulSDServer.Close()
|
|
|
|
consulSDFile, err := ioutil.TempFile("", "")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer func() { _ = os.Remove(consulSDFile.Name()) }()
|
|
|
|
writeToFile(t, consulSDFile.Name(), fmt.Sprintf(`
|
|
scheme: https
|
|
path_prefix: proxy
|
|
consul_sd_configs:
|
|
- server: %s
|
|
services:
|
|
- alertmanager
|
|
`, consulSDServer.URL))
|
|
|
|
cw, err := newWatcher(consulSDFile.Name(), nil)
|
|
if err != nil {
|
|
t.Fatalf("failed to start config watcher: %s", err)
|
|
}
|
|
defer cw.mustStop()
|
|
|
|
if len(cw.notifiers()) != 2 {
|
|
t.Fatalf("expected to get 2 notifiers; got %d", len(cw.notifiers()))
|
|
}
|
|
|
|
expAddr1 := fmt.Sprintf("https://%s/proxy/api/v2/alerts", fakeConsulService1)
|
|
expAddr2 := fmt.Sprintf("https://%s/proxy/api/v2/alerts", fakeConsulService2)
|
|
|
|
n1, n2 := cw.notifiers()[0], cw.notifiers()[1]
|
|
if n1.Addr() != expAddr1 {
|
|
t.Fatalf("exp address %q; got %q", expAddr1, n1.Addr())
|
|
}
|
|
if n2.Addr() != expAddr2 {
|
|
t.Fatalf("exp address %q; got %q", expAddr2, n2.Addr())
|
|
}
|
|
}
|
|
|
|
// TestConfigWatcherReloadConcurrent supposed to test concurrent
|
|
// execution of configuration update.
|
|
// Should be executed with -race flag
|
|
func TestConfigWatcherReloadConcurrent(t *testing.T) {
|
|
consulSDServer1 := newFakeConsulServer()
|
|
defer consulSDServer1.Close()
|
|
consulSDServer2 := newFakeConsulServer()
|
|
defer consulSDServer2.Close()
|
|
|
|
consulSDFile, err := ioutil.TempFile("", "")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer func() { _ = os.Remove(consulSDFile.Name()) }()
|
|
|
|
writeToFile(t, consulSDFile.Name(), fmt.Sprintf(`
|
|
consul_sd_configs:
|
|
- server: %s
|
|
services:
|
|
- alertmanager
|
|
- server: %s
|
|
services:
|
|
- consul
|
|
`, consulSDServer1.URL, consulSDServer2.URL))
|
|
|
|
staticAndConsulSDFile, err := ioutil.TempFile("", "")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer func() { _ = os.Remove(staticAndConsulSDFile.Name()) }()
|
|
|
|
writeToFile(t, staticAndConsulSDFile.Name(), fmt.Sprintf(`
|
|
static_configs:
|
|
- targets:
|
|
- localhost:9093
|
|
- localhost:9095
|
|
consul_sd_configs:
|
|
- server: %s
|
|
services:
|
|
- alertmanager
|
|
- server: %s
|
|
services:
|
|
- consul
|
|
`, consulSDServer1.URL, consulSDServer2.URL))
|
|
|
|
paths := []string{
|
|
staticAndConsulSDFile.Name(),
|
|
consulSDFile.Name(),
|
|
"testdata/static.good.yaml",
|
|
"unknownFields.bad.yaml",
|
|
}
|
|
|
|
cw, err := newWatcher(paths[0], nil)
|
|
if err != nil {
|
|
t.Fatalf("failed to start config watcher: %s", err)
|
|
}
|
|
defer cw.mustStop()
|
|
|
|
const workers = 500
|
|
const iterations = 10
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(workers)
|
|
for i := 0; i < workers; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for i := 0; i < iterations; i++ {
|
|
rnd := rand.Intn(len(paths))
|
|
_ = cw.reload(paths[rnd]) // update can fail and this is expected
|
|
_ = cw.notifiers()
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func writeToFile(t *testing.T, file, b string) {
|
|
t.Helper()
|
|
checkErr(t, ioutil.WriteFile(file, []byte(b), 0644))
|
|
}
|
|
|
|
func checkErr(t *testing.T, err error) {
|
|
t.Helper()
|
|
if err != nil {
|
|
t.Fatalf("unexpected err: %s", err)
|
|
}
|
|
}
|
|
|
|
const (
|
|
fakeConsulService1 = "127.0.0.1:9093"
|
|
fakeConsulService2 = "127.0.0.1:9095"
|
|
)
|
|
|
|
func newFakeConsulServer() *httptest.Server {
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/v1/agent/self", func(rw http.ResponseWriter, _ *http.Request) {
|
|
rw.Write([]byte(`{"Config": {"Datacenter": "dc1"}}`))
|
|
})
|
|
mux.HandleFunc("/v1/catalog/services", func(rw http.ResponseWriter, _ *http.Request) {
|
|
rw.Header().Set("X-Consul-Index", "1")
|
|
rw.Write([]byte(`{
|
|
"alertmanager": [
|
|
"alertmanager",
|
|
"__scheme__=http"
|
|
]
|
|
}`))
|
|
})
|
|
mux.HandleFunc("/v1/health/service/alertmanager", func(rw http.ResponseWriter, _ *http.Request) {
|
|
rw.Header().Set("X-Consul-Index", "1")
|
|
rw.Write([]byte(`
|
|
[
|
|
{
|
|
"Node": {
|
|
"ID": "e8e3629a-3f50-9d6e-aaf8-f173b5b05c72",
|
|
"Node": "machine",
|
|
"Address": "127.0.0.1",
|
|
"Datacenter": "dc1",
|
|
"TaggedAddresses": {
|
|
"lan": "127.0.0.1",
|
|
"lan_ipv4": "127.0.0.1",
|
|
"wan": "127.0.0.1",
|
|
"wan_ipv4": "127.0.0.1"
|
|
},
|
|
"Meta": {
|
|
"consul-network-segment": ""
|
|
},
|
|
"CreateIndex": 13,
|
|
"ModifyIndex": 14
|
|
},
|
|
"Service": {
|
|
"ID": "am1",
|
|
"Service": "alertmanager",
|
|
"Tags": [
|
|
"alertmanager",
|
|
"__scheme__=http"
|
|
],
|
|
"Address": "",
|
|
"Meta": null,
|
|
"Port": 9093,
|
|
"Weights": {
|
|
"Passing": 1,
|
|
"Warning": 1
|
|
},
|
|
"EnableTagOverride": false,
|
|
"Proxy": {
|
|
"Mode": "",
|
|
"MeshGateway": {},
|
|
"Expose": {}
|
|
},
|
|
"Connect": {},
|
|
"CreateIndex": 16,
|
|
"ModifyIndex": 16
|
|
}
|
|
},
|
|
{
|
|
"Node": {
|
|
"ID": "e8e3629a-3f50-9d6e-aaf8-f173b5b05c72",
|
|
"Node": "machine",
|
|
"Address": "127.0.0.1",
|
|
"Datacenter": "dc1",
|
|
"TaggedAddresses": {
|
|
"lan": "127.0.0.1",
|
|
"lan_ipv4": "127.0.0.1",
|
|
"wan": "127.0.0.1",
|
|
"wan_ipv4": "127.0.0.1"
|
|
},
|
|
"Meta": {
|
|
"consul-network-segment": ""
|
|
},
|
|
"CreateIndex": 13,
|
|
"ModifyIndex": 14
|
|
},
|
|
"Service": {
|
|
"ID": "am2",
|
|
"Service": "alertmanager",
|
|
"Tags": [
|
|
"alertmanager",
|
|
"bad-node"
|
|
],
|
|
"Address": "",
|
|
"Meta": null,
|
|
"Port": 9095,
|
|
"Weights": {
|
|
"Passing": 1,
|
|
"Warning": 1
|
|
},
|
|
"EnableTagOverride": false,
|
|
"Proxy": {
|
|
"Mode": "",
|
|
"MeshGateway": {},
|
|
"Expose": {}
|
|
},
|
|
"Connect": {},
|
|
"CreateIndex": 15,
|
|
"ModifyIndex": 15
|
|
}
|
|
}
|
|
]`))
|
|
})
|
|
|
|
return httptest.NewServer(mux)
|
|
}
|