add datadog /api/v2/series and /api/beta/sketches support (#5094)

Co-authored-by: Andrew Chubatiuk <andrew.chubatiuk@motional.com>
Co-authored-by: Nikolay <https://github.com/f41gh7>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
This commit is contained in:
Andrii Chubatiuk 2023-11-28 15:52:29 +02:00 committed by GitHub
parent 2291958648
commit 543f218fe9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 3445 additions and 349 deletions

View File

@ -8,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
@ -20,7 +19,7 @@ var (
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series, /api/v2/series, /api/beta/sketches request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
@ -28,66 +27,23 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, func(series []datadog.Series) error {
return insertRows(at, series, extraLabels)
})
return stream.Parse(
req, func(series prompbmarshal.TimeSeries) error {
series.Labels = append(series.Labels, extraLabels...)
return insertRows(at, series)
},
)
}
func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmarshal.Label) error {
func insertRows(at *auth.Token, series prompbmarshal.TimeSeries) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
rowsTotal := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: ss.Metric,
})
if ss.Host != "" {
labels = append(labels, prompbmarshal.Label{
Name: "host",
Value: ss.Host,
})
}
if ss.Device != "" {
labels = append(labels, prompbmarshal.Label{
Name: "device",
Value: ss.Device,
})
}
for _, tag := range ss.Tags {
name, value := datadog.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
labels = append(labels, prompbmarshal.Label{
Name: name,
Value: value,
})
}
labels = append(labels, extraLabels...)
samplesLen := len(samples)
for _, pt := range ss.Points {
samples = append(samples, prompbmarshal.Sample{
Timestamp: pt.Timestamp(),
Value: pt.Value(),
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
rowsTotal := len(series.Samples)
ctx.WriteRequest.Timeseries = []prompbmarshal.TimeSeries{series}
ctx.Labels = series.Labels
ctx.Samples = series.Samples
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}

View File

@ -343,9 +343,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v1/series":
datadogWriteRequests.Inc()
datadogWriteSeriesV1Requests.Inc()
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
datadogWriteErrors.Inc()
datadogWriteSeriesV1Errors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
@ -354,6 +354,27 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v2/series":
datadogWriteSeriesV2Requests.Inc()
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
datadogWriteSeriesV2Errors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"errors":[]}`)
return true
case "/datadog/api/beta/sketches":
datadogWriteSketchesBetaRequests.Inc()
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
datadogWriteSketchesBetaErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(202)
return true
case "/datadog/api/v1/validate":
datadogValidateRequests.Inc()
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
@ -566,9 +587,9 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v1/series":
datadogWriteRequests.Inc()
datadogWriteSeriesV1Requests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
datadogWriteErrors.Inc()
datadogWriteSeriesV1Errors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
@ -576,6 +597,26 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v2/series":
datadogWriteSeriesV2Requests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
datadogWriteSeriesV2Errors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
w.WriteHeader(202)
fmt.Fprintf(w, `{"errors":[]}`)
return true
case "datadog/api/beta/sketches":
datadogWriteSketchesBetaRequests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
datadogWriteSketchesBetaErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(202)
return true
case "datadog/api/v1/validate":
datadogValidateRequests.Inc()
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
@ -626,8 +667,14 @@ var (
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`)
datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteSeriesV1Requests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteSeriesV1Errors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteSeriesV2Requests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogWriteSeriesV2Errors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogWriteSketchesBetaRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/beta/sketches", protocol="datadog"}`)
datadogWriteSketchesBetaErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/beta/sketches", protocol="datadog"}`)
datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)

View File

