mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
Revert "add datadog /api/v2/series and /api/beta/sketches support (#5094)"
This reverts commit d6b4c8e4ef
.
Reason for revert: https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5094#issuecomment-1839789080
This commit is contained in:
parent
61db92cdc7
commit
559e4db512
@ -8,6 +8,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
@ -19,7 +20,7 @@ var (
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`)
|
||||
)
|
||||
|
||||
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series, /api/v2/series, /api/beta/sketches request.
|
||||
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
|
||||
@ -27,23 +28,66 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return stream.Parse(
|
||||
req, func(series prompbmarshal.TimeSeries) error {
|
||||
series.Labels = append(series.Labels, extraLabels...)
|
||||
return insertRows(at, series)
|
||||
},
|
||||
)
|
||||
ce := req.Header.Get("Content-Encoding")
|
||||
return stream.Parse(req.Body, ce, func(series []datadog.Series) error {
|
||||
return insertRows(at, series, extraLabels)
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, series prompbmarshal.TimeSeries) error {
|
||||
func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
rowsTotal := len(series.Samples)
|
||||
|
||||
ctx.WriteRequest.Timeseries = []prompbmarshal.TimeSeries{series}
|
||||
ctx.Labels = series.Labels
|
||||
ctx.Samples = series.Samples
|
||||
rowsTotal := 0
|
||||
tssDst := ctx.WriteRequest.Timeseries[:0]
|
||||
labels := ctx.Labels[:0]
|
||||
samples := ctx.Samples[:0]
|
||||
for i := range series {
|
||||
ss := &series[i]
|
||||
rowsTotal += len(ss.Points)
|
||||
labelsLen := len(labels)
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "__name__",
|
||||
Value: ss.Metric,
|
||||
})
|
||||
if ss.Host != "" {
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "host",
|
||||
Value: ss.Host,
|
||||
})
|
||||
}
|
||||
if ss.Device != "" {
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "device",
|
||||
Value: ss.Device,
|
||||
})
|
||||
}
|
||||
for _, tag := range ss.Tags {
|
||||
name, value := datadog.SplitTag(tag)
|
||||
if name == "host" {
|
||||
name = "exported_host"
|
||||
}
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: name,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
labels = append(labels, extraLabels...)
|
||||
samplesLen := len(samples)
|
||||
for _, pt := range ss.Points {
|
||||
samples = append(samples, prompbmarshal.Sample{
|
||||
Timestamp: pt.Timestamp(),
|
||||
Value: pt.Value(),
|
||||
})
|
||||
}
|
||||
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
||||
Labels: labels[labelsLen:],
|
||||
Samples: samples[samplesLen:],
|
||||
})
|
||||
}
|
||||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
|
||||
return remotewrite.ErrQueueFullHTTPRetry
|
||||
}
|
||||
|
@ -343,9 +343,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "/datadog/api/v1/series":
|
||||
datadogWriteSeriesV1Requests.Inc()
|
||||
datadogWriteRequests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
|
||||
datadogWriteSeriesV1Errors.Inc()
|
||||
datadogWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
@ -354,27 +354,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "/datadog/api/v2/series":
|
||||
datadogWriteSeriesV2Requests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
|
||||
datadogWriteSeriesV2Errors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"errors":[]}`)
|
||||
return true
|
||||
case "/datadog/api/beta/sketches":
|
||||
datadogWriteSketchesBetaRequests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
|
||||
datadogWriteSketchesBetaErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(202)
|
||||
return true
|
||||
case "/datadog/api/v1/validate":
|
||||
datadogValidateRequests.Inc()
|
||||
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
|
||||
@ -587,9 +566,9 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "datadog/api/v1/series":
|
||||
datadogWriteSeriesV1Requests.Inc()
|
||||
datadogWriteRequests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogWriteSeriesV1Errors.Inc()
|
||||
datadogWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
@ -597,26 +576,6 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "datadog/api/v2/series":
|
||||
datadogWriteSeriesV2Requests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogWriteSeriesV2Errors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"errors":[]}`)
|
||||
return true
|
||||
case "datadog/api/beta/sketches":
|
||||
datadogWriteSketchesBetaRequests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogWriteSketchesBetaErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(202)
|
||||
return true
|
||||
case "datadog/api/v1/validate":
|
||||
datadogValidateRequests.Inc()
|
||||
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
|
||||
@ -667,14 +626,8 @@ var (
|
||||
|
||||
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`)
|
||||
|
||||
datadogWriteSeriesV1Requests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
|
||||
datadogWriteSeriesV1Errors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
|
||||
|
||||
datadogWriteSeriesV2Requests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`)
|
||||
datadogWriteSeriesV2Errors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`)
|
||||
|
||||
datadogWriteSketchesBetaRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/beta/sketches", protocol="datadog"}`)
|
||||
datadogWriteSketchesBetaErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/beta/sketches", protocol="datadog"}`)
|
||||
datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
|
||||
datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
|
||||
|
||||
datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
|
||||
datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
@ -20,7 +21,7 @@ var (
|
||||
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadog"}`)
|
||||
)
|
||||
|
||||
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series, /api/v2/series, /api/v1/sketches, /api/beta/sketches request.
|
||||
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
|
||||
@ -28,44 +29,62 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return stream.Parse(
|
||||
req, func(series prompbmarshal.TimeSeries) error {
|
||||
series.Labels = append(series.Labels, extraLabels...)
|
||||
return insertRows(at, series)
|
||||
},
|
||||
)
|
||||
ce := req.Header.Get("Content-Encoding")
|
||||
return stream.Parse(req.Body, ce, func(series []parser.Series) error {
|
||||
return insertRows(at, series, extraLabels)
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, series prompbmarshal.TimeSeries) error {
|
||||
func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := netstorage.GetInsertCtx()
|
||||
defer netstorage.PutInsertCtx(ctx)
|
||||
|
||||
ctx.Reset()
|
||||
rowsTotal := 0
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
rowsTotal := len(series.Samples)
|
||||
|
||||
for i := range series {
|
||||
ss := &series[i]
|
||||
rowsTotal += len(ss.Points)
|
||||
ctx.Labels = ctx.Labels[:0]
|
||||
for l := range series.Labels {
|
||||
ctx.AddLabel(series.Labels[l].Name, series.Labels[l].Value)
|
||||
ctx.AddLabel("", ss.Metric)
|
||||
if ss.Host != "" {
|
||||
ctx.AddLabel("host", ss.Host)
|
||||
}
|
||||
if ss.Device != "" {
|
||||
ctx.AddLabel("device", ss.Device)
|
||||
}
|
||||
for _, tag := range ss.Tags {
|
||||
name, value := parser.SplitTag(tag)
|
||||
if name == "host" {
|
||||
name = "exported_host"
|
||||
}
|
||||
ctx.AddLabel(name, value)
|
||||
}
|
||||
for j := range extraLabels {
|
||||
label := &extraLabels[j]
|
||||
ctx.AddLabel(label.Name, label.Value)
|
||||
}
|
||||
if hasRelabeling {
|
||||
ctx.ApplyRelabeling()
|
||||
}
|
||||
if len(ctx.Labels) == 0 {
|
||||
return nil
|
||||
// Skip metric without labels.
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
atLocal := ctx.GetLocalAuthToken(at)
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
|
||||
for _, sample := range series.Samples {
|
||||
err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, sample.Timestamp, sample.Value)
|
||||
if err != nil {
|
||||
for _, pt := range ss.Points {
|
||||
timestamp := pt.Timestamp()
|
||||
value := pt.Value()
|
||||
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
perTenantRows[*atLocal] = rowsTotal
|
||||
perTenantRows[*atLocal] += len(ss.Points)
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
|
@ -325,9 +325,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "datadog/api/v1/series":
|
||||
datadogWriteSeriesV1Requests.Inc()
|
||||
datadogWriteRequests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogWriteSeriesV1Errors.Inc()
|
||||
datadogWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
@ -336,27 +336,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "datadog/api/v2/series":
|
||||
datadogWriteSeriesV2Requests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogWriteSeriesV2Errors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"errors":[]}`)
|
||||
return true
|
||||
case "datadog/api/beta/sketches":
|
||||
datadogWriteSketchesBetaRequests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogWriteSketchesBetaErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(202)
|
||||
return true
|
||||
case "datadog/api/v1/validate":
|
||||
datadogValidateRequests.Inc()
|
||||
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
|
||||
@ -424,14 +403,8 @@ var (
|
||||
newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/newrelic/inventory/deltas", protocol="newrelic"}`)
|
||||
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/newrelic", protocol="newrelic"}`)
|
||||
|
||||
datadogWriteSeriesV1Requests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
|
||||
datadogWriteSeriesV1Errors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
|
||||
|
||||
datadogWriteSeriesV2Requests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v2/series", protocol="datadog"}`)
|
||||
datadogWriteSeriesV2Errors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/datadog/api/v2/series", protocol="datadog"}`)
|
||||
|
||||
datadogWriteSketchesBetaRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/beta/sketches", protocol="datadog"}`)
|
||||
datadogWriteSketchesBetaErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/datadog/api/beta/sketches", protocol="datadog"}`)
|
||||
datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
|
||||
datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
|
||||
|
||||
datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/validate", protocol="datadog"}`)
|
||||
datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/check_run", protocol="datadog"}`)
|
||||
|
@ -363,8 +363,7 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr
|
||||
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details.
|
||||
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
- `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry).
|
||||
- `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API v1](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
|
||||
- `datadog/api/v2/series` - for ingesting data with [DataDog submit metrics API v2](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
|
||||
- `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
|
||||
- `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
|
||||
- `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details.
|
||||
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
|
||||
@ -897,7 +896,7 @@ Below is the output for `/path/to/vminsert -help`:
|
||||
-csvTrimTimestamp duration
|
||||
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
|
||||
-datadog.maxInsertRequestSize size
|
||||
The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches
|
||||
The maximum size in bytes of a single DataDog POST request to /api/v1/series
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-datadog.sanitizeMetricName
|
||||
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)
|
||||
|
@ -510,15 +510,10 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
|
||||
|
||||
## How to send data from DataDog agent
|
||||
|
||||
VictoriaMetrics accepts data in the following protocols:
|
||||
* [DataDog agent](https://docs.datadoghq.com/agent/)
|
||||
* [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
|
||||
* [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/)
|
||||
|
||||
Via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at the following path:
|
||||
* `/datadog/api/v1/series`
|
||||
* `/datadog/api/v2/series`
|
||||
* `/datadog/api/beta/sketches`
|
||||
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/)
|
||||
or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
|
||||
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics)
|
||||
at `/datadog/api/v1/series` path.
|
||||
|
||||
### Sending metrics to VictoriaMetrics
|
||||
|
||||
@ -590,19 +585,6 @@ additional_endpoints:
|
||||
|
||||
</div>
|
||||
|
||||
### Send via Serverless DataDog plugin
|
||||
|
||||
Disable logs (logs ingestion is not supported by Victoria Metrics) and set a custom endpoint in serverless.yaml
|
||||
```
|
||||
custom:
|
||||
datadog:
|
||||
enableDDLogs: false # Disabled not supported DD logs
|
||||
apiKey: fakekey # Set any key, otherwise plugin fails
|
||||
provider:
|
||||
environment:
|
||||
DD_DD_URL: <<vm-url>>/datadog # Victoria Metrics endpoint for DataDog
|
||||
```
|
||||
|
||||
### Send via cURL
|
||||
|
||||
See how to send data to VictoriaMetrics via
|
||||
@ -2585,7 +2567,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
||||
-csvTrimTimestamp duration
|
||||
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
|
||||
-datadog.maxInsertRequestSize size
|
||||
The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches
|
||||
The maximum size in bytes of a single DataDog POST request to /api/v1/series
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-datadog.sanitizeMetricName
|
||||
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)
|
||||
|
@ -518,15 +518,10 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
|
||||
|
||||
## How to send data from DataDog agent
|
||||
|
||||
VictoriaMetrics accepts data in the following protocols:
|
||||
* [DataDog agent](https://docs.datadoghq.com/agent/)
|
||||
* [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
|
||||
* [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/)
|
||||
|
||||
Via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at the following path:
|
||||
* `/datadog/api/v1/series`
|
||||
* `/datadog/api/v2/series`
|
||||
* `/datadog/api/beta/sketches`
|
||||
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/)
|
||||
or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
|
||||
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics)
|
||||
at `/datadog/api/v1/series` path.
|
||||
|
||||
### Sending metrics to VictoriaMetrics
|
||||
|
||||
@ -598,19 +593,6 @@ additional_endpoints:
|
||||
|
||||
</div>
|
||||
|
||||
### Send via Serverless DataDog plugin
|
||||
|
||||
Disable logs (logs ingestion is not supported by Victoria Metrics) and set a custom endpoint in serverless.yaml
|
||||
```
|
||||
custom:
|
||||
datadog:
|
||||
enableDDLogs: false # Disabled not supported DD logs
|
||||
apiKey: fakekey # Set any key, otherwise plugin fails
|
||||
provider:
|
||||
environment:
|
||||
DD_DD_URL: <<vm-url>>/datadog # Victoria Metrics endpoint for DataDog
|
||||
```
|
||||
|
||||
### Send via cURL
|
||||
|
||||
See how to send data to VictoriaMetrics via
|
||||
@ -2593,7 +2575,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
||||
-csvTrimTimestamp duration
|
||||
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
|
||||
-datadog.maxInsertRequestSize size
|
||||
The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches
|
||||
The maximum size in bytes of a single DataDog POST request to /api/v1/series
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-datadog.sanitizeMetricName
|
||||
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)
|
||||
|
@ -529,86 +529,6 @@ echo '
|
||||
|
||||
</div>
|
||||
|
||||
### /datadog/api/v2/series
|
||||
|
||||
**Imports data in DataDog format into VictoriaMetrics**
|
||||
|
||||
Single-node VictoriaMetrics:
|
||||
<div class="with-copy" markdown="1">
|
||||
|
||||
```console
|
||||
echo '
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"resources": [
|
||||
{
|
||||
"name": "test.example.com",
|
||||
"type": "host"
|
||||
}
|
||||
],
|
||||
"points": [
|
||||
{
|
||||
"timestamp": 1699152159,
|
||||
"value": 0
|
||||
},
|
||||
{
|
||||
"timestamp": 1699152160,
|
||||
"value": 0.5
|
||||
}
|
||||
],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}
|
||||
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:8428/datadog/api/v2/series
|
||||
```
|
||||
|
||||
</div>
|
||||
|
||||
Cluster version of VictoriaMetrics:
|
||||
<div class="with-copy" markdown="1">
|
||||
|
||||
```console
|
||||
echo '
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"resources": [
|
||||
{
|
||||
"name": "test.example.com",
|
||||
"type": "host"
|
||||
}
|
||||
],
|
||||
"points": [
|
||||
{
|
||||
"timestamp": 1699152159,
|
||||
"value": 0
|
||||
},
|
||||
{
|
||||
"timestamp": 1699152160,
|
||||
"value": 0.5
|
||||
}
|
||||
],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}
|
||||
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://<vminsert>:8480/insert/0/datadog/api/v2/series
|
||||
```
|
||||
|
||||
</div>
|
||||
|
||||
Additional information:
|
||||
|
||||
* [How to send data from datadog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent)
|
||||
|
@ -1482,7 +1482,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
||||
-csvTrimTimestamp duration
|
||||
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
|
||||
-datadog.maxInsertRequestSize size
|
||||
The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches
|
||||
The maximum size in bytes of a single DataDog POST request to /api/v1/series
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-datadog.sanitizeMetricName
|
||||
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)
|
||||
|
@ -1,102 +0,0 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
|
||||
)
|
||||
|
||||
// Request represents /api/v1/series request
|
||||
type Request struct {
|
||||
Series []series `json:"series"`
|
||||
}
|
||||
|
||||
// Unmarshal decodes byte array to series v1 Request struct
|
||||
func (r *Request) Unmarshal(b []byte) error {
|
||||
return json.Unmarshal(b, r)
|
||||
}
|
||||
|
||||
// Extract iterates fn execution over all timeseries from series v1 Request
|
||||
func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn func(string) string) error {
|
||||
currentTimestamp := int64(fasttime.UnixTimestamp())
|
||||
for i := range r.Series {
|
||||
s := r.Series[i]
|
||||
samples := make([]prompbmarshal.Sample, 0, len(s.Points))
|
||||
for j := range s.Points {
|
||||
p := s.Points[j]
|
||||
ts, val := p[0], p[1]
|
||||
if ts <= 0 {
|
||||
ts = float64(currentTimestamp)
|
||||
}
|
||||
samples[j] = prompbmarshal.Sample{
|
||||
Timestamp: int64(ts * 1000),
|
||||
Value: val,
|
||||
}
|
||||
}
|
||||
ts := prompbmarshal.TimeSeries{
|
||||
Samples: samples,
|
||||
Labels: s.getLabels(sanitizeFn),
|
||||
}
|
||||
if err := fn(ts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// series represents a series item from DataDog POST request to /api/v1/series
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
type series struct {
|
||||
Metric string `json:"metric"`
|
||||
Host string `json:"host"`
|
||||
|
||||
// The device field does not appear in the datadog docs, but datadog-agent does use it.
|
||||
// Datadog agent (v7 at least), removes the tag "device" and adds it as its own field. Why? That I don't know!
|
||||
// https://github.com/DataDog/datadog-agent/blob/0ada7a97fed6727838a6f4d9c87123d2aafde735/pkg/metrics/series.go#L84-L105
|
||||
Device string `json:"device"`
|
||||
|
||||
// Do not decode Interval, since it isn't used by VictoriaMetrics
|
||||
// Interval int64 `json:"interval"`
|
||||
|
||||
Points []point `json:"points"`
|
||||
Tags []string `json:"tags"`
|
||||
|
||||
// Do not decode type, since it isn't used by VictoriaMetrics
|
||||
// Type string `json:"type"`
|
||||
}
|
||||
|
||||
func (s *series) getLabels(sanitizeFn func(string) string) []prompbmarshal.Label {
|
||||
labels := []prompbmarshal.Label{{
|
||||
Name: "__name__",
|
||||
Value: sanitizeFn(s.Metric),
|
||||
}}
|
||||
if s.Host != "" {
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "host",
|
||||
Value: s.Host,
|
||||
})
|
||||
}
|
||||
if s.Device != "" {
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "device",
|
||||
Value: s.Device,
|
||||
})
|
||||
}
|
||||
for _, tag := range s.Tags {
|
||||
name, value := datadog.SplitTag(tag)
|
||||
if name == "host" {
|
||||
name = "exported_host"
|
||||
}
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: sanitizeFn(name),
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
// point represents a point from DataDog POST request to /api/v1/series
|
||||
type point [2]float64
|
@ -1,71 +0,0 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRequestUnmarshalFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
req := new(Request)
|
||||
if err := req.Unmarshal([]byte(s)); err == nil {
|
||||
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
|
||||
}
|
||||
}
|
||||
f("")
|
||||
f("foobar")
|
||||
f(`{"series":123`)
|
||||
f(`1234`)
|
||||
f(`[]`)
|
||||
}
|
||||
|
||||
func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) {
|
||||
t.Helper()
|
||||
req := new(Request)
|
||||
if err := req.Unmarshal(s); err != nil {
|
||||
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
|
||||
}
|
||||
if !reflect.DeepEqual(req, reqExpected) {
|
||||
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestUnmarshalSuccess(t *testing.T) {
|
||||
unmarshalRequestValidator(
|
||||
t, []byte("{}"), new(Request),
|
||||
)
|
||||
unmarshalRequestValidator(t, []byte(`
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"host": "test.example.com",
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"device": "/dev/sda",
|
||||
"points": [[
|
||||
1575317847,
|
||||
0.5
|
||||
]],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}
|
||||
`), &Request{
|
||||
Series: []series{{
|
||||
Host: "test.example.com",
|
||||
Metric: "system.load.1",
|
||||
Device: "/dev/sda",
|
||||
Points: []point{{
|
||||
1575317847,
|
||||
0.5,
|
||||
}},
|
||||
Tags: []string{
|
||||
"environment:test",
|
||||
},
|
||||
}},
|
||||
})
|
||||
}
|
@ -1,89 +0,0 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
|
||||
)
|
||||
|
||||
// Request represents a sketches item from DataDog POST request to /api/v2/series
|
||||
type Request struct {
|
||||
Series []series `json:"series"`
|
||||
}
|
||||
|
||||
// Unmarshal decodes byte array to series v2 Request struct
|
||||
func (r *Request) Unmarshal(b []byte) error {
|
||||
return json.Unmarshal(b, r)
|
||||
}
|
||||
|
||||
// Extract iterates fn execution over all timeseries from series v2 request
|
||||
func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn func(string) string) error {
|
||||
for i := range r.Series {
|
||||
s := r.Series[i]
|
||||
samples := make([]prompbmarshal.Sample, 0, len(s.Points))
|
||||
for j := range s.Points {
|
||||
p := s.Points[j]
|
||||
samples = append(samples, prompbmarshal.Sample{
|
||||
Timestamp: p.Timestamp * 1000,
|
||||
Value: p.Value,
|
||||
})
|
||||
}
|
||||
ts := prompbmarshal.TimeSeries{
|
||||
Samples: samples,
|
||||
Labels: s.getLabels(sanitizeFn),
|
||||
}
|
||||
if err := fn(ts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// series represents a series item from DataDog POST request to /api/v2/series
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
type series struct {
|
||||
Metric string `json:"metric"`
|
||||
Resources []resource `json:"resources"`
|
||||
Points []point `json:"points"`
|
||||
Tags []string `json:"tags"`
|
||||
|
||||
// Do not decode Type, since it isn't used by VictoriaMetrics
|
||||
// Type string `json:"type"`
|
||||
}
|
||||
|
||||
type resource struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
func (s *series) getLabels(sanitizeFn func(string) string) []prompbmarshal.Label {
|
||||
labels := []prompbmarshal.Label{{
|
||||
Name: "__name__",
|
||||
Value: sanitizeFn(s.Metric),
|
||||
}}
|
||||
for _, res := range s.Resources {
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: sanitizeFn(res.Type),
|
||||
Value: res.Name,
|
||||
})
|
||||
}
|
||||
for _, tag := range s.Tags {
|
||||
name, value := datadog.SplitTag(tag)
|
||||
if name == "host" {
|
||||
name = "exported_host"
|
||||
}
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: sanitizeFn(name),
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
// point represents a point from DataDog POST request to /api/v2/series
|
||||
type point struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Value float64 `json:"value"`
|
||||
}
|
@ -1,83 +0,0 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRequestUnmarshalFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
req := new(Request)
|
||||
if err := req.Unmarshal([]byte(s)); err == nil {
|
||||
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
|
||||
}
|
||||
}
|
||||
f("")
|
||||
f("foobar")
|
||||
f(`{"series":123`)
|
||||
f(`1234`)
|
||||
f(`[]`)
|
||||
}
|
||||
|
||||
func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) {
|
||||
t.Helper()
|
||||
req := new(Request)
|
||||
if err := req.Unmarshal(s); err != nil {
|
||||
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
|
||||
}
|
||||
if !reflect.DeepEqual(req, reqExpected) {
|
||||
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestUnmarshalSuccess(t *testing.T) {
|
||||
unmarshalRequestValidator(
|
||||
t, []byte("{}"), new(Request),
|
||||
)
|
||||
unmarshalRequestValidator(t, []byte(`
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"resources": [
|
||||
{
|
||||
"name": "test.example.com",
|
||||
"type": "host"
|
||||
}, {
|
||||
"name": "/dev/sda",
|
||||
"type": "device"
|
||||
}
|
||||
],
|
||||
"points": [{
|
||||
"timestamp": 1575317847,
|
||||
"value": 0.5
|
||||
}],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}
|
||||
`), &Request{
|
||||
Series: []series{{
|
||||
Metric: "system.load.1",
|
||||
Resources: []resource{{
|
||||
Name: "test.example.com",
|
||||
Type: "host",
|
||||
}, {
|
||||
Name: "/dev/sda",
|
||||
Type: "device",
|
||||
}},
|
||||
Points: []point{{
|
||||
Timestamp: 1575317847,
|
||||
Value: 0.5,
|
||||
}},
|
||||
Tags: []string{
|
||||
"environment:test",
|
||||
},
|
||||
}},
|
||||
})
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func BenchmarkRequestUnmarshal(b *testing.B) {
|
||||
reqBody := []byte(`{
|
||||
"series": [
|
||||
{
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"resources": [{
|
||||
"name": "test.example.com",
|
||||
"type": "host"
|
||||
}],
|
||||
"points": [
|
||||
{
|
||||
"timestamp": 1575317847,
|
||||
"value": 0.5
|
||||
}
|
||||
],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}`)
|
||||
b.SetBytes(int64(len(reqBody)))
|
||||
b.ReportAllocs()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
req := new(Request)
|
||||
for pb.Next() {
|
||||
if err := req.Unmarshal(reqBody); err != nil {
|
||||
panic(fmt.Errorf("unexpected error: %w", err))
|
||||
}
|
||||
if len(req.Series) != 1 {
|
||||
panic(fmt.Errorf("unexpected number of series unmarshaled: got %d; want 4", len(req.Series)))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
# Datadog proto files
|
||||
|
||||
Content copied from https://github.com/DataDog/agent-payload/blob/master/proto/metrics/agent_payload.proto
|
||||
|
||||
## Requirements
|
||||
- protoc binary [link](http://google.github.io/proto-lens/installing-protoc.html)
|
||||
- golang-proto-gen[link](https://developers.google.com/protocol-buffers/docs/reference/go-generated)
|
||||
- custom marshaller [link](https://github.com/planetscale/vtprotobuf)
|
||||
|
||||
## Modifications
|
||||
|
||||
Original proto files were modified:
|
||||
1) changed package name for `package beta`.
|
||||
2) changed import paths - changed directory names.
|
||||
3) changed go_package for `./pb`.
|
||||
|
||||
|
||||
## How to generate pbs
|
||||
|
||||
run command:
|
||||
```bash
|
||||
export GOBIN=~/go/bin protoc
|
||||
protoc -I=. --go_out=./lib/protoparser/datadog/api/sketches/beta --go-vtproto_out=./lib/protoparser/datadog/api/sketches/beta --plugin protoc-gen-go-vtproto="$GOBIN/protoc-gen-go-vtproto" --go-vtproto_opt=features=unmarshal lib/protoparser/datadog/api/sketches/beta/proto/*.proto
|
||||
```
|
||||
|
||||
Generated code will be at `lib/protoparser/datadog/api/sketches/beta/pb`
|
||||
|
||||
manually edit it:
|
||||
|
||||
1) remove all external imports
|
||||
2) remove all unneeded methods
|
||||
3) replace `unknownFields` with `unknownFields []byte`
|
@ -1,95 +0,0 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/sketches/beta/pb"
|
||||
)
|
||||
|
||||
// Request represents a sketches item from DataDog POST request to /api/beta/sketches
|
||||
type Request struct {
|
||||
*pb.SketchPayload
|
||||
}
|
||||
|
||||
// Unmarshal is a wrapper around SketchesPayload Unmarshal method which decodes byte array to SketchPayload struct
|
||||
func (r *Request) Unmarshal(b []byte) error {
|
||||
if r.SketchPayload == nil {
|
||||
r.SketchPayload = new(pb.SketchPayload)
|
||||
}
|
||||
return r.SketchPayload.UnmarshalVT(b)
|
||||
}
|
||||
|
||||
// Extract iterates fn function execution over all timeseries from a sketch payload
|
||||
func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn func(string) string) error {
|
||||
var err error
|
||||
for _, sketch := range r.SketchPayload.Sketches {
|
||||
sketchSeries := make([]prompbmarshal.TimeSeries, 5)
|
||||
for _, point := range sketch.Dogsketches {
|
||||
timestamp := int64(point.Ts * 1000)
|
||||
updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{
|
||||
"max": point.Max,
|
||||
"min": point.Min,
|
||||
"cnt": float64(point.Cnt),
|
||||
"avg": point.Avg,
|
||||
"sum": point.Sum,
|
||||
})
|
||||
}
|
||||
for _, point := range sketch.Distributions {
|
||||
timestamp := int64(point.Ts * 1000)
|
||||
updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{
|
||||
"max": point.Max,
|
||||
"min": point.Min,
|
||||
"cnt": float64(point.Cnt),
|
||||
"avg": point.Avg,
|
||||
"sum": point.Sum,
|
||||
})
|
||||
}
|
||||
labels := getLabels(sketch, sanitizeFn)
|
||||
for i := range sketchSeries {
|
||||
sketchSeries[i].Labels = append(sketchSeries[i].Labels, labels...)
|
||||
if err = fn(sketchSeries[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getLabels(sketch *pb.SketchPayload_Sketch, sanitizeFn func(string) string) []prompbmarshal.Label {
|
||||
labels := []prompbmarshal.Label{}
|
||||
if sketch.Host != "" {
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "host",
|
||||
Value: sketch.Host,
|
||||
})
|
||||
}
|
||||
for _, tag := range sketch.Tags {
|
||||
name, value := datadog.SplitTag(tag)
|
||||
if name == "host" {
|
||||
name = "exported_host"
|
||||
}
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: sanitizeFn(name),
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func updateSeries(series []prompbmarshal.TimeSeries, metric string, timestamp int64, values map[string]float64) {
|
||||
index := 0
|
||||
for suffix, value := range values {
|
||||
s := series[index]
|
||||
s.Samples = append(s.Samples, prompbmarshal.Sample{
|
||||
Timestamp: timestamp,
|
||||
Value: value,
|
||||
})
|
||||
if len(s.Labels) == 0 {
|
||||
s.Labels = append(s.Labels, prompbmarshal.Label{
|
||||
Name: "",
|
||||
Value: metric + "_" + suffix,
|
||||
})
|
||||
}
|
||||
index++
|
||||
}
|
||||
}
|
@ -1,159 +0,0 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.29.0
|
||||
// protoc v3.15.8
|
||||
// source: lib/protoparser/datadog/api/sketches/beta/proto/sketches.proto
|
||||
|
||||
package pb
|
||||
|
||||
type MetricPayload_MetricType int32
|
||||
|
||||
const (
|
||||
MetricPayload_UNSPECIFIED MetricPayload_MetricType = 0
|
||||
MetricPayload_COUNT MetricPayload_MetricType = 1
|
||||
MetricPayload_RATE MetricPayload_MetricType = 2
|
||||
MetricPayload_GAUGE MetricPayload_MetricType = 3
|
||||
)
|
||||
|
||||
// Enum value maps for MetricPayload_MetricType.
|
||||
var (
|
||||
MetricPayload_MetricType_name = map[int32]string{
|
||||
0: "UNSPECIFIED",
|
||||
1: "COUNT",
|
||||
2: "RATE",
|
||||
3: "GAUGE",
|
||||
}
|
||||
MetricPayload_MetricType_value = map[string]int32{
|
||||
"UNSPECIFIED": 0,
|
||||
"COUNT": 1,
|
||||
"RATE": 2,
|
||||
"GAUGE": 3,
|
||||
}
|
||||
)
|
||||
|
||||
func (x MetricPayload_MetricType) Enum() *MetricPayload_MetricType {
|
||||
p := new(MetricPayload_MetricType)
|
||||
*p = x
|
||||
return p
|
||||
}
|
||||
|
||||
type CommonMetadata struct {
|
||||
unknownFields []byte
|
||||
|
||||
AgentVersion string `protobuf:"bytes,1,opt,name=agent_version,json=agentVersion,proto3" json:"agent_version,omitempty"`
|
||||
Timezone string `protobuf:"bytes,2,opt,name=timezone,proto3" json:"timezone,omitempty"`
|
||||
CurrentEpoch float64 `protobuf:"fixed64,3,opt,name=current_epoch,json=currentEpoch,proto3" json:"current_epoch,omitempty"`
|
||||
InternalIp string `protobuf:"bytes,4,opt,name=internal_ip,json=internalIp,proto3" json:"internal_ip,omitempty"`
|
||||
PublicIp string `protobuf:"bytes,5,opt,name=public_ip,json=publicIp,proto3" json:"public_ip,omitempty"`
|
||||
ApiKey string `protobuf:"bytes,6,opt,name=api_key,json=apiKey,proto3" json:"api_key,omitempty"`
|
||||
}
|
||||
|
||||
type MetricPayload struct {
|
||||
unknownFields []byte
|
||||
|
||||
Series []*MetricPayload_MetricSeries `protobuf:"bytes,1,rep,name=series,proto3" json:"series,omitempty"`
|
||||
}
|
||||
|
||||
type EventsPayload struct {
|
||||
unknownFields []byte
|
||||
|
||||
Events []*EventsPayload_Event `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"`
|
||||
Metadata *CommonMetadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
type SketchPayload struct {
|
||||
unknownFields []byte
|
||||
|
||||
Sketches []*SketchPayload_Sketch `protobuf:"bytes,1,rep,name=sketches,proto3" json:"sketches,omitempty"`
|
||||
Metadata *CommonMetadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
type MetricPayload_MetricPoint struct {
|
||||
unknownFields []byte
|
||||
|
||||
// metric value
|
||||
Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
|
||||
// timestamp for this value in seconds since the UNIX epoch
|
||||
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
}
|
||||
|
||||
type MetricPayload_Resource struct {
|
||||
unknownFields []byte
|
||||
|
||||
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
|
||||
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
|
||||
}
|
||||
|
||||
type MetricPayload_MetricSeries struct {
|
||||
unknownFields []byte
|
||||
|
||||
// Resources this series applies to; include at least
|
||||
// { type="host", name=<hostname> }
|
||||
Resources []*MetricPayload_Resource `protobuf:"bytes,1,rep,name=resources,proto3" json:"resources,omitempty"`
|
||||
// metric name
|
||||
Metric string `protobuf:"bytes,2,opt,name=metric,proto3" json:"metric,omitempty"`
|
||||
// tags for this metric
|
||||
Tags []string `protobuf:"bytes,3,rep,name=tags,proto3" json:"tags,omitempty"`
|
||||
// data points for this metric
|
||||
Points []*MetricPayload_MetricPoint `protobuf:"bytes,4,rep,name=points,proto3" json:"points,omitempty"`
|
||||
// type of metric
|
||||
Type MetricPayload_MetricType `protobuf:"varint,5,opt,name=type,proto3,enum=beta.MetricPayload_MetricType" json:"type,omitempty"`
|
||||
// metric unit name
|
||||
Unit string `protobuf:"bytes,6,opt,name=unit,proto3" json:"unit,omitempty"`
|
||||
// source of this metric (check name, etc.)
|
||||
SourceTypeName string `protobuf:"bytes,7,opt,name=source_type_name,json=sourceTypeName,proto3" json:"source_type_name,omitempty"`
|
||||
// interval, in seconds, between samples of this metric
|
||||
Interval int64 `protobuf:"varint,8,opt,name=interval,proto3" json:"interval,omitempty"`
|
||||
}
|
||||
|
||||
type EventsPayload_Event struct {
|
||||
unknownFields []byte
|
||||
|
||||
Title string `protobuf:"bytes,1,opt,name=title,proto3" json:"title,omitempty"`
|
||||
Text string `protobuf:"bytes,2,opt,name=text,proto3" json:"text,omitempty"`
|
||||
Ts int64 `protobuf:"varint,3,opt,name=ts,proto3" json:"ts,omitempty"`
|
||||
Priority string `protobuf:"bytes,4,opt,name=priority,proto3" json:"priority,omitempty"`
|
||||
Host string `protobuf:"bytes,5,opt,name=host,proto3" json:"host,omitempty"`
|
||||
Tags []string `protobuf:"bytes,6,rep,name=tags,proto3" json:"tags,omitempty"`
|
||||
AlertType string `protobuf:"bytes,7,opt,name=alert_type,json=alertType,proto3" json:"alert_type,omitempty"`
|
||||
AggregationKey string `protobuf:"bytes,8,opt,name=aggregation_key,json=aggregationKey,proto3" json:"aggregation_key,omitempty"`
|
||||
SourceTypeName string `protobuf:"bytes,9,opt,name=source_type_name,json=sourceTypeName,proto3" json:"source_type_name,omitempty"`
|
||||
}
|
||||
|
||||
type SketchPayload_Sketch struct {
|
||||
unknownFields []byte
|
||||
|
||||
Metric string `protobuf:"bytes,1,opt,name=metric,proto3" json:"metric,omitempty"`
|
||||
Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"`
|
||||
Distributions []*SketchPayload_Sketch_Distribution `protobuf:"bytes,3,rep,name=distributions,proto3" json:"distributions,omitempty"`
|
||||
Tags []string `protobuf:"bytes,4,rep,name=tags,proto3" json:"tags,omitempty"`
|
||||
Dogsketches []*SketchPayload_Sketch_Dogsketch `protobuf:"bytes,7,rep,name=dogsketches,proto3" json:"dogsketches,omitempty"`
|
||||
}
|
||||
|
||||
type SketchPayload_Sketch_Distribution struct {
|
||||
unknownFields []byte
|
||||
|
||||
Ts int64 `protobuf:"varint,1,opt,name=ts,proto3" json:"ts,omitempty"`
|
||||
Cnt int64 `protobuf:"varint,2,opt,name=cnt,proto3" json:"cnt,omitempty"`
|
||||
Min float64 `protobuf:"fixed64,3,opt,name=min,proto3" json:"min,omitempty"`
|
||||
Max float64 `protobuf:"fixed64,4,opt,name=max,proto3" json:"max,omitempty"`
|
||||
Avg float64 `protobuf:"fixed64,5,opt,name=avg,proto3" json:"avg,omitempty"`
|
||||
Sum float64 `protobuf:"fixed64,6,opt,name=sum,proto3" json:"sum,omitempty"`
|
||||
V []float64 `protobuf:"fixed64,7,rep,packed,name=v,proto3" json:"v,omitempty"`
|
||||
G []uint32 `protobuf:"varint,8,rep,packed,name=g,proto3" json:"g,omitempty"`
|
||||
Delta []uint32 `protobuf:"varint,9,rep,packed,name=delta,proto3" json:"delta,omitempty"`
|
||||
Buf []float64 `protobuf:"fixed64,10,rep,packed,name=buf,proto3" json:"buf,omitempty"`
|
||||
}
|
||||
|
||||
type SketchPayload_Sketch_Dogsketch struct {
|
||||
unknownFields []byte
|
||||
|
||||
Ts int64 `protobuf:"varint,1,opt,name=ts,proto3" json:"ts,omitempty"`
|
||||
Cnt int64 `protobuf:"varint,2,opt,name=cnt,proto3" json:"cnt,omitempty"`
|
||||
Min float64 `protobuf:"fixed64,3,opt,name=min,proto3" json:"min,omitempty"`
|
||||
Max float64 `protobuf:"fixed64,4,opt,name=max,proto3" json:"max,omitempty"`
|
||||
Avg float64 `protobuf:"fixed64,5,opt,name=avg,proto3" json:"avg,omitempty"`
|
||||
Sum float64 `protobuf:"fixed64,6,opt,name=sum,proto3" json:"sum,omitempty"`
|
||||
K []int32 `protobuf:"zigzag32,7,rep,packed,name=k,proto3" json:"k,omitempty"`
|
||||
N []uint32 `protobuf:"varint,8,rep,packed,name=n,proto3" json:"n,omitempty"`
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -1,112 +0,0 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package beta;
|
||||
|
||||
option go_package = "./pb";
|
||||
|
||||
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
|
||||
|
||||
message CommonMetadata {
|
||||
string agent_version = 1;
|
||||
string timezone = 2;
|
||||
double current_epoch = 3;
|
||||
string internal_ip = 4;
|
||||
string public_ip = 5;
|
||||
string api_key = 6;
|
||||
}
|
||||
|
||||
message MetricPayload {
|
||||
enum MetricType {
|
||||
UNSPECIFIED = 0;
|
||||
COUNT = 1;
|
||||
RATE = 2;
|
||||
GAUGE = 3;
|
||||
}
|
||||
|
||||
message MetricPoint {
|
||||
// metric value
|
||||
double value = 1;
|
||||
// timestamp for this value in seconds since the UNIX epoch
|
||||
int64 timestamp = 2;
|
||||
}
|
||||
|
||||
message Resource {
|
||||
string type = 1;
|
||||
string name = 2;
|
||||
}
|
||||
|
||||
message MetricSeries {
|
||||
// Resources this series applies to; include at least
|
||||
// { type="host", name=<hostname> }
|
||||
repeated Resource resources = 1;
|
||||
// metric name
|
||||
string metric = 2;
|
||||
// tags for this metric
|
||||
repeated string tags = 3;
|
||||
// data points for this metric
|
||||
repeated MetricPoint points = 4;
|
||||
// type of metric
|
||||
MetricType type = 5;
|
||||
// metric unit name
|
||||
string unit = 6;
|
||||
// source of this metric (check name, etc.)
|
||||
string source_type_name = 7;
|
||||
// interval, in seconds, between samples of this metric
|
||||
int64 interval = 8;
|
||||
|
||||
reserved 9;
|
||||
}
|
||||
repeated MetricSeries series = 1;
|
||||
}
|
||||
|
||||
message EventsPayload {
|
||||
message Event {
|
||||
string title = 1;
|
||||
string text = 2;
|
||||
int64 ts = 3;
|
||||
string priority = 4;
|
||||
string host = 5;
|
||||
repeated string tags = 6;
|
||||
string alert_type = 7;
|
||||
string aggregation_key = 8;
|
||||
string source_type_name = 9;
|
||||
}
|
||||
repeated Event events = 1;
|
||||
CommonMetadata metadata = 2;
|
||||
}
|
||||
|
||||
message SketchPayload {
|
||||
message Sketch {
|
||||
message Distribution {
|
||||
int64 ts = 1;
|
||||
int64 cnt = 2;
|
||||
double min = 3;
|
||||
double max = 4;
|
||||
double avg = 5;
|
||||
double sum = 6;
|
||||
repeated double v = 7;
|
||||
repeated uint32 g = 8;
|
||||
repeated uint32 delta = 9;
|
||||
repeated double buf = 10;
|
||||
}
|
||||
message Dogsketch {
|
||||
int64 ts = 1;
|
||||
int64 cnt = 2;
|
||||
double min = 3;
|
||||
double max = 4;
|
||||
double avg = 5;
|
||||
double sum = 6;
|
||||
repeated sint32 k = 7;
|
||||
repeated uint32 n = 8;
|
||||
}
|
||||
string metric = 1;
|
||||
string host = 2;
|
||||
repeated Distribution distributions = 3 [(gogoproto.nullable) = false];
|
||||
repeated string tags = 4;
|
||||
reserved 5, 6;
|
||||
reserved "distributionsK", "distributionsC";
|
||||
repeated Dogsketch dogsketches = 7 [(gogoproto.nullable) = false];
|
||||
}
|
||||
repeated Sketch sketches = 1 [(gogoproto.nullable) = false];
|
||||
CommonMetadata metadata = 2 [(gogoproto.nullable) = false];
|
||||
}
|
@ -1,9 +1,11 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// SplitTag splits DataDog tag into tag name and value.
|
||||
@ -18,10 +20,98 @@ func SplitTag(tag string) (string, string) {
|
||||
return tag[:n], tag[n+1:]
|
||||
}
|
||||
|
||||
// Request represents DataDog submit metrics request
|
||||
// Request represents DataDog POST request to /api/v1/series
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
type Request interface {
|
||||
Extract(func(prompbmarshal.TimeSeries) error, func(string) string) error
|
||||
Unmarshal([]byte) error
|
||||
type Request struct {
|
||||
Series []Series `json:"series"`
|
||||
}
|
||||
|
||||
func (req *Request) reset() {
|
||||
// recursively reset all the fields in req in order to avoid field value
|
||||
// re-use in json.Unmarshal() when the corresponding field is missing
|
||||
// in the unmarshaled JSON.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432
|
||||
series := req.Series
|
||||
for i := range series {
|
||||
series[i].reset()
|
||||
}
|
||||
req.Series = series[:0]
|
||||
}
|
||||
|
||||
// Unmarshal unmarshals DataDog /api/v1/series request body from b to req.
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
//
|
||||
// b shouldn't be modified when req is in use.
|
||||
func (req *Request) Unmarshal(b []byte) error {
|
||||
req.reset()
|
||||
if err := json.Unmarshal(b, req); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal %q: %w", b, err)
|
||||
}
|
||||
// Set missing timestamps to the current time.
|
||||
currentTimestamp := float64(fasttime.UnixTimestamp())
|
||||
series := req.Series
|
||||
for i := range series {
|
||||
points := series[i].Points
|
||||
for j := range points {
|
||||
if points[j][0] <= 0 {
|
||||
points[j][0] = currentTimestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Series represents a series item from DataDog POST request to /api/v1/series
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
type Series struct {
|
||||
Metric string `json:"metric"`
|
||||
Host string `json:"host"`
|
||||
|
||||
// The device field does not appear in the datadog docs, but datadog-agent does use it.
|
||||
// Datadog agent (v7 at least), removes the tag "device" and adds it as its own field. Why? That I don't know!
|
||||
// https://github.com/DataDog/datadog-agent/blob/0ada7a97fed6727838a6f4d9c87123d2aafde735/pkg/metrics/series.go#L84-L105
|
||||
Device string `json:"device"`
|
||||
|
||||
// Do not decode Interval, since it isn't used by VictoriaMetrics
|
||||
// Interval int64 `json:"interval"`
|
||||
|
||||
Points []Point `json:"points"`
|
||||
Tags []string `json:"tags"`
|
||||
|
||||
// Do not decode Type, since it isn't used by VictoriaMetrics
|
||||
// Type string `json:"type"`
|
||||
}
|
||||
|
||||
func (s *Series) reset() {
|
||||
s.Metric = ""
|
||||
s.Host = ""
|
||||
s.Device = ""
|
||||
|
||||
points := s.Points
|
||||
for i := range points {
|
||||
points[i] = Point{}
|
||||
}
|
||||
s.Points = points[:0]
|
||||
|
||||
tags := s.Tags
|
||||
for i := range tags {
|
||||
tags[i] = ""
|
||||
}
|
||||
s.Tags = tags[:0]
|
||||
}
|
||||
|
||||
// Point represents a point from DataDog POST request to /api/v1/series
|
||||
type Point [2]float64
|
||||
|
||||
// Timestamp returns timestamp in milliseconds from the given pt.
|
||||
func (pt *Point) Timestamp() int64 {
|
||||
return int64(pt[0] * 1000)
|
||||
}
|
||||
|
||||
// Value returns value from the given pt.
|
||||
func (pt *Point) Value() float64 {
|
||||
return pt[1]
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@ -20,3 +21,102 @@ func TestSplitTag(t *testing.T) {
|
||||
f("foo:bar", "foo", "bar")
|
||||
f(":bar", "", "bar")
|
||||
}
|
||||
|
||||
func TestRequestUnmarshalMissingHost(t *testing.T) {
|
||||
// This tests https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432
|
||||
req := Request{
|
||||
Series: []Series{{
|
||||
Host: "prev-host",
|
||||
Device: "prev-device",
|
||||
}},
|
||||
}
|
||||
data := `
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"metric": "system.load.1",
|
||||
"points": [[
|
||||
1575317847,
|
||||
0.5
|
||||
]]
|
||||
}
|
||||
]
|
||||
}`
|
||||
if err := req.Unmarshal([]byte(data)); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
reqExpected := Request{
|
||||
Series: []Series{{
|
||||
Metric: "system.load.1",
|
||||
Points: []Point{{
|
||||
1575317847,
|
||||
0.5,
|
||||
}},
|
||||
}},
|
||||
}
|
||||
if !reflect.DeepEqual(&req, &reqExpected) {
|
||||
t.Fatalf("unexpected request parsed;\ngot\n%+v\nwant\n%+v", req, reqExpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestUnmarshalFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
var req Request
|
||||
if err := req.Unmarshal([]byte(s)); err == nil {
|
||||
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
|
||||
}
|
||||
}
|
||||
f("")
|
||||
f("foobar")
|
||||
f(`{"series":123`)
|
||||
f(`1234`)
|
||||
f(`[]`)
|
||||
}
|
||||
|
||||
func TestRequestUnmarshalSuccess(t *testing.T) {
|
||||
f := func(s string, reqExpected *Request) {
|
||||
t.Helper()
|
||||
var req Request
|
||||
if err := req.Unmarshal([]byte(s)); err != nil {
|
||||
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
|
||||
}
|
||||
if !reflect.DeepEqual(&req, reqExpected) {
|
||||
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &req, reqExpected)
|
||||
}
|
||||
}
|
||||
f("{}", &Request{})
|
||||
f(`
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"host": "test.example.com",
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"device": "/dev/sda",
|
||||
"points": [[
|
||||
1575317847,
|
||||
0.5
|
||||
]],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}
|
||||
`, &Request{
|
||||
Series: []Series{{
|
||||
Host: "test.example.com",
|
||||
Metric: "system.load.1",
|
||||
Device: "/dev/sda",
|
||||
Points: []Point{{
|
||||
1575317847,
|
||||
0.5,
|
||||
}},
|
||||
Tags: []string{
|
||||
"environment:test",
|
||||
},
|
||||
}},
|
||||
})
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ func BenchmarkRequestUnmarshal(b *testing.B) {
|
||||
b.SetBytes(int64(len(reqBody)))
|
||||
b.ReportAllocs()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
req := new(Request)
|
||||
var req Request
|
||||
for pb.Next() {
|
||||
if err := req.Unmarshal(reqBody); err != nil {
|
||||
panic(fmt.Errorf("unexpected error: %w", err))
|
@ -5,7 +5,6 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"sync"
|
||||
|
||||
@ -13,19 +12,15 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
|
||||
apiSeriesV1 "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/series/v1"
|
||||
apiSeriesV2 "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/series/v2"
|
||||
apiSketchesBeta "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/sketches/beta"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
// The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches")
|
||||
maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series")
|
||||
|
||||
// If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply:
|
||||
// https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
|
||||
@ -36,15 +31,13 @@ var (
|
||||
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics")
|
||||
)
|
||||
|
||||
// Parse parses DataDog POST request for /api/v1/series, /api/v2/series, /api/beta/sketches from reader and calls callback for the parsed request.
|
||||
// Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
|
||||
//
|
||||
// callback shouldn't hold series after returning.
|
||||
func Parse(req *http.Request, callback func(prompbmarshal.TimeSeries) error) error {
|
||||
var r io.Reader
|
||||
wcr := writeconcurrencylimiter.GetReader(req.Body)
|
||||
func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error {
|
||||
wcr := writeconcurrencylimiter.GetReader(r)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
r = wcr
|
||||
contentEncoding := req.Header.Get("Content-Encoding")
|
||||
|
||||
switch contentEncoding {
|
||||
case "gzip":
|
||||
@ -67,56 +60,25 @@ func Parse(req *http.Request, callback func(prompbmarshal.TimeSeries) error) err
|
||||
if err := ctx.Read(); err != nil {
|
||||
return err
|
||||
}
|
||||
apiVersion := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${version}")
|
||||
apiKind := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${kind}")
|
||||
|
||||
ddReq := getRequest()
|
||||
defer putRequest(ddReq)
|
||||
|
||||
switch apiKind {
|
||||
case "series":
|
||||
switch apiVersion {
|
||||
case "v1":
|
||||
ddReq = new(apiSeriesV1.Request)
|
||||
case "v2":
|
||||
ddReq = new(apiSeriesV2.Request)
|
||||
default:
|
||||
return fmt.Errorf(
|
||||
"API version %q of Datadog series endpoint is not supported",
|
||||
apiVersion,
|
||||
)
|
||||
}
|
||||
case "sketches":
|
||||
switch apiVersion {
|
||||
case "beta":
|
||||
ddReq = new(apiSketchesBeta.Request)
|
||||
default:
|
||||
return fmt.Errorf(
|
||||
"API version %q of Datadog sketches endpoint is not supported",
|
||||
apiVersion,
|
||||
)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf(
|
||||
"API kind %q of Datadog API is not supported",
|
||||
apiKind,
|
||||
)
|
||||
}
|
||||
|
||||
if err := ddReq.Unmarshal(ctx.reqBuf.B); err != nil {
|
||||
req := getRequest()
|
||||
defer putRequest(req)
|
||||
if err := req.Unmarshal(ctx.reqBuf.B); err != nil {
|
||||
unmarshalErrors.Inc()
|
||||
return fmt.Errorf("cannot unmarshal DataDog POST request with size %d bytes: %w", len(ctx.reqBuf.B), err)
|
||||
}
|
||||
|
||||
cb := func(series prompbmarshal.TimeSeries) error {
|
||||
rowsRead.Add(len(series.Samples))
|
||||
return callback(series)
|
||||
rows := 0
|
||||
series := req.Series
|
||||
for i := range series {
|
||||
rows += len(series[i].Points)
|
||||
if *sanitizeMetricName {
|
||||
series[i].Metric = sanitizeName(series[i].Metric)
|
||||
}
|
||||
}
|
||||
rowsRead.Add(rows)
|
||||
|
||||
if err := ddReq.Extract(cb, sanitizeName(*sanitizeMetricName)); err != nil {
|
||||
if err := callback(series); err != nil {
|
||||
return fmt.Errorf("error when processing imported data: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -182,15 +144,15 @@ func putPushCtx(ctx *pushCtx) {
|
||||
var pushCtxPool sync.Pool
|
||||
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
|
||||
|
||||
func getRequest() datadog.Request {
|
||||
func getRequest() *datadog.Request {
|
||||
v := requestPool.Get()
|
||||
if v == nil {
|
||||
return nil
|
||||
return &datadog.Request{}
|
||||
}
|
||||
return v.(datadog.Request)
|
||||
return v.(*datadog.Request)
|
||||
}
|
||||
|
||||
func putRequest(req datadog.Request) {
|
||||
func putRequest(req *datadog.Request) {
|
||||
requestPool.Put(req)
|
||||
}
|
||||
|
||||
@ -199,16 +161,9 @@ var requestPool sync.Pool
|
||||
// sanitizeName performs DataDog-compatible sanitizing for metric names
|
||||
//
|
||||
// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
|
||||
func sanitizeName(sanitize bool) func(string) string {
|
||||
if sanitize {
|
||||
return func(name string) string {
|
||||
func sanitizeName(name string) string {
|
||||
return namesSanitizer.Transform(name)
|
||||
}
|
||||
}
|
||||
return func(name string) string {
|
||||
return name
|
||||
}
|
||||
}
|
||||
|
||||
var namesSanitizer = bytesutil.NewFastStringTransformer(func(s string) string {
|
||||
s = unsupportedDatadogChars.ReplaceAllString(s, "_")
|
||||
@ -221,5 +176,4 @@ var (
|
||||
unsupportedDatadogChars = regexp.MustCompile(`[^0-9a-zA-Z_\.]+`)
|
||||
multiUnderscores = regexp.MustCompile(`_+`)
|
||||
underscoresWithDots = regexp.MustCompile(`_?\._?`)
|
||||
insertApisVersionRegex = regexp.MustCompile(`.*/api/(?P<version>[\w]+)/(?P<kind>[\w]+)`)
|
||||
)
|
||||
|
@ -7,7 +7,7 @@ import (
|
||||
func TestSanitizeName(t *testing.T) {
|
||||
f := func(s, resultExpected string) {
|
||||
t.Helper()
|
||||
result := sanitizeName(true)(s)
|
||||
result := sanitizeName(s)
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result for sanitizeName(%q); got\n%q\nwant\n%q", s, result, resultExpected)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user