From 58d459e8a8c5212f79e943e78beface5cac2251c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 25 Nov 2022 16:40:23 -0800 Subject: [PATCH] app/{vminsert,vmagent}: follow-up after 53a63c6c4c5c970c17c2b8232cb3b8b7f952f416 Extend /api/v1/import/prometheus with the support for Pushgateway way of specifying additional labels. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1415 --- README.md | 25 +++- app/vmagent/main.go | 62 +++------- app/vmagent/pushgateway/request_handler.go | 127 -------------------- app/vminsert/main.go | 19 +-- docs/CHANGELOG.md | 1 + docs/Cluster-VictoriaMetrics.md | 2 +- docs/Single-server-VictoriaMetrics.md | 25 +++- lib/protoparser/common/extra_labels.go | 63 +++++++++- lib/protoparser/common/extra_labels_test.go | 91 ++++++++++++++ 9 files changed, 221 insertions(+), 194 deletions(-) delete mode 100644 app/vmagent/pushgateway/request_handler.go create mode 100644 lib/protoparser/common/extra_labels_test.go diff --git a/README.md b/README.md index 558f17c6b..85c15a2e7 100644 --- a/README.md +++ b/README.md @@ -1048,7 +1048,8 @@ Time series data can be imported into VictoriaMetrics via any supported data ing * `/api/v1/import/native` for importing data obtained from [/api/v1/export/native](#how-to-export-data-in-native-format). See [these docs](#how-to-import-data-in-native-format) for details. * `/api/v1/import/csv` for importing arbitrary CSV data. See [these docs](#how-to-import-csv-data) for details. -* `/api/v1/import/prometheus` for importing data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. +* `/api/v1/import/prometheus` for importing data in Prometheus exposition format and in [Pushgateway format](https://github.com/prometheus/pushgateway#url). + See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. ### How to import data in JSON line format @@ -1153,9 +1154,11 @@ Note that it could be required to flush response cache after importing historica ### How to import data in Prometheus exposition format -VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) -and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md) -via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics: +VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format), +in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md) +and in [Pushgateway format](https://github.com/prometheus/pushgateway#url) via `/api/v1/import/prometheus` path. + +For example, the following command imports a single line in Prometheus exposition format into VictoriaMetrics:
@@ -1181,6 +1184,16 @@ It should return something like the following: {"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} ``` +The following command imports a single metric via [Pushgateway format](https://github.com/prometheus/pushgateway#url) with `{job="my_app",instance="host123"}` labels: + +
+ +```console +curl -d 'metric{label="abc"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus/metrics/job/my_app/instance/host123' +``` + +
+ Pass `Content-Encoding: gzip` HTTP request header to `/api/v1/import/prometheus` for importing gzipped data:
@@ -1192,8 +1205,8 @@ curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428
-Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. -For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. +Extra labels may be added to all the imported metrics either via [Pushgateway format](https://github.com/prometheus/pushgateway#url) +or by passing `extra_label=name=value` query args. For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. If timestamp is missing in ` ` Prometheus exposition format line, then the current timestamp is used during data ingestion. It can be overridden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`. diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 4cdb91f13..4209603b1 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -20,7 +20,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/pushgateway" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" @@ -218,6 +217,16 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } path := strings.Replace(r.URL.Path, "//", "/", -1) + if strings.HasPrefix(path, "/prometheus/api/v1/import/prometheus") || strings.HasPrefix(path, "/api/v1/import/prometheus") { + prometheusimportRequests.Inc() + if err := prometheusimport.InsertHandler(nil, r); err != nil { + prometheusimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + } if strings.HasPrefix(path, "datadog/") { // Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent. // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670 @@ -251,15 +260,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true - case "/prometheus/api/v1/import/prometheus", "/api/v1/import/prometheus": - prometheusimportRequests.Inc() - if err := prometheusimport.InsertHandler(nil, r); err != nil { - prometheusimportErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true case "/prometheus/api/v1/import/native", "/api/v1/import/native": nativeimportRequests.Inc() if err := native.InsertHandler(nil, r); err != nil { @@ -388,16 +388,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { staticServer.ServeHTTP(w, r) return true } - if strings.HasPrefix(r.URL.Path, "/api/v1/pushgateway") { - pushgatewayRequests.Inc() - if err := pushgateway.InsertHandler(nil, r); err != nil { - pushgatewayErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true - } if remotewrite.MultitenancyEnabled() { return processMultitenantRequest(w, r, path) } @@ -420,6 +410,16 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri httpserver.Errorf(w, r, "cannot obtain auth token: %s", err) return true } + if strings.HasPrefix(p.Suffix, "prometheus/api/v1/import/prometheus") { + prometheusimportRequests.Inc() + if err := prometheusimport.InsertHandler(at, r); err != nil { + prometheusimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + } if strings.HasPrefix(p.Suffix, "datadog/") { // Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent. // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670 @@ -453,15 +453,6 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri } w.WriteHeader(http.StatusNoContent) return true - case "prometheus/api/v1/import/prometheus": - prometheusimportRequests.Inc() - if err := prometheusimport.InsertHandler(at, r); err != nil { - prometheusimportErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true case "prometheus/api/v1/import/native": nativeimportRequests.Inc() if err := native.InsertHandler(at, r); err != nil { @@ -519,16 +510,6 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri fmt.Fprintf(w, `{}`) return true default: - if strings.HasPrefix(r.URL.Path, "/api/v1/pushgateway") { - pushgatewayRequests.Inc() - if err := pushgateway.InsertHandler(nil, r); err != nil { - pushgatewayErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true - } httpserver.Errorf(w, r, "unsupported multitenant path suffix: %q", p.Suffix) return true } @@ -547,9 +528,6 @@ var ( prometheusimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) prometheusimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) - pushgatewayRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/pushgateway", protocol="pushgateway"}`) - pushgatewayErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/pushgateway", protocol="pushgateway"}`) - nativeimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`) nativeimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`) diff --git a/app/vmagent/pushgateway/request_handler.go b/app/vmagent/pushgateway/request_handler.go deleted file mode 100644 index f841d7ec1..000000000 --- a/app/vmagent/pushgateway/request_handler.go +++ /dev/null @@ -1,127 +0,0 @@ -package pushgateway - -import ( - "fmt" - "net/http" - "strings" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" - "github.com/VictoriaMetrics/metrics" -) - -var ( - rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="pushgateway"}`) - rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="pushgateway"}`) - rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="pushgateway"}`) -) - -// InsertHandler processes `/api/v1/pushgateway` request. -func InsertHandler(at *auth.Token, req *http.Request) error { - // Pushgateway endpoint is of the style: /metrics/job/{//} - // Source:https://github.com/prometheus/pushgateway#url - pushgatewayPath := strings.TrimSuffix(strings.Replace(req.URL.Path, "/api/v1/pushgateway", "", 1), "/") - pathLabels, err := extractLabelsFromPath(pushgatewayPath) - if err != nil { - return err - } - extraLabels, err := parserCommon.GetExtraLabels(req) - if err != nil { - return err - } - defaultTimestamp, err := parserCommon.GetTimestamp(req) - if err != nil { - return err - } - return writeconcurrencylimiter.Do(func() error { - isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { - return insertRows(at, rows, append(pathLabels, extraLabels...)) - }, nil) - }) -} - -func extractLabelsFromPath(pushgatewayPath string) ([]prompbmarshal.Label, error) { - // Parsing Pushgateway path which is of the style: /metrics/job/{//} - // With an arbitrary number of // pairs - // Source:https://github.com/prometheus/pushgateway#url - var result []prompbmarshal.Label - if !strings.HasPrefix(pushgatewayPath, "/metrics/job/") { - return nil, fmt.Errorf("pushgateway endpoint format is incorrect. Expected /metrics/job/{//}, got %q ", pushgatewayPath) - } - labelsString := strings.Replace(pushgatewayPath, "/metrics/job/", "", 1) - labelsSlice := strings.Split(labelsString, "/") - if len(labelsSlice) == 1 && labelsSlice[0] == "" { - return nil, fmt.Errorf("pushgateway path has to contain a job name after /job/. Expected /metrics/job/{//}, got %q ", pushgatewayPath) - } - - //The first value that comes after /metrics/job/JOB_NAME gives origin to a label with key "job" and value "JOB_NAME" - result = append(result, prompbmarshal.Label{ - Name: "job", - Value: labelsSlice[0], - }) - - // We expect the number of items to be odd. - // The first item is the job label and after that is key/value pairs - if len(labelsSlice)%2 == 0 { - return nil, fmt.Errorf("number of label key/pair passed via pushgateway endpoint format does not match") - } - - // We start at 1, since index 0 was the job label value, and we jump every 2 - first item is the key, second is the value. - for i := 1; i < len(labelsSlice); i = i + 2 { - result = append(result, prompbmarshal.Label{ - Name: labelsSlice[i], - Value: labelsSlice[i+1], - }) - } - return result, nil -} - -func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { - ctx := common.GetPushCtx() - defer common.PutPushCtx(ctx) - - tssDst := ctx.WriteRequest.Timeseries[:0] - labels := ctx.Labels[:0] - samples := ctx.Samples[:0] - for i := range rows { - r := &rows[i] - labelsLen := len(labels) - labels = append(labels, prompbmarshal.Label{ - Name: "__name__", - Value: r.Metric, - }) - for j := range r.Tags { - tag := &r.Tags[j] - labels = append(labels, prompbmarshal.Label{ - Name: tag.Key, - Value: tag.Value, - }) - } - labels = append(labels, extraLabels...) - samples = append(samples, prompbmarshal.Sample{ - Value: r.Value, - Timestamp: r.Timestamp, - }) - tssDst = append(tssDst, prompbmarshal.TimeSeries{ - Labels: labels[labelsLen:], - Samples: samples[len(samples)-1:], - }) - } - ctx.WriteRequest.Timeseries = tssDst - ctx.Labels = labels - ctx.Samples = samples - remotewrite.Push(at, &ctx.WriteRequest) - rowsInserted.Add(len(rows)) - if at != nil { - rowsTenantInserted.Get(at).Add(len(rows)) - } - rowsPerInsert.Update(float64(len(rows))) - return nil -} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 05c9fd85e..dcfb8e21f 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -120,6 +120,16 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { staticServer.ServeHTTP(w, r) return true } + if strings.HasPrefix(path, "/prometheus/api/v1/import/prometheus") || strings.HasPrefix(path, "/api/v1/import/prometheus") { + prometheusimportRequests.Inc() + if err := prometheusimport.InsertHandler(r); err != nil { + prometheusimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + } if strings.HasPrefix(path, "/datadog/") { // Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent. // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670 @@ -153,15 +163,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true - case "/prometheus/api/v1/import/prometheus", "/api/v1/import/prometheus": - prometheusimportRequests.Inc() - if err := prometheusimport.InsertHandler(r); err != nil { - prometheusimportErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true case "/prometheus/api/v1/import/native", "/api/v1/import/native": nativeimportRequests.Inc() if err := native.InsertHandler(r); err != nil { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 8470958c8..9763d9123 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: add support for [Pushgateway data import format](https://github.com/prometheus/pushgateway#url) via `/api/v1/import/prometheus` url. See [these docs](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1415). Thanks to @PerGon for [the intial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3360). * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add `http://:8481/admin/tenants` API endpoint for returning a list of registered tenants. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) for details. * FEATURE: [VictoriaMetrics enterprise](https://docs.victoriametrics.com/enterprise.html): add `-storageNode.filter` command-line flag for filtering the [discovered vmstorage nodes](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) with arbitrary regular expressions. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3353). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): allow using numeric values with `K`, `Ki`, `M`, `Mi`, `G`, `Gi`, `T` and `Ti` suffixes inside MetricsQL queries. For example `8Ki` equals to `8*1024`, while `8.2M` equals to `8.2*1000*1000`. diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 3108109ff..05e0b341d 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -292,7 +292,7 @@ See [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html - `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` at `vmselect` (see below). - `prometheus/api/v1/import/native` - for importing data obtained via `api/v1/export/native` on `vmselect` (see below). - `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details. - - `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details. + - `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details. - URLs for [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): `http://:8481/select//prometheus/`, where: - `` is an arbitrary number identifying data namespace for the query (aka tenant) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index bb7578f89..7a9908aab 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -1052,7 +1052,8 @@ Time series data can be imported into VictoriaMetrics via any supported data ing * `/api/v1/import/native` for importing data obtained from [/api/v1/export/native](#how-to-export-data-in-native-format). See [these docs](#how-to-import-data-in-native-format) for details. * `/api/v1/import/csv` for importing arbitrary CSV data. See [these docs](#how-to-import-csv-data) for details. -* `/api/v1/import/prometheus` for importing data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. +* `/api/v1/import/prometheus` for importing data in Prometheus exposition format and in [Pushgateway format](https://github.com/prometheus/pushgateway#url). + See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. ### How to import data in JSON line format @@ -1157,9 +1158,11 @@ Note that it could be required to flush response cache after importing historica ### How to import data in Prometheus exposition format -VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) -and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md) -via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics: +VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format), +in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md) +and in [Pushgateway format](https://github.com/prometheus/pushgateway#url) via `/api/v1/import/prometheus` path. + +For example, the following command imports a single line in Prometheus exposition format into VictoriaMetrics:
@@ -1185,6 +1188,16 @@ It should return something like the following: {"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} ``` +The following command imports a single metric via [Pushgateway format](https://github.com/prometheus/pushgateway#url) with `{job="my_app",instance="host123"}` labels: + +
+ +```console +curl -d 'metric{label="abc"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus/metrics/job/my_app/instance/host123' +``` + +
+ Pass `Content-Encoding: gzip` HTTP request header to `/api/v1/import/prometheus` for importing gzipped data:
@@ -1196,8 +1209,8 @@ curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428
-Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. -For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. +Extra labels may be added to all the imported metrics either via [Pushgateway format](https://github.com/prometheus/pushgateway#url) +or by passing `extra_label=name=value` query args. For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. If timestamp is missing in ` ` Prometheus exposition format line, then the current timestamp is used during data ingestion. It can be overridden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`. diff --git a/lib/protoparser/common/extra_labels.go b/lib/protoparser/common/extra_labels.go index 18e40f065..6f9e9255c 100644 --- a/lib/protoparser/common/extra_labels.go +++ b/lib/protoparser/common/extra_labels.go @@ -1,6 +1,7 @@ package common import ( + "encoding/base64" "fmt" "net/http" "strings" @@ -9,18 +10,74 @@ import ( ) // GetExtraLabels extracts name:value labels from `extra_label=name=value` query args from req. +// +// It also extracts Pushgateways-compatible extra labels from req.URL.Path +// according to https://github.com/prometheus/pushgateway#url . func GetExtraLabels(req *http.Request) ([]prompbmarshal.Label, error) { + labels, err := getPushgatewayLabels(req.URL.Path) + if err != nil { + return nil, fmt.Errorf("cannot parse pushgateway-style labels from %q: %w", req.URL.Path, err) + } q := req.URL.Query() - var result []prompbmarshal.Label for _, label := range q["extra_label"] { tmp := strings.SplitN(label, "=", 2) if len(tmp) != 2 { return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", label) } - result = append(result, prompbmarshal.Label{ + labels = append(labels, prompbmarshal.Label{ Name: tmp[0], Value: tmp[1], }) } - return result, nil + return labels, nil +} + +func getPushgatewayLabels(path string) ([]prompbmarshal.Label, error) { + n := strings.Index(path, "/metrics/job") + if n < 0 { + return nil, nil + } + s := path[n+len("/metrics/"):] + if !strings.HasPrefix(s, "job/") && !strings.HasPrefix(s, "job@base64/") { + return nil, nil + } + labelsCount := (strings.Count(s, "/") + 1) / 2 + labels := make([]prompbmarshal.Label, 0, labelsCount) + for len(s) > 0 { + n := strings.IndexByte(s, '/') + if n < 0 { + return nil, fmt.Errorf("missing value for label %q", s) + } + name := s[:n] + s = s[n+1:] + isBase64 := strings.HasSuffix(name, "@base64") + if isBase64 { + name = name[:len(name)-len("@base64")] + } + var value string + n = strings.IndexByte(s, '/') + if n < 0 { + value = s + s = "" + } else { + value = s[:n] + s = s[n+1:] + } + if isBase64 { + data, err := base64.URLEncoding.DecodeString(value) + if err != nil { + return nil, fmt.Errorf("cannot base64-decode value=%q for label=%q: %w", value, name, err) + } + value = string(data) + } + if len(value) == 0 { + // Skip labels with empty values + continue + } + labels = append(labels, prompbmarshal.Label{ + Name: name, + Value: value, + }) + } + return labels, nil } diff --git a/lib/protoparser/common/extra_labels_test.go b/lib/protoparser/common/extra_labels_test.go new file mode 100644 index 000000000..960470b24 --- /dev/null +++ b/lib/protoparser/common/extra_labels_test.go @@ -0,0 +1,91 @@ +package common + +import ( + "fmt" + "net/http" + "sort" + "strings" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestGetExtraLabelsSuccess(t *testing.T) { + f := func(requestURI, expectedLabels string) { + t.Helper() + fullURL := "http://fobar" + requestURI + req, err := http.NewRequest("GET", fullURL, nil) + if err != nil { + t.Fatalf("cannot parse %q: %s", fullURL, err) + } + extraLabels, err := GetExtraLabels(req) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + labelsStr := getLabelsString(extraLabels) + if labelsStr != expectedLabels { + t.Fatalf("unexpected labels;\ngot\n%s\nwant\n%s", labelsStr, expectedLabels) + } + } + f("", `{}`) + f("/foo/bar", `{}`) + f("/foo?extra_label=foo=bar", `{foo="bar"}`) + f("/foo?extra_label=a=x&extra_label=b=y", `{a="x",b="y"}`) + f("/metrics/job/foo", `{job="foo"}`) + f("/metrics/job/foo?extra_label=a=b", `{a="b",job="foo"}`) + f("/metrics/job/foo/b/bcd?extra_label=a=b&extra_label=qwe=rty", `{a="b",b="bcd",job="foo",qwe="rty"}`) + f("/metrics/job/titan/name/%CE%A0%CF%81%CE%BF%CE%BC%CE%B7%CE%B8%CE%B5%CF%8D%CF%82", `{job="titan",name="Προμηθεύς"}`) + f("/metrics/job/titan/name@base64/zqDPgc6_zrzOt864zrXPjc-C", `{job="titan",name="Προμηθεύς"}`) +} + +func TestGetPushgatewayLabelsSuccess(t *testing.T) { + f := func(path, expectedLabels string) { + t.Helper() + labels, err := getPushgatewayLabels(path) + if err != nil { + t.Fatalf("unexpected error in getPushgatewayLabels(%q): %s", path, err) + } + labelsStr := getLabelsString(labels) + if labelsStr != expectedLabels { + t.Fatalf("unexpected labels returned from getPushgatewayLabels(%q);\ngot\n%s\nwant\n%s", path, labelsStr, expectedLabels) + } + } + f("", "{}") + f("/foo/bar", "{}") + f("/metrics/foo/bar", "{}") + f("/metrics/job", "{}") + f("/metrics/job@base64", "{}") + f("/metrics/job/", "{}") + f("/metrics/job/foo", `{job="foo"}`) + f("/foo/metrics/job/foo", `{job="foo"}`) + f("/api/v1/import/prometheus/metrics/job/foo", `{job="foo"}`) + f("/foo/metrics/job@base64/Zm9v", `{job="foo"}`) + f("/foo/metrics/job/x/a/foo/aaa/bar", `{a="foo",aaa="bar",job="x"}`) + f("/foo/metrics/job/x/a@base64/Zm9v", `{a="foo",job="x"}`) +} + +func TestGetPushgatewayLabelsFailure(t *testing.T) { + f := func(path string) { + t.Helper() + labels, err := getPushgatewayLabels(path) + if err == nil { + labelsStr := getLabelsString(labels) + t.Fatalf("expecting non-nil error for getPushgatewayLabels(%q); got labels %s", path, labelsStr) + } + } + // missing bar value + f("/metrics/job/foo/bar") + // invalid base64 encoding for job + f("/metrics/job@base64/#$%") + // invalid base64 encoding for non-job label + f("/metrics/job/foo/bar@base64/#$%") +} + +func getLabelsString(labels []prompbmarshal.Label) string { + a := make([]string, len(labels)) + for i, label := range labels { + a[i] = fmt.Sprintf("%s=%q", label.Name, label.Value) + } + sort.Strings(a) + return "{" + strings.Join(a, ",") + "}" +}