@ -7,7 +7,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/metrics"
)
@ -17,7 +16,7 @@ var (
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadog"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series, /api/v2/series, /api/v1/sketches, /api/beta/sketches request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(req *http.Request) error {
@ -25,64 +24,40 @@ func InsertHandlerForHTTP(req *http.Request) error {
if err != nil {
return err
}
ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, func(series []parser.Series) error {
return insertRows(series, extraLabels)
})
return stream.Parse(
req, func(series prompbmarshal.TimeSeries) error {
series.Labels = append(series.Labels, extraLabels...)
return insertRows(series)
},
)
}
func insertRows(series []parser.Series, extraLabels []prompbmarshal.Label) error {
func insertRows(series prompbmarshal.TimeSeries) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
rowsLen := 0
for i := range series {
rowsLen += len(series[i].Points)
}
ctx.Reset(rowsLen)
rowsTotal := 0
hasRelabeling := relabel.HasRelabeling()
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
rowsTotal := len(series.Samples)
ctx.Reset(rowsTotal)
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", ss.Metric)
if ss.Host != "" {
ctx.AddLabel("host", ss.Host)
}
if ss.Device != "" {
ctx.AddLabel("device", ss.Device)
}
for _, tag := range ss.Tags {
name, value := parser.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
ctx.AddLabel(name, value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
for l := range series.Labels {
ctx.AddLabel(series.Labels[l].Name, series.Labels[l].Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
return nil
}
ctx.SortLabelsIfNeeded()
var metricNameRaw []byte
var err error
for _, pt := range ss.Points {
timestamp := pt.Timestamp()
value := pt.Value()
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, timestamp, value)
if err != nil {
for _, sample := range series.Samples {
if _, err := ctx.WriteDataPointExt(
[]byte{}, ctx.Labels, sample.Timestamp, sample.Value,
); err != nil {
return err
}
}
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()

View File

@ -246,9 +246,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v1/series":
datadogWriteRequests.Inc()
datadogWriteSeriesV1Requests.Inc()
if err := datadog.InsertHandlerForHTTP(r); err != nil {
datadogWriteErrors.Inc()
datadogWriteSeriesV1Errors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
@ -257,6 +257,27 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v2/series":
datadogWriteSeriesV2Requests.Inc()
if err := datadog.InsertHandlerForHTTP(r); err != nil {
datadogWriteSeriesV2Errors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"errors":[]}`)
return true
case "/datadog/api/beta/sketches":
datadogWriteSketchesBetaRequests.Inc()
if err := datadog.InsertHandlerForHTTP(r); err != nil {
datadogWriteSketchesBetaErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(202)
return true
case "/datadog/api/v1/validate":
datadogValidateRequests.Inc()
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
@ -371,8 +392,14 @@ var (
influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/influx/query", protocol="influx"}`)
datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteSeriesV1Requests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteSeriesV1Errors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteSeriesV2Requests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogWriteSeriesV2Errors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogWriteSketchesBetaRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/beta/sketches", protocol="datadog"}`)
datadogWriteSketchesBetaErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/beta/sketches", protocol="datadog"}`)
datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)

View File

@ -363,7 +363,8 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.
- `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry).
- `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API v1](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `datadog/api/v2/series` - for ingesting data with [DataDog submit metrics API v2](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
- `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details.
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
@ -896,7 +897,7 @@ Below is the output for `/path/to/vminsert -help`:
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-datadog.maxInsertRequestSize size
The maximum size in bytes of a single DataDog POST request to /api/v1/series
The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)

View File

@ -2566,7 +2566,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-datadog.maxInsertRequestSize size
The maximum size in bytes of a single DataDog POST request to /api/v1/series
The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)

View File

@ -518,10 +518,16 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/)
or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
VictoriaMetrics accepts data from:
* [DataDog agent](https://docs.datadoghq.com/agent/)
* [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
* [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/)
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics)
at `/datadog/api/v1/series` path.
at:
* `/datadog/api/v1/series`
* `/datadog/api/v2/series`
* `/datadog/api/beta/sketches`
### Sending metrics to VictoriaMetrics
@ -554,10 +560,10 @@ dd_url: http://victoriametrics:8428/datadog
</div>
vmagent also can accept Datadog metrics format. Depending on where vmagent will forward data,
vmagent also can accept DataDog metrics format. Depending on where vmagent will forward data,
pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats.
### Sending metrics to Datadog and VictoriaMetrics
### Sending metrics to DataDog and VictoriaMetrics
DataDog allows configuring [Dual Shipping](https://docs.datadoghq.com/agent/guide/dual-shipping/) for metrics
sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `additional_endpoints`.
@ -593,6 +599,19 @@ additional_endpoints:
</div>
### Send via Serverless DataDog plugin
Disable logs (logs ingestion is not supported by Victoria Metrics) and set a custom endpoint in serverless.yaml
```
custom:
datadog:
enableDDLogs: false # Disabled not supported DD logs
apiKey: fakekey # Set any key, otherwise plugin fails
provider:
environment:
DD_DD_URL: <<vm-url>>/datadog # Victoria Metrics endpoint for DataDog
```
### Send via cURL
See how to send data to VictoriaMetrics via
@ -2574,7 +2593,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-datadog.maxInsertRequestSize size
The maximum size in bytes of a single DataDog POST request to /api/v1/series
The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)

View File

@ -529,6 +529,85 @@ echo '
</div>
### /datadog/api/v2/series
**Imports data in DataDog format into VictoriaMetrics**
Single-node VictoriaMetrics:
<div class="with-copy" markdown="1">
```console
echo '
{
"series": [
{
"interval": 20,
"metric": "system.load.1",
"resources": [
{
"name": "test.example.com",
"type": "host"
}
],
"points": [
{
"timestamp": 1699152159,
"value": 0
},
{
"timestamp": 1699152160,
"value": 0.5
}
],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:8428/datadog/api/v2/series
```
</div>
Cluster version of VictoriaMetrics:
<div class="with-copy" markdown="1">
```console
echo '
{
"series": [
{
"interval": 20,
"metric": "system.load.1",
"resources": [
{
"name": "test.example.com",
"type": "host"
}
],
"points": [
{
"timestamp": 1699152159,
"value": 0
},
{
"timestamp": 1699152160,
"value": 0.5
}
],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- 'http://<vminsert>:8480/insert/0/datadog/api/v2/series
</div>
Additional information:
* [How to send data from datadog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent)

View File

@ -1414,7 +1414,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-datadog.maxInsertRequestSize size
The maximum size in bytes of a single DataDog POST request to /api/v1/series
The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)

View File

@ -0,0 +1,102 @@
package datadog
import (
"encoding/json"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
)
// Request represents /api/v1/series request
type Request struct {
Series []series `json:"series"`
}
// Unmarshal decodes byte array to series v1 Request struct
func (r *Request) Unmarshal(b []byte) error {
return json.Unmarshal(b, r)
}
// Extract iterates fn execution over all timeseries from series v1 Request
func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn func(string) string) error {
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range r.Series {
s := r.Series[i]
samples := make([]prompbmarshal.Sample, 0, len(s.Points))
for j := range s.Points {
p := s.Points[j]
ts, val := p[0], p[1]
if ts <= 0 {
ts = float64(currentTimestamp)
}
samples[j] = prompbmarshal.Sample{
Timestamp: int64(ts * 1000),
Value: val,
}
}
ts := prompbmarshal.TimeSeries{
Samples: samples,
Labels: s.getLabels(sanitizeFn),
}
if err := fn(ts); err != nil {
return err
}
}
return nil
}
// series represents a series item from DataDog POST request to /api/v1/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type series struct {
Metric string `json:"metric"`
Host string `json:"host"`
// The device field does not appear in the datadog docs, but datadog-agent does use it.
// Datadog agent (v7 at least), removes the tag "device" and adds it as its own field. Why? That I don't know!
// https://github.com/DataDog/datadog-agent/blob/0ada7a97fed6727838a6f4d9c87123d2aafde735/pkg/metrics/series.go#L84-L105
Device string `json:"device"`
// Do not decode Interval, since it isn't used by VictoriaMetrics
// Interval int64 `json:"interval"`
Points []point `json:"points"`
Tags []string `json:"tags"`
// Do not decode type, since it isn't used by VictoriaMetrics
// Type string `json:"type"`
}
func (s *series) getLabels(sanitizeFn func(string) string) []prompbmarshal.Label {
labels := []prompbmarshal.Label{{
Name: "__name__",
Value: sanitizeFn(s.Metric),
}}
if s.Host != "" {
labels = append(labels, prompbmarshal.Label{
Name: "host",
Value: s.Host,
})
}
if s.Device != "" {
labels = append(labels, prompbmarshal.Label{
Name: "device",
Value: s.Device,
})
}
for _, tag := range s.Tags {
name, value := datadog.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
labels = append(labels, prompbmarshal.Label{
Name: sanitizeFn(name),
Value: value,
})
}
return labels
}
// point represents a point from DataDog POST request to /api/v1/series
type point [2]float64

View File

@ -0,0 +1,71 @@
package datadog
import (
"reflect"
"testing"
)
func TestRequestUnmarshalFailure(t *testing.T) {
f := func(s string) {
t.Helper()
req := new(Request)
if err := req.Unmarshal([]byte(s)); err == nil {
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
}
}
f("")
f("foobar")
f(`{"series":123`)
f(`1234`)
f(`[]`)
}
func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) {
t.Helper()
req := new(Request)
if err := req.Unmarshal(s); err != nil {
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
}
if !reflect.DeepEqual(req, reqExpected) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
}
}
func TestRequestUnmarshalSuccess(t *testing.T) {
unmarshalRequestValidator(
t, []byte("{}"), new(Request),
)
unmarshalRequestValidator(t, []byte(`
{
"series": [
{
"host": "test.example.com",
"interval": 20,
"metric": "system.load.1",
"device": "/dev/sda",
"points": [[
1575317847,
0.5
]],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}
`), &Request{
Series: []series{{
Host: "test.example.com",
Metric: "system.load.1",
Device: "/dev/sda",
Points: []point{{
1575317847,
0.5,
}},
Tags: []string{
"environment:test",
},
}},
})
}

View File

@ -26,7 +26,7 @@ func BenchmarkRequestUnmarshal(b *testing.B) {
b.SetBytes(int64(len(reqBody)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var req Request
req := new(Request)
for pb.Next() {
if err := req.Unmarshal(reqBody); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))

View File

@ -0,0 +1,89 @@
package datadog
import (
"encoding/json"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
)
// Request represents a sketches item from DataDog POST request to /api/v2/series
type Request struct {
Series []series `json:"series"`
}
// Unmarshal decodes byte array to series v2 Request struct
func (r *Request) Unmarshal(b []byte) error {
return json.Unmarshal(b, r)
}
// Extract iterates fn execution over all timeseries from series v2 request
func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn func(string) string) error {
for i := range r.Series {
s := r.Series[i]
samples := make([]prompbmarshal.Sample, 0, len(s.Points))
for j := range s.Points {
p := s.Points[j]
samples = append(samples, prompbmarshal.Sample{
Timestamp: p.Timestamp * 1000,
Value: p.Value,
})
}
ts := prompbmarshal.TimeSeries{
Samples: samples,
Labels: s.getLabels(sanitizeFn),
}
if err := fn(ts); err != nil {
return err
}
}
return nil
}
// series represents a series item from DataDog POST request to /api/v2/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type series struct {
Metric string `json:"metric"`
Resources []resource `json:"resources"`
Points []point `json:"points"`
Tags []string `json:"tags"`
// Do not decode Type, since it isn't used by VictoriaMetrics
// Type string `json:"type"`
}
type resource struct {
Name string `json:"name"`
Type string `json:"type"`
}
func (s *series) getLabels(sanitizeFn func(string) string) []prompbmarshal.Label {
labels := []prompbmarshal.Label{{
Name: "__name__",
Value: sanitizeFn(s.Metric),
}}
for _, res := range s.Resources {
labels = append(labels, prompbmarshal.Label{
Name: sanitizeFn(res.Type),
Value: res.Name,
})
}
for _, tag := range s.Tags {
name, value := datadog.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
labels = append(labels, prompbmarshal.Label{
Name: sanitizeFn(name),
Value: value,
})
}
return labels
}
// point represents a point from DataDog POST request to /api/v2/series
type point struct {
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
}

View File

@ -0,0 +1,83 @@
package datadog
import (
"reflect"
"testing"
)
func TestRequestUnmarshalFailure(t *testing.T) {
f := func(s string) {
t.Helper()
req := new(Request)
if err := req.Unmarshal([]byte(s)); err == nil {
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
}
}
f("")
f("foobar")
f(`{"series":123`)
f(`1234`)
f(`[]`)
}
func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) {
t.Helper()
req := new(Request)
if err := req.Unmarshal(s); err != nil {
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
}
if !reflect.DeepEqual(req, reqExpected) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
}
}
func TestRequestUnmarshalSuccess(t *testing.T) {
unmarshalRequestValidator(
t, []byte("{}"), new(Request),
)
unmarshalRequestValidator(t, []byte(`
{
"series": [
{
"interval": 20,
"metric": "system.load.1",
"resources": [
{
"name": "test.example.com",
"type": "host"
}, {
"name": "/dev/sda",
"type": "device"
}
],
"points": [{
"timestamp": 1575317847,
"value": 0.5
}],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}
`), &Request{
Series: []series{{
Metric: "system.load.1",
Resources: []resource{{
Name: "test.example.com",
Type: "host",
}, {
Name: "/dev/sda",
Type: "device",
}},
Points: []point{{
Timestamp: 1575317847,
Value: 0.5,
}},
Tags: []string{
"environment:test",
},
}},
})
}

View File

@ -0,0 +1,44 @@
package datadog
import (
"fmt"
"testing"
)
func BenchmarkRequestUnmarshal(b *testing.B) {
reqBody := []byte(`{
"series": [
{
"interval": 20,
"metric": "system.load.1",
"resources": [{
"name": "test.example.com",
"type": "host"
}],
"points": [
{
"timestamp": 1575317847,
"value": 0.5
}
],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}`)
b.SetBytes(int64(len(reqBody)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
req := new(Request)
for pb.Next() {
if err := req.Unmarshal(reqBody); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
if len(req.Series) != 1 {
panic(fmt.Errorf("unexpected number of series unmarshaled: got %d; want 4", len(req.Series)))
}
}
})
}

View File

@ -0,0 +1,32 @@
# Datadog proto files
Content copied from https://github.com/DataDog/agent-payload/blob/master/proto/metrics/agent_payload.proto
## Requirements
- protoc binary [link](http://google.github.io/proto-lens/installing-protoc.html)
- golang-proto-gen[link](https://developers.google.com/protocol-buffers/docs/reference/go-generated)
- custom marshaller [link](https://github.com/planetscale/vtprotobuf)
## Modifications
Original proto files were modified:
1) changed package name for `package beta`.
2) changed import paths - changed directory names.
3) changed go_package for `./pb`.
## How to generate pbs
run command:
```bash
export GOBIN=~/go/bin protoc
protoc -I=. --go_out=./lib/protoparser/datadog/api/sketches/beta --go-vtproto_out=./lib/protoparser/datadog/api/sketches/beta --plugin protoc-gen-go-vtproto="$GOBIN/protoc-gen-go-vtproto" --go-vtproto_opt=features=unmarshal lib/protoparser/datadog/api/sketches/beta/proto/*.proto
```
Generated code will be at `lib/protoparser/datadog/api/sketches/beta/pb`
manually edit it:
1) remove all external imports
2) remove all unneeded methods
3) replace `unknownFields` with `unknownFields []byte`

View File

@ -0,0 +1,95 @@
package datadog
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/sketches/beta/pb"
)
// Request represents a sketches item from DataDog POST request to /api/beta/sketches
type Request struct {
*pb.SketchPayload
}
// Unmarshal is a wrapper around SketchesPayload Unmarshal method which decodes byte array to SketchPayload struct
func (r *Request) Unmarshal(b []byte) error {
if r.SketchPayload == nil {
r.SketchPayload = new(pb.SketchPayload)
}
return r.SketchPayload.UnmarshalVT(b)
}
// Extract iterates fn function execution over all timeseries from a sketch payload
func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn func(string) string) error {
var err error
for _, sketch := range r.SketchPayload.Sketches {
sketchSeries := make([]prompbmarshal.TimeSeries, 5)
for _, point := range sketch.Dogsketches {
timestamp := int64(point.Ts * 1000)
updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{
"max": point.Max,
"min": point.Min,
"cnt": float64(point.Cnt),
"avg": point.Avg,
"sum": point.Sum,
})
}
for _, point := range sketch.Distributions {
timestamp := int64(point.Ts * 1000)
updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{
"max": point.Max,
"min": point.Min,
"cnt": float64(point.Cnt),
"avg": point.Avg,
"sum": point.Sum,
})
}
labels := getLabels(sketch, sanitizeFn)
for i := range sketchSeries {
sketchSeries[i].Labels = append(sketchSeries[i].Labels, labels...)
if err = fn(sketchSeries[i]); err != nil {
return err
}
}
}
return nil
}
func getLabels(sketch *pb.SketchPayload_Sketch, sanitizeFn func(string) string) []prompbmarshal.Label {
labels := []prompbmarshal.Label{}
if sketch.Host != "" {
labels = append(labels, prompbmarshal.Label{
Name: "host",
Value: sketch.Host,
})
}
for _, tag := range sketch.Tags {
name, value := datadog.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
labels = append(labels, prompbmarshal.Label{
Name: sanitizeFn(name),
Value: value,
})
}
return labels
}
func updateSeries(series []prompbmarshal.TimeSeries, metric string, timestamp int64, values map[string]float64) {
index := 0
for suffix, value := range values {
s := series[index]
s.Samples = append(s.Samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,
})
if len(s.Labels) == 0 {
s.Labels = append(s.Labels, prompbmarshal.Label{
Name: "",
Value: metric + "_" + suffix,
})
}
index++
}
}

