diff --git a/README.md b/README.md index 361883f47..1fb365097 100644 --- a/README.md +++ b/README.md @@ -265,6 +265,52 @@ VictoriaMetrics also supports [importing data in Prometheus exposition format](# See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be used as drop-in replacement for Prometheus. +## How to send data from DataDog agent + +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD]() via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v1/series` path. + +Run DataDog agent with `DD_DD_URL=http://victoriametrics-host:8428/datadog` environment variable in order to write data to VictoriaMetrics at `victoriametrics-host` host. Another option is to set `dd_url` param at [DataDog agent configuration file](https://docs.datadoghq.com/agent/guide/agent-configuration-files/) to `http://victoriametrics-host:8428/datadog`. + +Example on how to send data to VictoriaMetrics via DataDog "submit metrics" API from command line: + +```bash +echo ' +{ + "series": [ + { + "host": "test.example.com", + "interval": 20, + "metric": "system.load.1", + "points": [[ + 0, + 0.5 + ]], + "tags": [ + "environment:test" + ], + "type": "rate" + } + ] +} +' | curl -X POST --data-binary @- http://localhost:8428/datadog/api/v1/series +``` + +The imported data can be read via [export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format): + +```bash +curl http://localhost:8428/api/v1/export -d 'match[]=system.load.1' +``` + +This command should return the following output if everything is OK: + +``` +{"metric":{"__name__":"system.load.1","environment":"test","host":"test.example.com"},"values":[0.5],"timestamps":[1632833641000]} +``` + +Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. +For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. + + ## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) Use `http://:8428` url instead of InfluxDB url in agents' configs. @@ -790,6 +836,7 @@ The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv Time series data can be imported via any supported ingestion protocol: * [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). See [these docs](#prometheus-setup) for details. +* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details. * InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. * Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details. * OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details. diff --git a/app/vmagent/README.md b/app/vmagent/README.md index d630d6f34..feb9cb092 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -21,6 +21,7 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did See [Quick Start](#quick-start) for details. * Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details. * Accepts data via all ingestion protocols supported by VictoriaMetrics: + * DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent). * InfluxDB line protocol via `http://:8429/write`. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf). * Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-opentsdb-compatible-agents). diff --git a/app/vmagent/datadog/request_handler.go b/app/vmagent/datadog/request_handler.go new file mode 100644 index 000000000..7f98ec955 --- /dev/null +++ b/app/vmagent/datadog/request_handler.go @@ -0,0 +1,99 @@ +package datadog + +import ( + "fmt" + "net/http" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadog"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadog"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`) +) + +// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + return writeconcurrencylimiter.Do(func() error { + ce := req.Header.Get("Content-Encoding") + return parser.ParseStream(req.Body, ce, func(series []parser.Series) error { + return insertRows(at, series, extraLabels) + }) + }) +} + +func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error { + ctx := common.GetPushCtx() + defer common.PutPushCtx(ctx) + + rowsTotal := 0 + tssDst := ctx.WriteRequest.Timeseries[:0] + labels := ctx.Labels[:0] + samples := ctx.Samples[:0] + for i := range series { + ss := &series[i] + rowsTotal += len(ss.Points) + labelsLen := len(labels) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: ss.Metric, + }) + labels = append(labels, prompbmarshal.Label{ + Name: "host", + Value: ss.Host, + }) + for _, tag := range ss.Tags { + n := strings.IndexByte(tag, ':') + if n < 0 { + return fmt.Errorf("cannot find ':' in tag %q", tag) + } + name := tag[:n] + value := tag[n+1:] + if name == "host" { + name = "exported_host" + } + labels = append(labels, prompbmarshal.Label{ + Name: name, + Value: value, + }) + } + labels = append(labels, extraLabels...) + samplesLen := len(samples) + for _, pt := range ss.Points { + samples = append(samples, prompbmarshal.Sample{ + Timestamp: pt.Timestamp(), + Value: pt.Value(), + }) + } + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[samplesLen:], + }) + } + ctx.WriteRequest.Timeseries = tssDst + ctx.Labels = labels + ctx.Samples = samples + remotewrite.PushWithAuthToken(at, &ctx.WriteRequest) + rowsInserted.Add(rowsTotal) + if at != nil { + rowsTenantInserted.Get(at).Add(rowsTotal) + } + rowsPerInsert.Update(float64(rowsTotal)) + return nil +} diff --git a/app/vmagent/main.go b/app/vmagent/main.go index d7d81ac96..7b2e02ace 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -11,6 +11,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native" @@ -224,6 +225,36 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { influxQueryRequests.Inc() influxutils.WriteDatabaseNames(w) return true + case "/datadog/api/v1/series": + datadogWriteRequests.Inc() + if err := datadog.InsertHandlerForHTTP(nil, r); err != nil { + datadogWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "/datadog/api/v1/validate": + datadogValidateRequests.Inc() + // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, `{"valid":true}`) + return true + case "/datadog/api/v1/check_run": + datadogCheckRunRequests.Inc() + // See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "/datadog/intake/": + datadogIntakeRequests.Inc() + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, `{}`) + return true case "/targets": promscrapeTargetsRequests.Inc() promscrape.WriteHumanReadableTargetsStatus(w, r) @@ -330,6 +361,35 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri influxQueryRequests.Inc() influxutils.WriteDatabaseNames(w) return true + case "datadog/api/v1/series": + datadogWriteRequests.Inc() + if err := datadog.InsertHandlerForHTTP(at, r); err != nil { + datadogWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "datadog/api/v1/validate": + datadogValidateRequests.Inc() + // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, `{"valid":true}`) + return true + case "datadog/api/v1/check_run": + datadogCheckRunRequests.Inc() + // See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "datadog/intake/": + datadogIntakeRequests.Inc() + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, `{}`) + return true default: httpserver.Errorf(w, r, "unsupported multitenant path suffix: %q", p.Suffix) return true @@ -352,10 +412,17 @@ var ( nativeimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`) nativeimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`) - influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`) - influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`) + influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/write", protocol="influx"}`) + influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/influx/write", protocol="influx"}`) - influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/query", protocol="influx"}`) + influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`) + + datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) + datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) + + datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) + datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) + datadogIntakeRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/intake/", protocol="datadog"}`) promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`) promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/targets"}`) diff --git a/app/vminsert/datadog/request_handler.go b/app/vminsert/datadog/request_handler.go new file mode 100644 index 000000000..5ae809a7c --- /dev/null +++ b/app/vminsert/datadog/request_handler.go @@ -0,0 +1,97 @@ +package datadog + +import ( + "fmt" + "net/http" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadog"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadog"}`) +) + +// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +func InsertHandlerForHTTP(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + return writeconcurrencylimiter.Do(func() error { + ce := req.Header.Get("Content-Encoding") + err := parser.ParseStream(req.Body, ce, func(series []parser.Series) error { + return insertRows(series, extraLabels) + }) + if err != nil { + return fmt.Errorf("headers: %q; err: %w", req.Header, err) + } + return nil + }) +} + +func insertRows(series []parser.Series, extraLabels []prompbmarshal.Label) error { + ctx := common.GetInsertCtx() + defer common.PutInsertCtx(ctx) + + rowsLen := 0 + for i := range series { + rowsLen += len(series[i].Points) + } + ctx.Reset(rowsLen) + rowsTotal := 0 + hasRelabeling := relabel.HasRelabeling() + for i := range series { + ss := &series[i] + rowsTotal += len(ss.Points) + ctx.Labels = ctx.Labels[:0] + ctx.AddLabel("", ss.Metric) + ctx.AddLabel("host", ss.Host) + for _, tag := range ss.Tags { + n := strings.IndexByte(tag, ':') + if n < 0 { + return fmt.Errorf("cannot find ':' in tag %q", tag) + } + name := tag[:n] + value := tag[n+1:] + if name == "host" { + name = "exported_host" + } + ctx.AddLabel(name, value) + } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } + if hasRelabeling { + ctx.ApplyRelabeling() + } + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.SortLabelsIfNeeded() + var metricNameRaw []byte + var err error + for _, pt := range ss.Points { + timestamp := pt.Timestamp() + value := pt.Value() + metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, timestamp, value) + if err != nil { + return err + } + } + } + rowsInserted.Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) + return ctx.FlushBufs() +} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 88238fa06..dabe0ecea 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -9,6 +9,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native" @@ -155,6 +156,36 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { influxQueryRequests.Inc() influxutils.WriteDatabaseNames(w) return true + case "/datadog/api/v1/series": + datadogWriteRequests.Inc() + if err := datadog.InsertHandlerForHTTP(r); err != nil { + datadogWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "/datadog/api/v1/validate": + datadogValidateRequests.Inc() + // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, `{"valid":true}`) + return true + case "/datadog/api/v1/check_run": + datadogCheckRunRequests.Inc() + // See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "/datadog/intake/": + datadogIntakeRequests.Inc() + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, `{}`) + return true case "/prometheus/targets", "/targets": promscrapeTargetsRequests.Inc() promscrape.WriteHumanReadableTargetsStatus(w, r) @@ -204,10 +235,17 @@ var ( nativeimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`) nativeimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`) - influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`) - influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`) + influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/influx/write", protocol="influx"}`) + influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/influx/write", protocol="influx"}`) - influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/query", protocol="influx"}`) + influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/influx/query", protocol="influx"}`) + + datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) + datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) + + datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) + datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) + datadogIntakeRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/intake/", protocol="datadog"}`) promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`) promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/targets"}`) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bdfe3cfe3..a94efb7d3 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,7 @@ sort: 15 ## tip +* FEATURE: add ability to accept metrics from [DataDog agent](https://docs.datadoghq.com/agent/) and [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent). This option simplifies the migration path from DataDog to VictoriaMetrics. See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/206). * FEATURE: vmagent [enterprise](https://victoriametrics.com/enterprise.html): add support for data reading from [Apache Kafka](https://kafka.apache.org/). * FEATURE: calculate quantiles in the same way as Prometheus does in such functions as [quantile_over_time](https://docs.victoriametrics.com/MetricsQL.html#quantile_over_time) and [quantile](https://docs.victoriametrics.com/MetricsQL.html#quantile). Previously results from VictoriaMetrics could be slightly different than results from Prometheus. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612) issues. * FEATURE: add `rollup_scrape_interval(m[d])` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html), which returns `min`, `max` and `avg` values for the interval between samples for `m` on the given lookbehind window `d`. diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index ee295a768..f8cc57b16 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -191,12 +191,11 @@ It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.co - `` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`, where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. - `` may have the following values: - - `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) - - `influx/write` and `influx/api/v2/write` - for inserting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). - - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). - This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. - See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details. - - `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below). + - `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). + - `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. + - `influx/write` and `influx/api/v2/write` - for inserting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. + - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details. + - `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` at `vmselect` (see below). - `prometheus/api/v1/import/native` - for importing data obtained via `api/v1/export/native` on `vmselect` (see below). - `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details. - `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details. diff --git a/docs/README.md b/docs/README.md index 361883f47..1fb365097 100644 --- a/docs/README.md +++ b/docs/README.md @@ -265,6 +265,52 @@ VictoriaMetrics also supports [importing data in Prometheus exposition format](# See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be used as drop-in replacement for Prometheus. +## How to send data from DataDog agent + +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD]() via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v1/series` path. + +Run DataDog agent with `DD_DD_URL=http://victoriametrics-host:8428/datadog` environment variable in order to write data to VictoriaMetrics at `victoriametrics-host` host. Another option is to set `dd_url` param at [DataDog agent configuration file](https://docs.datadoghq.com/agent/guide/agent-configuration-files/) to `http://victoriametrics-host:8428/datadog`. + +Example on how to send data to VictoriaMetrics via DataDog "submit metrics" API from command line: + +```bash +echo ' +{ + "series": [ + { + "host": "test.example.com", + "interval": 20, + "metric": "system.load.1", + "points": [[ + 0, + 0.5 + ]], + "tags": [ + "environment:test" + ], + "type": "rate" + } + ] +} +' | curl -X POST --data-binary @- http://localhost:8428/datadog/api/v1/series +``` + +The imported data can be read via [export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format): + +```bash +curl http://localhost:8428/api/v1/export -d 'match[]=system.load.1' +``` + +This command should return the following output if everything is OK: + +``` +{"metric":{"__name__":"system.load.1","environment":"test","host":"test.example.com"},"values":[0.5],"timestamps":[1632833641000]} +``` + +Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. +For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. + + ## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) Use `http://:8428` url instead of InfluxDB url in agents' configs. @@ -790,6 +836,7 @@ The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv Time series data can be imported via any supported ingestion protocol: * [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). See [these docs](#prometheus-setup) for details. +* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details. * InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. * Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details. * OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 106c5ca66..eff296f7b 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -269,6 +269,52 @@ VictoriaMetrics also supports [importing data in Prometheus exposition format](# See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be used as drop-in replacement for Prometheus. +## How to send data from DataDog agent + +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD]() via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v1/series` path. + +Run DataDog agent with `DD_DD_URL=http://victoriametrics-host:8428/datadog` environment variable in order to write data to VictoriaMetrics at `victoriametrics-host` host. Another option is to set `dd_url` param at [DataDog agent configuration file](https://docs.datadoghq.com/agent/guide/agent-configuration-files/) to `http://victoriametrics-host:8428/datadog`. + +Example on how to send data to VictoriaMetrics via DataDog "submit metrics" API from command line: + +```bash +echo ' +{ + "series": [ + { + "host": "test.example.com", + "interval": 20, + "metric": "system.load.1", + "points": [[ + 0, + 0.5 + ]], + "tags": [ + "environment:test" + ], + "type": "rate" + } + ] +} +' | curl -X POST --data-binary @- http://localhost:8428/datadog/api/v1/series +``` + +The imported data can be read via [export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format): + +```bash +curl http://localhost:8428/api/v1/export -d 'match[]=system.load.1' +``` + +This command should return the following output if everything is OK: + +``` +{"metric":{"__name__":"system.load.1","environment":"test","host":"test.example.com"},"values":[0.5],"timestamps":[1632833641000]} +``` + +Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. +For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. + + ## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) Use `http://:8428` url instead of InfluxDB url in agents' configs. @@ -794,6 +840,7 @@ The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv Time series data can be imported via any supported ingestion protocol: * [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). See [these docs](#prometheus-setup) for details. +* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details. * InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. * Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details. * OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details. diff --git a/docs/vmagent.md b/docs/vmagent.md index 66b5398b3..3e1426fa7 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -25,6 +25,7 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did See [Quick Start](#quick-start) for details. * Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details. * Accepts data via all ingestion protocols supported by VictoriaMetrics: + * DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent). * InfluxDB line protocol via `http://:8429/write`. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf). * Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-opentsdb-compatible-agents). diff --git a/lib/protoparser/common/gzip_reader.go b/lib/protoparser/common/compress_reader.go similarity index 55% rename from lib/protoparser/common/gzip_reader.go rename to lib/protoparser/common/compress_reader.go index 8830a8335..a44dda0e9 100644 --- a/lib/protoparser/common/gzip_reader.go +++ b/lib/protoparser/common/compress_reader.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/zlib" ) // GetGzipReader returns new gzip reader from the pool. @@ -29,3 +30,24 @@ func PutGzipReader(zr *gzip.Reader) { } var gzipReaderPool sync.Pool + +// GetZlibReader returns zlib reader. +func GetZlibReader(r io.Reader) (io.ReadCloser, error) { + v := zlibReaderPool.Get() + if v == nil { + return zlib.NewReader(r) + } + zr := v.(io.ReadCloser) + if err := zr.(zlib.Resetter).Reset(r, nil); err != nil { + return nil, err + } + return zr, nil +} + +// PutZlibReader returns back zlib reader obtained via GetZlibReader. +func PutZlibReader(zr io.ReadCloser) { + _ = zr.Close() + zlibReaderPool.Put(zr) +} + +var zlibReaderPool sync.Pool diff --git a/lib/protoparser/datadog/parser.go b/lib/protoparser/datadog/parser.go new file mode 100644 index 000000000..ad539f293 --- /dev/null +++ b/lib/protoparser/datadog/parser.go @@ -0,0 +1,73 @@ +package datadog + +import ( + "encoding/json" + "fmt" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// Request represents DataDog POST request to /api/v1/series +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +type Request struct { + Series []Series `json:"series"` +} + +func (req *Request) reset() { + req.Series = req.Series[:0] +} + +// Unmarshal unmarshals DataDog /api/v1/series request body from b to req. +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +// +// b shouldn't be modified when req is in use. +func (req *Request) Unmarshal(b []byte) error { + req.reset() + if err := json.Unmarshal(b, req); err != nil { + return fmt.Errorf("cannot unmarshal %q: %w", b, err) + } + // Set missing timestamps to the current time. + currentTimestamp := float64(fasttime.UnixTimestamp()) + series := req.Series + for i := range series { + points := series[i].Points + for j := range points { + if points[j][0] <= 0 { + points[j][0] = currentTimestamp + } + } + } + return nil +} + +// Series represents a series item from DataDog POST request to /api/v1/series +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +type Series struct { + Host string `json:"host"` + + // Do not decode Interval, since it isn't used by VictoriaMetrics + // Interval int64 `json:"interval"` + + Metric string `json:"metric"` + Points []Point `json:"points"` + Tags []string `json:"tags"` + + // Do not decode Type, since it isn't used by VictoriaMetrics + // Type string `json:"type"` +} + +// Point represents a point from DataDog POST request to /api/v1/series +type Point [2]float64 + +// Timestamp returns timestamp in milliseconds from the given pt. +func (pt *Point) Timestamp() int64 { + return int64(pt[0] * 1000) +} + +// Value returns value from the given pt. +func (pt *Point) Value() float64 { + return pt[1] +} diff --git a/lib/protoparser/datadog/parser_test.go b/lib/protoparser/datadog/parser_test.go new file mode 100644 index 000000000..4cacf9721 --- /dev/null +++ b/lib/protoparser/datadog/parser_test.go @@ -0,0 +1,66 @@ +package datadog + +import ( + "reflect" + "testing" +) + +func TestRequestUnmarshalFailure(t *testing.T) { + f := func(s string) { + t.Helper() + var req Request + if err := req.Unmarshal([]byte(s)); err == nil { + t.Fatalf("expecting non-nil error for Unmarshal(%q)", s) + } + } + f("") + f("foobar") + f(`{"series":123`) + f(`1234`) + f(`[]`) +} + +func TestRequestUnmarshalSuccess(t *testing.T) { + f := func(s string, reqExpected *Request) { + t.Helper() + var req Request + if err := req.Unmarshal([]byte(s)); err != nil { + t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err) + } + if !reflect.DeepEqual(&req, reqExpected) { + t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &req, reqExpected) + } + } + f("{}", &Request{}) + f(` +{ + "series": [ + { + "host": "test.example.com", + "interval": 20, + "metric": "system.load.1", + "points": [[ + 1575317847, + 0.5 + ]], + "tags": [ + "environment:test" + ], + "type": "rate" + } + ] +} +`, &Request{ + Series: []Series{{ + Host: "test.example.com", + Metric: "system.load.1", + Points: []Point{{ + 1575317847, + 0.5, + }}, + Tags: []string{ + "environment:test", + }, + }}, + }) +} diff --git a/lib/protoparser/datadog/parser_timing_test.go b/lib/protoparser/datadog/parser_timing_test.go new file mode 100644 index 000000000..f3c2b568a --- /dev/null +++ b/lib/protoparser/datadog/parser_timing_test.go @@ -0,0 +1,39 @@ +package datadog + +import ( + "fmt" + "testing" +) + +func BenchmarkRequestUnmarshal(b *testing.B) { + reqBody := []byte(`{ + "series": [ + { + "host": "test.example.com", + "interval": 20, + "metric": "system.load.1", + "points": [ + 1575317847, + 0.5 + ], + "tags": [ + "environment:test" + ], + "type": "rate" + } + ] +}`) + b.SetBytes(int64(len(reqBody))) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var req Request + for pb.Next() { + if err := req.Unmarshal(reqBody); err != nil { + panic(fmt.Errorf("unexpected error: %w", err)) + } + if len(req.Series) != 1 { + panic(fmt.Errorf("unexpected number of series unmarshaled: got %d; want 4", len(req.Series))) + } + } + }) +} diff --git a/lib/protoparser/datadog/streamparser.go b/lib/protoparser/datadog/streamparser.go new file mode 100644 index 000000000..ace0165b0 --- /dev/null +++ b/lib/protoparser/datadog/streamparser.go @@ -0,0 +1,138 @@ +package datadog + +import ( + "bufio" + "fmt" + "io" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/metrics" +) + +// The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +var maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series") + +// ParseStream parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request. +// +// callback shouldn't hold series after returning. +func ParseStream(r io.Reader, contentEncoding string, callback func(series []Series) error) error { + switch contentEncoding { + case "gzip": + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped DataDog data: %w", err) + } + defer common.PutGzipReader(zr) + r = zr + case "deflate": + zlr, err := common.GetZlibReader(r) + if err != nil { + return fmt.Errorf("cannot read deflated DataDog data: %w", err) + } + defer common.PutZlibReader(zlr) + r = zlr + } + ctx := getPushCtx(r) + defer putPushCtx(ctx) + if err := ctx.Read(); err != nil { + return err + } + req := getRequest() + defer putRequest(req) + if err := req.Unmarshal(ctx.reqBuf.B); err != nil { + unmarshalErrors.Inc() + return fmt.Errorf("cannot unmarshal DataDog POST request with size %d bytes: %s", len(ctx.reqBuf.B), err) + } + rows := 0 + series := req.Series + for i := range series { + rows += len(series[i].Points) + } + rowsRead.Add(rows) + + if err := callback(series); err != nil { + return fmt.Errorf("error when processing imported data: %w", err) + } + return nil +} + +type pushCtx struct { + br *bufio.Reader + reqBuf bytesutil.ByteBuffer +} + +func (ctx *pushCtx) reset() { + ctx.br.Reset(nil) + ctx.reqBuf.Reset() +} + +func (ctx *pushCtx) Read() error { + readCalls.Inc() + lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1) + startTime := fasttime.UnixTimestamp() + reqLen, err := ctx.reqBuf.ReadFrom(lr) + if err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + } + if reqLen > int64(maxInsertRequestSize.N) { + readErrors.Inc() + return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) + } + return nil +} + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadog"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadog"}`) + rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadog"}`) + unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadog"}`) +) + +func getPushCtx(r io.Reader) *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + ctx.br.Reset(r) + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) + ctx.br.Reset(r) + return ctx + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), + } + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) + +func getRequest() *Request { + v := requestPool.Get() + if v == nil { + return &Request{} + } + return v.(*Request) +} + +func putRequest(req *Request) { + requestPool.Put(req) +} + +var requestPool sync.Pool