app/{vminsert,vmagent}: preliminary support for /api/v2/series ingestion from new versions of DataDog Agent

This commit adds only JSON support - https://docs.datadoghq.com/api/latest/metrics/#submit-metrics ,
while recent versions of DataDog Agent send data to /api/v2/series in undocumented Protobuf format.
The support for this format will be added later.

Thanks to @AndrewChubatiuk for the initial implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5094

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451
This commit is contained in:
Aliaksandr Valialkin 2023-12-21 18:29:10 +02:00
parent 4837616df6
commit 62a105d9e9
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
23 changed files with 870 additions and 179 deletions

View File

@ -352,7 +352,8 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.
- `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry).
- `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `datadog/api/v1/series` - for ingesting data with DataDog submit metrics API v1. See [these docs](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) for details.
- `datadog/api/v2/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
- `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details.
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
@ -1541,7 +1542,7 @@ Below is the output for `/path/to/vmstorage -help`:
-loggerWarnsPerSecondLimit int
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
-maxConcurrentInserts int
The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 32)
The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration.
-memory.allowedBytes size
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)

View File

@ -1,4 +1,4 @@
package datadog
package datadogv1
import (
"net/http"
@ -8,33 +8,32 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadog"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadog"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`)
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadogv1"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadogv1"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadogv1"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, func(series []datadog.Series) error {
return stream.Parse(req.Body, ce, func(series []datadogv1.Series) error {
return insertRows(at, series, extraLabels)
})
}
func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmarshal.Label) error {
func insertRows(at *auth.Token, series []datadogv1.Series, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -63,7 +62,7 @@ func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmar
})
}
for _, tag := range ss.Tags {
name, value := datadog.SplitTag(tag)
name, value := datadogutils.SplitTag(tag)
if name == "host" {
name = "exported_host"
}

View File

@ -0,0 +1,96 @@
package datadogv2
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadogv2"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadogv2"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadogv2"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v2/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
ct := req.Header.Get("Content-Type")
ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, ct, func(series []datadogv2.Series) error {
return insertRows(at, series, extraLabels)
})
}
func insertRows(at *auth.Token, series []datadogv2.Series, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
rowsTotal := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: ss.Metric,
})
for _, rs := range ss.Resources {
labels = append(labels, prompbmarshal.Label{
Name: rs.Type,
Value: rs.Name,
})
}
for _, tag := range ss.Tags {
name, value := datadogutils.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
labels = append(labels, prompbmarshal.Label{
Name: name,
Value: value,
})
}
labels = append(labels, extraLabels...)
samplesLen := len(samples)
for _, pt := range ss.Points {
samples = append(samples, prompbmarshal.Sample{
Timestamp: pt.Timestamp * 1000,
Value: pt.Value,
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View File

@ -26,15 +26,6 @@ func InsertHandler(r io.Reader) error {
})
}
// InsertHandlerForReader processes remote write for graphite plaintext protocol.
//
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandlerForReader(at *auth.Token, r io.Reader, isGzipped bool) error {
return stream.Parse(r, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows)
})
}
func insertRows(at *auth.Token, rows []parser.Row) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

View File

@ -12,7 +12,8 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
@ -343,9 +344,20 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
datadogWriteErrors.Inc()
datadogv1WriteRequests.Inc()
if err := datadogv1.InsertHandlerForHTTP(nil, r); err != nil {
datadogv1WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v2/series":
datadogv2WriteRequests.Inc()
if err := datadogv2.InsertHandlerForHTTP(nil, r); err != nil {
datadogv2WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
@ -566,9 +578,19 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
datadogWriteErrors.Inc()
datadogv1WriteRequests.Inc()
if err := datadogv1.InsertHandlerForHTTP(at, r); err != nil {
datadogv1WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v2/series":
datadogv2WriteRequests.Inc()
if err := datadogv2.InsertHandlerForHTTP(at, r); err != nil {
datadogv2WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
@ -626,8 +648,11 @@ var (
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`)
datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogv1WriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogv1WriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogv2WriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogv2WriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)

