diff --git a/README.md b/README.md index eb5ef83e8..24ee73182 100644 --- a/README.md +++ b/README.md @@ -260,6 +260,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la * [static_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config) * [file_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) * [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config) +* [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) * [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config) In the future other `*_sd_config` types will be supported. diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 520bd428c..71b952103 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -133,6 +133,9 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh See [these docs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) for details. * `kubernetes_sd_configs` - for scraping targets in Kubernetes (k8s). See [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config) for details. +* `ec2_sd_configs` - for scraping targets in Amazone EC2. + See [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) for details. + `vmagent` doesn't support `role_arn` config param yet. * `gce_sd_configs` - for scraping targets in Google Compute Engine (GCE). See [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config) for details. `vmagent` provides the following additional functionality `gce_sd_config`: @@ -143,7 +146,6 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh The following service discovery mechanisms will be added to `vmagent` soon: -* [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) * [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) * [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index eb5ef83e8..24ee73182 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -260,6 +260,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la * [static_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config) * [file_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) * [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config) +* [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) * [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config) In the future other `*_sd_config` types will be supported. diff --git a/docs/vmagent.md b/docs/vmagent.md index 520bd428c..71b952103 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -133,6 +133,9 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh See [these docs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) for details. * `kubernetes_sd_configs` - for scraping targets in Kubernetes (k8s). See [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config) for details. +* `ec2_sd_configs` - for scraping targets in Amazone EC2. + See [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) for details. + `vmagent` doesn't support `role_arn` config param yet. * `gce_sd_configs` - for scraping targets in Google Compute Engine (GCE). See [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config) for details. `vmagent` provides the following additional functionality `gce_sd_config`: @@ -143,7 +146,6 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh The following service discovery mechanisms will be added to `vmagent` soon: -* [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) * [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) * [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config) diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 1a7939c58..0b18d388f 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes" "gopkg.in/yaml.v2" @@ -61,6 +62,7 @@ type ScrapeConfig struct { StaticConfigs []StaticConfig `yaml:"static_configs"` FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"` KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs"` + EC2SDConfigs []ec2.SDConfig `yaml:"ec2_sd_configs"` GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs"` RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs"` MetricRelabelConfigs []promrelabel.RelabelConfig `yaml:"metric_relabel_configs"` @@ -149,6 +151,14 @@ func (cfg *Config) kubernetesSDConfigsCount() int { return n } +func (cfg *Config) ec2SDConfigsCount() int { + n := 0 + for i := range cfg.ScrapeConfigs { + n += len(cfg.ScrapeConfigs[i].EC2SDConfigs) + } + return n +} + func (cfg *Config) gceSDConfigsCount() int { n := 0 for i := range cfg.ScrapeConfigs { @@ -178,6 +188,19 @@ func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork { return dst } +// getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg. +func (cfg *Config) getEC2SDScrapeWork() []ScrapeWork { + var dst []ScrapeWork + for i := range cfg.ScrapeConfigs { + sc := &cfg.ScrapeConfigs[i] + for j := range sc.EC2SDConfigs { + sdc := &sc.EC2SDConfigs[j] + dst = appendEC2ScrapeWork(dst, sdc, sc.swc) + } + } + return dst +} + // getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg. func (cfg *Config) getGCESDScrapeWork() []ScrapeWork { var dst []ScrapeWork @@ -323,6 +346,15 @@ func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, base return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config") } +func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) []ScrapeWork { + targetLabels, err := ec2.GetLabels(sdc) + if err != nil { + logger.Errorf("error when discovering ec2 nodes for `job_name` %q: %s; skipping it", swc.jobName, err) + return dst + } + return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config") +} + func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) []ScrapeWork { targetLabels, err := gce.GetLabels(sdc) if err != nil { diff --git a/lib/promscrape/discovery/ec2/api.go b/lib/promscrape/discovery/ec2/api.go new file mode 100644 index 000000000..ffc69ddeb --- /dev/null +++ b/lib/promscrape/discovery/ec2/api.go @@ -0,0 +1,218 @@ +package ec2 + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + "strings" + "sync" + "time" +) + +type apiConfig struct { + endpoint string + region string + accessKey string + secretKey string + filters string + port int +} + +func getAPIConfig(sdc *SDConfig) (*apiConfig, error) { + apiConfigMapLock.Lock() + defer apiConfigMapLock.Unlock() + + if !hasAPIConfigMapCleaner { + hasAPIConfigMapCleaner = true + go apiConfigMapCleaner() + } + + e := apiConfigMap[sdc] + if e != nil { + e.lastAccessTime = time.Now() + return e.cfg, nil + } + cfg, err := newAPIConfig(sdc) + if err != nil { + return nil, err + } + apiConfigMap[sdc] = &apiConfigMapEntry{ + cfg: cfg, + lastAccessTime: time.Now(), + } + return cfg, nil +} + +func apiConfigMapCleaner() { + tc := time.NewTicker(15 * time.Minute) + for currentTime := range tc.C { + apiConfigMapLock.Lock() + for k, e := range apiConfigMap { + if currentTime.Sub(e.lastAccessTime) > 10*time.Minute { + delete(apiConfigMap, k) + } + } + apiConfigMapLock.Unlock() + } +} + +type apiConfigMapEntry struct { + cfg *apiConfig + lastAccessTime time.Time +} + +var ( + apiConfigMap = make(map[*SDConfig]*apiConfigMapEntry) + apiConfigMapLock sync.Mutex + hasAPIConfigMapCleaner bool +) + +func newAPIConfig(sdc *SDConfig) (*apiConfig, error) { + region := sdc.Region + if len(region) == 0 { + r, err := getDefaultRegion() + if err != nil { + return nil, fmt.Errorf("cannot determine default ec2 region; probably, `region` param in `ec2_sd_configs` is missing; the error: %s", err) + } + region = r + } + accessKey := sdc.AccessKey + if len(accessKey) == 0 { + accessKey = os.Getenv("AWS_ACCESS_KEY_ID") + if len(accessKey) == 0 { + return nil, fmt.Errorf("missing `access_key` in AWS_ACCESS_KEY_ID env var; probably, `access_key` must be set in `ec2_sd_config`?") + } + } + secretKey := sdc.SecretKey + if len(secretKey) == 0 { + secretKey = os.Getenv("AWS_SECRET_ACCESS_KEY") + if len(secretKey) == 0 { + return nil, fmt.Errorf("miising `secret_key` in AWS_SECRET_ACCESS_KEY env var; probably, `secret_key` must be set in `ec2_sd_config`?") + } + } + filters := getFiltersQueryString(sdc.Filters) + port := 80 + if sdc.Port != nil { + port = *sdc.Port + } + return &apiConfig{ + endpoint: sdc.Endpoint, + region: region, + accessKey: accessKey, + secretKey: secretKey, + filters: filters, + port: port, + }, nil +} + +func getFiltersQueryString(filters []Filter) string { + // See how to build filters query string at examples at https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstances.html + var args []string + for i, f := range filters { + args = append(args, fmt.Sprintf("Filter.%d.Name=%s", i+1, url.QueryEscape(f.Name))) + for j, v := range f.Values { + args = append(args, fmt.Sprintf("Filter.%d.Value.%d=%s", i+1, j+1, url.QueryEscape(v))) + } + } + return strings.Join(args, "&") +} + +func getDefaultRegion() (string, error) { + data, err := getMetadataByPath("dynamic/instance-identity/document") + if err != nil { + return "", err + } + var id IdentityDocument + if err := json.Unmarshal(data, &id); err != nil { + return "", fmt.Errorf("cannot parse identity document: %s", err) + } + return id.Region, nil +} + +// IdentityDocument is identity document returned from AWS metadata server. +// +// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html +type IdentityDocument struct { + Region string +} + +func getMetadataByPath(apiPath string) ([]byte, error) { + // See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html + + // Obtain session token + sessionTokenURL := "http://169.254.169.254/latest/api/token" + req, err := http.NewRequest("PUT", sessionTokenURL, nil) + if err != nil { + return nil, fmt.Errorf("cannot create request for IMDSv2 session token at url %q: %s", sessionTokenURL, err) + } + req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "60") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("cannot obtain IMDSv2 session token from %q; probably, `region` is missing in `ec2_sd_config`; error: %s", sessionTokenURL, err) + } + token, err := readResponseBody(resp, sessionTokenURL) + if err != nil { + return nil, fmt.Errorf("cannot read IMDSv2 session token from %q; probably, `region` is missing in `ec2_sd_config`; error: %s", sessionTokenURL, err) + } + + // Use session token in the request. + apiURL := "http://169.254.169.254/latest/" + apiPath + req, err = http.NewRequest("GET", apiURL, nil) + if err != nil { + return nil, fmt.Errorf("cannot create request to %q: %s", apiURL, err) + } + req.Header.Set("X-aws-ec2-metadata-token", string(token)) + resp, err = http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("cannot obtain response for %q: %s", apiURL, err) + } + return readResponseBody(resp, apiURL) +} + +func getAPIResponse(cfg *apiConfig, action, nextPageToken string) ([]byte, error) { + // See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/Query-Requests.html + endpoint := fmt.Sprintf("https://ec2.%s.amazonaws.com/", cfg.region) + if len(cfg.endpoint) > 0 { + endpoint = cfg.endpoint + // endpoint may contain only hostname. Convert it to proper url then. + if !strings.Contains(endpoint, "://") { + endpoint = "https://" + endpoint + } + if !strings.HasSuffix(endpoint, "/") { + endpoint += "/" + } + } + apiURL := fmt.Sprintf("%s?Action=%s", endpoint, url.QueryEscape(action)) + if len(cfg.filters) > 0 { + apiURL += "&" + cfg.filters + } + if len(nextPageToken) > 0 { + apiURL += fmt.Sprintf("&NextToken=%s", url.QueryEscape(nextPageToken)) + } + apiURL += "&Version=2013-10-15" + req, err := newSignedRequest(apiURL, "ec2", cfg.region, cfg.accessKey, cfg.secretKey) + if err != nil { + return nil, fmt.Errorf("cannot create signed request: %s", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("cannot perform http request to %q: %s", apiURL, err) + } + return readResponseBody(resp, apiURL) +} + +func readResponseBody(resp *http.Response, apiURL string) ([]byte, error) { + data, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("cannot read response from %q: %s", apiURL, err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code for %q; got %d; want %d; response body: %q", + apiURL, resp.StatusCode, http.StatusOK, data) + } + return data, nil +} diff --git a/lib/promscrape/discovery/ec2/ec2.go b/lib/promscrape/discovery/ec2/ec2.go new file mode 100644 index 000000000..f49902b5b --- /dev/null +++ b/lib/promscrape/discovery/ec2/ec2.go @@ -0,0 +1,44 @@ +package ec2 + +import ( + "fmt" +) + +// SDConfig represents service discovery config for ec2. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config +type SDConfig struct { + Region string `yaml:"region"` + Endpoint string `yaml:"endpoint"` + AccessKey string `yaml:"access_key"` + SecretKey string `yaml:"secret_key"` + Profile string `yaml:"profile"` + // TODO: add support for RoleARN + // RoleARN string `yaml:"role_arn"` + // RefreshInterval time.Duration `yaml:"refresh_interval"` + // refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option. + Port *int `yaml:"port"` + Filters []Filter `yaml:"filters"` +} + +// Filter is ec2 filter. +// +// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstances.html +// and https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Filter.html +type Filter struct { + Name string `yaml:"name"` + Values []string `yaml:"values"` +} + +// GetLabels returns ec2 labels according to sdc. +func GetLabels(sdc *SDConfig) ([]map[string]string, error) { + cfg, err := getAPIConfig(sdc) + if err != nil { + return nil, fmt.Errorf("cannot get API config: %s", err) + } + ms, err := getInstancesLabels(cfg) + if err != nil { + return nil, fmt.Errorf("error when fetching instances data from EC2: %s", err) + } + return ms, nil +} diff --git a/lib/promscrape/discovery/ec2/instance.go b/lib/promscrape/discovery/ec2/instance.go new file mode 100644 index 000000000..6483860a7 --- /dev/null +++ b/lib/promscrape/discovery/ec2/instance.go @@ -0,0 +1,177 @@ +package ec2 + +import ( + "encoding/xml" + "fmt" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" +) + +// getInstancesLabels returns labels for ec2 instances obtained from the given cfg +func getInstancesLabels(cfg *apiConfig) ([]map[string]string, error) { + rs, err := getReservations(cfg) + if err != nil { + return nil, err + } + var ms []map[string]string + for _, r := range rs { + for _, inst := range r.InstanceSet.Items { + ms = inst.appendTargetLabels(ms, r.OwnerID, cfg.port) + } + } + return ms, nil +} + +func getReservations(cfg *apiConfig) ([]Reservation, error) { + // See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstances.html + action := "DescribeInstances" + var rs []Reservation + pageToken := "" + for { + data, err := getAPIResponse(cfg, action, pageToken) + if err != nil { + return nil, fmt.Errorf("cannot obtain instances: %s", err) + } + ir, err := parseInstancesResponse(data) + if err != nil { + return nil, fmt.Errorf("cannot parse instance list: %s", err) + } + rs = append(rs, ir.ReservationSet.Items...) + if len(ir.NextPageToken) == 0 { + return rs, nil + } + pageToken = ir.NextPageToken + } +} + +// InstancesResponse represents response to https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstances.html +type InstancesResponse struct { + ReservationSet ReservationSet `xml:"reservationSet"` + NextPageToken string `xml:"nextToken"` +} + +// ReservationSet represetns ReservationSet from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstances.html +type ReservationSet struct { + Items []Reservation `xml:"item"` +} + +// Reservation represents Reservation from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Reservation.html +type Reservation struct { + OwnerID string `xml:"ownerId"` + InstanceSet InstanceSet `xml:"instancesSet"` +} + +// InstanceSet represents InstanceSet from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Reservation.html +type InstanceSet struct { + Items []Instance `xml:"item"` +} + +// Instance represents Instance from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Instance.html +type Instance struct { + PrivateIPAddress string `xml:"privateIpAddress"` + Architecture string `xml:"architecture"` + Placement Placement `xml:"placement"` + ID string `xml:"instanceId"` + Lifecycle string `xml:"instanceLifecycle"` + State InstanceState `xml:"instanceState"` + Type string `xml:"instanceType"` + Platform string `xml:"platform"` + SubnetID string `xml:"subnetId"` + PrivateDNSName string `xml:"privateDnsName"` + PublicDNSName string `xml:"dnsName"` + PublicIPAddress string `xml:"ipAddress"` + VPCID string `xml:"vpcId"` + NetworkInterfaceSet NetworkInterfaceSet `xml:"networkInterfaceSet"` + TagSet TagSet `xml:"tagSet"` +} + +// Placement represents Placement from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Placement.html +type Placement struct { + AvailabilityZone string `xml:"availabilityZone"` +} + +// InstanceState represents InstanceState from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_InstanceState.html +type InstanceState struct { + Name string `xml:"name"` +} + +// NetworkInterfaceSet represents NetworkInterfaceSet from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Instance.html +type NetworkInterfaceSet struct { + Items []NetworkInterface `xml:"item"` +} + +// NetworkInterface represents NetworkInterface from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_InstanceNetworkInterface.html +type NetworkInterface struct { + SubnetID string `xml:"subnetId"` +} + +// TagSet represents TagSet from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Instance.html +type TagSet struct { + Items []Tag `xml:"item"` +} + +// Tag represents Tag from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Tag.html +type Tag struct { + Key string `xml:"key"` + Value string `xml:"value"` +} + +func parseInstancesResponse(data []byte) (*InstancesResponse, error) { + var v InstancesResponse + if err := xml.Unmarshal(data, &v); err != nil { + return nil, fmt.Errorf("cannot unmarshal InstancesResponse from %q: %s", data, err) + } + return &v, nil +} + +func (inst *Instance) appendTargetLabels(ms []map[string]string, ownerID string, port int) []map[string]string { + if len(inst.PrivateIPAddress) == 0 { + // Cannot scrape instance without private IP address + return ms + } + addr := discoveryutils.JoinHostPort(inst.PrivateIPAddress, port) + m := map[string]string{ + "__address__": addr, + "__meta_ec2_architecture": inst.Architecture, + "__meta_ec2_availability_zone": inst.Placement.AvailabilityZone, + "__meta_ec2_instance_id": inst.ID, + "__meta_ec2_instance_lifecycle": inst.Lifecycle, + "__meta_ec2_instance_state": inst.State.Name, + "__meta_ec2_instance_type": inst.Type, + "__meta_ec2_owner_id": ownerID, + "__meta_ec2_platform": inst.Platform, + "__meta_ec2_primary_subnet_id": inst.SubnetID, + "__meta_ec2_private_dns_name": inst.PrivateDNSName, + "__meta_ec2_private_ip": inst.PrivateIPAddress, + "__meta_ec2_public_dns_name": inst.PublicDNSName, + "__meta_ec2_public_ip": inst.PublicIPAddress, + "__meta_ec2_vpc_id": inst.VPCID, + } + if len(inst.VPCID) > 0 { + // Deduplicate VPC Subnet IDs maintaining the order of the network interfaces returned by EC2. + subnets := make([]string, 0, len(inst.NetworkInterfaceSet.Items)) + seenSubnets := make(map[string]bool, len(inst.NetworkInterfaceSet.Items)) + for _, ni := range inst.NetworkInterfaceSet.Items { + if len(ni.SubnetID) == 0 { + continue + } + if !seenSubnets[ni.SubnetID] { + seenSubnets[ni.SubnetID] = true + subnets = append(subnets, ni.SubnetID) + } + } + // We surround the separated list with the separator as well. This way regular expressions + // in relabeling rules don't have to consider tag positions. + m["__meta_ec2_subnet_id"] = "," + strings.Join(subnets, ",") + "," + } + for _, t := range inst.TagSet.Items { + if len(t.Key) == 0 || len(t.Value) == 0 { + continue + } + name := discoveryutils.SanitizeLabelName(t.Key) + m["__meta_ec2_tag_"+name] = t.Value + } + ms = append(ms, m) + return ms +} diff --git a/lib/promscrape/discovery/ec2/instance_test.go b/lib/promscrape/discovery/ec2/instance_test.go new file mode 100644 index 000000000..3c1f736de --- /dev/null +++ b/lib/promscrape/discovery/ec2/instance_test.go @@ -0,0 +1,219 @@ +package ec2 + +import ( + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" +) + +func TestParseInstancesResponse(t *testing.T) { + data := ` + + 98667f8e-7fb6-441b-a612-41c6268c6399 + + + r-05534f81f74ea7036 + 793614593844 + + + + i-0e730b692d9c15460 + ami-0eb89db7593b5d434 + + 16 + running + + ip-172-31-11-152.eu-west-2.compute.internal + ec2-3-8-232-141.eu-west-2.compute.amazonaws.com + + my-laptop + 0 + + t2.micro + 2020-04-27T09:19:26.000Z + + eu-west-2c + + default + + + disabled + + subnet-57044c3e + vpc-f1eaad99 + 172.31.11.152 + 3.8.232.141 + true + + + sg-05d74e4e8551bd020 + launch-wizard-1 + + + x86_64 + ebs + /dev/sda1 + + + /dev/sda1 + + vol-0153ef24058482522 + attached + 2020-04-27T09:19:27.000Z + true + + + + hvm + + + + foo + bar + + + xen + + + eni-01d7b338ea037a60b + subnet-57044c3e + vpc-f1eaad99 + + 793614593844 + in-use + 02:3b:63:46:13:9a + 172.31.11.152 + ip-172-31-11-152.eu-west-2.compute.internal + true + + + sg-05d74e4e8551bd020 + launch-wizard-1 + + + + eni-attach-030cc2cdffe745682 + 0 + attached + 2020-04-27T09:19:26.000Z + true + + + 3.8.232.141 + ec2-3-8-232-141.eu-west-2.compute.amazonaws.com + amazon + + + + 172.31.11.152 + ip-172-31-11-152.eu-west-2.compute.internal + true + + 3.8.232.141 + ec2-3-8-232-141.eu-west-2.compute.amazonaws.com + amazon + + + + + + false + spot + windows + + + + + +` + ir, err := parseInstancesResponse([]byte(data)) + if err != nil { + t.Fatalf("unexpected error when parsing data: %s", err) + } + irExpected := &InstancesResponse{ + ReservationSet: ReservationSet{ + Items: []Reservation{ + { + OwnerID: "793614593844", + InstanceSet: InstanceSet{ + Items: []Instance{ + { + PrivateIPAddress: "172.31.11.152", + Architecture: "x86_64", + Placement: Placement{ + AvailabilityZone: "eu-west-2c", + }, + ID: "i-0e730b692d9c15460", + Lifecycle: "spot", + State: InstanceState{ + Name: "running", + }, + Type: "t2.micro", + Platform: "windows", + SubnetID: "subnet-57044c3e", + PrivateDNSName: "ip-172-31-11-152.eu-west-2.compute.internal", + PublicDNSName: "ec2-3-8-232-141.eu-west-2.compute.amazonaws.com", + PublicIPAddress: "3.8.232.141", + VPCID: "vpc-f1eaad99", + NetworkInterfaceSet: NetworkInterfaceSet{ + Items: []NetworkInterface{ + { + SubnetID: "subnet-57044c3e", + }, + }, + }, + TagSet: TagSet{ + Items: []Tag{ + { + Key: "foo", + Value: "bar", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + if !reflect.DeepEqual(ir, irExpected) { + t.Fatalf("unexpected InstancesResponse parsed;\ngot\n%+v\nwant\n%+v", ir, irExpected) + } + + rs := ir.ReservationSet.Items[0] + ownerID := rs.OwnerID + port := 423 + inst := rs.InstanceSet.Items[0] + labelss := inst.appendTargetLabels(nil, ownerID, port) + var sortedLabelss [][]prompbmarshal.Label + for _, labels := range labelss { + sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels)) + } + expectedLabels := [][]prompbmarshal.Label{ + discoveryutils.GetSortedLabels(map[string]string{ + "__address__": "172.31.11.152:423", + "__meta_ec2_architecture": "x86_64", + "__meta_ec2_availability_zone": "eu-west-2c", + "__meta_ec2_instance_id": "i-0e730b692d9c15460", + "__meta_ec2_instance_lifecycle": "spot", + "__meta_ec2_instance_state": "running", + "__meta_ec2_instance_type": "t2.micro", + "__meta_ec2_owner_id": "793614593844", + "__meta_ec2_platform": "windows", + "__meta_ec2_primary_subnet_id": "subnet-57044c3e", + "__meta_ec2_private_dns_name": "ip-172-31-11-152.eu-west-2.compute.internal", + "__meta_ec2_private_ip": "172.31.11.152", + "__meta_ec2_public_dns_name": "ec2-3-8-232-141.eu-west-2.compute.amazonaws.com", + "__meta_ec2_public_ip": "3.8.232.141", + "__meta_ec2_subnet_id": ",subnet-57044c3e,", + "__meta_ec2_tag_foo": "bar", + "__meta_ec2_vpc_id": "vpc-f1eaad99", + }), + } + if !reflect.DeepEqual(sortedLabelss, expectedLabels) { + t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabels) + } +} diff --git a/lib/promscrape/discovery/ec2/sign.go b/lib/promscrape/discovery/ec2/sign.go new file mode 100644 index 000000000..8f0244313 --- /dev/null +++ b/lib/promscrape/discovery/ec2/sign.go @@ -0,0 +1,99 @@ +package ec2 + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// newSignedRequest signed request for apiURL according to aws signature algorithm. +// +// See the algorithm at https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html +func newSignedRequest(apiURL, service, region, accessKey, secretKey string) (*http.Request, error) { + t := time.Now().UTC() + return newSignedRequestWithTime(apiURL, service, region, accessKey, secretKey, t) +} + +func newSignedRequestWithTime(apiURL, service, region, accessKey, secretKey string, t time.Time) (*http.Request, error) { + uri, err := url.Parse(apiURL) + if err != nil { + return nil, fmt.Errorf("cannot parse %q: %s", apiURL, err) + } + + // Create canonicalRequest + amzdate := t.Format("20060102T150405Z") + datestamp := t.Format("20060102") + canonicalURL := uri.Path + canonicalQS := uri.Query().Encode() + canonicalHeaders := fmt.Sprintf("host:%s\nx-amz-date:%s\n", uri.Host, amzdate) + signedHeaders := "host;x-amz-date" + payloadHash := hashHex("") + tmp := []string{ + "GET", + canonicalURL, + canonicalQS, + canonicalHeaders, + signedHeaders, + payloadHash, + } + canonicalRequest := strings.Join(tmp, "\n") + + // Create stringToSign + algorithm := "AWS4-HMAC-SHA256" + credentialScope := fmt.Sprintf("%s/%s/%s/aws4_request", datestamp, region, service) + tmp = []string{ + algorithm, + amzdate, + credentialScope, + hashHex(canonicalRequest), + } + stringToSign := strings.Join(tmp, "\n") + + // Calculate the signature + signingKey := getSignatureKey(secretKey, datestamp, region, service) + signature := hmacHex(signingKey, stringToSign) + + // Calculate autheader + authHeader := fmt.Sprintf("%s Credential=%s/%s, SignedHeaders=%s, Signature=%s", algorithm, accessKey, credentialScope, signedHeaders, signature) + + req, err := http.NewRequest("GET", apiURL, nil) + if err != nil { + return nil, fmt.Errorf("cannot create request from %q: %s", apiURL, err) + } + req.Header.Set("x-amz-date", amzdate) + req.Header.Set("Authorization", authHeader) + return req, nil +} + +func getSignatureKey(key, datestamp, region, service string) string { + kDate := hmacBin("AWS4"+key, datestamp) + kRegion := hmacBin(kDate, region) + kService := hmacBin(kRegion, service) + return hmacBin(kService, "aws4_request") +} + +func hashHex(s string) string { + h := sha256.Sum256([]byte(s)) + return hex.EncodeToString(h[:]) +} + +func hmacHex(key, data string) string { + digest := hmacBin(key, data) + return hex.EncodeToString([]byte(digest)) +} + +func hmacBin(key, data string) string { + h := hmac.New(sha256.New, []byte(key)) + _, err := h.Write([]byte(data)) + if err != nil { + logger.Panicf("BUG: unexpected error when writing to hmac: %s", err) + } + return string(h.Sum(nil)) +} diff --git a/lib/promscrape/discovery/ec2/sign_test.go b/lib/promscrape/discovery/ec2/sign_test.go new file mode 100644 index 000000000..6a1152012 --- /dev/null +++ b/lib/promscrape/discovery/ec2/sign_test.go @@ -0,0 +1,27 @@ +package ec2 + +import ( + "testing" + "time" +) + +func TestNewSignedRequest(t *testing.T) { + f := func(apiURL string, authHeaderExpected string) { + t.Helper() + service := "ec2" + region := "us-east-1" + accessKey := "fake-access-key" + secretKey := "foobar" + ct := time.Unix(0, 0).UTC() + req, err := newSignedRequestWithTime(apiURL, service, region, accessKey, secretKey, ct) + if err != nil { + t.Fatalf("error in newSignedRequest: %s", err) + } + authHeader := req.Header.Get("Authorization") + if authHeader != authHeaderExpected { + t.Fatalf("unexpected auth header;\ngot\n%s\nwant\n%s", authHeader, authHeaderExpected) + } + } + f("https://ec2.amazonaws.com/?Action=DescribeRegions&Version=2013-10-15", + "AWS4-HMAC-SHA256 Credential=fake-access-key/19700101/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-date, Signature=79dc8f54719a4c11edcd5811824a071361b3514172a3f5c903b7e279dfa6a710") +} diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 32d96fbb0..ab45097d1 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -22,6 +22,9 @@ var ( kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+ "This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+ "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details") + ec2SDCheckInterval = flag.Duration("promscrape.ec2SDCheckInterval", time.Minute, "Interval for checking for changes in ec2. "+ + "This works only if `ec2_sd_configs` is configured in '-promscrape.config' file. "+ + "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config for details") gceSDCheckInterval = flag.Duration("promscrape.gceSDCheckInterval", time.Minute, "Interval for checking for changes in gce. "+ "This works only if `gce_sd_configs` is configured in '-promscrape.config' file. "+ "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config for details") @@ -93,6 +96,11 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) runKubernetesSDScrapers(cfg, pushData, stopCh) }() wg.Add(1) + go func() { + defer wg.Done() + runEC2SDScrapers(cfg, pushData, stopCh) + }() + wg.Add(1) go func() { defer wg.Done() runGCESDScrapers(cfg, pushData, stopCh) @@ -202,6 +210,51 @@ var ( kubernetesSDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="kubernetes_sd"}`) ) +func runEC2SDScrapers(cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { + if cfg.ec2SDConfigsCount() == 0 { + return + } + sws := cfg.getEC2SDScrapeWork() + ticker := time.NewTicker(*ec2SDCheckInterval) + defer ticker.Stop() + mustStop := false + for !mustStop { + localStopCh := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func(sws []ScrapeWork) { + defer wg.Done() + logger.Infof("starting %d scrapers for `ec2_sd_config` targets", len(sws)) + ec2SDTargets.Set(uint64(len(sws))) + runScrapeWorkers(sws, pushData, localStopCh) + ec2SDTargets.Set(0) + logger.Infof("stopped all the %d scrapers for `ec2_sd_config` targets", len(sws)) + }(sws) + waitForChans: + select { + case <-ticker.C: + swsNew := cfg.getEC2SDScrapeWork() + if equalStaticConfigForScrapeWorks(swsNew, sws) { + // Nothing changed, continue waiting for updated scrape work + goto waitForChans + } + logger.Infof("restarting scrapers for changed `ec2_sd_config` targets") + sws = swsNew + case <-stopCh: + mustStop = true + } + + close(localStopCh) + wg.Wait() + ec2SDReloads.Inc() + } +} + +var ( + ec2SDTargets = metrics.NewCounter(`vm_promscrape_targets{type="ec2_sd"}`) + ec2SDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="ec2_sd"}`) +) + func runGCESDScrapers(cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) { if cfg.gceSDConfigsCount() == 0 { return