mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
app/{vminsert,vmagent}: add ability to ingest data via DataDog "submit metrics" API
See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/206
This commit is contained in:
parent
a64155d91e
commit
91b3c601bc
47
README.md
47
README.md
@ -265,6 +265,52 @@ VictoriaMetrics also supports [importing data in Prometheus exposition format](#
|
||||
See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be used as drop-in replacement for Prometheus.
|
||||
|
||||
|
||||
## How to send data from DataDog agent
|
||||
|
||||
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD]() via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v1/series` path.
|
||||
|
||||
Run DataDog agent with `DD_DD_URL=http://victoriametrics-host:8428/datadog` environment variable in order to write data to VictoriaMetrics at `victoriametrics-host` host. Another option is to set `dd_url` param at [DataDog agent configuration file](https://docs.datadoghq.com/agent/guide/agent-configuration-files/) to `http://victoriametrics-host:8428/datadog`.
|
||||
|
||||
Example on how to send data to VictoriaMetrics via DataDog "submit metrics" API from command line:
|
||||
|
||||
```bash
|
||||
echo '
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"host": "test.example.com",
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"points": [[
|
||||
0,
|
||||
0.5
|
||||
]],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}
|
||||
' | curl -X POST --data-binary @- http://localhost:8428/datadog/api/v1/series
|
||||
```
|
||||
|
||||
The imported data can be read via [export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format):
|
||||
|
||||
```bash
|
||||
curl http://localhost:8428/api/v1/export -d 'match[]=system.load.1'
|
||||
```
|
||||
|
||||
This command should return the following output if everything is OK:
|
||||
|
||||
```
|
||||
{"metric":{"__name__":"system.load.1","environment":"test","host":"test.example.com"},"values":[0.5],"timestamps":[1632833641000]}
|
||||
```
|
||||
|
||||
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
|
||||
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
|
||||
|
||||
|
||||
## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
|
||||
|
||||
Use `http://<victoriametric-addr>:8428` url instead of InfluxDB url in agents' configs.
|
||||
@ -790,6 +836,7 @@ The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv
|
||||
Time series data can be imported via any supported ingestion protocol:
|
||||
|
||||
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). See [these docs](#prometheus-setup) for details.
|
||||
* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details.
|
||||
* InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
|
||||
* Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
|
||||
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
|
||||
|
@ -21,6 +21,7 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did
|
||||
See [Quick Start](#quick-start) for details.
|
||||
* Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details.
|
||||
* Accepts data via all ingestion protocols supported by VictoriaMetrics:
|
||||
* DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent).
|
||||
* InfluxDB line protocol via `http://<vmagent>:8429/write`. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
|
||||
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
|
||||
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-opentsdb-compatible-agents).
|
||||
|
99
app/vmagent/datadog/request_handler.go
Normal file
99
app/vmagent/datadog/request_handler.go
Normal file
@ -0,0 +1,99 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadog"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadog"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`)
|
||||
)
|
||||
|
||||
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
ce := req.Header.Get("Content-Encoding")
|
||||
return parser.ParseStream(req.Body, ce, func(series []parser.Series) error {
|
||||
return insertRows(at, series, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
rowsTotal := 0
|
||||
tssDst := ctx.WriteRequest.Timeseries[:0]
|
||||
labels := ctx.Labels[:0]
|
||||
samples := ctx.Samples[:0]
|
||||
for i := range series {
|
||||
ss := &series[i]
|
||||
rowsTotal += len(ss.Points)
|
||||
labelsLen := len(labels)
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "__name__",
|
||||
Value: ss.Metric,
|
||||
})
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "host",
|
||||
Value: ss.Host,
|
||||
})
|
||||
for _, tag := range ss.Tags {
|
||||
n := strings.IndexByte(tag, ':')
|
||||
if n < 0 {
|
||||
return fmt.Errorf("cannot find ':' in tag %q", tag)
|
||||
}
|
||||
name := tag[:n]
|
||||
value := tag[n+1:]
|
||||
if name == "host" {
|
||||
name = "exported_host"
|
||||
}
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: name,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
labels = append(labels, extraLabels...)
|
||||
samplesLen := len(samples)
|
||||
for _, pt := range ss.Points {
|
||||
samples = append(samples, prompbmarshal.Sample{
|
||||
Timestamp: pt.Timestamp(),
|
||||
Value: pt.Value(),
|
||||
})
|
||||
}
|
||||
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
||||
Labels: labels[labelsLen:],
|
||||
Samples: samples[samplesLen:],
|
||||
})
|
||||
}
|
||||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(rowsTotal)
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
}
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return nil
|
||||
}
|
@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadog"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
|
||||
@ -224,6 +225,36 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
influxQueryRequests.Inc()
|
||||
influxutils.WriteDatabaseNames(w)
|
||||
return true
|
||||
case "/datadog/api/v1/series":
|
||||
datadogWriteRequests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
|
||||
datadogWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "/datadog/api/v1/validate":
|
||||
datadogValidateRequests.Inc()
|
||||
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
fmt.Fprintf(w, `{"valid":true}`)
|
||||
return true
|
||||
case "/datadog/api/v1/check_run":
|
||||
datadogCheckRunRequests.Inc()
|
||||
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "/datadog/intake/":
|
||||
datadogIntakeRequests.Inc()
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
fmt.Fprintf(w, `{}`)
|
||||
return true
|
||||
case "/targets":
|
||||
promscrapeTargetsRequests.Inc()
|
||||
promscrape.WriteHumanReadableTargetsStatus(w, r)
|
||||
@ -330,6 +361,35 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
|
||||
influxQueryRequests.Inc()
|
||||
influxutils.WriteDatabaseNames(w)
|
||||
return true
|
||||
case "datadog/api/v1/series":
|
||||
datadogWriteRequests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "datadog/api/v1/validate":
|
||||
datadogValidateRequests.Inc()
|
||||
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
fmt.Fprintf(w, `{"valid":true}`)
|
||||
return true
|
||||
case "datadog/api/v1/check_run":
|
||||
datadogCheckRunRequests.Inc()
|
||||
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "datadog/intake/":
|
||||
datadogIntakeRequests.Inc()
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
fmt.Fprintf(w, `{}`)
|
||||
return true
|
||||
default:
|
||||
httpserver.Errorf(w, r, "unsupported multitenant path suffix: %q", p.Suffix)
|
||||
return true
|
||||
@ -352,10 +412,17 @@ var (
|
||||
nativeimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`)
|
||||
nativeimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`)
|
||||
|
||||
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`)
|
||||
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`)
|
||||
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/write", protocol="influx"}`)
|
||||
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/influx/write", protocol="influx"}`)
|
||||
|
||||
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/query", protocol="influx"}`)
|
||||
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`)
|
||||
|
||||
datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
|
||||
datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
|
||||
|
||||
datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
|
||||
datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)
|
||||
datadogIntakeRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/intake/", protocol="datadog"}`)
|
||||
|
||||
promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`)
|
||||
promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/targets"}`)
|
||||
|
97
app/vminsert/datadog/request_handler.go
Normal file
97
app/vminsert/datadog/request_handler.go
Normal file
@ -0,0 +1,97 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadog"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadog"}`)
|
||||
)
|
||||
|
||||
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
func InsertHandlerForHTTP(req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
ce := req.Header.Get("Content-Encoding")
|
||||
err := parser.ParseStream(req.Body, ce, func(series []parser.Series) error {
|
||||
return insertRows(series, extraLabels)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("headers: %q; err: %w", req.Header, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(series []parser.Series, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetInsertCtx()
|
||||
defer common.PutInsertCtx(ctx)
|
||||
|
||||
rowsLen := 0
|
||||
for i := range series {
|
||||
rowsLen += len(series[i].Points)
|
||||
}
|
||||
ctx.Reset(rowsLen)
|
||||
rowsTotal := 0
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range series {
|
||||
ss := &series[i]
|
||||
rowsTotal += len(ss.Points)
|
||||
ctx.Labels = ctx.Labels[:0]
|
||||
ctx.AddLabel("", ss.Metric)
|
||||
ctx.AddLabel("host", ss.Host)
|
||||
for _, tag := range ss.Tags {
|
||||
n := strings.IndexByte(tag, ':')
|
||||
if n < 0 {
|
||||
return fmt.Errorf("cannot find ':' in tag %q", tag)
|
||||
}
|
||||
name := tag[:n]
|
||||
value := tag[n+1:]
|
||||
if name == "host" {
|
||||
name = "exported_host"
|
||||
}
|
||||
ctx.AddLabel(name, value)
|
||||
}
|
||||
for j := range extraLabels {
|
||||
label := &extraLabels[j]
|
||||
ctx.AddLabel(label.Name, label.Value)
|
||||
}
|
||||
if hasRelabeling {
|
||||
ctx.ApplyRelabeling()
|
||||
}
|
||||
if len(ctx.Labels) == 0 {
|
||||
// Skip metric without labels.
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
var metricNameRaw []byte
|
||||
var err error
|
||||
for _, pt := range ss.Points {
|
||||
timestamp := pt.Timestamp()
|
||||
value := pt.Value()
|
||||
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, timestamp, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return ctx.FlushBufs()
|
||||
}
|
@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
|
||||
@ -155,6 +156,36 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
influxQueryRequests.Inc()
|
||||
influxutils.WriteDatabaseNames(w)
|
||||
return true
|
||||
case "/datadog/api/v1/series":
|
||||
datadogWriteRequests.Inc()
|
||||
if err := datadog.InsertHandlerForHTTP(r); err != nil {
|
||||
datadogWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "/datadog/api/v1/validate":
|
||||
datadogValidateRequests.Inc()
|
||||
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
fmt.Fprintf(w, `{"valid":true}`)
|
||||
return true
|
||||
case "/datadog/api/v1/check_run":
|
||||
datadogCheckRunRequests.Inc()
|
||||
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
w.WriteHeader(202)
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
case "/datadog/intake/":
|
||||
datadogIntakeRequests.Inc()
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
fmt.Fprintf(w, `{}`)
|
||||
return true
|
||||
case "/prometheus/targets", "/targets":
|
||||
promscrapeTargetsRequests.Inc()
|
||||
promscrape.WriteHumanReadableTargetsStatus(w, r)
|
||||
@ -204,10 +235,17 @@ var (
|
||||
nativeimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`)
|
||||
nativeimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`)
|
||||
|
||||
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`)
|
||||
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`)
|
||||
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/influx/write", protocol="influx"}`)
|
||||
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/influx/write", protocol="influx"}`)
|
||||
|
||||
influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/query", protocol="influx"}`)
|
||||
influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/influx/query", protocol="influx"}`)
|
||||
|
||||
datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
|
||||
datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
|
||||
|
||||
datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
|
||||
datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)
|
||||
datadogIntakeRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/intake/", protocol="datadog"}`)
|
||||
|
||||
promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`)
|
||||
promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/targets"}`)
|
||||
|
@ -6,6 +6,7 @@ sort: 15
|
||||
|
||||
## tip
|
||||
|
||||
* FEATURE: add ability to accept metrics from [DataDog agent](https://docs.datadoghq.com/agent/) and [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent). This option simplifies the migration path from DataDog to VictoriaMetrics. See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/206).
|
||||
* FEATURE: vmagent [enterprise](https://victoriametrics.com/enterprise.html): add support for data reading from [Apache Kafka](https://kafka.apache.org/).
|
||||
* FEATURE: calculate quantiles in the same way as Prometheus does in such functions as [quantile_over_time](https://docs.victoriametrics.com/MetricsQL.html#quantile_over_time) and [quantile](https://docs.victoriametrics.com/MetricsQL.html#quantile). Previously results from VictoriaMetrics could be slightly different than results from Prometheus. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612) issues.
|
||||
* FEATURE: add `rollup_scrape_interval(m[d])` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html), which returns `min`, `max` and `avg` values for the interval between samples for `m` on the given lookbehind window `d`.
|
||||
|
@ -191,12 +191,11 @@ It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.co
|
||||
- `<accountID>` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`,
|
||||
where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`.
|
||||
- `<suffix>` may have the following values:
|
||||
- `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write)
|
||||
- `influx/write` and `influx/api/v2/write` - for inserting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/).
|
||||
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html).
|
||||
This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag.
|
||||
See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
|
||||
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below).
|
||||
- `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
|
||||
- `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
|
||||
- `influx/write` and `influx/api/v2/write` - for inserting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
|
||||
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
|
||||
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` at `vmselect` (see below).
|
||||
- `prometheus/api/v1/import/native` - for importing data obtained via `api/v1/export/native` on `vmselect` (see below).
|
||||
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details.
|
||||
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
|
@ -265,6 +265,52 @@ VictoriaMetrics also supports [importing data in Prometheus exposition format](#
|
||||
See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be used as drop-in replacement for Prometheus.
|
||||
|
||||
|
||||
## How to send data from DataDog agent
|
||||
|
||||
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD]() via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v1/series` path.
|
||||
|
||||
Run DataDog agent with `DD_DD_URL=http://victoriametrics-host:8428/datadog` environment variable in order to write data to VictoriaMetrics at `victoriametrics-host` host. Another option is to set `dd_url` param at [DataDog agent configuration file](https://docs.datadoghq.com/agent/guide/agent-configuration-files/) to `http://victoriametrics-host:8428/datadog`.
|
||||
|
||||
Example on how to send data to VictoriaMetrics via DataDog "submit metrics" API from command line:
|
||||
|
||||
```bash
|
||||
echo '
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"host": "test.example.com",
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"points": [[
|
||||
0,
|
||||
0.5
|
||||
]],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}
|
||||
' | curl -X POST --data-binary @- http://localhost:8428/datadog/api/v1/series
|
||||
```
|
||||
|
||||
The imported data can be read via [export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format):
|
||||
|
||||
```bash
|
||||
curl http://localhost:8428/api/v1/export -d 'match[]=system.load.1'
|
||||
```
|
||||
|
||||
This command should return the following output if everything is OK:
|
||||
|
||||
```
|
||||
{"metric":{"__name__":"system.load.1","environment":"test","host":"test.example.com"},"values":[0.5],"timestamps":[1632833641000]}
|
||||
```
|
||||
|
||||
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
|
||||
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
|
||||
|
||||
|
||||
## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
|
||||
|
||||
Use `http://<victoriametric-addr>:8428` url instead of InfluxDB url in agents' configs.
|
||||
@ -790,6 +836,7 @@ The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv
|
||||
Time series data can be imported via any supported ingestion protocol:
|
||||
|
||||
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). See [these docs](#prometheus-setup) for details.
|
||||
* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details.
|
||||
* InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
|
||||
* Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
|
||||
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
|
||||
|
@ -269,6 +269,52 @@ VictoriaMetrics also supports [importing data in Prometheus exposition format](#
|
||||
See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be used as drop-in replacement for Prometheus.
|
||||
|
||||
|
||||
## How to send data from DataDog agent
|
||||
|
||||
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD]() via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v1/series` path.
|
||||
|
||||
Run DataDog agent with `DD_DD_URL=http://victoriametrics-host:8428/datadog` environment variable in order to write data to VictoriaMetrics at `victoriametrics-host` host. Another option is to set `dd_url` param at [DataDog agent configuration file](https://docs.datadoghq.com/agent/guide/agent-configuration-files/) to `http://victoriametrics-host:8428/datadog`.
|
||||
|
||||
Example on how to send data to VictoriaMetrics via DataDog "submit metrics" API from command line:
|
||||
|
||||
```bash
|
||||
echo '
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"host": "test.example.com",
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"points": [[
|
||||
0,
|
||||
0.5
|
||||
]],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}
|
||||
' | curl -X POST --data-binary @- http://localhost:8428/datadog/api/v1/series
|
||||
```
|
||||
|
||||
The imported data can be read via [export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format):
|
||||
|
||||
```bash
|
||||
curl http://localhost:8428/api/v1/export -d 'match[]=system.load.1'
|
||||
```
|
||||
|
||||
This command should return the following output if everything is OK:
|
||||
|
||||
```
|
||||
{"metric":{"__name__":"system.load.1","environment":"test","host":"test.example.com"},"values":[0.5],"timestamps":[1632833641000]}
|
||||
```
|
||||
|
||||
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
|
||||
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
|
||||
|
||||
|
||||
## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
|
||||
|
||||
Use `http://<victoriametric-addr>:8428` url instead of InfluxDB url in agents' configs.
|
||||
@ -794,6 +840,7 @@ The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv
|
||||
Time series data can be imported via any supported ingestion protocol:
|
||||
|
||||
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). See [these docs](#prometheus-setup) for details.
|
||||
* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details.
|
||||
* InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
|
||||
* Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
|
||||
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
|
||||
|
@ -25,6 +25,7 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did
|
||||
See [Quick Start](#quick-start) for details.
|
||||
* Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details.
|
||||
* Accepts data via all ingestion protocols supported by VictoriaMetrics:
|
||||
* DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent).
|
||||
* InfluxDB line protocol via `http://<vmagent>:8429/write`. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
|
||||
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
|
||||
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-opentsdb-compatible-agents).
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/klauspost/compress/gzip"
|
||||
"github.com/klauspost/compress/zlib"
|
||||
)
|
||||
|
||||
// GetGzipReader returns new gzip reader from the pool.
|
||||
@ -29,3 +30,24 @@ func PutGzipReader(zr *gzip.Reader) {
|
||||
}
|
||||
|
||||
var gzipReaderPool sync.Pool
|
||||
|
||||
// GetZlibReader returns zlib reader.
|
||||
func GetZlibReader(r io.Reader) (io.ReadCloser, error) {
|
||||
v := zlibReaderPool.Get()
|
||||
if v == nil {
|
||||
return zlib.NewReader(r)
|
||||
}
|
||||
zr := v.(io.ReadCloser)
|
||||
if err := zr.(zlib.Resetter).Reset(r, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return zr, nil
|
||||
}
|
||||
|
||||
// PutZlibReader returns back zlib reader obtained via GetZlibReader.
|
||||
func PutZlibReader(zr io.ReadCloser) {
|
||||
_ = zr.Close()
|
||||
zlibReaderPool.Put(zr)
|
||||
}
|
||||
|
||||
var zlibReaderPool sync.Pool
|
73
lib/protoparser/datadog/parser.go
Normal file
73
lib/protoparser/datadog/parser.go
Normal file
@ -0,0 +1,73 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// Request represents DataDog POST request to /api/v1/series
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
type Request struct {
|
||||
Series []Series `json:"series"`
|
||||
}
|
||||
|
||||
func (req *Request) reset() {
|
||||
req.Series = req.Series[:0]
|
||||
}
|
||||
|
||||
// Unmarshal unmarshals DataDog /api/v1/series request body from b to req.
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
//
|
||||
// b shouldn't be modified when req is in use.
|
||||
func (req *Request) Unmarshal(b []byte) error {
|
||||
req.reset()
|
||||
if err := json.Unmarshal(b, req); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal %q: %w", b, err)
|
||||
}
|
||||
// Set missing timestamps to the current time.
|
||||
currentTimestamp := float64(fasttime.UnixTimestamp())
|
||||
series := req.Series
|
||||
for i := range series {
|
||||
points := series[i].Points
|
||||
for j := range points {
|
||||
if points[j][0] <= 0 {
|
||||
points[j][0] = currentTimestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Series represents a series item from DataDog POST request to /api/v1/series
|
||||
//
|
||||
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
type Series struct {
|
||||
Host string `json:"host"`
|
||||
|
||||
// Do not decode Interval, since it isn't used by VictoriaMetrics
|
||||
// Interval int64 `json:"interval"`
|
||||
|
||||
Metric string `json:"metric"`
|
||||
Points []Point `json:"points"`
|
||||
Tags []string `json:"tags"`
|
||||
|
||||
// Do not decode Type, since it isn't used by VictoriaMetrics
|
||||
// Type string `json:"type"`
|
||||
}
|
||||
|
||||
// Point represents a point from DataDog POST request to /api/v1/series
|
||||
type Point [2]float64
|
||||
|
||||
// Timestamp returns timestamp in milliseconds from the given pt.
|
||||
func (pt *Point) Timestamp() int64 {
|
||||
return int64(pt[0] * 1000)
|
||||
}
|
||||
|
||||
// Value returns value from the given pt.
|
||||
func (pt *Point) Value() float64 {
|
||||
return pt[1]
|
||||
}
|
66
lib/protoparser/datadog/parser_test.go
Normal file
66
lib/protoparser/datadog/parser_test.go
Normal file
@ -0,0 +1,66 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRequestUnmarshalFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
var req Request
|
||||
if err := req.Unmarshal([]byte(s)); err == nil {
|
||||
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
|
||||
}
|
||||
}
|
||||
f("")
|
||||
f("foobar")
|
||||
f(`{"series":123`)
|
||||
f(`1234`)
|
||||
f(`[]`)
|
||||
}
|
||||
|
||||
func TestRequestUnmarshalSuccess(t *testing.T) {
|
||||
f := func(s string, reqExpected *Request) {
|
||||
t.Helper()
|
||||
var req Request
|
||||
if err := req.Unmarshal([]byte(s)); err != nil {
|
||||
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
|
||||
}
|
||||
if !reflect.DeepEqual(&req, reqExpected) {
|
||||
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &req, reqExpected)
|
||||
}
|
||||
}
|
||||
f("{}", &Request{})
|
||||
f(`
|
||||
{
|
||||
"series": [
|
||||
{
|
||||
"host": "test.example.com",
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"points": [[
|
||||
1575317847,
|
||||
0.5
|
||||
]],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}
|
||||
`, &Request{
|
||||
Series: []Series{{
|
||||
Host: "test.example.com",
|
||||
Metric: "system.load.1",
|
||||
Points: []Point{{
|
||||
1575317847,
|
||||
0.5,
|
||||
}},
|
||||
Tags: []string{
|
||||
"environment:test",
|
||||
},
|
||||
}},
|
||||
})
|
||||
}
|
39
lib/protoparser/datadog/parser_timing_test.go
Normal file
39
lib/protoparser/datadog/parser_timing_test.go
Normal file
@ -0,0 +1,39 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func BenchmarkRequestUnmarshal(b *testing.B) {
|
||||
reqBody := []byte(`{
|
||||
"series": [
|
||||
{
|
||||
"host": "test.example.com",
|
||||
"interval": 20,
|
||||
"metric": "system.load.1",
|
||||
"points": [
|
||||
1575317847,
|
||||
0.5
|
||||
],
|
||||
"tags": [
|
||||
"environment:test"
|
||||
],
|
||||
"type": "rate"
|
||||
}
|
||||
]
|
||||
}`)
|
||||
b.SetBytes(int64(len(reqBody)))
|
||||
b.ReportAllocs()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var req Request
|
||||
for pb.Next() {
|
||||
if err := req.Unmarshal(reqBody); err != nil {
|
||||
panic(fmt.Errorf("unexpected error: %w", err))
|
||||
}
|
||||
if len(req.Series) != 1 {
|
||||
panic(fmt.Errorf("unexpected number of series unmarshaled: got %d; want 4", len(req.Series)))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
138
lib/protoparser/datadog/streamparser.go
Normal file
138
lib/protoparser/datadog/streamparser.go
Normal file
@ -0,0 +1,138 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
|
||||
var maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series")
|
||||
|
||||
// ParseStream parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
|
||||
//
|
||||
// callback shouldn't hold series after returning.
|
||||
func ParseStream(r io.Reader, contentEncoding string, callback func(series []Series) error) error {
|
||||
switch contentEncoding {
|
||||
case "gzip":
|
||||
zr, err := common.GetGzipReader(r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read gzipped DataDog data: %w", err)
|
||||
}
|
||||
defer common.PutGzipReader(zr)
|
||||
r = zr
|
||||
case "deflate":
|
||||
zlr, err := common.GetZlibReader(r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read deflated DataDog data: %w", err)
|
||||
}
|
||||
defer common.PutZlibReader(zlr)
|
||||
r = zlr
|
||||
}
|
||||
ctx := getPushCtx(r)
|
||||
defer putPushCtx(ctx)
|
||||
if err := ctx.Read(); err != nil {
|
||||
return err
|
||||
}
|
||||
req := getRequest()
|
||||
defer putRequest(req)
|
||||
if err := req.Unmarshal(ctx.reqBuf.B); err != nil {
|
||||
unmarshalErrors.Inc()
|
||||
return fmt.Errorf("cannot unmarshal DataDog POST request with size %d bytes: %s", len(ctx.reqBuf.B), err)
|
||||
}
|
||||
rows := 0
|
||||
series := req.Series
|
||||
for i := range series {
|
||||
rows += len(series[i].Points)
|
||||
}
|
||||
rowsRead.Add(rows)
|
||||
|
||||
if err := callback(series); err != nil {
|
||||
return fmt.Errorf("error when processing imported data: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type pushCtx struct {
|
||||
br *bufio.Reader
|
||||
reqBuf bytesutil.ByteBuffer
|
||||
}
|
||||
|
||||
func (ctx *pushCtx) reset() {
|
||||
ctx.br.Reset(nil)
|
||||
ctx.reqBuf.Reset()
|
||||
}
|
||||
|
||||
func (ctx *pushCtx) Read() error {
|
||||
readCalls.Inc()
|
||||
lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1)
|
||||
startTime := fasttime.UnixTimestamp()
|
||||
reqLen, err := ctx.reqBuf.ReadFrom(lr)
|
||||
if err != nil {
|
||||
readErrors.Inc()
|
||||
return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
|
||||
}
|
||||
if reqLen > int64(maxInsertRequestSize.N) {
|
||||
readErrors.Inc()
|
||||
return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadog"}`)
|
||||
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadog"}`)
|
||||
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadog"}`)
|
||||
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadog"}`)
|
||||
)
|
||||
|
||||
func getPushCtx(r io.Reader) *pushCtx {
|
||||
select {
|
||||
case ctx := <-pushCtxPoolCh:
|
||||
ctx.br.Reset(r)
|
||||
return ctx
|
||||
default:
|
||||
if v := pushCtxPool.Get(); v != nil {
|
||||
ctx := v.(*pushCtx)
|
||||
ctx.br.Reset(r)
|
||||
return ctx
|
||||
}
|
||||
return &pushCtx{
|
||||
br: bufio.NewReaderSize(r, 64*1024),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func putPushCtx(ctx *pushCtx) {
|
||||
ctx.reset()
|
||||
select {
|
||||
case pushCtxPoolCh <- ctx:
|
||||
default:
|
||||
pushCtxPool.Put(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
var pushCtxPool sync.Pool
|
||||
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
|
||||
|
||||
func getRequest() *Request {
|
||||
v := requestPool.Get()
|
||||
if v == nil {
|
||||
return &Request{}
|
||||
}
|
||||
return v.(*Request)
|
||||
}
|
||||
|
||||
func putRequest(req *Request) {
|
||||
requestPool.Put(req)
|
||||
}
|
||||
|
||||
var requestPool sync.Pool
|
Loading…
Reference in New Issue
Block a user