View File

@ -1,4 +1,4 @@
package datadog
package datadogv1
import (
"net/http"
@ -8,34 +8,33 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadog"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="datadog"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadog"}`)
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadogv1"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="datadogv1"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadogv1"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, func(series []parser.Series) error {
return stream.Parse(req.Body, ce, func(series []datadogv1.Series) error {
return insertRows(at, series, extraLabels)
})
}
func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error {
func insertRows(at *auth.Token, series []datadogv1.Series, extraLabels []prompbmarshal.Label) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
@ -55,7 +54,7 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars
ctx.AddLabel("device", ss.Device)
}
for _, tag := range ss.Tags {
name, value := parser.SplitTag(tag)
name, value := datadogutils.SplitTag(tag)
if name == "host" {
name = "exported_host"
}

View File

@ -0,0 +1,91 @@
package datadogv2
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadogv2"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="datadogv2"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadogv2"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v2/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
ct := req.Header.Get("Content-Type")
ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, ct, func(series []datadogv2.Series) error {
return insertRows(at, series, extraLabels)
})
}
func insertRows(at *auth.Token, series []datadogv2.Series, extraLabels []prompbmarshal.Label) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset()
rowsTotal := 0
perTenantRows := make(map[auth.Token]int)
hasRelabeling := relabel.HasRelabeling()
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", ss.Metric)
for _, rs := range ss.Resources {
ctx.AddLabel(rs.Type, rs.Name)
}
for _, tag := range ss.Tags {
name, value := datadogutils.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
ctx.AddLabel(name, value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
atLocal := ctx.GetLocalAuthToken(at)
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
for _, pt := range ss.Points {
timestamp := pt.Timestamp * 1000
value := pt.Value
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
return err
}
}
perTenantRows[*atLocal] += len(ss.Points)
}
rowsInserted.Add(rowsTotal)
rowsTenantInserted.MultiAdd(perTenantRows)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}

View File

@ -15,7 +15,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
@ -325,9 +326,20 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
datadogWriteErrors.Inc()
datadogv1WriteRequests.Inc()
if err := datadogv1.InsertHandlerForHTTP(at, r); err != nil {
datadogv1WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v2/series":
datadogv2WriteRequests.Inc()
if err := datadogv2.InsertHandlerForHTTP(at, r); err != nil {
datadogv2WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
@ -403,8 +415,11 @@ var (
newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/newrelic/inventory/deltas", protocol="newrelic"}`)
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/newrelic", protocol="newrelic"}`)
datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
datadogv1WriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
datadogv1WriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
datadogv2WriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v2/series", protocol="datadog"}`)
datadogv2WriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/datadog/api/v2/series", protocol="datadog"}`)
datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/validate", protocol="datadog"}`)
datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/check_run", protocol="datadog"}`)

View File

@ -28,6 +28,7 @@ The sandbox cluster installation is running under the constant load generated by
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). JSON protocol is supperted right now. Protobuf protocol will be supported later. See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags:
- `-datasource.oauth2.endpointParams` for `-datasource.url`

View File

@ -363,7 +363,8 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.
- `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry).
- `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `datadog/api/v1/series` - for ingesting data with DataDog submit metrics API v1. See [these docs](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) for details.
- `datadog/api/v2/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
- `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details.
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.

View File