View File

@ -0,0 +1,159 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.29.0
// protoc v3.15.8
// source: lib/protoparser/datadog/api/sketches/beta/proto/sketches.proto
package pb
type MetricPayload_MetricType int32
const (
MetricPayload_UNSPECIFIED MetricPayload_MetricType = 0
MetricPayload_COUNT MetricPayload_MetricType = 1
MetricPayload_RATE MetricPayload_MetricType = 2
MetricPayload_GAUGE MetricPayload_MetricType = 3
)
// Enum value maps for MetricPayload_MetricType.
var (
MetricPayload_MetricType_name = map[int32]string{
0: "UNSPECIFIED",
1: "COUNT",
2: "RATE",
3: "GAUGE",
}
MetricPayload_MetricType_value = map[string]int32{
"UNSPECIFIED": 0,
"COUNT": 1,
"RATE": 2,
"GAUGE": 3,
}
)
func (x MetricPayload_MetricType) Enum() *MetricPayload_MetricType {
p := new(MetricPayload_MetricType)
*p = x
return p
}
type CommonMetadata struct {
unknownFields []byte
AgentVersion string `protobuf:"bytes,1,opt,name=agent_version,json=agentVersion,proto3" json:"agent_version,omitempty"`
Timezone string `protobuf:"bytes,2,opt,name=timezone,proto3" json:"timezone,omitempty"`
CurrentEpoch float64 `protobuf:"fixed64,3,opt,name=current_epoch,json=currentEpoch,proto3" json:"current_epoch,omitempty"`
InternalIp string `protobuf:"bytes,4,opt,name=internal_ip,json=internalIp,proto3" json:"internal_ip,omitempty"`
PublicIp string `protobuf:"bytes,5,opt,name=public_ip,json=publicIp,proto3" json:"public_ip,omitempty"`
ApiKey string `protobuf:"bytes,6,opt,name=api_key,json=apiKey,proto3" json:"api_key,omitempty"`
}
type MetricPayload struct {
unknownFields []byte
Series []*MetricPayload_MetricSeries `protobuf:"bytes,1,rep,name=series,proto3" json:"series,omitempty"`
}
type EventsPayload struct {
unknownFields []byte
Events []*EventsPayload_Event `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"`
Metadata *CommonMetadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"`
}
type SketchPayload struct {
unknownFields []byte
Sketches []*SketchPayload_Sketch `protobuf:"bytes,1,rep,name=sketches,proto3" json:"sketches,omitempty"`
Metadata *CommonMetadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"`
}
type MetricPayload_MetricPoint struct {
unknownFields []byte
// metric value
Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
// timestamp for this value in seconds since the UNIX epoch
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
}
type MetricPayload_Resource struct {
unknownFields []byte
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
}
type MetricPayload_MetricSeries struct {
unknownFields []byte
// Resources this series applies to; include at least
// { type="host", name=<hostname> }
Resources []*MetricPayload_Resource `protobuf:"bytes,1,rep,name=resources,proto3" json:"resources,omitempty"`
// metric name
Metric string `protobuf:"bytes,2,opt,name=metric,proto3" json:"metric,omitempty"`
// tags for this metric
Tags []string `protobuf:"bytes,3,rep,name=tags,proto3" json:"tags,omitempty"`
// data points for this metric
Points []*MetricPayload_MetricPoint `protobuf:"bytes,4,rep,name=points,proto3" json:"points,omitempty"`
// type of metric
Type MetricPayload_MetricType `protobuf:"varint,5,opt,name=type,proto3,enum=beta.MetricPayload_MetricType" json:"type,omitempty"`
// metric unit name
Unit string `protobuf:"bytes,6,opt,name=unit,proto3" json:"unit,omitempty"`
// source of this metric (check name, etc.)
SourceTypeName string `protobuf:"bytes,7,opt,name=source_type_name,json=sourceTypeName,proto3" json:"source_type_name,omitempty"`
// interval, in seconds, between samples of this metric
Interval int64 `protobuf:"varint,8,opt,name=interval,proto3" json:"interval,omitempty"`
}
type EventsPayload_Event struct {
unknownFields []byte
Title string `protobuf:"bytes,1,opt,name=title,proto3" json:"title,omitempty"`
Text string `protobuf:"bytes,2,opt,name=text,proto3" json:"text,omitempty"`
Ts int64 `protobuf:"varint,3,opt,name=ts,proto3" json:"ts,omitempty"`
Priority string `protobuf:"bytes,4,opt,name=priority,proto3" json:"priority,omitempty"`
Host string `protobuf:"bytes,5,opt,name=host,proto3" json:"host,omitempty"`
Tags []string `protobuf:"bytes,6,rep,name=tags,proto3" json:"tags,omitempty"`
AlertType string `protobuf:"bytes,7,opt,name=alert_type,json=alertType,proto3" json:"alert_type,omitempty"`
AggregationKey string `protobuf:"bytes,8,opt,name=aggregation_key,json=aggregationKey,proto3" json:"aggregation_key,omitempty"`
SourceTypeName string `protobuf:"bytes,9,opt,name=source_type_name,json=sourceTypeName,proto3" json:"source_type_name,omitempty"`
}
type SketchPayload_Sketch struct {
unknownFields []byte
Metric string `protobuf:"bytes,1,opt,name=metric,proto3" json:"metric,omitempty"`
Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"`
Distributions []*SketchPayload_Sketch_Distribution `protobuf:"bytes,3,rep,name=distributions,proto3" json:"distributions,omitempty"`
Tags []string `protobuf:"bytes,4,rep,name=tags,proto3" json:"tags,omitempty"`
Dogsketches []*SketchPayload_Sketch_Dogsketch `protobuf:"bytes,7,rep,name=dogsketches,proto3" json:"dogsketches,omitempty"`
}
type SketchPayload_Sketch_Distribution struct {
unknownFields []byte
Ts int64 `protobuf:"varint,1,opt,name=ts,proto3" json:"ts,omitempty"`
Cnt int64 `protobuf:"varint,2,opt,name=cnt,proto3" json:"cnt,omitempty"`
Min float64 `protobuf:"fixed64,3,opt,name=min,proto3" json:"min,omitempty"`
Max float64 `protobuf:"fixed64,4,opt,name=max,proto3" json:"max,omitempty"`
Avg float64 `protobuf:"fixed64,5,opt,name=avg,proto3" json:"avg,omitempty"`
Sum float64 `protobuf:"fixed64,6,opt,name=sum,proto3" json:"sum,omitempty"`
V []float64 `protobuf:"fixed64,7,rep,packed,name=v,proto3" json:"v,omitempty"`
G []uint32 `protobuf:"varint,8,rep,packed,name=g,proto3" json:"g,omitempty"`
Delta []uint32 `protobuf:"varint,9,rep,packed,name=delta,proto3" json:"delta,omitempty"`
Buf []float64 `protobuf:"fixed64,10,rep,packed,name=buf,proto3" json:"buf,omitempty"`
}
type SketchPayload_Sketch_Dogsketch struct {
unknownFields []byte
Ts int64 `protobuf:"varint,1,opt,name=ts,proto3" json:"ts,omitempty"`
Cnt int64 `protobuf:"varint,2,opt,name=cnt,proto3" json:"cnt,omitempty"`
Min float64 `protobuf:"fixed64,3,opt,name=min,proto3" json:"min,omitempty"`
Max float64 `protobuf:"fixed64,4,opt,name=max,proto3" json:"max,omitempty"`
Avg float64 `protobuf:"fixed64,5,opt,name=avg,proto3" json:"avg,omitempty"`
Sum float64 `protobuf:"fixed64,6,opt,name=sum,proto3" json:"sum,omitempty"`
K []int32 `protobuf:"zigzag32,7,rep,packed,name=k,proto3" json:"k,omitempty"`
N []uint32 `protobuf:"varint,8,rep,packed,name=n,proto3" json:"n,omitempty"`
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,112 @@
syntax = "proto3";
package beta;
option go_package = "./pb";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
message CommonMetadata {
string agent_version = 1;
string timezone = 2;
double current_epoch = 3;
string internal_ip = 4;
string public_ip = 5;
string api_key = 6;
}
message MetricPayload {
enum MetricType {
UNSPECIFIED = 0;
COUNT = 1;
RATE = 2;
GAUGE = 3;
}
message MetricPoint {
// metric value
double value = 1;
// timestamp for this value in seconds since the UNIX epoch
int64 timestamp = 2;
}
message Resource {
string type = 1;
string name = 2;
}
message MetricSeries {
// Resources this series applies to; include at least
// { type="host", name=<hostname> }
repeated Resource resources = 1;
// metric name
string metric = 2;
// tags for this metric
repeated string tags = 3;
// data points for this metric
repeated MetricPoint points = 4;
// type of metric
MetricType type = 5;
// metric unit name
string unit = 6;
// source of this metric (check name, etc.)
string source_type_name = 7;
// interval, in seconds, between samples of this metric
int64 interval = 8;
reserved 9;
}
repeated MetricSeries series = 1;
}
message EventsPayload {
message Event {
string title = 1;
string text = 2;
int64 ts = 3;
string priority = 4;
string host = 5;
repeated string tags = 6;
string alert_type = 7;
string aggregation_key = 8;
string source_type_name = 9;
}
repeated Event events = 1;
CommonMetadata metadata = 2;
}
message SketchPayload {
message Sketch {
message Distribution {
int64 ts = 1;
int64 cnt = 2;
double min = 3;
double max = 4;
double avg = 5;
double sum = 6;
repeated double v = 7;
repeated uint32 g = 8;
repeated uint32 delta = 9;
repeated double buf = 10;
}
message Dogsketch {
int64 ts = 1;
int64 cnt = 2;
double min = 3;
double max = 4;
double avg = 5;
double sum = 6;
repeated sint32 k = 7;
repeated uint32 n = 8;
}
string metric = 1;
string host = 2;
repeated Distribution distributions = 3 [(gogoproto.nullable) = false];
repeated string tags = 4;
reserved 5, 6;
reserved "distributionsK", "distributionsC";
repeated Dogsketch dogsketches = 7 [(gogoproto.nullable) = false];
}
repeated Sketch sketches = 1 [(gogoproto.nullable) = false];
CommonMetadata metadata = 2 [(gogoproto.nullable) = false];
}

View File

@ -1,11 +1,9 @@
package datadog
import (
"encoding/json"
"fmt"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// SplitTag splits DataDog tag into tag name and value.
@ -20,98 +18,10 @@ func SplitTag(tag string) (string, string) {
return tag[:n], tag[n+1:]
}
// Request represents DataDog POST request to /api/v1/series
// Request represents DataDog submit metrics request
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Request struct {
Series []Series `json:"series"`
}
func (req *Request) reset() {
// recursively reset all the fields in req in order to avoid field value
// re-use in json.Unmarshal() when the corresponding field is missing
// in the unmarshaled JSON.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432
series := req.Series
for i := range series {
series[i].reset()
}
req.Series = series[:0]
}
// Unmarshal unmarshals DataDog /api/v1/series request body from b to req.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
//
// b shouldn't be modified when req is in use.
func (req *Request) Unmarshal(b []byte) error {
req.reset()
if err := json.Unmarshal(b, req); err != nil {
return fmt.Errorf("cannot unmarshal %q: %w", b, err)
}
// Set missing timestamps to the current time.
currentTimestamp := float64(fasttime.UnixTimestamp())
series := req.Series
for i := range series {
points := series[i].Points
for j := range points {
if points[j][0] <= 0 {
points[j][0] = currentTimestamp
}
}
}
return nil
}
// Series represents a series item from DataDog POST request to /api/v1/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Series struct {
Metric string `json:"metric"`
Host string `json:"host"`
// The device field does not appear in the datadog docs, but datadog-agent does use it.
// Datadog agent (v7 at least), removes the tag "device" and adds it as its own field. Why? That I don't know!
// https://github.com/DataDog/datadog-agent/blob/0ada7a97fed6727838a6f4d9c87123d2aafde735/pkg/metrics/series.go#L84-L105
Device string `json:"device"`
// Do not decode Interval, since it isn't used by VictoriaMetrics
// Interval int64 `json:"interval"`
Points []Point `json:"points"`
Tags []string `json:"tags"`
// Do not decode Type, since it isn't used by VictoriaMetrics
// Type string `json:"type"`
}
func (s *Series) reset() {
s.Metric = ""
s.Host = ""
s.Device = ""
points := s.Points
for i := range points {
points[i] = Point{}
}
s.Points = points[:0]
tags := s.Tags
for i := range tags {
tags[i] = ""
}
s.Tags = tags[:0]
}
// Point represents a point from DataDog POST request to /api/v1/series
type Point [2]float64
// Timestamp returns timestamp in milliseconds from the given pt.
func (pt *Point) Timestamp() int64 {
return int64(pt[0] * 1000)
}
// Value returns value from the given pt.
func (pt *Point) Value() float64 {
return pt[1]
type Request interface {
Extract(func(prompbmarshal.TimeSeries) error, func(string) string) error
Unmarshal([]byte) error
}

View File

@ -1,7 +1,6 @@
package datadog
import (
"reflect"
"testing"
)
@ -21,102 +20,3 @@ func TestSplitTag(t *testing.T) {
f("foo:bar", "foo", "bar")
f(":bar", "", "bar")
}
func TestRequestUnmarshalMissingHost(t *testing.T) {
// This tests https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432
req := Request{
Series: []Series{{
Host: "prev-host",
Device: "prev-device",
}},
}
data := `
{
"series": [
{
"metric": "system.load.1",
"points": [[
1575317847,
0.5
]]
}
]
}`
if err := req.Unmarshal([]byte(data)); err != nil {
t.Fatalf("unexpected error: %s", err)
}
reqExpected := Request{
Series: []Series{{
Metric: "system.load.1",
Points: []Point{{
1575317847,
0.5,
}},
}},
}
if !reflect.DeepEqual(&req, &reqExpected) {
t.Fatalf("unexpected request parsed;\ngot\n%+v\nwant\n%+v", req, reqExpected)
}
}
func TestRequestUnmarshalFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var req Request
if err := req.Unmarshal([]byte(s)); err == nil {
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
}
}
f("")
f("foobar")
f(`{"series":123`)
f(`1234`)
f(`[]`)
}
func TestRequestUnmarshalSuccess(t *testing.T) {
f := func(s string, reqExpected *Request) {
t.Helper()
var req Request
if err := req.Unmarshal([]byte(s)); err != nil {
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
}
if !reflect.DeepEqual(&req, reqExpected) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &req, reqExpected)
}
}
f("{}", &Request{})
f(`
{
"series": [
{
"host": "test.example.com",
"interval": 20,
"metric": "system.load.1",
"device": "/dev/sda",
"points": [[
1575317847,
0.5
]],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}
`, &Request{
Series: []Series{{
Host: "test.example.com",
Metric: "system.load.1",
Device: "/dev/sda",
Points: []Point{{
1575317847,
0.5,
}},
Tags: []string{
"environment:test",
},
}},
})
}

