diff --git a/README.md b/README.md index 93c42e4c6..d107238ca 100644 --- a/README.md +++ b/README.md @@ -847,6 +847,74 @@ The `/api/v1/export` endpoint should return the following response: Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. +## How to send data from NewRelic agent + +VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) +at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. +NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). + +NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. +It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. + +To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: +```console +COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +``` + +### NewRelic agent data mapping + +As example, lets create `newrelic.json` file with the following content: +```json +[ + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } + ] +``` + +Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: + +```console +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +``` + +If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics +in vmui via query `{__name__!=""}`: +```console +system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 +system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 +system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 +system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 +system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 +system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 +system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 +system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 +system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 +``` + +The fields in `newrelic.json` are transformed in the following way: +1. `eventType` filed is used as prefix for all metrics in the object; +2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; +3. the rest fields with numeric values will be used as metrics; +4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, +current time is used. + + ## Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 32c11614b..c9bcd6f22 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -18,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/newrelic" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentelemetry" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp" @@ -319,6 +320,29 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusOK) return true + case "/newrelic/api/v1": + newrelicCheckRequest.Inc() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "/newrelic/api/v1/inventory/deltas": + newrelicInventoryRequests.Inc() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) + return true + case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + newrelicWriteRequests.Inc() + if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil { + newrelicWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true case "/datadog/api/v1/series": datadogWriteRequests.Inc() if err := datadog.InsertHandlerForHTTP(nil, r); err != nil { @@ -519,6 +543,29 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri } w.WriteHeader(http.StatusOK) return true + case "/newrelic/api/v1": + newrelicCheckRequest.Inc() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "/newrelic/api/v1/inventory/deltas": + newrelicInventoryRequests.Inc() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) + return true + case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + newrelicWriteRequests.Inc() + if err := newrelic.InsertHandlerForHTTP(at, r); err != nil { + newrelicWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true case "datadog/api/v1/series": datadogWriteRequests.Inc() if err := datadog.InsertHandlerForHTTP(at, r); err != nil { @@ -591,6 +638,12 @@ var ( opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) + newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + + newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic/inventory/deltas", protocol="newrelic"}`) + newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic", protocol="newrelic"}`) + promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`) diff --git a/app/vmagent/newrelic/request_handler.go b/app/vmagent/newrelic/request_handler.go new file mode 100644 index 000000000..378476405 --- /dev/null +++ b/app/vmagent/newrelic/request_handler.go @@ -0,0 +1,79 @@ +package newrelic + +import ( + "net/http" + + "github.com/VictoriaMetrics/metrics" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="newrelic"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="newrelic"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="newrelic"}`) +) + +// InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request. +func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + ce := req.Header.Get("Content-Encoding") + isGzip := ce == "gzip" + return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { + return insertRows(at, series, extraLabels) + }) +} + +func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { + ctx := common.GetPushCtx() + defer common.PutPushCtx(ctx) + + rowsTotal := 0 + tssDst := ctx.WriteRequest.Timeseries[:0] + labels := ctx.Labels[:0] + samples := ctx.Samples[:0] + for i := range rows { + r := &rows[i] + labelsLen := len(labels) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: r.Metric, + }) + for j := range r.Tags { + tag := &r.Tags[j] + labels = append(labels, prompbmarshal.Label{ + Name: tag.Key, + Value: tag.Value, + }) + } + samples = append(samples, prompbmarshal.Sample{ + Value: r.Value, + Timestamp: r.Timestamp, + }) + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[len(samples)-1:], + }) + labels = append(labels, extraLabels...) + } + ctx.WriteRequest.Timeseries = tssDst + ctx.Labels = labels + ctx.Samples = samples + remotewrite.Push(at, &ctx.WriteRequest) + rowsInserted.Add(len(rows)) + if at != nil { + rowsTenantInserted.Get(at).Add(rowsTotal) + } + rowsPerInsert.Update(float64(len(rows))) + return nil +} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index ee86f0eb5..61c1c30f5 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -9,12 +9,15 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/metrics" + vminsertCommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/newrelic" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentelemetry" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp" @@ -36,7 +39,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/metrics" ) var ( @@ -220,6 +222,29 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusOK) return true + case "/newrelic/api/v1": + newrelicCheckRequest.Inc() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "/newrelic/api/v1/inventory/deltas": + newrelicInventoryRequests.Inc() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) + return true + case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + newrelicWriteRequests.Inc() + if err := newrelic.InsertHandlerForHTTP(r); err != nil { + newrelicWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true case "/datadog/api/v1/series": datadogWriteRequests.Inc() if err := datadog.InsertHandlerForHTTP(r); err != nil { @@ -357,6 +382,12 @@ var ( opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) + newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + + newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic/inventory/deltas", protocol="newrelic"}`) + newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic", protocol="newrelic"}`) + promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vm_http_requests_total{path="/service-discovery"}`) diff --git a/app/vminsert/newrelic/request_handler.go b/app/vminsert/newrelic/request_handler.go new file mode 100644 index 000000000..ae8530d56 --- /dev/null +++ b/app/vminsert/newrelic/request_handler.go @@ -0,0 +1,67 @@ +package newrelic + +import ( + "net/http" + + "github.com/VictoriaMetrics/metrics" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic/stream" +) + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="newrelic"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="newrelic"}`) +) + +// InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request. +func InsertHandlerForHTTP(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + ce := req.Header.Get("Content-Encoding") + isGzip := ce == "gzip" + return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { + return insertRows(series, extraLabels) + }) +} + +func insertRows(rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { + ctx := common.GetInsertCtx() + defer common.PutInsertCtx(ctx) + + ctx.Reset(len(rows)) + hasRelabeling := relabel.HasRelabeling() + for i := range rows { + r := &rows[i] + ctx.Labels = ctx.Labels[:0] + ctx.AddLabel("", r.Metric) + for j := range r.Tags { + tag := &r.Tags[j] + ctx.AddLabel(tag.Key, tag.Value) + } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } + if hasRelabeling { + ctx.ApplyRelabeling() + } + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.SortLabelsIfNeeded() + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { + return err + } + } + rowsInserted.Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) + return ctx.FlushBufs() +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f56af1602..dfbfc72dc 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -28,6 +28,7 @@ The sandbox cluster installation is running under the constant load generated by ## tip +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support data ingestion from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-newrelic-agent), [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712). * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-filestream.disableFadvise` command-line flag, which can be used for disabling `fadvise` syscall during backup upload to the remote storage. By default `vmbackup` uses `fadvise` syscall in order to prevent from eviction of recently accessed data from the [OS page cache](https://en.wikipedia.org/wiki/Page_cache) when backing up large files. Sometimes the `fadvise` syscall may take significant amounts of CPU when the backup is performed with large value of `-concurrency` command-line flag on systems with big number of CPU cores. In this case it is better to manually disable `fadvise` syscall by passing `-filestream.disableFadvise` command-line flag to `vmbackup`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120) for details. * FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): account for `vmauth` component for alerts `ServiceDown` and `TooManyRestarts`. diff --git a/docs/README.md b/docs/README.md index 142aa9f9c..7b740e68e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -850,6 +850,74 @@ The `/api/v1/export` endpoint should return the following response: Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. +## How to send data from NewRelic agent + +VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) +at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. +NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). + +NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. +It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. + +To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: +```console +COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +``` + +### NewRelic agent data mapping + +As example, lets create `newrelic.json` file with the following content: +```json +[ + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } + ] +``` + +Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: + +```console +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +``` + +If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics +in vmui via query `{__name__!=""}`: +```console +system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 +system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 +system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 +system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 +system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 +system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 +system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 +system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 +system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 +``` + +The fields in `newrelic.json` are transformed in the following way: +1. `eventType` filed is used as prefix for all metrics in the object; +2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; +3. the rest fields with numeric values will be used as metrics; +4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, +current time is used. + + ## Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index ee66d9ed5..d0f8da9ae 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -858,6 +858,74 @@ The `/api/v1/export` endpoint should return the following response: Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. +## How to send data from NewRelic agent + +VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) +at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. +NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). + +NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. +It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. + +To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: +```console +COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +``` + +### NewRelic agent data mapping + +As example, lets create `newrelic.json` file with the following content: +```json +[ + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } + ] +``` + +Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: + +```console +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +``` + +If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics +in vmui via query `{__name__!=""}`: +```console +system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 +system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 +system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 +system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 +system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 +system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 +system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 +system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 +system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 +``` + +The fields in `newrelic.json` are transformed in the following way: +1. `eventType` filed is used as prefix for all metrics in the object; +2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; +3. the rest fields with numeric values will be used as metrics; +4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, +current time is used. + + ## Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): diff --git a/lib/protoparser/newrelic/parser.go b/lib/protoparser/newrelic/parser.go new file mode 100644 index 000000000..e6901949f --- /dev/null +++ b/lib/protoparser/newrelic/parser.go @@ -0,0 +1,246 @@ +package newrelic + +import ( + "fmt" + "sync" + "unicode" + + "github.com/valyala/fastjson" + "github.com/valyala/fastjson/fastfloat" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" +) + +var baseEventKeys = map[string]struct{}{ + "timestamp": {}, "eventType": {}, +} + +type tagsBuffer struct { + tags []Tag +} + +var tagsPool = sync.Pool{ + New: func() interface{} { + return &tagsBuffer{tags: make([]Tag, 0)} + }, +} + +// NewRelic agent sends next struct to the collector +// MetricPost entity item for the HTTP post to be sent to the ingest service. +// type MetricPost struct { +// ExternalKeys []string `json:"ExternalKeys,omitempty"` +// EntityID uint64 `json:"EntityID,omitempty"` +// IsAgent bool `json:"IsAgent"` +// Events []json.RawMessage `json:"Events"` +// // Entity ID of the reporting agent, which will = EntityID when IsAgent == true. +// // The field is required in the backend for host metadata matching of the remote entities +// ReportingAgentID uint64 `json:"ReportingAgentID,omitempty"` +// } +// We are using only Events field because it contains all needed metrics + +// Events represents Metrics collected from NewRelic MetricPost request +// https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events +type Events struct { + Metrics []Metric +} + +// Unmarshal takes fastjson.Value and collects Metrics +func (e *Events) Unmarshal(v []*fastjson.Value) error { + for _, value := range v { + events := value.Get("Events") + if events == nil { + return fmt.Errorf("got empty Events array from request") + } + eventsArr, err := events.Array() + if err != nil { + return fmt.Errorf("error collect events: %s", err) + } + + for _, event := range eventsArr { + metricData, err := event.Object() + if err != nil { + return fmt.Errorf("error get metric data: %s", err) + } + var m Metric + metrics, err := m.unmarshal(metricData) + if err != nil { + return fmt.Errorf("error collect metrics from Newrelic json: %s", err) + } + e.Metrics = append(e.Metrics, metrics...) + } + } + + return nil +} + +// Metric represents VictoriaMetrics metrics +type Metric struct { + Timestamp int64 + Tags []Tag + Metric string + Value float64 +} + +func (m *Metric) unmarshal(o *fastjson.Object) ([]Metric, error) { + m.reset() + + tgsBuffer := tagsPool.Get().(*tagsBuffer) + defer func() { + tgsBuffer.tags = tgsBuffer.tags[:0] + tagsPool.Put(tgsBuffer) + }() + + metrics := make([]Metric, 0, o.Len()) + rawTs := o.Get("timestamp") + if rawTs != nil { + ts, err := getFloat64(rawTs) + if err != nil { + return nil, fmt.Errorf("invalid `timestamp` in %s: %w", o, err) + } + m.Timestamp = int64(ts * 1e3) + } else { + // Allow missing timestamp. It should be automatically populated + // with the current time by the caller. + m.Timestamp = 0 + } + + eventType := o.Get("eventType") + if eventType == nil { + return nil, fmt.Errorf("error get eventType from Events object: %s", o) + } + prefix := bytesutil.ToUnsafeString(eventType.GetStringBytes()) + prefix = camelToSnakeCase(prefix) + + o.Visit(func(key []byte, v *fastjson.Value) { + + k := bytesutil.ToUnsafeString(key) + // skip base event keys which should have been parsed before this + if _, ok := baseEventKeys[k]; ok { + return + } + + switch v.Type() { + case fastjson.TypeString: + // this is label-value pair + value := v.Get() + if value == nil { + logger.Errorf("failed to get label value from NewRelic json: %s", v) + return + } + name := camelToSnakeCase(k) + val := bytesutil.ToUnsafeString(value.GetStringBytes()) + tgsBuffer.tags = append(tgsBuffer.tags, Tag{Key: name, Value: val}) + case fastjson.TypeNumber: + // this is metric name with value + metricName := camelToSnakeCase(k) + if prefix != "" { + metricName = fmt.Sprintf("%s_%s", prefix, metricName) + } + f, err := getFloat64(v) + if err != nil { + logger.Errorf("failed to get value for NewRelic metric %q: %w", k, err) + return + } + metrics = append(metrics, Metric{Metric: metricName, Value: f}) + default: + // unknown type + logger.Errorf("got unsupported NewRelic json %s field type: %s", v, v.Type()) + return + } + }) + + for i := range metrics { + metrics[i].Timestamp = m.Timestamp + metrics[i].Tags = tgsBuffer.tags + } + + return metrics, nil +} + +func (m *Metric) reset() { + m.Timestamp = 0 + m.Tags = nil + m.Metric = "" + m.Value = 0 +} + +// Tag is an NewRelic tag. +type Tag struct { + Key string + Value string +} + +func camelToSnakeCase(str string) string { + str = promrelabel.SanitizeLabelName(str) + length := len(str) + snakeCase := make([]byte, 0, length*2) + tokens := make([]byte, 0, length) + var allTokensUpper bool + + flush := func(tokens []byte) { + for _, c := range tokens { + snakeCase = append(snakeCase, byte(unicode.ToLower(rune(c)))) + } + } + + for i := 0; i < length; i++ { + char := str[i] + if unicode.IsUpper(rune(char)) { + switch { + case len(tokens) == 0: + allTokensUpper = true + tokens = append(tokens, char) + case allTokensUpper: + tokens = append(tokens, char) + default: + flush(tokens) + snakeCase = append(snakeCase, '_') + tokens = tokens[:0] + tokens = append(tokens, char) + allTokensUpper = true + } + continue + } + + switch { + case len(tokens) == 1: + tokens = append(tokens, char) + allTokensUpper = false + case allTokensUpper: + tail := tokens[:len(tokens)-1] + last := tokens[len(tokens)-1:] + flush(tail) + snakeCase = append(snakeCase, '_') + tokens = tokens[:0] + tokens = append(tokens, last...) + tokens = append(tokens, char) + allTokensUpper = false + default: + tokens = append(tokens, char) + } + } + + if len(tokens) > 0 { + flush(tokens) + } + s := bytesutil.ToUnsafeString(snakeCase) + return s +} + +func getFloat64(v *fastjson.Value) (float64, error) { + switch v.Type() { + case fastjson.TypeNumber: + return v.Float64() + case fastjson.TypeString: + vStr, _ := v.StringBytes() + vFloat, err := fastfloat.Parse(bytesutil.ToUnsafeString(vStr)) + if err != nil { + return 0, fmt.Errorf("cannot parse value %q: %w", vStr, err) + } + return vFloat, nil + default: + return 0, fmt.Errorf("value doesn't contain float64; it contains %s", v.Type()) + } +} diff --git a/lib/protoparser/newrelic/parser_test.go b/lib/protoparser/newrelic/parser_test.go new file mode 100644 index 000000000..2daded2dd --- /dev/null +++ b/lib/protoparser/newrelic/parser_test.go @@ -0,0 +1,174 @@ +package newrelic + +import ( + "reflect" + "strings" + "testing" + + "github.com/valyala/fastjson" +) + +func TestEvents_Unmarshal(t *testing.T) { + tests := []struct { + name string + metrics []Metric + json string + wantErr bool + }{ + { + name: "empty json", + metrics: []Metric{}, + json: "", + wantErr: true, + }, + { + name: "json with correct data", + metrics: []Metric{ + { + Timestamp: 1690286061000, + Tags: []Tag{ + {Key: "entity_key", Value: "macbook-pro.local"}, + {Key: "dc", Value: "1"}, + }, + Metric: "system_sample_disk_writes_per_second", + Value: 0, + }, + { + Timestamp: 1690286061000, + Tags: []Tag{ + {Key: "entity_key", Value: "macbook-pro.local"}, + {Key: "dc", Value: "1"}, + }, + Metric: "system_sample_uptime", + Value: 762376, + }, + }, + json: `[ + { + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "dc": "1", + "diskWritesPerSecond":0, + "uptime":762376 + } + ], + "ReportingAgentID":28257883748326179 + } + ]`, + wantErr: false, + }, + { + name: "empty array in json", + metrics: []Metric{}, + json: `[]`, + wantErr: false, + }, + { + name: "empty events in json", + metrics: []Metric{}, + json: `[ + { + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[], + "ReportingAgentID":28257883748326179 + } + ]`, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &Events{Metrics: []Metric{}} + + value, err := fastjson.Parse(tt.json) + if (err != nil) != tt.wantErr { + t.Errorf("cannot parse json error: %s", err) + } + + if value != nil { + v, err := value.Array() + if err != nil { + t.Errorf("cannot get array from json") + } + if err := e.Unmarshal(v); (err != nil) != tt.wantErr { + t.Errorf("Unmarshal() error = %v, wantErr %v", err, tt.wantErr) + } + if !reflect.DeepEqual(e.Metrics, tt.metrics) { + t.Errorf("got metrics => %v; expected = %v", e.Metrics, tt.metrics) + } + } + }) + } +} + +func Test_camelToSnakeCase(t *testing.T) { + tests := []struct { + name string + str string + want string + }{ + { + name: "empty string", + str: "", + want: "", + }, + { + name: "lowercase all chars", + str: "somenewstring", + want: "somenewstring", + }, + { + name: "first letter uppercase", + str: "Teststring", + want: "teststring", + }, + { + name: "two uppercase letters", + str: "TestString", + want: "test_string", + }, + { + name: "first and last uppercase letters", + str: "TeststrinG", + want: "teststrin_g", + }, + { + name: "three letters uppercase", + str: "TestStrinG", + want: "test_strin_g", + }, + { + name: "has many upper case letters", + str: "ProgressIOTime", + want: "progress_io_time", + }, + { + name: "last all uppercase letters", + str: "ProgressTSDB", + want: "progress_tsdb", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := camelToSnakeCase(tt.str); got != tt.want { + t.Errorf("camelToSnakeCase() = %v, want %v", got, tt.want) + } + }) + } +} + +func BenchmarkCameToSnake(b *testing.B) { + b.ReportAllocs() + str := strings.Repeat("ProgressIOTime", 20) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + camelToSnakeCase(str) + } + }) +} diff --git a/lib/protoparser/newrelic/parser_timing_test.go b/lib/protoparser/newrelic/parser_timing_test.go new file mode 100644 index 000000000..94e168a66 --- /dev/null +++ b/lib/protoparser/newrelic/parser_timing_test.go @@ -0,0 +1,77 @@ +package newrelic + +import ( + "testing" + + "github.com/valyala/fastjson" +) + +func BenchmarkRequestUnmarshal(b *testing.B) { + reqBody := `[ + { + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125, + "memoryTotalBytes":17179869184, + "memoryFreeBytes":3782705152, + "memoryUsedBytes":13397164032, + "memoryFreePercent":22.01824188232422, + "memoryUsedPercent":77.98175811767578, + "memoryCachedBytes":0, + "memorySlabBytes":0, + "memorySharedBytes":0, + "memoryKernelFree":89587712, + "swapTotalBytes":7516192768, + "swapFreeBytes":1737293824, + "swapUsedBytes":5778898944, + "diskUsedBytes":0, + "diskUsedPercent":0, + "diskFreeBytes":0, + "diskFreePercent":0, + "diskTotalBytes":0, + "diskUtilizationPercent":0, + "diskReadUtilizationPercent":0, + "diskWriteUtilizationPercent":0, + "diskReadsPerSecond":0, + "diskWritesPerSecond":0, + "uptime":762376 + } + ], + "ReportingAgentID":28257883748326179 + } + ]` + b.SetBytes(int64(len(reqBody))) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + value, err := fastjson.Parse(reqBody) + if err != nil { + b.Errorf("cannot parse json error: %s", err) + } + v, err := value.Array() + if err != nil { + b.Errorf("cannot get array from json") + } + for pb.Next() { + e := &Events{Metrics: []Metric{}} + if err := e.Unmarshal(v); err != nil { + b.Errorf("Unmarshal() error = %v", err) + } + if len(e.Metrics) == 0 { + b.Errorf("metrics should have at least one element") + } + } + }) +} diff --git a/lib/protoparser/newrelic/stream/push_context.go b/lib/protoparser/newrelic/stream/push_context.go new file mode 100644 index 000000000..4dbb7707c --- /dev/null +++ b/lib/protoparser/newrelic/stream/push_context.go @@ -0,0 +1,80 @@ +package stream + +import ( + "bufio" + "fmt" + "io" + "sync" + + "github.com/VictoriaMetrics/metrics" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" +) + +var ( + maxInsertRequestSize = flagutil.NewBytes("newrelic.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single NewRelic POST request to /infra/v2/metrics/events/bulk") +) + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="newrelic"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="newrelic"}`) + unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) +) + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) + +type pushCtx struct { + br *bufio.Reader + reqBuf bytesutil.ByteBuffer +} + +func (ctx *pushCtx) Read() error { + readCalls.Inc() + lr := io.LimitReader(ctx.br, maxInsertRequestSize.N+1) + startTime := fasttime.UnixTimestamp() + reqLen, err := ctx.reqBuf.ReadFrom(lr) + if err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + } + if reqLen > maxInsertRequestSize.N { + readErrors.Inc() + return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) + } + return nil +} + +func (ctx *pushCtx) reset() { + ctx.br.Reset(nil) + ctx.reqBuf.Reset() +} + +func getPushCtx(r io.Reader) *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + ctx.br.Reset(r) + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) + ctx.br.Reset(r) + return ctx + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), + } + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} diff --git a/lib/protoparser/newrelic/stream/streamparser.go b/lib/protoparser/newrelic/stream/streamparser.go new file mode 100644 index 000000000..880212444 --- /dev/null +++ b/lib/protoparser/newrelic/stream/streamparser.go @@ -0,0 +1,73 @@ +package stream + +import ( + "fmt" + "io" + + "github.com/valyala/fastjson" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" +) + +var parserPool fastjson.ParserPool + +// Parse parses NewRelic POST request for newrelic/infra/v2/metrics/events/bulk from reader and calls callback for the parsed request. +// +// callback shouldn't hold series after returning. +func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) error) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + + if isGzip { + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped Newrelic agent data: %w", err) + } + defer common.PutGzipReader(zr) + r = zr + } + + ctx := getPushCtx(r) + defer putPushCtx(ctx) + if err := ctx.Read(); err != nil { + return err + } + + p := parserPool.Get() + defer parserPool.Put(p) + + v, err := p.ParseBytes(ctx.reqBuf.B) + if err != nil { + return fmt.Errorf("cannot parse NewRelic POST request with size %d bytes: %w", len(ctx.reqBuf.B), err) + } + + metricsPost, err := v.Array() + if err != nil { + return fmt.Errorf("cannot fetch data from Newrelic POST request: %w", err) + } + + var events newrelic.Events + + if err := events.Unmarshal(metricsPost); err != nil { + unmarshalErrors.Inc() + return fmt.Errorf("cannot unmarshal NewRelic POST request: %w", err) + } + + // Fill in missing timestamps + currentTimestamp := int64(fasttime.UnixTimestamp()) + for i := range events.Metrics { + m := &events.Metrics[i] + if m.Timestamp == 0 { + m.Timestamp = currentTimestamp * 1e3 + } + } + + if err := callback(events.Metrics); err != nil { + return fmt.Errorf("error when processing imported data: %w", err) + } + return nil +}