@ -519,10 +519,8 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/)
or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics)
at `/datadog/api/v1/series` path.
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path.
### Sending metrics to VictoriaMetrics
@ -534,12 +532,11 @@ or via [configuration file](https://docs.datadoghq.com/agent/guide/agent-configu
</p>
To configure DataDog agent via ENV variable add the following prefix:
<div class="with-copy" markdown="1">
<div class="with-copy" markdown="1">
```
DD_DD_URL=http://victoriametrics:8428/datadog
```
</div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -548,14 +545,12 @@ To configure DataDog agent via [configuration file](https://github.com/DataDog/d
add the following line:
<div class="with-copy" markdown="1">
```
dd_url: http://victoriametrics:8428/datadog
```
</div>
vmagent also can accept Datadog metrics format. Depending on where vmagent will forward data,
[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data,
pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats.
### Sending metrics to Datadog and VictoriaMetrics
@ -570,12 +565,10 @@ sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `ad
Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver:
<div class="with-copy" markdown="1">
```
DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}'
```
</div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -585,19 +578,16 @@ To configure DataDog Dual Shipping via [configuration file](https://docs.datadog
add the following line:
<div class="with-copy" markdown="1">
```
additional_endpoints:
"http://victoriametrics:8428/datadog":
- apikey
```
</div>
### Send via cURL
See how to send data to VictoriaMetrics via
[DataDog "submit metrics"](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) from command line.
See how to send data to VictoriaMetrics via DataDog "submit metrics" API [here](https://docs.victoriametrics.com/url-examples.html#datadogapiv2series).
The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export).
@ -608,7 +598,7 @@ according to [DataDog metric naming recommendations](https://docs.datadoghq.com/
If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics.
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
For example, `/datadog/api/v2/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to
undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet.
@ -2583,7 +2573,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-datadog.maxInsertRequestSize size
The maximum size in bytes of a single DataDog POST request to /api/v1/series
The maximum size in bytes of a single DataDog POST request to /datadog/api/v2/series
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)

View File

@ -527,10 +527,8 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/)
or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics)
at `/datadog/api/v1/series` path.
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path.
### Sending metrics to VictoriaMetrics
@ -542,12 +540,11 @@ or via [configuration file](https://docs.datadoghq.com/agent/guide/agent-configu
</p>
To configure DataDog agent via ENV variable add the following prefix:
<div class="with-copy" markdown="1">
<div class="with-copy" markdown="1">
```
DD_DD_URL=http://victoriametrics:8428/datadog
```
</div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -556,14 +553,12 @@ To configure DataDog agent via [configuration file](https://github.com/DataDog/d
add the following line:
<div class="with-copy" markdown="1">
```
dd_url: http://victoriametrics:8428/datadog
```
</div>
vmagent also can accept Datadog metrics format. Depending on where vmagent will forward data,
[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data,
pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats.
### Sending metrics to Datadog and VictoriaMetrics
@ -578,12 +573,10 @@ sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `ad
Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver:
<div class="with-copy" markdown="1">
```
DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}'
```
</div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -593,19 +586,16 @@ To configure DataDog Dual Shipping via [configuration file](https://docs.datadog
add the following line:
<div class="with-copy" markdown="1">
```
additional_endpoints:
"http://victoriametrics:8428/datadog":
- apikey
```
</div>
### Send via cURL
See how to send data to VictoriaMetrics via
[DataDog "submit metrics"](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) from command line.
See how to send data to VictoriaMetrics via DataDog "submit metrics" API [here](https://docs.victoriametrics.com/url-examples.html#datadogapiv2series).
The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export).
@ -616,7 +606,7 @@ according to [DataDog metric naming recommendations](https://docs.datadoghq.com/
If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics.
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
For example, `/datadog/api/v2/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to
undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet.
@ -2591,7 +2581,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-datadog.maxInsertRequestSize size
The maximum size in bytes of a single DataDog POST request to /api/v1/series
The maximum size in bytes of a single DataDog POST request to /datadog/api/v2/series
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)

View File

@ -473,7 +473,7 @@ http://vminsert:8480/insert/0/datadog
### /datadog/api/v1/series
**Imports data in DataDog format into VictoriaMetrics**
**Imports data in DataDog v1 format into VictoriaMetrics**
Single-node VictoriaMetrics:
<div class="with-copy" markdown="1">
@ -531,7 +531,79 @@ echo '
Additional information:
* [How to send data from datadog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent)
* [How to send data from DataDog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent)
* [URL format for VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format)
### /datadog/api/v2/series
**Imports data in [DataDog v2](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) format into VictoriaMetrics**
Single-node VictoriaMetrics:
<div class="with-copy" markdown="1">
```console
echo '
{
"series": [
{
"metric": "system.load.1",
"type": 0,
"points": [
{
"timestamp": 0,
"value": 0.7
}
],
"resources": [
{
"name": "dummyhost",
"type": "host"
}
],
"tags": ["environment:test"]
}
]
}
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:8428/datadog/api/v2/series
```
</div>
Cluster version of VictoriaMetrics:
<div class="with-copy" markdown="1">
```console
echo '
{
"series": [
{
"metric": "system.load.1",
"type": 0,
"points": [
{
"timestamp": 0,
"value": 0.7
}
],
"resources": [
{
"name": "dummyhost",
"type": "host"
}
],
"tags": ["environment:test"]
}
]
}
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- 'http://<vminsert>:8480/insert/0/datadog/api/v2/series'
```
</div>
Additional information:
* [How to send data from DataDog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent)
* [URL format for VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format)
### /federate

View File

@ -0,0 +1,57 @@
package datadogutils
import (
"flag"
"regexp"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
)
var (
// MaxInsertRequestSize is the maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
MaxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v2/series")
// SanitizeMetricName controls sanitizing metric names ingested via DataDog protocols.
//
// If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply:
// https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
// But there's some hidden behaviour. In addition to what it states in the docs, the following is also done:
// - Consecutive underscores are replaced with just one underscore
// - Underscore immediately before or after a dot are removed
SanitizeMetricName = flag.Bool("datadog.sanitizeMetricName", true, "Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at "+
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics")
)
// SplitTag splits DataDog tag into tag name and value.
//
// See https://docs.datadoghq.com/getting_started/tagging/#define-tags
func SplitTag(tag string) (string, string) {
n := strings.IndexByte(tag, ':')
if n < 0 {
// No tag value.
return tag, "no_label_value"
}
return tag[:n], tag[n+1:]
}
// SanitizeName performs DataDog-compatible sanitizing for metric names
//
// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
func SanitizeName(name string) string {
return namesSanitizer.Transform(name)
}
var namesSanitizer = bytesutil.NewFastStringTransformer(func(s string) string {
s = unsupportedDatadogChars.ReplaceAllString(s, "_")
s = multiUnderscores.ReplaceAllString(s, "_")
s = underscoresWithDots.ReplaceAllString(s, ".")
return s
})
var (
unsupportedDatadogChars = regexp.MustCompile(`[^0-9a-zA-Z_\.]+`)
multiUnderscores = regexp.MustCompile(`_+`)
underscoresWithDots = regexp.MustCompile(`_?\._?`)
)

View File

@ -1,13 +1,30 @@
package stream
package datadogutils
import (
"testing"
)
func TestSplitTag(t *testing.T) {
f := func(s, nameExpected, valueExpected string) {
t.Helper()
name, value := SplitTag(s)
if name != nameExpected {
t.Fatalf("unexpected name obtained from %q; got %q; want %q", s, name, nameExpected)
}
if value != valueExpected {
t.Fatalf("unexpected value obtained from %q; got %q; want %q", s, value, valueExpected)
}
}
f("", "", "no_label_value")
f("foo", "foo", "no_label_value")
f("foo:bar", "foo", "bar")
f(":bar", "", "bar")
}
func TestSanitizeName(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
result := sanitizeName(s)
result := SanitizeName(s)
if result != resultExpected {
t.Fatalf("unexpected result for sanitizeName(%q); got\n%q\nwant\n%q", s, result, resultExpected)
}

View File

@ -1,28 +1,13 @@
package datadog
package datadogv1
import (
"encoding/json"
"fmt"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// SplitTag splits DataDog tag into tag name and value.
//
// See https://docs.datadoghq.com/getting_started/tagging/#define-tags
func SplitTag(tag string) (string, string) {
n := strings.IndexByte(tag, ':')
if n < 0 {
// No tag value.
return tag, "no_label_value"
}
return tag[:n], tag[n+1:]
}
// Request represents DataDog POST request to /api/v1/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Request struct {
Series []Series `json:"series"`
}
@ -41,8 +26,6 @@ func (req *Request) reset() {
// Unmarshal unmarshals DataDog /api/v1/series request body from b to req.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
//
// b shouldn't be modified when req is in use.
func (req *Request) Unmarshal(b []byte) error {
req.reset()
@ -64,8 +47,6 @@ func (req *Request) Unmarshal(b []byte) error {
}
// Series represents a series item from DataDog POST request to /api/v1/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Series struct {
Metric string `json:"metric"`
Host string `json:"host"`

View File

@ -1,27 +1,10 @@
package datadog
package datadogv1
import (
"reflect"
"testing"
)
func TestSplitTag(t *testing.T) {
f := func(s, nameExpected, valueExpected string) {
t.Helper()
name, value := SplitTag(s)
if name != nameExpected {
t.Fatalf("unexpected name obtained from %q; got %q; want %q", s, name, nameExpected)
}
if value != valueExpected {
t.Fatalf("unexpected value obtained from %q; got %q; want %q", s, value, valueExpected)
}
}
f("", "", "no_label_value")
f("foo", "foo", "no_label_value")
f("foo:bar", "foo", "bar")
f(":bar", "", "bar")
}
func TestRequestUnmarshalMissingHost(t *testing.T) {
// This tests https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432
req := Request{

View File

@ -1,4 +1,4 @@
package datadog
package datadogv1
import (
"fmt"
@ -12,10 +12,10 @@ func BenchmarkRequestUnmarshal(b *testing.B) {
"host": "test.example.com",
"interval": 20,
"metric": "system.load.1",
"points": [
"points": [[
1575317847,
0.5
],
]],
"tags": [
"environment:test"
],

View File

@ -2,39 +2,24 @@ package stream
import (
"bufio"
"flag"
"fmt"
"io"
"regexp"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
// The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series")
// If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply:
// https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
// But there's some hidden behaviour. In addition to what it states in the docs, the following is also done:
// - Consecutive underscores are replaced with just one underscore
// - Underscore immediately before or after a dot are removed
sanitizeMetricName = flag.Bool("datadog.sanitizeMetricName", true, "Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at "+
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics")
)
// Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
//
// callback shouldn't hold series after returning.
func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error {
func Parse(r io.Reader, contentEncoding string, callback func(series []datadogv1.Series) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
@ -70,8 +55,8 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.S
series := req.Series
for i := range series {
rows += len(series[i].Points)
if *sanitizeMetricName {
series[i].Metric = sanitizeName(series[i].Metric)
if *datadogutils.SanitizeMetricName {
series[i].Metric = datadogutils.SanitizeName(series[i].Metric)
}
}
rowsRead.Add(rows)
@ -94,25 +79,25 @@ func (ctx *pushCtx) reset() {
func (ctx *pushCtx) Read() error {
readCalls.Inc()
lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1)
lr := io.LimitReader(ctx.br, int64(datadogutils.MaxInsertRequestSize.N)+1)
startTime := fasttime.UnixTimestamp()
reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
if reqLen > int64(maxInsertRequestSize.N) {
if reqLen > int64(datadogutils.MaxInsertRequestSize.N) {
readErrors.Inc()
return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N)
return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", datadogutils.MaxInsertRequestSize.N)
}
return nil
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadog"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadog"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadog"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadog"}`)
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadogv1"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadogv1"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadogv1"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadogv1"}`)
)
func getPushCtx(r io.Reader) *pushCtx {
@ -144,36 +129,16 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *datadog.Request {
func getRequest() *datadogv1.Request {
v := requestPool.Get()
if v == nil {
return &datadog.Request{}
return &datadogv1.Request{}
}
return v.(*datadog.Request)
return v.(*datadogv1.Request)
}
func putRequest(req *datadog.Request) {
func putRequest(req *datadogv1.Request) {
requestPool.Put(req)
}
var requestPool sync.Pool
// sanitizeName performs DataDog-compatible sanitizing for metric names
//
// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
func sanitizeName(name string) string {
return namesSanitizer.Transform(name)
}
var namesSanitizer = bytesutil.NewFastStringTransformer(func(s string) string {
s = unsupportedDatadogChars.ReplaceAllString(s, "_")
s = multiUnderscores.ReplaceAllString(s, "_")
s = underscoresWithDots.ReplaceAllString(s, ".")
return s
})
var (
unsupportedDatadogChars = regexp.MustCompile(`[^0-9a-zA-Z_\.]+`)
multiUnderscores = regexp.MustCompile(`_+`)
underscoresWithDots = regexp.MustCompile(`_?\._?`)
)

View File

@ -0,0 +1,143 @@
package datadogv2
import (
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// Request represents DataDog POST request to /api/v2/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Request struct {
Series []Series `json:"series"`
}
func (req *Request) reset() {
// recursively reset all the fields in req in order to avoid field value
// re-use in json.Unmarshal() when the corresponding field is missing
// in the unmarshaled JSON.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432
series := req.Series
for i := range series {
series[i].reset()
}
req.Series = series[:0]
}
// UnmarshalJSON unmarshals JSON DataDog /api/v2/series request body from b to req.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
//
// b shouldn't be modified when req is in use.
func UnmarshalJSON(req *Request, b []byte) error {
req.reset()
if err := json.Unmarshal(b, req); err != nil {
return fmt.Errorf("cannot unmarshal %q: %w", b, err)
}
// Set missing timestamps to the current time.
currentTimestamp := int64(fasttime.UnixTimestamp())
series := req.Series
for i := range series {
points := series[i].Points
for j := range points {
pt := &points[j]
if pt.Timestamp <= 0 {
pt.Timestamp = currentTimestamp
}
}
}
return nil
}
// UnmarshalProtobuf unmarshals protobuf DataDog /api/v2/series request body from b to req.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
//
// b shouldn't be modified when req is in use.
func UnmarshalProtobuf(req *Request, b []byte) error {
req.reset()
_ = b
return fmt.Errorf("unimplemented")
}
// Series represents a series item from DataDog POST request to /api/v2/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Series struct {
// Do not decode Interval, since it isn't used by VictoriaMetrics
// Interval int64 `json:"interval"`
// Do not decode Metadata, since it isn't used by VictoriaMetrics
// Metadata Metadata `json:"metadata"`
// Metric is the name of the metric
Metric string `json:"metric"`
// Points points for the given metric
Points []Point `json:"points"`
Resources []Resource `json:"resources"`
// Do not decode SourceTypeName, since it isn't used by VictoriaMetrics
// SourceTypeName string `json:"source_type_name"`
Tags []string
// Do not decode Type, since it isn't used by VictoriaMetrics
// Type int `json:"type"`
// Do not decode Unit, since it isn't used by VictoriaMetrics
// Unit string
}
func (s *Series) reset() {
s.Metric = ""
points := s.Points
for i := range points {
points[i].reset()
}
s.Points = points[:0]
resources := s.Resources
for i := range resources {
resources[i].reset()
}
s.Resources = resources[:0]
tags := s.Tags
for i := range tags {
tags[i] = ""
}
s.Tags = tags[:0]
}
// Point represents a point from DataDog POST request to /api/v2/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Point struct {
// Timestamp is point timestamp in seconds
Timestamp int64 `json:"timestamp"`
// Value is point value
Value float64 `json:"value"`
}
func (pt *Point) reset() {
pt.Timestamp = 0
pt.Value = 0
}
// Resource is series resource from DataDog POST request to /api/v2/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Resource struct {
Name string `json:"name"`
Type string `json:"type"`
}
func (r *Resource) reset() {
r.Name = ""
r.Type = ""
}

View File

@ -0,0 +1,77 @@
package datadogv2
import (
"reflect"
"testing"
)
func TestRequestUnmarshalJSONFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var req Request
if err := UnmarshalJSON(&req, []byte(s)); err == nil {
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
}
}
f("")
f("foobar")
f(`{"series":123`)
f(`1234`)
f(`[]`)
}
func TestRequestUnmarshalJSONSuccess(t *testing.T) {
f := func(s string, reqExpected *Request) {
t.Helper()
var req Request
if err := UnmarshalJSON(&req, []byte(s)); err != nil {
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
}
if !reflect.DeepEqual(&req, reqExpected) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &req, reqExpected)
}
}
f("{}", &Request{})
f(`
{
"series": [
{
"metric": "system.load.1",
"type": 0,
"points": [
{
"timestamp": 1636629071,
"value": 0.7
}
],
"resources": [
{
"name": "dummyhost",
"type": "host"
}
],
"tags": ["environment:test"]
}
]
}
`, &Request{
Series: []Series{{
Metric: "system.load.1",
Points: []Point{
{
Timestamp: 1636629071,
Value: 0.7,
},
},
Resources: []Resource{
{
Name: "dummyhost",
Type: "host",
},
},
Tags: []string{
"environment:test",
},
}},
})
}

View File

@ -0,0 +1,43 @@
package datadogv2
import (
"fmt"
"testing"
)
func BenchmarkRequestUnmarshalJSON(b *testing.B) {
reqBody := []byte(`{
"series": [
{
"metric": "system.load.1",
"type": 0,
"points": [
{
"timestamp": 1636629071,
"value": 0.7
}
],
"resources": [
{
"name": "dummyhost",
"type": "host"
}
],
"tags": ["environment:test"]
}
]
}`)
b.SetBytes(int64(len(reqBody)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var req Request
for pb.Next() {
if err := UnmarshalJSON(&req, reqBody); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
if len(req.Series) != 1 {
panic(fmt.Errorf("unexpected number of series unmarshaled: got %d; want 4", len(req.Series)))
}
}
})
}

View File

@ -0,0 +1,154 @@
package stream
import (
"bufio"
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
// Parse parses DataDog POST request for /api/v2/series from reader and calls callback for the parsed request.
//
// callback shouldn't hold series after returning.
func Parse(r io.Reader, contentEncoding, contentType string, callback func(series []datadogv2.Series) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
switch contentEncoding {
case "gzip":
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped DataDog data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
case "deflate":
zlr, err := common.GetZlibReader(r)
if err != nil {
return fmt.Errorf("cannot read deflated DataDog data: %w", err)
}
defer common.PutZlibReader(zlr)
r = zlr
}
ctx := getPushCtx(r)
defer putPushCtx(ctx)
if err := ctx.Read(); err != nil {
return err
}
req := getRequest()
defer putRequest(req)
var err error
switch contentType {
case "application/x-protobuf":
err = datadogv2.UnmarshalProtobuf(req, ctx.reqBuf.B)
default:
err = datadogv2.UnmarshalJSON(req, ctx.reqBuf.B)
}
if err != nil {
unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal DataDog %s request with size %d bytes: %w", contentType, len(ctx.reqBuf.B), err)
}
rows := 0
series := req.Series
for i := range series {
rows += len(series[i].Points)
if *datadogutils.SanitizeMetricName {
series[i].Metric = datadogutils.SanitizeName(series[i].Metric)
}
}
rowsRead.Add(rows)
if err := callback(series); err != nil {
return fmt.Errorf("error when processing imported data: %w", err)
}
return nil
}
type pushCtx struct {
br *bufio.Reader
reqBuf bytesutil.ByteBuffer
}
func (ctx *pushCtx) reset() {
ctx.br.Reset(nil)
ctx.reqBuf.Reset()
}
func (ctx *pushCtx) Read() error {
readCalls.Inc()
lr := io.LimitReader(ctx.br, int64(datadogutils.MaxInsertRequestSize.N)+1)
startTime := fasttime.UnixTimestamp()
reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
if reqLen > int64(datadogutils.MaxInsertRequestSize.N) {
readErrors.Inc()
return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", datadogutils.MaxInsertRequestSize.N)
}
return nil
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadogv2"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadogv2"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadogv2"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadogv2"}`)
)
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *datadogv2.Request {
v := requestPool.Get()
if v == nil {
return &datadogv2.Request{}
}
return v.(*datadogv2.Request)
}
func putRequest(req *datadogv2.Request) {
requestPool.Put(req)
}
var requestPool sync.Pool