lib/promscrape/discovery/kuma: follow-up for 317fef95f9

- Do not generate __meta_server label, since it is unavailable in Prometheus.
- Add a link to https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs to docs/CHANGELOG.md,
  so users could click it and read the docs without the need to search the corresponding docs.
- Remove kumaTarget struct, since it is easier generating labels for discovered targets
  directly from the response returned by Kuma. This simplifies the code.
- Store the generated labels for discovered targets inside atomic.Value. This allows reading them
  from concurrent goroutines without the need to use mutex.
- Use synchronouse requests to Kuma instead of long polling, since there is a little sense
  in the long polling when the Kuma server may return 304 Not Modified response every -promscrape.kumaSDCheckInterval.
- Remove -promscrape.kuma.waitTime command-line flag, since it is no longer needed when long polling isn't used.
- Set default value for -promscrape.kumaSDCheckInterval to 30s in order to be consistent with Prometheus.
- Remove unnecessary indirections for string literals, which are used only once, in order to improve code readability.
- Remove unused fields from discoveryRequest and discoveryResponse.
- Update tests.
- Document why fetch_timeout and refresh_interval options are missing in kuma_sd_config.
- Add docs to discoveryutils.RequestCallback and discoveryutils.ResponseCallback,
  since these are public types.

Side notes: it is weird that Prometheus implementation for kuma_sd_configs sets `instance` label,
since usually this label is set by the Prometheus itself to __address__ after the relabeling phase.
See https://www.robustperception.io/life-of-a-label/

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3389

See https://github.com/prometheus/prometheus/issues/7919
and https://github.com/prometheus/prometheus/pull/8844
as a reference implementation in Prometheus
This commit is contained in:
Aliaksandr Valialkin 2023-02-22 17:05:49 -08:00
parent b7d13c3478
commit 1b70238dca
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
12 changed files with 297 additions and 468 deletions

View File

@ -1372,9 +1372,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-promscrape.kubernetesSDCheckInterval duration
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
-promscrape.kumaSDCheckInterval duration
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 1m0s)
-promscrape.kuma.waitTime duration
Wait time used by Kuma service discovery. Default value is used if not set
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 30s)
-promscrape.maxDroppedTargets int
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
-promscrape.maxResponseHeadersSize size

View File