View File

@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
"net/http"
"regexp"
"sync"
@ -12,15 +13,19 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
apiSeriesV1 "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/series/v1"
apiSeriesV2 "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/series/v2"
apiSketchesBeta "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/sketches/beta"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
// The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series")
maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches")
// If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply:
// https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
@ -31,13 +36,15 @@ var (
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics")
)
// Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
// Parse parses DataDog POST request for /api/v1/series, /api/v2/series, /api/beta/sketches from reader and calls callback for the parsed request.
//
// callback shouldn't hold series after returning.
func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
func Parse(req *http.Request, callback func(prompbmarshal.TimeSeries) error) error {
var r io.Reader
wcr := writeconcurrencylimiter.GetReader(req.Body)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
contentEncoding := req.Header.Get("Content-Encoding")
switch contentEncoding {
case "gzip":
@ -60,25 +67,56 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.S
if err := ctx.Read(); err != nil {
return err
}
req := getRequest()
defer putRequest(req)
if err := req.Unmarshal(ctx.reqBuf.B); err != nil {
apiVersion := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${version}")
apiKind := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${kind}")
ddReq := getRequest()
defer putRequest(ddReq)
switch apiKind {
case "series":
switch apiVersion {
case "v1":
ddReq = new(apiSeriesV1.Request)
case "v2":
ddReq = new(apiSeriesV2.Request)
default:
return fmt.Errorf(
"API version %q of Datadog series endpoint is not supported",
apiVersion,
)
}
case "sketches":
switch apiVersion {
case "beta":
ddReq = new(apiSketchesBeta.Request)
default:
return fmt.Errorf(
"API version %q of Datadog sketches endpoint is not supported",
apiVersion,
)
}
default:
return fmt.Errorf(
"API kind %q of Datadog API is not supported",
apiKind,
)
}
if err := ddReq.Unmarshal(ctx.reqBuf.B); err != nil {
unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal DataDog POST request with size %d bytes: %w", len(ctx.reqBuf.B), err)
}
rows := 0
series := req.Series
for i := range series {
rows += len(series[i].Points)
if *sanitizeMetricName {
series[i].Metric = sanitizeName(series[i].Metric)
}
}
rowsRead.Add(rows)
if err := callback(series); err != nil {
cb := func(series prompbmarshal.TimeSeries) error {
rowsRead.Add(len(series.Samples))
return callback(series)
}
if err := ddReq.Extract(cb, sanitizeName(*sanitizeMetricName)); err != nil {
return fmt.Errorf("error when processing imported data: %w", err)
}
return nil
}
@ -144,15 +182,15 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *datadog.Request {
func getRequest() datadog.Request {
v := requestPool.Get()
if v == nil {
return &datadog.Request{}
return nil
}
return v.(*datadog.Request)
return v.(datadog.Request)
}
func putRequest(req *datadog.Request) {
func putRequest(req datadog.Request) {
requestPool.Put(req)
}
@ -161,8 +199,15 @@ var requestPool sync.Pool
// sanitizeName performs DataDog-compatible sanitizing for metric names
//
// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
func sanitizeName(name string) string {
func sanitizeName(sanitize bool) func(string) string {
if sanitize {
return func(name string) string {
return namesSanitizer.Transform(name)
}
}
return func(name string) string {
return name
}
}
var namesSanitizer = bytesutil.NewFastStringTransformer(func(s string) string {
@ -176,4 +221,5 @@ var (
unsupportedDatadogChars = regexp.MustCompile(`[^0-9a-zA-Z_\.]+`)
multiUnderscores = regexp.MustCompile(`_+`)
underscoresWithDots = regexp.MustCompile(`_?\._?`)
insertApisVersionRegex = regexp.MustCompile(`.*/api/(?P<version>[\w]+)/(?P<kind>[\w]+)`)
)

View File

@ -7,7 +7,7 @@ import (
func TestSanitizeName(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
result := sanitizeName(s)
result := sanitizeName(true)(s)
if result != resultExpected {
t.Fatalf("unexpected result for sanitizeName(%q); got\n%q\nwant\n%q", s, result, resultExpected)
}