diff --git a/README.md b/README.md index 8ac1bfec8..e342debd2 100644 --- a/README.md +++ b/README.md @@ -507,15 +507,10 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data in the following protocols: -* [DataDog agent](https://docs.datadoghq.com/agent/) -* [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) -* [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) - -Via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at the following path: -* `/datadog/api/v1/series` -* `/datadog/api/v2/series` -* `/datadog/api/beta/sketches` +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) +or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) +at `/datadog/api/v1/series` path. ### Sending metrics to VictoriaMetrics @@ -587,19 +582,6 @@ additional_endpoints: -### Send via Serverless DataDog plugin - -Disable logs (logs ingestion is not supported by Victoria Metrics) and set a custom endpoint in serverless.yaml -``` -custom: - datadog: - enableDDLogs: false # Disabled not supported DD logs - apiKey: fakekey # Set any key, otherwise plugin fails -provider: - environment: - DD_DD_URL: <>/datadog # Victoria Metrics endpoint for DataDog -``` - ### Send via cURL See how to send data to VictoriaMetrics via @@ -2582,7 +2564,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -csvTrimTimestamp duration Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -datadog.maxInsertRequestSize size - The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches + The maximum size in bytes of a single DataDog POST request to /api/v1/series Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) diff --git a/app/vmagent/datadog/request_handler.go b/app/vmagent/datadog/request_handler.go index 80f41ca95..4cdfe1093 100644 --- a/app/vmagent/datadog/request_handler.go +++ b/app/vmagent/datadog/request_handler.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -19,7 +20,7 @@ var ( rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`) ) -// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series, /api/v2/series, /api/beta/sketches request. +// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. // // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { @@ -27,23 +28,66 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { if err != nil { return err } - return stream.Parse( - req, func(series prompbmarshal.TimeSeries) error { - series.Labels = append(series.Labels, extraLabels...) - return insertRows(at, series) - }, - ) + ce := req.Header.Get("Content-Encoding") + return stream.Parse(req.Body, ce, func(series []datadog.Series) error { + return insertRows(at, series, extraLabels) + }) } -func insertRows(at *auth.Token, series prompbmarshal.TimeSeries) error { +func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) - rowsTotal := len(series.Samples) - - ctx.WriteRequest.Timeseries = []prompbmarshal.TimeSeries{series} - ctx.Labels = series.Labels - ctx.Samples = series.Samples + rowsTotal := 0 + tssDst := ctx.WriteRequest.Timeseries[:0] + labels := ctx.Labels[:0] + samples := ctx.Samples[:0] + for i := range series { + ss := &series[i] + rowsTotal += len(ss.Points) + labelsLen := len(labels) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: ss.Metric, + }) + if ss.Host != "" { + labels = append(labels, prompbmarshal.Label{ + Name: "host", + Value: ss.Host, + }) + } + if ss.Device != "" { + labels = append(labels, prompbmarshal.Label{ + Name: "device", + Value: ss.Device, + }) + } + for _, tag := range ss.Tags { + name, value := datadog.SplitTag(tag) + if name == "host" { + name = "exported_host" + } + labels = append(labels, prompbmarshal.Label{ + Name: name, + Value: value, + }) + } + labels = append(labels, extraLabels...) + samplesLen := len(samples) + for _, pt := range ss.Points { + samples = append(samples, prompbmarshal.Sample{ + Timestamp: pt.Timestamp(), + Value: pt.Value(), + }) + } + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[samplesLen:], + }) + } + ctx.WriteRequest.Timeseries = tssDst + ctx.Labels = labels + ctx.Samples = samples if !remotewrite.TryPush(at, &ctx.WriteRequest) { return remotewrite.ErrQueueFullHTTPRetry } diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 6fd91f63e..f05c3ecf5 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -343,9 +343,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { fmt.Fprintf(w, `{"status":"ok"}`) return true case "/datadog/api/v1/series": - datadogWriteSeriesV1Requests.Inc() + datadogWriteRequests.Inc() if err := datadog.InsertHandlerForHTTP(nil, r); err != nil { - datadogWriteSeriesV1Errors.Inc() + datadogWriteErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true } @@ -354,27 +354,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/datadog/api/v2/series": - datadogWriteSeriesV2Requests.Inc() - if err := datadog.InsertHandlerForHTTP(nil, r); err != nil { - datadogWriteSeriesV2Errors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(202) - fmt.Fprintf(w, `{"errors":[]}`) - return true - case "/datadog/api/beta/sketches": - datadogWriteSketchesBetaRequests.Inc() - if err := datadog.InsertHandlerForHTTP(nil, r); err != nil { - datadogWriteSketchesBetaErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(202) - return true case "/datadog/api/v1/validate": datadogValidateRequests.Inc() // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key @@ -587,9 +566,9 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri fmt.Fprintf(w, `{"status":"ok"}`) return true case "datadog/api/v1/series": - datadogWriteSeriesV1Requests.Inc() + datadogWriteRequests.Inc() if err := datadog.InsertHandlerForHTTP(at, r); err != nil { - datadogWriteSeriesV1Errors.Inc() + datadogWriteErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true } @@ -597,26 +576,6 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "datadog/api/v2/series": - datadogWriteSeriesV2Requests.Inc() - if err := datadog.InsertHandlerForHTTP(at, r); err != nil { - datadogWriteSeriesV2Errors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics - w.WriteHeader(202) - fmt.Fprintf(w, `{"errors":[]}`) - return true - case "datadog/api/beta/sketches": - datadogWriteSketchesBetaRequests.Inc() - if err := datadog.InsertHandlerForHTTP(at, r); err != nil { - datadogWriteSketchesBetaErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(202) - return true case "datadog/api/v1/validate": datadogValidateRequests.Inc() // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key @@ -667,14 +626,8 @@ var ( influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`) - datadogWriteSeriesV1Requests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) - datadogWriteSeriesV1Errors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) - - datadogWriteSeriesV2Requests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`) - datadogWriteSeriesV2Errors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`) - - datadogWriteSketchesBetaRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/beta/sketches", protocol="datadog"}`) - datadogWriteSketchesBetaErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/beta/sketches", protocol="datadog"}`) + datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) + datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) diff --git a/app/vminsert/datadog/request_handler.go b/app/vminsert/datadog/request_handler.go index 14c13bdf4..9717b7ef5 100644 --- a/app/vminsert/datadog/request_handler.go +++ b/app/vminsert/datadog/request_handler.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream" "github.com/VictoriaMetrics/metrics" ) @@ -16,7 +17,7 @@ var ( rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadog"}`) ) -// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series, /api/v2/series, /api/v1/sketches, /api/beta/sketches request. +// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. // // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics func InsertHandlerForHTTP(req *http.Request) error { @@ -24,38 +25,62 @@ func InsertHandlerForHTTP(req *http.Request) error { if err != nil { return err } - return stream.Parse( - req, func(series prompbmarshal.TimeSeries) error { - series.Labels = append(series.Labels, extraLabels...) - return insertRows(series) - }, - ) + ce := req.Header.Get("Content-Encoding") + return stream.Parse(req.Body, ce, func(series []parser.Series) error { + return insertRows(series, extraLabels) + }) } -func insertRows(series prompbmarshal.TimeSeries) error { +func insertRows(series []parser.Series, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) + rowsLen := 0 + for i := range series { + rowsLen += len(series[i].Points) + } + ctx.Reset(rowsLen) + rowsTotal := 0 hasRelabeling := relabel.HasRelabeling() - rowsTotal := len(series.Samples) - - ctx.Reset(rowsTotal) - ctx.Labels = ctx.Labels[:0] - for l := range series.Labels { - ctx.AddLabel(series.Labels[l].Name, series.Labels[l].Value) - } - if hasRelabeling { - ctx.ApplyRelabeling() - } - if len(ctx.Labels) == 0 { - return nil - } - ctx.SortLabelsIfNeeded() - for _, sample := range series.Samples { - if _, err := ctx.WriteDataPointExt( - []byte{}, ctx.Labels, sample.Timestamp, sample.Value, - ); err != nil { - return err + for i := range series { + ss := &series[i] + rowsTotal += len(ss.Points) + ctx.Labels = ctx.Labels[:0] + ctx.AddLabel("", ss.Metric) + if ss.Host != "" { + ctx.AddLabel("host", ss.Host) + } + if ss.Device != "" { + ctx.AddLabel("device", ss.Device) + } + for _, tag := range ss.Tags { + name, value := parser.SplitTag(tag) + if name == "host" { + name = "exported_host" + } + ctx.AddLabel(name, value) + } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } + if hasRelabeling { + ctx.ApplyRelabeling() + } + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.SortLabelsIfNeeded() + var metricNameRaw []byte + var err error + for _, pt := range ss.Points { + timestamp := pt.Timestamp() + value := pt.Value() + metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, timestamp, value) + if err != nil { + return err + } } } rowsInserted.Add(rowsTotal) diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 43a9ea645..a53f8216d 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -246,9 +246,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { fmt.Fprintf(w, `{"status":"ok"}`) return true case "/datadog/api/v1/series": - datadogWriteSeriesV1Requests.Inc() + datadogWriteRequests.Inc() if err := datadog.InsertHandlerForHTTP(r); err != nil { - datadogWriteSeriesV1Errors.Inc() + datadogWriteErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true } @@ -257,27 +257,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/datadog/api/v2/series": - datadogWriteSeriesV2Requests.Inc() - if err := datadog.InsertHandlerForHTTP(r); err != nil { - datadogWriteSeriesV2Errors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(202) - fmt.Fprintf(w, `{"errors":[]}`) - return true - case "/datadog/api/beta/sketches": - datadogWriteSketchesBetaRequests.Inc() - if err := datadog.InsertHandlerForHTTP(r); err != nil { - datadogWriteSketchesBetaErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(202) - return true case "/datadog/api/v1/validate": datadogValidateRequests.Inc() // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key @@ -392,14 +371,8 @@ var ( influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/influx/query", protocol="influx"}`) - datadogWriteSeriesV1Requests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) - datadogWriteSeriesV1Errors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) - - datadogWriteSeriesV2Requests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`) - datadogWriteSeriesV2Errors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`) - - datadogWriteSketchesBetaRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/beta/sketches", protocol="datadog"}`) - datadogWriteSketchesBetaErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/beta/sketches", protocol="datadog"}`) + datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) + datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 0a4ebaaaa..949f65e4b 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -363,8 +363,7 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr - `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details. - `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details. - `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry). - - `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API v1](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. - - `datadog/api/v2/series` - for ingesting data with [DataDog submit metrics API v2](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. + - `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. - `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. - `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details. - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details. @@ -897,7 +896,7 @@ Below is the output for `/path/to/vminsert -help`: -csvTrimTimestamp duration Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -datadog.maxInsertRequestSize size - The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches + The maximum size in bytes of a single DataDog POST request to /api/v1/series Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) diff --git a/docs/README.md b/docs/README.md index 16a884c09..0cdc16ddd 100644 --- a/docs/README.md +++ b/docs/README.md @@ -510,15 +510,10 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data in the following protocols: -* [DataDog agent](https://docs.datadoghq.com/agent/) -* [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) -* [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) - -Via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at the following path: -* `/datadog/api/v1/series` -* `/datadog/api/v2/series` -* `/datadog/api/beta/sketches` +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) +or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) +at `/datadog/api/v1/series` path. ### Sending metrics to VictoriaMetrics @@ -590,19 +585,6 @@ additional_endpoints: -### Send via Serverless DataDog plugin - -Disable logs (logs ingestion is not supported by Victoria Metrics) and set a custom endpoint in serverless.yaml -``` -custom: - datadog: - enableDDLogs: false # Disabled not supported DD logs - apiKey: fakekey # Set any key, otherwise plugin fails -provider: - environment: - DD_DD_URL: <>/datadog # Victoria Metrics endpoint for DataDog -``` - ### Send via cURL See how to send data to VictoriaMetrics via @@ -2585,7 +2567,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -csvTrimTimestamp duration Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -datadog.maxInsertRequestSize size - The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches + The maximum size in bytes of a single DataDog POST request to /api/v1/series Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index d11d9f385..715d2c9c9 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -518,15 +518,10 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data in the following protocols: -* [DataDog agent](https://docs.datadoghq.com/agent/) -* [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) -* [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) - -Via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at the following path: -* `/datadog/api/v1/series` -* `/datadog/api/v2/series` -* `/datadog/api/beta/sketches` +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) +or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) +at `/datadog/api/v1/series` path. ### Sending metrics to VictoriaMetrics @@ -598,19 +593,6 @@ additional_endpoints: -### Send via Serverless DataDog plugin - -Disable logs (logs ingestion is not supported by Victoria Metrics) and set a custom endpoint in serverless.yaml -``` -custom: - datadog: - enableDDLogs: false # Disabled not supported DD logs - apiKey: fakekey # Set any key, otherwise plugin fails -provider: - environment: - DD_DD_URL: <>/datadog # Victoria Metrics endpoint for DataDog -``` - ### Send via cURL See how to send data to VictoriaMetrics via @@ -2593,7 +2575,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -csvTrimTimestamp duration Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -datadog.maxInsertRequestSize size - The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches + The maximum size in bytes of a single DataDog POST request to /api/v1/series Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) diff --git a/docs/url-examples.md b/docs/url-examples.md index 82994e887..bc1a9983e 100644 --- a/docs/url-examples.md +++ b/docs/url-examples.md @@ -529,86 +529,6 @@ echo ' -### /datadog/api/v2/series - -**Imports data in DataDog format into VictoriaMetrics** - -Single-node VictoriaMetrics: -
- -```console -echo ' -{ - "series": [ - { - "interval": 20, - "metric": "system.load.1", - "resources": [ - { - "name": "test.example.com", - "type": "host" - } - ], - "points": [ - { - "timestamp": 1699152159, - "value": 0 - }, - { - "timestamp": 1699152160, - "value": 0.5 - } - ], - "tags": [ - "environment:test" - ], - "type": "rate" - } - ] -} -' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:8428/datadog/api/v2/series -``` - -
- -Cluster version of VictoriaMetrics: -
- -```console -echo ' -{ - "series": [ - { - "interval": 20, - "metric": "system.load.1", - "resources": [ - { - "name": "test.example.com", - "type": "host" - } - ], - "points": [ - { - "timestamp": 1699152159, - "value": 0 - }, - { - "timestamp": 1699152160, - "value": 0.5 - } - ], - "tags": [ - "environment:test" - ], - "type": "rate" - } - ] -} -' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://:8480/insert/0/datadog/api/v2/series -``` - -
- Additional information: * [How to send data from datadog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) diff --git a/docs/vmagent.md b/docs/vmagent.md index 0ffc5d77f..99291b318 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1482,7 +1482,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -csvTrimTimestamp duration Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -datadog.maxInsertRequestSize size - The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches + The maximum size in bytes of a single DataDog POST request to /api/v1/series Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) diff --git a/lib/protoparser/datadog/api/series/v1/api.go b/lib/protoparser/datadog/api/series/v1/api.go deleted file mode 100644 index 135b91193..000000000 --- a/lib/protoparser/datadog/api/series/v1/api.go +++ /dev/null @@ -1,102 +0,0 @@ -package datadog - -import ( - "encoding/json" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" -) - -// Request represents /api/v1/series request -type Request struct { - Series []series `json:"series"` -} - -// Unmarshal decodes byte array to series v1 Request struct -func (r *Request) Unmarshal(b []byte) error { - return json.Unmarshal(b, r) -} - -// Extract iterates fn execution over all timeseries from series v1 Request -func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn func(string) string) error { - currentTimestamp := int64(fasttime.UnixTimestamp()) - for i := range r.Series { - s := r.Series[i] - samples := make([]prompbmarshal.Sample, 0, len(s.Points)) - for j := range s.Points { - p := s.Points[j] - ts, val := p[0], p[1] - if ts <= 0 { - ts = float64(currentTimestamp) - } - samples[j] = prompbmarshal.Sample{ - Timestamp: int64(ts * 1000), - Value: val, - } - } - ts := prompbmarshal.TimeSeries{ - Samples: samples, - Labels: s.getLabels(sanitizeFn), - } - if err := fn(ts); err != nil { - return err - } - } - return nil -} - -// series represents a series item from DataDog POST request to /api/v1/series -// -// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics -type series struct { - Metric string `json:"metric"` - Host string `json:"host"` - - // The device field does not appear in the datadog docs, but datadog-agent does use it. - // Datadog agent (v7 at least), removes the tag "device" and adds it as its own field. Why? That I don't know! - // https://github.com/DataDog/datadog-agent/blob/0ada7a97fed6727838a6f4d9c87123d2aafde735/pkg/metrics/series.go#L84-L105 - Device string `json:"device"` - - // Do not decode Interval, since it isn't used by VictoriaMetrics - // Interval int64 `json:"interval"` - - Points []point `json:"points"` - Tags []string `json:"tags"` - - // Do not decode type, since it isn't used by VictoriaMetrics - // Type string `json:"type"` -} - -func (s *series) getLabels(sanitizeFn func(string) string) []prompbmarshal.Label { - labels := []prompbmarshal.Label{{ - Name: "__name__", - Value: sanitizeFn(s.Metric), - }} - if s.Host != "" { - labels = append(labels, prompbmarshal.Label{ - Name: "host", - Value: s.Host, - }) - } - if s.Device != "" { - labels = append(labels, prompbmarshal.Label{ - Name: "device", - Value: s.Device, - }) - } - for _, tag := range s.Tags { - name, value := datadog.SplitTag(tag) - if name == "host" { - name = "exported_host" - } - labels = append(labels, prompbmarshal.Label{ - Name: sanitizeFn(name), - Value: value, - }) - } - return labels -} - -// point represents a point from DataDog POST request to /api/v1/series -type point [2]float64 diff --git a/lib/protoparser/datadog/api/series/v1/api_test.go b/lib/protoparser/datadog/api/series/v1/api_test.go deleted file mode 100644 index 7573ccd42..000000000 --- a/lib/protoparser/datadog/api/series/v1/api_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package datadog - -import ( - "reflect" - "testing" -) - -func TestRequestUnmarshalFailure(t *testing.T) { - f := func(s string) { - t.Helper() - req := new(Request) - if err := req.Unmarshal([]byte(s)); err == nil { - t.Fatalf("expecting non-nil error for Unmarshal(%q)", s) - } - } - f("") - f("foobar") - f(`{"series":123`) - f(`1234`) - f(`[]`) -} - -func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) { - t.Helper() - req := new(Request) - if err := req.Unmarshal(s); err != nil { - t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err) - } - if !reflect.DeepEqual(req, reqExpected) { - t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected) - } -} - -func TestRequestUnmarshalSuccess(t *testing.T) { - unmarshalRequestValidator( - t, []byte("{}"), new(Request), - ) - unmarshalRequestValidator(t, []byte(` -{ - "series": [ - { - "host": "test.example.com", - "interval": 20, - "metric": "system.load.1", - "device": "/dev/sda", - "points": [[ - 1575317847, - 0.5 - ]], - "tags": [ - "environment:test" - ], - "type": "rate" - } - ] -} -`), &Request{ - Series: []series{{ - Host: "test.example.com", - Metric: "system.load.1", - Device: "/dev/sda", - Points: []point{{ - 1575317847, - 0.5, - }}, - Tags: []string{ - "environment:test", - }, - }}, - }) -} diff --git a/lib/protoparser/datadog/api/series/v2/api.go b/lib/protoparser/datadog/api/series/v2/api.go deleted file mode 100644 index de058fd54..000000000 --- a/lib/protoparser/datadog/api/series/v2/api.go +++ /dev/null @@ -1,89 +0,0 @@ -package datadog - -import ( - "encoding/json" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" -) - -// Request represents a sketches item from DataDog POST request to /api/v2/series -type Request struct { - Series []series `json:"series"` -} - -// Unmarshal decodes byte array to series v2 Request struct -func (r *Request) Unmarshal(b []byte) error { - return json.Unmarshal(b, r) -} - -// Extract iterates fn execution over all timeseries from series v2 request -func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn func(string) string) error { - for i := range r.Series { - s := r.Series[i] - samples := make([]prompbmarshal.Sample, 0, len(s.Points)) - for j := range s.Points { - p := s.Points[j] - samples = append(samples, prompbmarshal.Sample{ - Timestamp: p.Timestamp * 1000, - Value: p.Value, - }) - } - ts := prompbmarshal.TimeSeries{ - Samples: samples, - Labels: s.getLabels(sanitizeFn), - } - if err := fn(ts); err != nil { - return err - } - } - return nil -} - -// series represents a series item from DataDog POST request to /api/v2/series -// -// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics -type series struct { - Metric string `json:"metric"` - Resources []resource `json:"resources"` - Points []point `json:"points"` - Tags []string `json:"tags"` - - // Do not decode Type, since it isn't used by VictoriaMetrics - // Type string `json:"type"` -} - -type resource struct { - Name string `json:"name"` - Type string `json:"type"` -} - -func (s *series) getLabels(sanitizeFn func(string) string) []prompbmarshal.Label { - labels := []prompbmarshal.Label{{ - Name: "__name__", - Value: sanitizeFn(s.Metric), - }} - for _, res := range s.Resources { - labels = append(labels, prompbmarshal.Label{ - Name: sanitizeFn(res.Type), - Value: res.Name, - }) - } - for _, tag := range s.Tags { - name, value := datadog.SplitTag(tag) - if name == "host" { - name = "exported_host" - } - labels = append(labels, prompbmarshal.Label{ - Name: sanitizeFn(name), - Value: value, - }) - } - return labels -} - -// point represents a point from DataDog POST request to /api/v2/series -type point struct { - Timestamp int64 `json:"timestamp"` - Value float64 `json:"value"` -} diff --git a/lib/protoparser/datadog/api/series/v2/api_test.go b/lib/protoparser/datadog/api/series/v2/api_test.go deleted file mode 100644 index a40a4776a..000000000 --- a/lib/protoparser/datadog/api/series/v2/api_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package datadog - -import ( - "reflect" - "testing" -) - -func TestRequestUnmarshalFailure(t *testing.T) { - f := func(s string) { - t.Helper() - req := new(Request) - if err := req.Unmarshal([]byte(s)); err == nil { - t.Fatalf("expecting non-nil error for Unmarshal(%q)", s) - } - } - f("") - f("foobar") - f(`{"series":123`) - f(`1234`) - f(`[]`) -} - -func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) { - t.Helper() - req := new(Request) - if err := req.Unmarshal(s); err != nil { - t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err) - } - if !reflect.DeepEqual(req, reqExpected) { - t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected) - } -} - -func TestRequestUnmarshalSuccess(t *testing.T) { - unmarshalRequestValidator( - t, []byte("{}"), new(Request), - ) - unmarshalRequestValidator(t, []byte(` -{ - "series": [ - { - "interval": 20, - "metric": "system.load.1", - "resources": [ - { - "name": "test.example.com", - "type": "host" - }, { - "name": "/dev/sda", - "type": "device" - } - ], - "points": [{ - "timestamp": 1575317847, - "value": 0.5 - }], - "tags": [ - "environment:test" - ], - "type": "rate" - } - ] -} -`), &Request{ - Series: []series{{ - Metric: "system.load.1", - Resources: []resource{{ - Name: "test.example.com", - Type: "host", - }, { - Name: "/dev/sda", - Type: "device", - }}, - Points: []point{{ - Timestamp: 1575317847, - Value: 0.5, - }}, - Tags: []string{ - "environment:test", - }, - }}, - }) -} diff --git a/lib/protoparser/datadog/api/series/v2/api_timing_test.go b/lib/protoparser/datadog/api/series/v2/api_timing_test.go deleted file mode 100644 index e86448ad0..000000000 --- a/lib/protoparser/datadog/api/series/v2/api_timing_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package datadog - -import ( - "fmt" - "testing" -) - -func BenchmarkRequestUnmarshal(b *testing.B) { - reqBody := []byte(`{ - "series": [ - { - "interval": 20, - "metric": "system.load.1", - "resources": [{ - "name": "test.example.com", - "type": "host" - }], - "points": [ - { - "timestamp": 1575317847, - "value": 0.5 - } - ], - "tags": [ - "environment:test" - ], - "type": "rate" - } - ] -}`) - b.SetBytes(int64(len(reqBody))) - b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { - req := new(Request) - for pb.Next() { - if err := req.Unmarshal(reqBody); err != nil { - panic(fmt.Errorf("unexpected error: %w", err)) - } - if len(req.Series) != 1 { - panic(fmt.Errorf("unexpected number of series unmarshaled: got %d; want 4", len(req.Series))) - } - } - }) -} diff --git a/lib/protoparser/datadog/api/sketches/beta/README.md b/lib/protoparser/datadog/api/sketches/beta/README.md deleted file mode 100644 index 8ac8b117c..000000000 --- a/lib/protoparser/datadog/api/sketches/beta/README.md +++ /dev/null @@ -1,32 +0,0 @@ -# Datadog proto files - -Content copied from https://github.com/DataDog/agent-payload/blob/master/proto/metrics/agent_payload.proto - -## Requirements -- protoc binary [link](http://google.github.io/proto-lens/installing-protoc.html) -- golang-proto-gen[link](https://developers.google.com/protocol-buffers/docs/reference/go-generated) -- custom marshaller [link](https://github.com/planetscale/vtprotobuf) - -## Modifications - - Original proto files were modified: -1) changed package name for `package beta`. -2) changed import paths - changed directory names. -3) changed go_package for `./pb`. - - -## How to generate pbs - - run command: - ```bash -export GOBIN=~/go/bin protoc -protoc -I=. --go_out=./lib/protoparser/datadog/api/sketches/beta --go-vtproto_out=./lib/protoparser/datadog/api/sketches/beta --plugin protoc-gen-go-vtproto="$GOBIN/protoc-gen-go-vtproto" --go-vtproto_opt=features=unmarshal lib/protoparser/datadog/api/sketches/beta/proto/*.proto - ``` - - Generated code will be at `lib/protoparser/datadog/api/sketches/beta/pb` - - manually edit it: - -1) remove all external imports -2) remove all unneeded methods -3) replace `unknownFields` with `unknownFields []byte` diff --git a/lib/protoparser/datadog/api/sketches/beta/api.go b/lib/protoparser/datadog/api/sketches/beta/api.go deleted file mode 100644 index 5577a6c9c..000000000 --- a/lib/protoparser/datadog/api/sketches/beta/api.go +++ /dev/null @@ -1,95 +0,0 @@ -package datadog - -import ( - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/sketches/beta/pb" -) - -// Request represents a sketches item from DataDog POST request to /api/beta/sketches -type Request struct { - *pb.SketchPayload -} - -// Unmarshal is a wrapper around SketchesPayload Unmarshal method which decodes byte array to SketchPayload struct -func (r *Request) Unmarshal(b []byte) error { - if r.SketchPayload == nil { - r.SketchPayload = new(pb.SketchPayload) - } - return r.SketchPayload.UnmarshalVT(b) -} - -// Extract iterates fn function execution over all timeseries from a sketch payload -func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn func(string) string) error { - var err error - for _, sketch := range r.SketchPayload.Sketches { - sketchSeries := make([]prompbmarshal.TimeSeries, 5) - for _, point := range sketch.Dogsketches { - timestamp := int64(point.Ts * 1000) - updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{ - "max": point.Max, - "min": point.Min, - "cnt": float64(point.Cnt), - "avg": point.Avg, - "sum": point.Sum, - }) - } - for _, point := range sketch.Distributions { - timestamp := int64(point.Ts * 1000) - updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{ - "max": point.Max, - "min": point.Min, - "cnt": float64(point.Cnt), - "avg": point.Avg, - "sum": point.Sum, - }) - } - labels := getLabels(sketch, sanitizeFn) - for i := range sketchSeries { - sketchSeries[i].Labels = append(sketchSeries[i].Labels, labels...) - if err = fn(sketchSeries[i]); err != nil { - return err - } - } - } - return nil -} - -func getLabels(sketch *pb.SketchPayload_Sketch, sanitizeFn func(string) string) []prompbmarshal.Label { - labels := []prompbmarshal.Label{} - if sketch.Host != "" { - labels = append(labels, prompbmarshal.Label{ - Name: "host", - Value: sketch.Host, - }) - } - for _, tag := range sketch.Tags { - name, value := datadog.SplitTag(tag) - if name == "host" { - name = "exported_host" - } - labels = append(labels, prompbmarshal.Label{ - Name: sanitizeFn(name), - Value: value, - }) - } - return labels -} - -func updateSeries(series []prompbmarshal.TimeSeries, metric string, timestamp int64, values map[string]float64) { - index := 0 - for suffix, value := range values { - s := series[index] - s.Samples = append(s.Samples, prompbmarshal.Sample{ - Timestamp: timestamp, - Value: value, - }) - if len(s.Labels) == 0 { - s.Labels = append(s.Labels, prompbmarshal.Label{ - Name: "", - Value: metric + "_" + suffix, - }) - } - index++ - } -} diff --git a/lib/protoparser/datadog/api/sketches/beta/pb/sketches.pb.go b/lib/protoparser/datadog/api/sketches/beta/pb/sketches.pb.go deleted file mode 100644 index f567d6a9b..000000000 --- a/lib/protoparser/datadog/api/sketches/beta/pb/sketches.pb.go +++ /dev/null @@ -1,159 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.29.0 -// protoc v3.15.8 -// source: lib/protoparser/datadog/api/sketches/beta/proto/sketches.proto - -package pb - -type MetricPayload_MetricType int32 - -const ( - MetricPayload_UNSPECIFIED MetricPayload_MetricType = 0 - MetricPayload_COUNT MetricPayload_MetricType = 1 - MetricPayload_RATE MetricPayload_MetricType = 2 - MetricPayload_GAUGE MetricPayload_MetricType = 3 -) - -// Enum value maps for MetricPayload_MetricType. -var ( - MetricPayload_MetricType_name = map[int32]string{ - 0: "UNSPECIFIED", - 1: "COUNT", - 2: "RATE", - 3: "GAUGE", - } - MetricPayload_MetricType_value = map[string]int32{ - "UNSPECIFIED": 0, - "COUNT": 1, - "RATE": 2, - "GAUGE": 3, - } -) - -func (x MetricPayload_MetricType) Enum() *MetricPayload_MetricType { - p := new(MetricPayload_MetricType) - *p = x - return p -} - -type CommonMetadata struct { - unknownFields []byte - - AgentVersion string `protobuf:"bytes,1,opt,name=agent_version,json=agentVersion,proto3" json:"agent_version,omitempty"` - Timezone string `protobuf:"bytes,2,opt,name=timezone,proto3" json:"timezone,omitempty"` - CurrentEpoch float64 `protobuf:"fixed64,3,opt,name=current_epoch,json=currentEpoch,proto3" json:"current_epoch,omitempty"` - InternalIp string `protobuf:"bytes,4,opt,name=internal_ip,json=internalIp,proto3" json:"internal_ip,omitempty"` - PublicIp string `protobuf:"bytes,5,opt,name=public_ip,json=publicIp,proto3" json:"public_ip,omitempty"` - ApiKey string `protobuf:"bytes,6,opt,name=api_key,json=apiKey,proto3" json:"api_key,omitempty"` -} - -type MetricPayload struct { - unknownFields []byte - - Series []*MetricPayload_MetricSeries `protobuf:"bytes,1,rep,name=series,proto3" json:"series,omitempty"` -} - -type EventsPayload struct { - unknownFields []byte - - Events []*EventsPayload_Event `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` - Metadata *CommonMetadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"` -} - -type SketchPayload struct { - unknownFields []byte - - Sketches []*SketchPayload_Sketch `protobuf:"bytes,1,rep,name=sketches,proto3" json:"sketches,omitempty"` - Metadata *CommonMetadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"` -} - -type MetricPayload_MetricPoint struct { - unknownFields []byte - - // metric value - Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` - // timestamp for this value in seconds since the UNIX epoch - Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` -} - -type MetricPayload_Resource struct { - unknownFields []byte - - Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` - Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` -} - -type MetricPayload_MetricSeries struct { - unknownFields []byte - - // Resources this series applies to; include at least - // { type="host", name= } - Resources []*MetricPayload_Resource `protobuf:"bytes,1,rep,name=resources,proto3" json:"resources,omitempty"` - // metric name - Metric string `protobuf:"bytes,2,opt,name=metric,proto3" json:"metric,omitempty"` - // tags for this metric - Tags []string `protobuf:"bytes,3,rep,name=tags,proto3" json:"tags,omitempty"` - // data points for this metric - Points []*MetricPayload_MetricPoint `protobuf:"bytes,4,rep,name=points,proto3" json:"points,omitempty"` - // type of metric - Type MetricPayload_MetricType `protobuf:"varint,5,opt,name=type,proto3,enum=beta.MetricPayload_MetricType" json:"type,omitempty"` - // metric unit name - Unit string `protobuf:"bytes,6,opt,name=unit,proto3" json:"unit,omitempty"` - // source of this metric (check name, etc.) - SourceTypeName string `protobuf:"bytes,7,opt,name=source_type_name,json=sourceTypeName,proto3" json:"source_type_name,omitempty"` - // interval, in seconds, between samples of this metric - Interval int64 `protobuf:"varint,8,opt,name=interval,proto3" json:"interval,omitempty"` -} - -type EventsPayload_Event struct { - unknownFields []byte - - Title string `protobuf:"bytes,1,opt,name=title,proto3" json:"title,omitempty"` - Text string `protobuf:"bytes,2,opt,name=text,proto3" json:"text,omitempty"` - Ts int64 `protobuf:"varint,3,opt,name=ts,proto3" json:"ts,omitempty"` - Priority string `protobuf:"bytes,4,opt,name=priority,proto3" json:"priority,omitempty"` - Host string `protobuf:"bytes,5,opt,name=host,proto3" json:"host,omitempty"` - Tags []string `protobuf:"bytes,6,rep,name=tags,proto3" json:"tags,omitempty"` - AlertType string `protobuf:"bytes,7,opt,name=alert_type,json=alertType,proto3" json:"alert_type,omitempty"` - AggregationKey string `protobuf:"bytes,8,opt,name=aggregation_key,json=aggregationKey,proto3" json:"aggregation_key,omitempty"` - SourceTypeName string `protobuf:"bytes,9,opt,name=source_type_name,json=sourceTypeName,proto3" json:"source_type_name,omitempty"` -} - -type SketchPayload_Sketch struct { - unknownFields []byte - - Metric string `protobuf:"bytes,1,opt,name=metric,proto3" json:"metric,omitempty"` - Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"` - Distributions []*SketchPayload_Sketch_Distribution `protobuf:"bytes,3,rep,name=distributions,proto3" json:"distributions,omitempty"` - Tags []string `protobuf:"bytes,4,rep,name=tags,proto3" json:"tags,omitempty"` - Dogsketches []*SketchPayload_Sketch_Dogsketch `protobuf:"bytes,7,rep,name=dogsketches,proto3" json:"dogsketches,omitempty"` -} - -type SketchPayload_Sketch_Distribution struct { - unknownFields []byte - - Ts int64 `protobuf:"varint,1,opt,name=ts,proto3" json:"ts,omitempty"` - Cnt int64 `protobuf:"varint,2,opt,name=cnt,proto3" json:"cnt,omitempty"` - Min float64 `protobuf:"fixed64,3,opt,name=min,proto3" json:"min,omitempty"` - Max float64 `protobuf:"fixed64,4,opt,name=max,proto3" json:"max,omitempty"` - Avg float64 `protobuf:"fixed64,5,opt,name=avg,proto3" json:"avg,omitempty"` - Sum float64 `protobuf:"fixed64,6,opt,name=sum,proto3" json:"sum,omitempty"` - V []float64 `protobuf:"fixed64,7,rep,packed,name=v,proto3" json:"v,omitempty"` - G []uint32 `protobuf:"varint,8,rep,packed,name=g,proto3" json:"g,omitempty"` - Delta []uint32 `protobuf:"varint,9,rep,packed,name=delta,proto3" json:"delta,omitempty"` - Buf []float64 `protobuf:"fixed64,10,rep,packed,name=buf,proto3" json:"buf,omitempty"` -} - -type SketchPayload_Sketch_Dogsketch struct { - unknownFields []byte - - Ts int64 `protobuf:"varint,1,opt,name=ts,proto3" json:"ts,omitempty"` - Cnt int64 `protobuf:"varint,2,opt,name=cnt,proto3" json:"cnt,omitempty"` - Min float64 `protobuf:"fixed64,3,opt,name=min,proto3" json:"min,omitempty"` - Max float64 `protobuf:"fixed64,4,opt,name=max,proto3" json:"max,omitempty"` - Avg float64 `protobuf:"fixed64,5,opt,name=avg,proto3" json:"avg,omitempty"` - Sum float64 `protobuf:"fixed64,6,opt,name=sum,proto3" json:"sum,omitempty"` - K []int32 `protobuf:"zigzag32,7,rep,packed,name=k,proto3" json:"k,omitempty"` - N []uint32 `protobuf:"varint,8,rep,packed,name=n,proto3" json:"n,omitempty"` -} diff --git a/lib/protoparser/datadog/api/sketches/beta/pb/sketches_vtproto.pb.go b/lib/protoparser/datadog/api/sketches/beta/pb/sketches_vtproto.pb.go deleted file mode 100644 index e271a177f..000000000 --- a/lib/protoparser/datadog/api/sketches/beta/pb/sketches_vtproto.pb.go +++ /dev/null @@ -1,2349 +0,0 @@ -// Code generated by protoc-gen-go-vtproto. DO NOT EDIT. -// protoc-gen-go-vtproto version: v0.5.0 -// source: lib/protoparser/datadog/api/sketches/beta/proto/sketches.proto - -package pb - -import ( - binary "encoding/binary" - fmt "fmt" - io "io" - math "math" -) - -func (m *CommonMetadata) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: CommonMetadata: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: CommonMetadata: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field AgentVersion", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.AgentVersion = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Timezone", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Timezone = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 3: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field CurrentEpoch", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.CurrentEpoch = float64(math.Float64frombits(v)) - case 4: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field InternalIp", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.InternalIp = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 5: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field PublicIp", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.PublicIp = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 6: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field ApiKey", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.ApiKey = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *MetricPayload_MetricPoint) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: MetricPayload_MetricPoint: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: MetricPayload_MetricPoint: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Value = float64(math.Float64frombits(v)) - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) - } - m.Timestamp = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Timestamp |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *MetricPayload_Resource) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: MetricPayload_Resource: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: MetricPayload_Resource: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Type = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Name = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *MetricPayload_MetricSeries) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: MetricPayload_MetricSeries: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: MetricPayload_MetricSeries: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Resources", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Resources = append(m.Resources, &MetricPayload_Resource{}) - if err := m.Resources[len(m.Resources)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Metric", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Metric = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 3: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Tags", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Tags = append(m.Tags, string(dAtA[iNdEx:postIndex])) - iNdEx = postIndex - case 4: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Points", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Points = append(m.Points, &MetricPayload_MetricPoint{}) - if err := m.Points[len(m.Points)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 5: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) - } - m.Type = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Type |= MetricPayload_MetricType(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 6: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Unit = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 7: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field SourceTypeName", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.SourceTypeName = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 8: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Interval", wireType) - } - m.Interval = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Interval |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *MetricPayload) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: MetricPayload: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: MetricPayload: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Series", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Series = append(m.Series, &MetricPayload_MetricSeries{}) - if err := m.Series[len(m.Series)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *EventsPayload_Event) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: EventsPayload_Event: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: EventsPayload_Event: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Title", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Title = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Text", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Text = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Ts", wireType) - } - m.Ts = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Ts |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 4: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Priority", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Priority = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 5: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Host", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Host = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 6: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Tags", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Tags = append(m.Tags, string(dAtA[iNdEx:postIndex])) - iNdEx = postIndex - case 7: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field AlertType", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.AlertType = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 8: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field AggregationKey", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.AggregationKey = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 9: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field SourceTypeName", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.SourceTypeName = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *EventsPayload) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: EventsPayload: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: EventsPayload: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Events", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Events = append(m.Events, &EventsPayload_Event{}) - if err := m.Events[len(m.Events)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Metadata == nil { - m.Metadata = &CommonMetadata{} - } - if err := m.Metadata.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *SketchPayload_Sketch_Distribution) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: SketchPayload_Sketch_Distribution: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: SketchPayload_Sketch_Distribution: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Ts", wireType) - } - m.Ts = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Ts |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Cnt", wireType) - } - m.Cnt = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Cnt |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Min", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Min = float64(math.Float64frombits(v)) - case 4: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Max", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Max = float64(math.Float64frombits(v)) - case 5: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Avg", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Avg = float64(math.Float64frombits(v)) - case 6: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Sum", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Sum = float64(math.Float64frombits(v)) - case 7: - if wireType == 1 { - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - v2 := float64(math.Float64frombits(v)) - m.V = append(m.V, v2) - } else if wireType == 2 { - var packedLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - packedLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if packedLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + packedLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - var elementCount int - elementCount = packedLen / 8 - if elementCount != 0 && len(m.V) == 0 { - m.V = make([]float64, 0, elementCount) - } - for iNdEx < postIndex { - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - v2 := float64(math.Float64frombits(v)) - m.V = append(m.V, v2) - } - } else { - return fmt.Errorf("proto: wrong wireType = %d for field V", wireType) - } - case 8: - if wireType == 0 { - var v uint32 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.G = append(m.G, v) - } else if wireType == 2 { - var packedLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - packedLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if packedLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + packedLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - var elementCount int - var count int - for _, integer := range dAtA[iNdEx:postIndex] { - if integer < 128 { - count++ - } - } - elementCount = count - if elementCount != 0 && len(m.G) == 0 { - m.G = make([]uint32, 0, elementCount) - } - for iNdEx < postIndex { - var v uint32 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.G = append(m.G, v) - } - } else { - return fmt.Errorf("proto: wrong wireType = %d for field G", wireType) - } - case 9: - if wireType == 0 { - var v uint32 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.Delta = append(m.Delta, v) - } else if wireType == 2 { - var packedLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - packedLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if packedLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + packedLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - var elementCount int - var count int - for _, integer := range dAtA[iNdEx:postIndex] { - if integer < 128 { - count++ - } - } - elementCount = count - if elementCount != 0 && len(m.Delta) == 0 { - m.Delta = make([]uint32, 0, elementCount) - } - for iNdEx < postIndex { - var v uint32 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.Delta = append(m.Delta, v) - } - } else { - return fmt.Errorf("proto: wrong wireType = %d for field Delta", wireType) - } - case 10: - if wireType == 1 { - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - v2 := float64(math.Float64frombits(v)) - m.Buf = append(m.Buf, v2) - } else if wireType == 2 { - var packedLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - packedLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if packedLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + packedLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - var elementCount int - elementCount = packedLen / 8 - if elementCount != 0 && len(m.Buf) == 0 { - m.Buf = make([]float64, 0, elementCount) - } - for iNdEx < postIndex { - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - v2 := float64(math.Float64frombits(v)) - m.Buf = append(m.Buf, v2) - } - } else { - return fmt.Errorf("proto: wrong wireType = %d for field Buf", wireType) - } - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *SketchPayload_Sketch_Dogsketch) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: SketchPayload_Sketch_Dogsketch: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: SketchPayload_Sketch_Dogsketch: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Ts", wireType) - } - m.Ts = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Ts |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Cnt", wireType) - } - m.Cnt = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Cnt |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Min", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Min = float64(math.Float64frombits(v)) - case 4: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Max", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Max = float64(math.Float64frombits(v)) - case 5: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Avg", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Avg = float64(math.Float64frombits(v)) - case 6: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Sum", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Sum = float64(math.Float64frombits(v)) - case 7: - if wireType == 0 { - var v int32 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= int32(b&0x7F) << shift - if b < 0x80 { - break - } - } - v = int32((uint32(v) >> 1) ^ uint32(((v&1)<<31)>>31)) - m.K = append(m.K, v) - } else if wireType == 2 { - var packedLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - packedLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if packedLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + packedLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - var elementCount int - var count int - for _, integer := range dAtA[iNdEx:postIndex] { - if integer < 128 { - count++ - } - } - elementCount = count - if elementCount != 0 && len(m.K) == 0 { - m.K = make([]int32, 0, elementCount) - } - for iNdEx < postIndex { - var v int32 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= int32(b&0x7F) << shift - if b < 0x80 { - break - } - } - v = int32((uint32(v) >> 1) ^ uint32(((v&1)<<31)>>31)) - m.K = append(m.K, v) - } - } else { - return fmt.Errorf("proto: wrong wireType = %d for field K", wireType) - } - case 8: - if wireType == 0 { - var v uint32 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.N = append(m.N, v) - } else if wireType == 2 { - var packedLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - packedLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if packedLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + packedLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - var elementCount int - var count int - for _, integer := range dAtA[iNdEx:postIndex] { - if integer < 128 { - count++ - } - } - elementCount = count - if elementCount != 0 && len(m.N) == 0 { - m.N = make([]uint32, 0, elementCount) - } - for iNdEx < postIndex { - var v uint32 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.N = append(m.N, v) - } - } else { - return fmt.Errorf("proto: wrong wireType = %d for field N", wireType) - } - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *SketchPayload_Sketch) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: SketchPayload_Sketch: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: SketchPayload_Sketch: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Metric", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Metric = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Host", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Host = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 3: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Distributions", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Distributions = append(m.Distributions, &SketchPayload_Sketch_Distribution{}) - if err := m.Distributions[len(m.Distributions)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 4: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Tags", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Tags = append(m.Tags, string(dAtA[iNdEx:postIndex])) - iNdEx = postIndex - case 7: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Dogsketches", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Dogsketches = append(m.Dogsketches, &SketchPayload_Sketch_Dogsketch{}) - if err := m.Dogsketches[len(m.Dogsketches)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *SketchPayload) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: SketchPayload: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: SketchPayload: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Sketches", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Sketches = append(m.Sketches, &SketchPayload_Sketch{}) - if err := m.Sketches[len(m.Sketches)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLength - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLength - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Metadata == nil { - m.Metadata = &CommonMetadata{} - } - if err := m.Metadata.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skip(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLength - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} - -func skip(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - depth := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflow - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflow - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - case 1: - iNdEx += 8 - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflow - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if length < 0 { - return 0, ErrInvalidLength - } - iNdEx += length - case 3: - depth++ - case 4: - if depth == 0 { - return 0, ErrUnexpectedEndOfGroup - } - depth-- - case 5: - iNdEx += 4 - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - if iNdEx < 0 { - return 0, ErrInvalidLength - } - if depth == 0 { - return iNdEx, nil - } - } - return 0, io.ErrUnexpectedEOF -} - -var ( - ErrInvalidLength = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflow = fmt.Errorf("proto: integer overflow") - ErrUnexpectedEndOfGroup = fmt.Errorf("proto: unexpected end of group") -) diff --git a/lib/protoparser/datadog/api/sketches/beta/proto/sketches.proto b/lib/protoparser/datadog/api/sketches/beta/proto/sketches.proto deleted file mode 100644 index b5afd37b2..000000000 --- a/lib/protoparser/datadog/api/sketches/beta/proto/sketches.proto +++ /dev/null @@ -1,112 +0,0 @@ -syntax = "proto3"; - -package beta; - -option go_package = "./pb"; - -import "github.com/gogo/protobuf/gogoproto/gogo.proto"; - -message CommonMetadata { - string agent_version = 1; - string timezone = 2; - double current_epoch = 3; - string internal_ip = 4; - string public_ip = 5; - string api_key = 6; -} - -message MetricPayload { - enum MetricType { - UNSPECIFIED = 0; - COUNT = 1; - RATE = 2; - GAUGE = 3; - } - - message MetricPoint { - // metric value - double value = 1; - // timestamp for this value in seconds since the UNIX epoch - int64 timestamp = 2; - } - - message Resource { - string type = 1; - string name = 2; - } - - message MetricSeries { - // Resources this series applies to; include at least - // { type="host", name= } - repeated Resource resources = 1; - // metric name - string metric = 2; - // tags for this metric - repeated string tags = 3; - // data points for this metric - repeated MetricPoint points = 4; - // type of metric - MetricType type = 5; - // metric unit name - string unit = 6; - // source of this metric (check name, etc.) - string source_type_name = 7; - // interval, in seconds, between samples of this metric - int64 interval = 8; - - reserved 9; - } - repeated MetricSeries series = 1; -} - -message EventsPayload { - message Event { - string title = 1; - string text = 2; - int64 ts = 3; - string priority = 4; - string host = 5; - repeated string tags = 6; - string alert_type = 7; - string aggregation_key = 8; - string source_type_name = 9; - } - repeated Event events = 1; - CommonMetadata metadata = 2; -} - -message SketchPayload { - message Sketch { - message Distribution { - int64 ts = 1; - int64 cnt = 2; - double min = 3; - double max = 4; - double avg = 5; - double sum = 6; - repeated double v = 7; - repeated uint32 g = 8; - repeated uint32 delta = 9; - repeated double buf = 10; - } - message Dogsketch { - int64 ts = 1; - int64 cnt = 2; - double min = 3; - double max = 4; - double avg = 5; - double sum = 6; - repeated sint32 k = 7; - repeated uint32 n = 8; - } - string metric = 1; - string host = 2; - repeated Distribution distributions = 3 [(gogoproto.nullable) = false]; - repeated string tags = 4; - reserved 5, 6; - reserved "distributionsK", "distributionsC"; - repeated Dogsketch dogsketches = 7 [(gogoproto.nullable) = false]; - } - repeated Sketch sketches = 1 [(gogoproto.nullable) = false]; - CommonMetadata metadata = 2 [(gogoproto.nullable) = false]; -} diff --git a/lib/protoparser/datadog/parser.go b/lib/protoparser/datadog/parser.go index 970adc0d1..c32a9c99d 100644 --- a/lib/protoparser/datadog/parser.go +++ b/lib/protoparser/datadog/parser.go @@ -1,9 +1,11 @@ package datadog import ( + "encoding/json" + "fmt" "strings" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // SplitTag splits DataDog tag into tag name and value. @@ -18,10 +20,98 @@ func SplitTag(tag string) (string, string) { return tag[:n], tag[n+1:] } -// Request represents DataDog submit metrics request +// Request represents DataDog POST request to /api/v1/series // // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics -type Request interface { - Extract(func(prompbmarshal.TimeSeries) error, func(string) string) error - Unmarshal([]byte) error +type Request struct { + Series []Series `json:"series"` +} + +func (req *Request) reset() { + // recursively reset all the fields in req in order to avoid field value + // re-use in json.Unmarshal() when the corresponding field is missing + // in the unmarshaled JSON. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432 + series := req.Series + for i := range series { + series[i].reset() + } + req.Series = series[:0] +} + +// Unmarshal unmarshals DataDog /api/v1/series request body from b to req. +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +// +// b shouldn't be modified when req is in use. +func (req *Request) Unmarshal(b []byte) error { + req.reset() + if err := json.Unmarshal(b, req); err != nil { + return fmt.Errorf("cannot unmarshal %q: %w", b, err) + } + // Set missing timestamps to the current time. + currentTimestamp := float64(fasttime.UnixTimestamp()) + series := req.Series + for i := range series { + points := series[i].Points + for j := range points { + if points[j][0] <= 0 { + points[j][0] = currentTimestamp + } + } + } + return nil +} + +// Series represents a series item from DataDog POST request to /api/v1/series +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +type Series struct { + Metric string `json:"metric"` + Host string `json:"host"` + + // The device field does not appear in the datadog docs, but datadog-agent does use it. + // Datadog agent (v7 at least), removes the tag "device" and adds it as its own field. Why? That I don't know! + // https://github.com/DataDog/datadog-agent/blob/0ada7a97fed6727838a6f4d9c87123d2aafde735/pkg/metrics/series.go#L84-L105 + Device string `json:"device"` + + // Do not decode Interval, since it isn't used by VictoriaMetrics + // Interval int64 `json:"interval"` + + Points []Point `json:"points"` + Tags []string `json:"tags"` + + // Do not decode Type, since it isn't used by VictoriaMetrics + // Type string `json:"type"` +} + +func (s *Series) reset() { + s.Metric = "" + s.Host = "" + s.Device = "" + + points := s.Points + for i := range points { + points[i] = Point{} + } + s.Points = points[:0] + + tags := s.Tags + for i := range tags { + tags[i] = "" + } + s.Tags = tags[:0] +} + +// Point represents a point from DataDog POST request to /api/v1/series +type Point [2]float64 + +// Timestamp returns timestamp in milliseconds from the given pt. +func (pt *Point) Timestamp() int64 { + return int64(pt[0] * 1000) +} + +// Value returns value from the given pt. +func (pt *Point) Value() float64 { + return pt[1] } diff --git a/lib/protoparser/datadog/parser_test.go b/lib/protoparser/datadog/parser_test.go index 185cb8b98..5ea720331 100644 --- a/lib/protoparser/datadog/parser_test.go +++ b/lib/protoparser/datadog/parser_test.go @@ -1,6 +1,7 @@ package datadog import ( + "reflect" "testing" ) @@ -20,3 +21,102 @@ func TestSplitTag(t *testing.T) { f("foo:bar", "foo", "bar") f(":bar", "", "bar") } + +func TestRequestUnmarshalMissingHost(t *testing.T) { + // This tests https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432 + req := Request{ + Series: []Series{{ + Host: "prev-host", + Device: "prev-device", + }}, + } + data := ` +{ + "series": [ + { + "metric": "system.load.1", + "points": [[ + 1575317847, + 0.5 + ]] + } + ] +}` + if err := req.Unmarshal([]byte(data)); err != nil { + t.Fatalf("unexpected error: %s", err) + } + reqExpected := Request{ + Series: []Series{{ + Metric: "system.load.1", + Points: []Point{{ + 1575317847, + 0.5, + }}, + }}, + } + if !reflect.DeepEqual(&req, &reqExpected) { + t.Fatalf("unexpected request parsed;\ngot\n%+v\nwant\n%+v", req, reqExpected) + } +} + +func TestRequestUnmarshalFailure(t *testing.T) { + f := func(s string) { + t.Helper() + var req Request + if err := req.Unmarshal([]byte(s)); err == nil { + t.Fatalf("expecting non-nil error for Unmarshal(%q)", s) + } + } + f("") + f("foobar") + f(`{"series":123`) + f(`1234`) + f(`[]`) +} + +func TestRequestUnmarshalSuccess(t *testing.T) { + f := func(s string, reqExpected *Request) { + t.Helper() + var req Request + if err := req.Unmarshal([]byte(s)); err != nil { + t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err) + } + if !reflect.DeepEqual(&req, reqExpected) { + t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &req, reqExpected) + } + } + f("{}", &Request{}) + f(` +{ + "series": [ + { + "host": "test.example.com", + "interval": 20, + "metric": "system.load.1", + "device": "/dev/sda", + "points": [[ + 1575317847, + 0.5 + ]], + "tags": [ + "environment:test" + ], + "type": "rate" + } + ] +} +`, &Request{ + Series: []Series{{ + Host: "test.example.com", + Metric: "system.load.1", + Device: "/dev/sda", + Points: []Point{{ + 1575317847, + 0.5, + }}, + Tags: []string{ + "environment:test", + }, + }}, + }) +} diff --git a/lib/protoparser/datadog/api/series/v1/api_timing_test.go b/lib/protoparser/datadog/parser_timing_test.go similarity index 97% rename from lib/protoparser/datadog/api/series/v1/api_timing_test.go rename to lib/protoparser/datadog/parser_timing_test.go index df37abdc7..f3c2b568a 100644 --- a/lib/protoparser/datadog/api/series/v1/api_timing_test.go +++ b/lib/protoparser/datadog/parser_timing_test.go @@ -26,7 +26,7 @@ func BenchmarkRequestUnmarshal(b *testing.B) { b.SetBytes(int64(len(reqBody))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { - req := new(Request) + var req Request for pb.Next() { if err := req.Unmarshal(reqBody); err != nil { panic(fmt.Errorf("unexpected error: %w", err)) diff --git a/lib/protoparser/datadog/stream/streamparser.go b/lib/protoparser/datadog/stream/streamparser.go index 1279829b7..d79f0b84a 100644 --- a/lib/protoparser/datadog/stream/streamparser.go +++ b/lib/protoparser/datadog/stream/streamparser.go @@ -5,7 +5,6 @@ import ( "flag" "fmt" "io" - "net/http" "regexp" "sync" @@ -13,19 +12,15 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" - apiSeriesV1 "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/series/v1" - apiSeriesV2 "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/series/v2" - apiSketchesBeta "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/sketches/beta" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) var ( // The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics - maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches") + maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series") // If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply: // https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics @@ -36,15 +31,13 @@ var ( "https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics") ) -// Parse parses DataDog POST request for /api/v1/series, /api/v2/series, /api/beta/sketches from reader and calls callback for the parsed request. +// Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request. // // callback shouldn't hold series after returning. -func Parse(req *http.Request, callback func(prompbmarshal.TimeSeries) error) error { - var r io.Reader - wcr := writeconcurrencylimiter.GetReader(req.Body) +func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error { + wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr - contentEncoding := req.Header.Get("Content-Encoding") switch contentEncoding { case "gzip": @@ -67,56 +60,25 @@ func Parse(req *http.Request, callback func(prompbmarshal.TimeSeries) error) err if err := ctx.Read(); err != nil { return err } - apiVersion := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${version}") - apiKind := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${kind}") - - ddReq := getRequest() - defer putRequest(ddReq) - - switch apiKind { - case "series": - switch apiVersion { - case "v1": - ddReq = new(apiSeriesV1.Request) - case "v2": - ddReq = new(apiSeriesV2.Request) - default: - return fmt.Errorf( - "API version %q of Datadog series endpoint is not supported", - apiVersion, - ) - } - case "sketches": - switch apiVersion { - case "beta": - ddReq = new(apiSketchesBeta.Request) - default: - return fmt.Errorf( - "API version %q of Datadog sketches endpoint is not supported", - apiVersion, - ) - } - default: - return fmt.Errorf( - "API kind %q of Datadog API is not supported", - apiKind, - ) - } - - if err := ddReq.Unmarshal(ctx.reqBuf.B); err != nil { + req := getRequest() + defer putRequest(req) + if err := req.Unmarshal(ctx.reqBuf.B); err != nil { unmarshalErrors.Inc() return fmt.Errorf("cannot unmarshal DataDog POST request with size %d bytes: %w", len(ctx.reqBuf.B), err) } - - cb := func(series prompbmarshal.TimeSeries) error { - rowsRead.Add(len(series.Samples)) - return callback(series) + rows := 0 + series := req.Series + for i := range series { + rows += len(series[i].Points) + if *sanitizeMetricName { + series[i].Metric = sanitizeName(series[i].Metric) + } } + rowsRead.Add(rows) - if err := ddReq.Extract(cb, sanitizeName(*sanitizeMetricName)); err != nil { + if err := callback(series); err != nil { return fmt.Errorf("error when processing imported data: %w", err) } - return nil } @@ -182,15 +144,15 @@ func putPushCtx(ctx *pushCtx) { var pushCtxPool sync.Pool var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) -func getRequest() datadog.Request { +func getRequest() *datadog.Request { v := requestPool.Get() if v == nil { - return nil + return &datadog.Request{} } - return v.(datadog.Request) + return v.(*datadog.Request) } -func putRequest(req datadog.Request) { +func putRequest(req *datadog.Request) { requestPool.Put(req) } @@ -199,15 +161,8 @@ var requestPool sync.Pool // sanitizeName performs DataDog-compatible sanitizing for metric names // // See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics -func sanitizeName(sanitize bool) func(string) string { - if sanitize { - return func(name string) string { - return namesSanitizer.Transform(name) - } - } - return func(name string) string { - return name - } +func sanitizeName(name string) string { + return namesSanitizer.Transform(name) } var namesSanitizer = bytesutil.NewFastStringTransformer(func(s string) string { @@ -221,5 +176,4 @@ var ( unsupportedDatadogChars = regexp.MustCompile(`[^0-9a-zA-Z_\.]+`) multiUnderscores = regexp.MustCompile(`_+`) underscoresWithDots = regexp.MustCompile(`_?\._?`) - insertApisVersionRegex = regexp.MustCompile(`.*/api/(?P[\w]+)/(?P[\w]+)`) ) diff --git a/lib/protoparser/datadog/stream/streamparser_test.go b/lib/protoparser/datadog/stream/streamparser_test.go index e74d6f253..19a52edf4 100644 --- a/lib/protoparser/datadog/stream/streamparser_test.go +++ b/lib/protoparser/datadog/stream/streamparser_test.go @@ -7,7 +7,7 @@ import ( func TestSanitizeName(t *testing.T) { f := func(s, resultExpected string) { t.Helper() - result := sanitizeName(true)(s) + result := sanitizeName(s) if result != resultExpected { t.Fatalf("unexpected result for sanitizeName(%q); got\n%q\nwant\n%q", s, result, resultExpected) }