@ -19,7 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* SECURITY: upgrade Go builder from Go1.20.0 to Go1.20.1. See [the list of issues addressed in Go1.20.1](https://github.com/golang/go/issues?q=milestone%3AGo1.20.1+label%3ACherryPickApproved).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to VictoriaMetrics located in another datacenter or availability zone. This also allows reducing disk IO under high load when `vmagent` starts queuing the collected data to disk when the remote storage is temporarily unavailable or cannot keep up with the data ingestion rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `kuma_sd_config` for [Kuma](http://kuma.io/) Control Plane targets discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3389).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kuma](http://kuma.io/) Control Plane targets discovery aka [kuma_sd_configs](https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3389).
* FEATURE: [vmgateway](https://docs.victoriametrics.com/vmgateway.html): add the ability to verify JWT signature via [JWKS endpoint](https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-key-sets). See [these docs](https://docs.victoriametrics.com/vmgateway.html#using-jwks-endpoint-for-jwt-signature-verification).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add the ability to limit the number of concurrent requests on a per-user basis via `-maxConcurrentPerUserRequests` command-line flag and via `max_concurrent_requests` config option. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3346) and [these docs](https://docs.victoriametrics.com/vmauth.html#concurrency-limiting).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): automatically retry failing `GET` requests on all [the configured backends](https://docs.victoriametrics.com/vmauth.html#load-balancing). Previously the backend error has been immediately returned to the client without retrying the request on the remaining backends.

View File

@ -2339,6 +2339,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
How frequently to reload the full state from Kubernetes API server (default 30m0s)
-promscrape.kubernetesSDCheckInterval duration
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
-promscrape.kumaSDCheckInterval duration
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 30s)
-promscrape.maxDroppedTargets int
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
-promscrape.maxResponseHeadersSize size

View File

@ -109,6 +109,7 @@ Case studies:
* [Brandwatch](https://docs.victoriametrics.com/CaseStudies.html#brandwatch)
* [CERN](https://docs.victoriametrics.com/CaseStudies.html#cern)
* [COLOPL](https://docs.victoriametrics.com/CaseStudies.html#colopl)
* [Dig Security](https://docs.victoriametrics.com/CaseStudies.html#dig-security)
* [Fly.io](https://docs.victoriametrics.com/CaseStudies.html#flyio)
* [German Research Center for Artificial Intelligence](https://docs.victoriametrics.com/CaseStudies.html#german-research-center-for-artificial-intelligence)
* [Grammarly](https://docs.victoriametrics.com/CaseStudies.html#grammarly)
@ -2341,6 +2342,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
How frequently to reload the full state from Kubernetes API server (default 30m0s)
-promscrape.kubernetesSDCheckInterval duration
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
-promscrape.kumaSDCheckInterval duration
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 30s)
-promscrape.maxDroppedTargets int
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
-promscrape.maxResponseHeadersSize size

View File

@ -974,12 +974,10 @@ to one of the targets returned by the http service.
The following meta labels are available on discovered targets during [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling):
* `__meta_server`: the URL of Kuma Control Plane's MADS xDS server from which the target was extracted
* `__meta_kuma_mesh`: the name of the mesh
* `__meta_kuma_service`: the name of the service associated with the proxy
* `__meta_kuma_dataplane`: the name of the proxy
* `__meta_kuma_service`: the name of the service associated with the proxy
* `__meta_kuma_label_<label_name>`: each label of target given from Kuma Control Plane
* `__metrics_path__`: the path by which the service metrics are scraped
## nomad_sd_configs

View File

@ -1373,12 +1373,10 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Interval for checking for changes in http endpoint service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#http_sd_configs for details (default 1m0s)
-promscrape.kubernetes.apiServerTimeout duration
How frequently to reload the full state from Kubernetes API server (default 30m0s)
-promscrape.kumaSDCheckInterval duration
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 1m0s)
-promscrape.kuma.waitTime duration
Wait time used by Kuma service discovery. Default value is used if not set
-promscrape.kubernetesSDCheckInterval duration
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
-promscrape.kumaSDCheckInterval duration
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 30s)
-promscrape.maxDroppedTargets int
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
-promscrape.maxResponseHeadersSize size

View File

@ -4,30 +4,34 @@ import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
)
var configMap = discoveryutils.NewConfigMap()
type apiConfig struct {
client *discoveryutils.Client
path string
client *discoveryutils.Client
apiPath string
// labels contains the latest discovered labels.
labels atomic.Value
cancel context.CancelFunc
wg sync.WaitGroup
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex // protects targets
targets []kumaTarget
latestVersion string
latestNonce string
@ -35,16 +39,6 @@ type apiConfig struct {
parseErrors *metrics.Counter
}
const (
discoveryNode = "victoria-metrics"
xdsApiVersion = "v3"
xdsRequestType = "discovery"
xdsResourceType = "monitoringassignments"
xdsResourceTypeUrl = "type.googleapis.com/kuma.observability.v1.MonitoringAssignment"
)
var waitTime = flag.Duration("promscrape.kuma.waitTime", 0, "Wait time used by Kuma service discovery. Default value is used if not set")
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
@ -54,16 +48,14 @@ func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
apiServer, apiPath, err := getAPIServerPath(sdc.Server)
if err != nil {
return nil, fmt.Errorf("cannot parse server %q: %w", sdc.Server, err)
}
ac, err := sdc.HTTPClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %w", err)
}
parsedURL, err := url.Parse(sdc.Server)
if err != nil {
return nil, fmt.Errorf("cannot parse kuma_sd server URL: %w", err)
}
apiServer := fmt.Sprintf("%s://%s", parsedURL.Scheme, parsedURL.Host)
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
@ -73,135 +65,205 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
}
apiPath := path.Join(
parsedURL.RequestURI(),
xdsApiVersion,
xdsRequestType+":"+xdsResourceType,
)
cfg := &apiConfig{
client: client,
path: apiPath,
client: client,
apiPath: apiPath,
fetchErrors: metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_discovery_kuma_errors_total{type="fetch",url=%q}`, sdc.Server)),
parseErrors: metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_discovery_kuma_errors_total{type="parse",url=%q}`, sdc.Server)),
}
// initialize targets synchronously and start updating them in background
cfg.startWatcher()
ctx, cancel := context.WithCancel(context.Background())
cfg.cancel = cancel
// Initialize targets synchronously and then start updating them in background.
// The synchronous targets' update is needed for returning non-empty list of targets
// just after the initialization.
if err := cfg.updateTargetsLabels(ctx); err != nil {
return nil, fmt.Errorf("cannot discover Kuma targets: %w", err)
}
cfg.wg.Add(1)
go func() {
defer cfg.wg.Done()
cfg.runTargetsWatcher(ctx)
}()
return cfg, nil
}
func (cfg *apiConfig) startWatcher() func() {
ctx, cancel := context.WithCancel(context.Background())
cfg.cancel = cancel
// blocking initial targets update
if err := cfg.updateTargets(ctx); err != nil {
logger.Errorf("there were errors when discovering kuma targets, so preserving the previous targets. error: %v", err)
func getAPIServerPath(serverURL string) (string, string, error) {
if serverURL == "" {
return "", "", fmt.Errorf("missing servier url")
}
// start updating targets with a long polling in background
cfg.wg.Add(1)
go func() {
ticker := time.NewTicker(*SDCheckInterval)
defer cfg.wg.Done()
defer ticker.Stop()
for {
select {
case <-ticker.C:
// we are constantly waiting for targets updates in long polling requests
err := cfg.updateTargets(ctx)
if err != nil {
logger.Errorf("there were errors when discovering kuma targets, so preserving the previous targets. error: %v", err)
}
case <-ctx.Done():
return
}
}
}()
return cancel
if !strings.Contains(serverURL, "://") {
serverURL = "http://" + serverURL
}
psu, err := url.Parse(serverURL)
if err != nil {
return "", "", fmt.Errorf("cannot parse server url=%q: %w", serverURL, err)
}
apiServer := fmt.Sprintf("%s://%s", psu.Scheme, psu.Host)
apiPath := psu.Path
if !strings.HasSuffix(apiPath, "/") {
apiPath += "/"
}
apiPath += "v3/discovery:monitoringassignments"
if psu.RawQuery != "" {
apiPath += "?" + psu.RawQuery
}
return apiServer, apiPath, nil
}
func (cfg *apiConfig) stopWatcher() {
func (cfg *apiConfig) runTargetsWatcher(ctx context.Context) {
ticker := time.NewTicker(*SDCheckInterval)
defer ticker.Stop()
doneCh := ctx.Done()
for {
select {
case <-ticker.C:
if err := cfg.updateTargetsLabels(ctx); err != nil {
logger.Errorf("there was an error when discovering Kuma targets, so preserving the previous targets; error: %s", err)
}
case <-doneCh:
return
}
}
}
func (cfg *apiConfig) mustStop() {
cfg.client.Stop()
cfg.cancel()
cfg.wg.Wait()
}
func (cfg *apiConfig) getTargets() ([]kumaTarget, error) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
return cfg.targets, nil
}
func (cfg *apiConfig) updateTargets(ctx context.Context) error {
requestBody, err := json.Marshal(discoveryRequest{
VersionInfo: cfg.latestVersion,
Node: discoveryRequestNode{Id: discoveryNode},
TypeUrl: xdsResourceTypeUrl,
func (cfg *apiConfig) updateTargetsLabels(ctx context.Context) error {
dReq := &discoveryRequest{
VersionInfo: cfg.latestVersion,
Node: discoveryRequestNode{
ID: "vmagent",
},
TypeURL: "type.googleapis.com/kuma.observability.v1.MonitoringAssignment",
ResponseNonce: cfg.latestNonce,
})
}
requestBody, err := json.Marshal(dReq)
if err != nil {
return fmt.Errorf("cannot marshal request body for kuma_sd api: %w", err)
logger.Panicf("BUG: cannot marshal Kuma discovery request: %s", err)
}
var statusCode int
data, err := cfg.client.GetBlockingAPIResponseWithParamsCtx(
ctx,
cfg.path,
func(request *http.Request) {
request.Method = http.MethodPost
request.Body = io.NopCloser(bytes.NewReader(requestBody))
// set max duration for long polling request
query := request.URL.Query()
query.Add("fetch-timeout", cfg.getWaitTime().String())
request.URL.RawQuery = query.Encode()
request.Header.Set("Accept", "application/json")
request.Header.Set("Content-Type", "application/json")
},
func(response *http.Response) {
statusCode = response.StatusCode
},
)
if statusCode == http.StatusNotModified {
return nil
updateRequestFunc := func(req *http.Request) {
req.Method = "POST"
req.Body = io.NopCloser(bytes.NewReader(requestBody))
req.Header.Set("Accept", "application/json")
req.Header.Set("Content-Type", "application/json")
}
notModified := false
inspectResponseFunc := func(resp *http.Response) {
if resp.StatusCode == http.StatusNotModified {
// Override status code, so GetBlockingAPIResponseWithParamsCtx() returns nil error.
resp.StatusCode = http.StatusOK
notModified = true
}
}
data, err := cfg.client.GetBlockingAPIResponseWithParamsCtx(ctx, cfg.apiPath, updateRequestFunc, inspectResponseFunc)
if err != nil {
cfg.fetchErrors.Inc()
return fmt.Errorf("cannot read kuma_sd api response: %w", err)
return fmt.Errorf("error when reading Kuma discovery response: %w", err)
}
if notModified {
// The targets weren't modified, so nothing to update.
return nil
}
response, err := parseDiscoveryResponse(data)
// Parse response
labels, versionInfo, nonce, err := parseTargetsLabels(data)
if err != nil {
cfg.parseErrors.Inc()
return fmt.Errorf("cannot parse kuma_sd api response: %w", err)
return fmt.Errorf("cannot parse Kuma discovery response received from %q: %w", cfg.client.APIServer(), err)
}
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.targets = parseKumaTargets(response)
cfg.latestVersion = response.VersionInfo
cfg.latestNonce = response.Nonce
cfg.labels.Store(&labels)
cfg.latestVersion = versionInfo
cfg.latestNonce = nonce
return nil
}
func (cfg *apiConfig) getWaitTime() time.Duration {
d := discoveryutils.BlockingClientReadTimeout
// Reduce wait time to avoid timeouts (request execution time should be less than the read timeout)
d -= d / 8
if *waitTime > time.Second && *waitTime < d {
d = *waitTime
func parseTargetsLabels(data []byte) ([]*promutils.Labels, string, string, error) {
var dResp discoveryResponse
if err := json.Unmarshal(data, &dResp); err != nil {
return nil, "", "", err
}
return d
return dResp.getTargetsLabels(), dResp.VersionInfo, dResp.Nonce, nil
}
func (cfg *apiConfig) mustStop() {
cfg.stopWatcher()
cfg.client.Stop()
func (dr *discoveryResponse) getTargetsLabels() []*promutils.Labels {
var ms []*promutils.Labels
for _, r := range dr.Resources {
for _, t := range r.Targets {
m := promutils.NewLabels(8 + len(r.Labels) + len(t.Labels))
m.Add("instance", t.Name)
m.Add("__address__", t.Address)
m.Add("__scheme__", t.Scheme)
m.Add("__metrics_path__", t.MetricsPath)
m.Add("__meta_kuma_dataplane", t.Name)
m.Add("__meta_kuma_mesh", r.Mesh)
m.Add("__meta_kuma_service", r.Service)
addLabels(m, r.Labels)
addLabels(m, t.Labels)
// Remove possible duplicate labels after addLabels() calls above
m.RemoveDuplicates()
ms = append(ms, m)
}
}
return ms
}
func addLabels(dst *promutils.Labels, src map[string]string) {
bb := bbPool.Get()
b := bb.B
for k, v := range src {
b = append(b[:0], "__meta_kuma_label_"...)
b = append(b, discoveryutils.SanitizeLabelName(k)...)
labelName := bytesutil.InternBytes(b)
dst.Add(labelName, v)
}
bb.B = b
bbPool.Put(bb)
}
var bbPool bytesutil.ByteBufferPool
// discoveryRequest represent xDS-requests for Kuma Service Mesh
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryrequest
type discoveryRequest struct {
VersionInfo string `json:"version_info"`
Node discoveryRequestNode `json:"node"`
ResourceNames []string `json:"resource_names"`
TypeURL string `json:"type_url"`
ResponseNonce string `json:"response_nonce"`
}
type discoveryRequestNode struct {
ID string `json:"id"`
}
// discoveryResponse represent xDS-requests for Kuma Service Mesh
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryresponse
type discoveryResponse struct {
VersionInfo string `json:"version_info"`
Resources []struct {
Mesh string `json:"mesh"`
Service string `json:"service"`
Targets []struct {
Name string `json:"name"`
Scheme string `json:"scheme"`
Address string `json:"address"`
MetricsPath string `json:"metrics_path"`
Labels map[string]string `json:"labels"`
} `json:"targets"`
Labels map[string]string `json:"labels"`
} `json:"resources"`
Nonce string `json:"nonce"`
}

View File

@ -1,80 +1,61 @@
package kuma
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
func Test_buildAPIPath(t *testing.T) {
type args struct {
server string
func TestGetAPIServerPathSuccess(t *testing.T) {
f := func(server, expectedAPIServer, expectedAPIPath string) {
t.Helper()
apiServer, apiPath, err := getAPIServerPath(server)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if apiServer != expectedAPIServer {
t.Fatalf("unexpected API server; got %q; want %q", apiServer, expectedAPIServer)
}
if apiPath != expectedAPIPath {
t.Fatalf("unexpected API path; got %q; want %q", apiPath, expectedAPIPath)
}
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "get api path ok",
args: args{server: "http://localhost:5676"},
want: "/v3/discovery:monitoringassignments",
},
{
name: "get api path incorrect server URL",
args: args{server: ":"},
wantErr: true,
},
{
name: "get api path incorrect server URL err",
args: args{server: "api"},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sdConf := &SDConfig{
Server: tt.args.server,
HTTPClientConfig: promauth.HTTPClientConfig{},
ProxyClientConfig: promauth.ProxyClientConfig{},
}
apiConf, err := getAPIConfig(sdConf, ".")
if tt.wantErr {
if err == nil {
t.Errorf("buildAPIPath() error = %v, wantErr %v", err, tt.wantErr)
}
return
}
if apiConf.path != tt.want {
t.Errorf("buildAPIPath() got = %v, want = %v", apiConf.path, tt.want)
}
sdConf.MustStop()
})
}
// url without path
f("http://localhost:5676", "http://localhost:5676", "/v3/discovery:monitoringassignments")
// url with path
f("http://localhost:5676/", "http://localhost:5676", "/v3/discovery:monitoringassignments")
f("https://foo.bar:1234/a/b", "https://foo.bar:1234", "/a/b/v3/discovery:monitoringassignments")
// url with query args
f("https://foo.bar:1234/a/b?c=d&arg2=value2", "https://foo.bar:1234", "/a/b/v3/discovery:monitoringassignments?c=d&arg2=value2")
// missing scheme
f("foo.bar", "http://foo.bar", "/v3/discovery:monitoringassignments")
f("foo.bar:1234/a/b", "http://foo.bar:1234", "/a/b/v3/discovery:monitoringassignments")
f("foo.bar:1234/a/b?c=d&arg2=value2", "http://foo.bar:1234", "/a/b/v3/discovery:monitoringassignments?c=d&arg2=value2")
}
func Test_parseAPIResponse(t *testing.T) {
type args struct {
data []byte
func TestGetAPIConfigFailure(t *testing.T) {
f := func(server string) {
t.Helper()
sdc := &SDConfig{
Server: server,
}
cfg, err := getAPIConfig(sdc, ".")
if err == nil {
t.Fatalf("expecting non-nil error")
}
if cfg != nil {
t.Fatalf("expecting nil cfg; got %v", cfg)
}
}
tests := []struct {
name string
args args
want []kumaTarget
wantErr bool
}{
// empty server url
f("")
// invalid server url
f(":")
}
{
name: "parse ok",
args: args{
data: []byte(`{
func TestParseTargetsLabels(t *testing.T) {
data := `{
"version_info":"5dc9a5dd-2091-4426-a886-dfdc24fc99d7",
"resources":[
{
@ -100,77 +81,55 @@ func Test_parseAPIResponse(t *testing.T) {
"targets":[
{
"name":"app",
"scheme":"http",
"scheme":"https",
"address":"127.0.0.1:5671",
"metrics_path":"/metrics",
"metrics_path":"/metrics/abc",
"labels":{ "kuma_io_protocol":"http" }
}
]
}
],
"type_url":"type.googleapis.com/kuma.observability.v1.MonitoringAssignment"
}`),
},
want: []kumaTarget{
{
Mesh: "default",
Service: "redis",
DataPlane: "redis",
Instance: "redis",
Scheme: "http",
Address: "127.0.0.1:5670",
MetricsPath: "/metrics",
Labels: map[string]string{"kuma_io_protocol": "tcp", "test": "test1"},
},
{
Mesh: "default",
Service: "app",
DataPlane: "app",
Instance: "app",
Scheme: "http",
Address: "127.0.0.1:5671",
MetricsPath: "/metrics",
Labels: map[string]string{"kuma_io_protocol": "http", "test": "test2"},
},
},
},
{
name: "parse err",
args: args{data: []byte(`[]`)},
wantErr: true,
},
{
name: "api version err",
args: args{
data: []byte(`{
"resources":[
{
"@type":"type.googleapis.com/kuma.observability.v2.MonitoringAssignment",
"mesh":"default",
"service":"redis",
"targets":[]
}
],
"type_url":"type.googleapis.com/kuma.observability.v2.MonitoringAssignment"
}`),
},
wantErr: true,
},
"type_url":"type.googleapis.com/kuma.observability.v1.MonitoringAssignment",
"nonce": "foobar"
}`
labelss, versionInfo, nonce, err := parseTargetsLabels([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resp, err := parseDiscoveryResponse(tt.args.data)
if tt.wantErr {
if err == nil {
t.Errorf("parseDiscoveryResponse() error = %v, wantErr %v", err, tt.wantErr)
}
return
}
got := parseKumaTargets(resp)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseDiscoveryResponse() got = %v, want %v", got, tt.want)
}
})
expectedLabelss := []*promutils.Labels{
promutils.NewLabelsFromMap(map[string]string{
"__address__": "127.0.0.1:5670",
"__meta_kuma_dataplane": "redis",
"__meta_kuma_label_kuma_io_protocol": "tcp",
"__meta_kuma_label_test": "test1",
"__meta_kuma_mesh": "default",
"__meta_kuma_service": "redis",
"__metrics_path__": "/metrics",
"__scheme__": "http",
"instance": "redis",
}),
promutils.NewLabelsFromMap(map[string]string{
"__address__": "127.0.0.1:5671",
"__meta_kuma_dataplane": "app",
"__meta_kuma_label_kuma_io_protocol": "http",
"__meta_kuma_label_test": "test2",
"__meta_kuma_mesh": "default",
"__meta_kuma_service": "app",
"__metrics_path__": "/metrics/abc",
"__scheme__": "https",
"instance": "app",
}),
}
discoveryutils.TestEqualLabelss(t, labelss, expectedLabelss)
expectedVersionInfo := "5dc9a5dd-2091-4426-a886-dfdc24fc99d7"
if versionInfo != expectedVersionInfo {
t.Fatalf("unexpected versionInfo; got %q; want %q", versionInfo, expectedVersionInfo)
}
expectedNonce := "foobar"
if nonce != expectedNonce {
t.Fatalf("unexpected nonce; got %q; want %q", nonce, expectedNonce)
}
}

View File

@ -11,7 +11,7 @@ import (
)
// SDCheckInterval defines interval for targets refresh.
var SDCheckInterval = flag.Duration("promscrape.kumaSDCheckInterval", time.Minute, "Interval for checking for changes in kuma service discovery. "+
var SDCheckInterval = flag.Duration("promscrape.kumaSDCheckInterval", 30*time.Second, "Interval for checking for changes in kuma service discovery. "+
"This works only if kuma_sd_configs is configured in '-promscrape.config' file. "+
"See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details")
@ -19,22 +19,17 @@ var SDCheckInterval = flag.Duration("promscrape.kumaSDCheckInterval", time.Minut
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kuma_sd_config
type SDConfig struct {
Server string `yaml:"server"`
Server string `yaml:"server"`
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
ProxyURL *proxy.URL `yaml:"proxy_url,omitempty"`
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
}
type kumaTarget struct {
Mesh string `json:"mesh"`
ControlPlane string `json:"controlplane"`
Service string `json:"service"`
DataPlane string `json:"dataplane"`
Instance string `json:"instance"`
Scheme string `json:"scheme"`
Address string `json:"address"`
MetricsPath string `json:"metrics_path"`
Labels map[string]string `json:"labels"`
// fetch_timeout isn't used, so it isn't defined.
// FetchTimeout time.Duration `yaml:"fetch_timeout,omitempty"`
// refresh_interval is obtained from `-promscrape.kumaSDCheckInterval` command-line option.
// RefreshInterval time.Duration `yaml:"refresh_interval,omitempty"`
}
// GetLabels returns kuma service discovery labels according to sdc.
@ -43,11 +38,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
if err != nil {
return nil, fmt.Errorf("cannot get API config for kuma_sd: %w", err)
}
targets, err := cfg.getTargets()
if err != nil {
return nil, err
}
return kumaTargetsToLabels(targets, sdc.Server), nil
v := cfg.labels.Load()
pLabels := v.(*[]*promutils.Labels)
return *pLabels, nil
}
// MustStop stops further usage for sdc.
@ -59,55 +52,3 @@ func (sdc *SDConfig) MustStop() {
cfg.mustStop()
}
}
func kumaTargetsToLabels(src []kumaTarget, sourceURL string) []*promutils.Labels {
ms := make([]*promutils.Labels, 0, len(src))
for _, target := range src {
m := promutils.NewLabels(8 + len(target.Labels))
m.Add("instance", target.Instance)
m.Add("__address__", target.Address)
m.Add("__scheme__", target.Scheme)
m.Add("__metrics_path__", target.MetricsPath)
m.Add("__meta_server", sourceURL)
m.Add("__meta_kuma_mesh", target.Mesh)
m.Add("__meta_kuma_service", target.Service)
m.Add("__meta_kuma_dataplane", target.DataPlane)
for k, v := range target.Labels {
m.Add("__meta_kuma_label_"+k, v)
}
m.RemoveDuplicates()
ms = append(ms, m)
}
return ms
}
func parseKumaTargets(response discoveryResponse) []kumaTarget {
result := make([]kumaTarget, 0, len(response.Resources))
for _, resource := range response.Resources {
for _, target := range resource.Targets {
labels := make(map[string]string)
for label, value := range resource.Labels {
labels[label] = value
}
for label, value := range target.Labels {
labels[label] = value
}
result = append(result, kumaTarget{
Mesh: resource.Mesh,
ControlPlane: response.ControlPlane.Identifier,
Service: resource.Service,
DataPlane: target.Name,
Instance: target.Name,
Scheme: target.Scheme,
Address: target.Address,
MetricsPath: target.MetricsPath,
Labels: labels,
})
}
}
return result
}

View File

@ -1,79 +0,0 @@
package kuma
import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
func Test_kumaTargetsToLabels(t *testing.T) {
type args struct {
src []kumaTarget
}
tests := []struct {
name string
args args
want []*promutils.Labels
}{
{
name: "convert to labels ok",
args: args{
src: []kumaTarget{
{
Mesh: "default",
Service: "redis",
DataPlane: "redis",
Instance: "redis",
Scheme: "http",
Address: "127.0.0.1:5670",
MetricsPath: "/metrics",
Labels: map[string]string{"kuma_io_protocol": "tcp", "kuma_io_service": "redis"},
},
{
Mesh: "default",
Service: "app",
DataPlane: "app",
Instance: "app",
Scheme: "http",
Address: "127.0.0.1:5671",
MetricsPath: "/vm/metrics",
Labels: map[string]string{"kuma_io_protocol": "http", "kuma_io_service": "app"},
},
},
},
want: []*promutils.Labels{
promutils.NewLabelsFromMap(map[string]string{
"instance": "redis",
"__address__": "127.0.0.1:5670",
"__scheme__": "http",
"__metrics_path__": "/metrics",
"__meta_server": "http://localhost:5676",
"__meta_kuma_mesh": "default",
"__meta_kuma_service": "redis",
"__meta_kuma_dataplane": "redis",
"__meta_kuma_label_kuma_io_protocol": "tcp",
"__meta_kuma_label_kuma_io_service": "redis",
}),
promutils.NewLabelsFromMap(map[string]string{
"instance": "app",
"__address__": "127.0.0.1:5671",
"__scheme__": "http",
"__metrics_path__": "/vm/metrics",
"__meta_server": "http://localhost:5676",
"__meta_kuma_mesh": "default",
"__meta_kuma_service": "app",
"__meta_kuma_dataplane": "app",
"__meta_kuma_label_kuma_io_protocol": "http",
"__meta_kuma_label_kuma_io_service": "app",
}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := kumaTargetsToLabels(tt.args.src, "http://localhost:5676")
discoveryutils.TestEqualLabelss(t, got, tt.want)
})
}
}

View File

@ -1,56 +0,0 @@
package kuma
import (
"encoding/json"
"fmt"
)
// discoveryRequest represent xDS-requests for Kuma Service Mesh
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryrequest
type discoveryRequest struct {
VersionInfo string `json:"version_info,omitempty"`
Node discoveryRequestNode `json:"node,omitempty"`
ResourceNames []string `json:"resource_names,omitempty"`
TypeUrl string `json:"type_url,omitempty"`
ResponseNonce string `json:"response_nonce,omitempty"`
}
type discoveryRequestNode struct {
Id string `json:"id,omitempty"`
}
// discoveryResponse represent xDS-requests for Kuma Service Mesh
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryresponse
type discoveryResponse struct {
VersionInfo string `json:"version_info,omitempty"`
Resources []struct {
Mesh string `json:"mesh,omitempty"`
Service string `json:"service,omitempty"`
Targets []struct {
Name string `json:"name,omitempty"`
Scheme string `json:"scheme,omitempty"`
Address string `json:"address,omitempty"`
MetricsPath string `json:"metrics_path,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
} `json:"targets,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
} `json:"resources,omitempty"`
TypeUrl string `json:"type_url,omitempty"`
Nonce string `json:"nonce,omitempty"`
ControlPlane struct {
Identifier string `json:"identifier,omitempty"`
} `json:"control_plane,omitempty"`
}
func parseDiscoveryResponse(data []byte) (discoveryResponse, error) {
response := discoveryResponse{}
err := json.Unmarshal(data, &response)
if err != nil {
return discoveryResponse{}, fmt.Errorf("cannot parse kuma_sd api response, err: %w", err)
}
if response.TypeUrl != xdsResourceTypeUrl {
return discoveryResponse{}, fmt.Errorf("unexpected type_url in kuma_sd api response, expected: %s, got: %s", xdsResourceTypeUrl, response.TypeUrl)
}
return response, nil
}

View File

@ -42,8 +42,11 @@ const (
DefaultClientReadTimeout = time.Minute
)
type RequestCallback func(request *http.Request)
type ResponseCallback func(response *http.Response)
// RequestCallback is called on the request before sending the request to the server.
type RequestCallback func(req *http.Request)
// ResponseCallback is called on the response before validating and returning the response to the caller.
type ResponseCallback func(resp *http.Response)
func concurrencyLimitChInit() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrency)