diff --git a/README.md b/README.md index 9923e9081..e38769a28 100644 --- a/README.md +++ b/README.md @@ -509,8 +509,9 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) -via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path. +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/), [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) and +[DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` or via "sketches" API at `/datadog/api/beta/sketches`. ### Sending metrics to VictoriaMetrics @@ -536,11 +537,11 @@ add the following line: dd_url: http://victoriametrics:8428/datadog ``` -[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data, +[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept DataDog metrics format. Depending on where vmagent will forward data, pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. -### Sending metrics to Datadog and VictoriaMetrics - +### Sending metrics to DataDog and VictoriaMetrics + DataDog allows configuring [Dual Shipping](https://docs.datadoghq.com/agent/guide/dual-shipping/) for metrics sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `additional_endpoints`. @@ -564,6 +565,19 @@ additional_endpoints: - apikey ``` +### Send metrics via Serverless DataDog plugin + +Disable logs (logs ingestion is not supported by VictoriaMetrics) and set a custom endpoint in `serverless.yaml`: + +``` +custom: + datadog: + enableDDLogs: false # Disabled not supported DD logs + apiKey: fakekey # Set any key, otherwise plugin fails +provider: + environment: + DD_DD_URL: <>/datadog # VictoriaMetrics endpoint for DataDog +``` ### Send via cURL diff --git a/app/vmagent/datadogsketches/request_handler.go b/app/vmagent/datadogsketches/request_handler.go index 42a8665ff..cb7c9cb3d 100644 --- a/app/vmagent/datadogsketches/request_handler.go +++ b/app/vmagent/datadogsketches/request_handler.go @@ -41,19 +41,15 @@ func insertRows(at *auth.Token, sketches []*datadogsketches.Sketch, extraLabels tssDst := ctx.WriteRequest.Timeseries[:0] labels := ctx.Labels[:0] samples := ctx.Samples[:0] - for i := range sketches { - sketch := sketches[i] - metrics := sketch.ToHistogram() - rowsTotal += sketch.RowsCount() - for m := range metrics { - metric := metrics[m] + for _, sketch := range sketches { + ms := sketch.ToSummary() + for _, m := range ms { labelsLen := len(labels) labels = append(labels, prompbmarshal.Label{ Name: "__name__", - Value: metric.Name, + Value: m.Name, }) - for l := range metric.Labels { - label := metric.Labels[l] + for _, label := range m.Labels { labels = append(labels, prompbmarshal.Label{ Name: label.Name, Value: label.Value, @@ -71,13 +67,13 @@ func insertRows(at *auth.Token, sketches []*datadogsketches.Sketch, extraLabels } labels = append(labels, extraLabels...) samplesLen := len(samples) - for p := range metric.Points { - point := metric.Points[p] + for _, p := range m.Points { samples = append(samples, prompbmarshal.Sample{ - Timestamp: sketch.Dogsketches[p].Ts * 1000, - Value: point, + Timestamp: p.Timestamp, + Value: p.Value, }) } + rowsTotal += len(m.Points) tssDst = append(tssDst, prompbmarshal.TimeSeries{ Labels: labels[labelsLen:], Samples: samples[samplesLen:], diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 913742265..8e77afea2 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -613,6 +613,15 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true + case "datadog/api/beta/sketches": + datadogsketchesWriteRequests.Inc() + if err := datadogsketches.InsertHandlerForHTTP(at, r); err != nil { + datadogsketchesWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(202) + return true case "datadog/api/v1/validate": datadogValidateRequests.Inc() // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key diff --git a/app/vminsert/datadogsketches/request_handler.go b/app/vminsert/datadogsketches/request_handler.go index b00f885aa..15973fbd0 100644 --- a/app/vminsert/datadogsketches/request_handler.go +++ b/app/vminsert/datadogsketches/request_handler.go @@ -35,23 +35,18 @@ func insertRows(sketches []*datadogsketches.Sketch, extraLabels []prompbmarshal. defer common.PutInsertCtx(ctx) rowsLen := 0 - for i := range sketches { - sketch := sketches[i] + for _, sketch := range sketches { rowsLen += sketch.RowsCount() } ctx.Reset(rowsLen) rowsTotal := 0 hasRelabeling := relabel.HasRelabeling() - for i := range sketches { - sketch := sketches[i] - metrics := sketch.ToHistogram() - rowsTotal += sketch.RowsCount() - for m := range metrics { - metric := metrics[m] + for _, sketch := range sketches { + ms := sketch.ToSummary() + for _, m := range ms { ctx.Labels = ctx.Labels[:0] - ctx.AddLabel("", metric.Name) - for l := range metric.Labels { - label := metric.Labels[l] + ctx.AddLabel("", m.Name) + for _, label := range m.Labels { ctx.AddLabel(label.Name, label.Value) } for _, tag := range sketch.Tags { @@ -75,14 +70,13 @@ func insertRows(sketches []*datadogsketches.Sketch, extraLabels []prompbmarshal. ctx.SortLabelsIfNeeded() var metricNameRaw []byte var err error - for p := range metric.Points { - value := metric.Points[p] - timestamp := sketch.Dogsketches[p].Ts * 1000 - metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, timestamp, value) + for _, p := range m.Points { + metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, p.Timestamp, p.Value) if err != nil { return err } } + rowsTotal += len(m.Points) } } rowsInserted.Add(rowsTotal) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 559be0396..72b65af3d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -29,6 +29,7 @@ The sandbox cluster installation is running under the constant load generated by ## tip * FEATURE: all VictoriaMetrics components: add support for TLS client certificate verification at `-httpListenAddr` (aka [mTLS](https://en.wikipedia.org/wiki/Mutual_authentication)). See [these docs](https://docs.victoriametrics.com/#mtls-protection). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5458). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com): add support for data ingestion via [DataDog lambda extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) aka `/api/beta/sketches` endpoint. See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3091). Thanks to @AndrewChubatiuk for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5584). * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add `-disableReroutingOnUnavailable` command-line flag, which can be used for reducing resource usage spikes at `vmstorage` nodes during rolling restart. Thanks to @Muxa1L for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5713). * FEATURE: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683): add `Targets scraped/s` stat panel showing the number of targets scraped by the vmagent per-second. * FEATURE: [dashboards/all](https://grafana.com/orgs/victoriametrics): add new panel `CPU spent on GC`. It should help identifying cases when too much CPU is spent on garbage collection, and advice users on how this can be addressed. diff --git a/docs/README.md b/docs/README.md index b9d892952..4f2fc1f45 100644 --- a/docs/README.md +++ b/docs/README.md @@ -512,8 +512,9 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) -via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path. +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/), [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) and +[DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` or via "sketches" API at `/datadog/api/beta/sketches`. ### Sending metrics to VictoriaMetrics @@ -539,11 +540,11 @@ add the following line: dd_url: http://victoriametrics:8428/datadog ``` -[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data, +[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept DataDog metrics format. Depending on where vmagent will forward data, pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. -### Sending metrics to Datadog and VictoriaMetrics - +### Sending metrics to DataDog and VictoriaMetrics + DataDog allows configuring [Dual Shipping](https://docs.datadoghq.com/agent/guide/dual-shipping/) for metrics sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `additional_endpoints`. @@ -567,6 +568,19 @@ additional_endpoints: - apikey ``` +### Send metrics via Serverless DataDog plugin + +Disable logs (logs ingestion is not supported by VictoriaMetrics) and set a custom endpoint in `serverless.yaml`: + +``` +custom: + datadog: + enableDDLogs: false # Disabled not supported DD logs + apiKey: fakekey # Set any key, otherwise plugin fails +provider: + environment: + DD_DD_URL: <>/datadog # VictoriaMetrics endpoint for DataDog +``` ### Send via cURL diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 4c88059e9..e45d0c71d 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -520,8 +520,9 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/), [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) or [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) -via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path or via "sketches" API at `/datadog/api/beta/sketches`. +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/), [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) and +[DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` or via "sketches" API at `/datadog/api/beta/sketches`. ### Sending metrics to VictoriaMetrics @@ -551,7 +552,7 @@ dd_url: http://victoriametrics:8428/datadog pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. ### Sending metrics to DataDog and VictoriaMetrics - + DataDog allows configuring [Dual Shipping](https://docs.datadoghq.com/agent/guide/dual-shipping/) for metrics sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `additional_endpoints`. @@ -575,10 +576,10 @@ additional_endpoints: - apikey ``` +### Send metrics via Serverless DataDog plugin -### Send via Serverless DataDog plugin +Disable logs (logs ingestion is not supported by VictoriaMetrics) and set a custom endpoint in `serverless.yaml`: -Disable logs (logs ingestion is not supported by Victoria Metrics) and set a custom endpoint in serverless.yaml ``` custom: datadog: @@ -586,7 +587,7 @@ custom: apiKey: fakekey # Set any key, otherwise plugin fails provider: environment: - DD_DD_URL: <>/datadog # Victoria Metrics endpoint for DataDog + DD_DD_URL: <>/datadog # VictoriaMetrics endpoint for DataDog ``` ### Send via cURL diff --git a/go.mod b/go.mod index a17325da1..712de6a25 100644 --- a/go.mod +++ b/go.mod @@ -37,8 +37,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 ) -require github.com/stretchr/testify v1.8.4 - require ( cloud.google.com/go v0.112.0 // indirect cloud.google.com/go/compute v1.23.4 // indirect @@ -104,6 +102,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/rivo/uniseg v0.4.6 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/stretchr/testify v1.8.4 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect go.opencensus.io v0.24.0 // indirect diff --git a/lib/protoparser/datadogsketches/parser.go b/lib/protoparser/datadogsketches/parser.go index b04dd4616..87609d1bb 100644 --- a/lib/protoparser/datadogsketches/parser.go +++ b/lib/protoparser/datadogsketches/parser.go @@ -9,6 +9,7 @@ import ( ) var ( + // TODO: @AndrewChubatiuk, please provide a permalink for the original source code where these constants were extracted epsillon = 1.0 / 128 gamma = 1 + 2*epsillon gammaLn = math.Log(gamma) @@ -17,7 +18,16 @@ var ( quantiles = []float64{0.5, 0.75, 0.9, 0.95, 0.99} ) -type label struct { +var quantilesStr = func() []string { + a := make([]string, len(quantiles)) + for i, q := range quantiles { + a[i] = strconv.FormatFloat(q, 'g', 3, 64) + } + return a +}() + +// Label is a single label for Metric +type Label struct { Name string Value string } @@ -25,8 +35,14 @@ type label struct { // Metric stores metrics extracted from sketches type Metric struct { Name string - Labels []label - Points []float64 + Labels []Label + Points []Point +} + +// Point stores a single point extracted from sketches +type Point struct { + Value float64 + Timestamp int64 } // SketchPayload stores sketches extracted from /api/beta/sketches endpoint @@ -34,13 +50,15 @@ type Metric struct { // message SketchPayload { // repeated Sketch sketches = 1 // } +// +// See https://github.com/DataDog/agent-payload/blob/38db68d9641c8a0bd2e1eac53b9d54793448850f/proto/metrics/agent_payload.proto#L90 type SketchPayload struct { Sketches []*Sketch } -// UnmarshalProtobuf decodes byte array to SketchPayload struct +// UnmarshalProtobuf decodes src to SketchPayload struct func (sp *SketchPayload) UnmarshalProtobuf(src []byte) (err error) { - sp.Sketches = sp.Sketches[:0] + sp.Sketches = nil var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) @@ -51,13 +69,13 @@ func (sp *SketchPayload) UnmarshalProtobuf(src []byte) (err error) { case 1: data, ok := fc.MessageData() if !ok { - return fmt.Errorf("cannot read SketchPayload sketches data") + return fmt.Errorf("cannot read Sketch data") } - sp.Sketches = append(sp.Sketches, &Sketch{}) - s := sp.Sketches[len(sp.Sketches)-1] + var s Sketch if err := s.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal sketch: %w", err) + return fmt.Errorf("cannot unmarshal Sketch: %w", err) } + sp.Sketches = append(sp.Sketches, &s) } } return nil @@ -71,6 +89,8 @@ func (sp *SketchPayload) UnmarshalProtobuf(src []byte) (err error) { // repeated string tags = 4; // repeated Dogsketch dogsketches = 7 // } +// +// See https://github.com/DataDog/agent-payload/blob/38db68d9641c8a0bd2e1eac53b9d54793448850f/proto/metrics/agent_payload.proto#L91 type Sketch struct { Metric string Host string @@ -78,12 +98,12 @@ type Sketch struct { Dogsketches []*Dogsketch } -// unmarshalProtobuf decodes byte array to Sketch struct +// unmarshalProtobuf decodes src to Sketch struct func (s *Sketch) unmarshalProtobuf(src []byte) (err error) { s.Metric = "" s.Host = "" - s.Tags = s.Tags[:0] - s.Dogsketches = s.Dogsketches[:0] + s.Tags = nil + s.Dogsketches = nil var fc easyproto.FieldContext for len(src) > 0 { @@ -95,78 +115,87 @@ func (s *Sketch) unmarshalProtobuf(src []byte) (err error) { case 1: metric, ok := fc.String() if !ok { - return fmt.Errorf("cannot read Sketch metric") + return fmt.Errorf("cannot read metric") } s.Metric = metric case 2: host, ok := fc.String() if !ok { - return fmt.Errorf("cannot read Sketch host") + return fmt.Errorf("cannot read host") } s.Host = host case 4: tag, ok := fc.String() if !ok { - return fmt.Errorf("cannot read Sketch tag") + return fmt.Errorf("cannot read tag") } s.Tags = append(s.Tags, tag) case 7: data, ok := fc.MessageData() if !ok { - return fmt.Errorf("cannot read Sketch dogsketches data") + return fmt.Errorf("cannot read Dogsketch data") } - s.Dogsketches = append(s.Dogsketches, &Dogsketch{}) - d := s.Dogsketches[len(s.Dogsketches)-1] + var d Dogsketch if err := d.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal dogsketch: %w", err) + return fmt.Errorf("cannot unmarshal Dogsketch: %w", err) } + s.Dogsketches = append(s.Dogsketches, &d) } } return nil } -// RowsCount calculates generated rows num from sketch +// RowsCount returns the number of samples s generates. func (s *Sketch) RowsCount() int { - return (len(quantiles) + len(s.extractAggr())) * len(s.Dogsketches) + // The sketch contains len(quantiles) plus *_sum and *_count metrics + // per each Dogsketch in s.Dogsketches. + return (len(quantiles) + 2) * len(s.Dogsketches) } -func (s *Sketch) extractAggr() []*Metric { - return []*Metric{ - { - Name: s.Metric + "_sum", - Labels: []label{}, - Points: make([]float64, len(s.Dogsketches)), - }, { - Name: s.Metric + "_count", - Labels: []label{}, - Points: make([]float64, len(s.Dogsketches)), - }, - } -} - -// ToHistogram generates histogram metrics -func (s *Sketch) ToHistogram() []*Metric { +// ToSummary generates Prometheus summary from the given s. +func (s *Sketch) ToSummary() []*Metric { + metrics := make([]*Metric, len(quantiles)+2) dogsketches := s.Dogsketches - aggr := s.extractAggr() - metrics := make([]*Metric, len(quantiles)) - for q := range quantiles { - quantile := quantiles[q] - metrics[q] = &Metric{ - Name: s.Metric, - Labels: []label{{ - Name: "quantile", - Value: strconv.FormatFloat(quantile, 'g', 3, 64), - }}, - Points: make([]float64, len(dogsketches)), + + sumPoints := make([]Point, len(dogsketches)) + countPoints := make([]Point, len(dogsketches)) + metrics[len(metrics)-2] = &Metric{ + Name: s.Metric + "_sum", + Points: sumPoints, + } + metrics[len(metrics)-1] = &Metric{ + Name: s.Metric + "_count", + Points: countPoints, + } + + for i, q := range quantiles { + points := make([]Point, len(dogsketches)) + for j, d := range dogsketches { + timestamp := d.Ts * 1000 + points[j] = Point{ + Timestamp: timestamp, + Value: d.valueForQuantile(q), + } + sumPoints[j] = Point{ + Timestamp: timestamp, + Value: d.Sum, + } + countPoints[j] = Point{ + Timestamp: timestamp, + Value: float64(d.Cnt), + } } - for d := range dogsketches { - dogsketch := dogsketches[d] - aggr[0].Points[d] = dogsketch.Sum - aggr[1].Points[d] = float64(dogsketch.Cnt) - metrics[q].Points[d] = dogsketch.pointForQuantile(quantile) + metrics[i] = &Metric{ + Name: s.Metric, + Labels: []Label{{ + Name: "quantile", + Value: quantilesStr[i], + }}, + Points: points, } } - return append(metrics, aggr...) + + return metrics } // Dogsketch proto struct @@ -180,6 +209,8 @@ func (s *Sketch) ToHistogram() []*Metric { // repeated sint32 k = 7; // repeated uint32 n = 8; // } +// +// See https://github.com/DataDog/agent-payload/blob/38db68d9641c8a0bd2e1eac53b9d54793448850f/proto/metrics/agent_payload.proto#L104 type Dogsketch struct { Ts int64 Cnt int64 @@ -190,91 +221,102 @@ type Dogsketch struct { N []uint32 } -// unmarshalProtobuf decodes byte array to Dogsketch struct +// unmarshalProtobuf decodes src to Dogsketch struct func (d *Dogsketch) unmarshalProtobuf(src []byte) (err error) { d.Ts = 0 d.Cnt = 0 d.Min = 0.0 d.Max = 0.0 d.Sum = 0.0 - d.K = d.K[:0] - d.N = d.N[:0] + d.K = nil + d.N = nil + var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) if err != nil { - return fmt.Errorf("cannot read next field in Dogsketch message, %w", err) + return fmt.Errorf("cannot read next field in Dogsketch message: %w", err) } switch fc.FieldNum { case 1: ts, ok := fc.Int64() if !ok { - return fmt.Errorf("cannot read Dogsketch timestamp") + return fmt.Errorf("cannot read timestamp") } d.Ts = ts case 2: cnt, ok := fc.Int64() if !ok { - return fmt.Errorf("cannot read Dogsketch count") + return fmt.Errorf("cannot read count") } d.Cnt = cnt case 3: min, ok := fc.Double() if !ok { - return fmt.Errorf("cannot read Dogsketch min") + return fmt.Errorf("cannot read min") } d.Min = min case 4: max, ok := fc.Double() if !ok { - return fmt.Errorf("cannot read Dogsketch max") + return fmt.Errorf("cannot read max") } d.Max = max case 6: sum, ok := fc.Double() if !ok { - return fmt.Errorf("cannot read Dogsketch sum") + return fmt.Errorf("cannot read sum") } d.Sum = sum case 7: var ok bool d.K, ok = fc.UnpackSint32s(d.K) if !ok { - return fmt.Errorf("cannot read Dogsketch k") + return fmt.Errorf("cannot read k") } case 8: var ok bool d.N, ok = fc.UnpackUint32s(d.N) if !ok { - return fmt.Errorf("cannot read Dogsketch n") + return fmt.Errorf("cannot read n") } } } return nil } -func (d *Dogsketch) pointForQuantile(quantile float64) float64 { +func (d *Dogsketch) valueForQuantile(q float64) float64 { switch { case d.Cnt == 0: return 0 - case quantile <= 0: + case q <= 0: return d.Min - case quantile >= 1: + case q >= 1: return d.Max } - rank := quantile * float64(d.Cnt-1) - nLen := len(d.N) - for cnt, i := 0.0, 0; i < nLen; i++ { - cnt += float64(d.N[i]) + ns := d.N + ks := d.K + if len(ns) != len(ks) { + // Avoid index out of range panic in the loop below. + return 0 + } + + rank := q * float64(d.Cnt-1) + cnt := float64(0) + for i, n := range ns { + cnt += float64(n) if cnt <= rank { continue } - weight := (cnt - rank) / float64(d.N[i]) - vLow := f64(d.K[i]) + weight := (cnt - rank) / float64(n) + vLow := f64(ks[i]) vHigh := vLow * gamma switch i { - case nLen: + // TODO: I'm unsure this code is correct. i cannot equal len(ns) in this loop. + // @AndrewChubatiuk, please add a permalink to the original source code, which was used + // for writing this code, in the comments to this function. + case len(ns): vHigh = d.Max case 0: vLow = d.Min @@ -288,6 +330,8 @@ func f64(k int32) float64 { switch { case k < 0: return -f64(-k) + // TODO: I'm unsure this logic is correct, since k can be smaller than math.MinInt16 and bigger than math.MaxInt16 + // @AndrewChubatiuk, please add a permalink to the original source code, which was used for writing this code. case k == math.MaxInt16 || k == math.MinInt16: return math.Inf(int(k)) case k == 0: diff --git a/lib/protoparser/datadogsketches/parser_test.go b/lib/protoparser/datadogsketches/parser_test.go index ac52efb98..94c3bf0f1 100644 --- a/lib/protoparser/datadogsketches/parser_test.go +++ b/lib/protoparser/datadogsketches/parser_test.go @@ -1,16 +1,17 @@ package datadogsketches import ( + "math" "testing" - - "github.com/stretchr/testify/assert" ) func TestPointsForQuantile(t *testing.T) { - f := func(d *Dogsketch, quantile float64, expected float64) { + f := func(d *Dogsketch, q float64, vExpected float64) { t.Helper() - point := d.pointForQuantile(quantile) - assert.InDelta(t, point, expected, 0.5) + v := d.valueForQuantile(q) + if math.Abs(v-vExpected) > 0.4 { + t.Fatalf("unexpected value; got %v; want %v", v, vExpected) + } } sketches := &Dogsketch{ Min: 8.0, diff --git a/lib/protoparser/datadogsketches/stream/streamparser.go b/lib/protoparser/datadogsketches/stream/streamparser.go index 177741663..c81cee1e5 100644 --- a/lib/protoparser/datadogsketches/stream/streamparser.go +++ b/lib/protoparser/datadogsketches/stream/streamparser.go @@ -20,7 +20,6 @@ import ( // // callback shouldn't hold series after returning. func Parse(r io.Reader, contentEncoding string, callback func(series []*datadogsketches.Sketch) error) error { - var err error wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -50,18 +49,17 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []*datadogs req := getRequest() defer putRequest(req) - err = req.UnmarshalProtobuf(ctx.reqBuf.B) - if err != nil { + if err := req.UnmarshalProtobuf(ctx.reqBuf.B); err != nil { unmarshalErrors.Inc() - return fmt.Errorf("cannot unmarshal DataDog request with size %d bytes: %w", len(ctx.reqBuf.B), err) + return fmt.Errorf("cannot unmarshal DataDog Sketches request with size %d bytes: %w", len(ctx.reqBuf.B), err) } rows := 0 sketches := req.Sketches - for i := range sketches { - rows += len(sketches[i].Dogsketches) + for _, sketch := range sketches { + rows += sketch.RowsCount() if *datadogutils.SanitizeMetricName { - sketches[i].Metric = datadogutils.SanitizeName(sketches[i].Metric) + sketch.Metric = datadogutils.SanitizeName(sketch.Metric) } } rowsRead.Add(rows) diff --git a/lib/protoparser/datadogv1/stream/streamparser.go b/lib/protoparser/datadogv1/stream/streamparser.go index d1ce9f7d4..961645f04 100644 --- a/lib/protoparser/datadogv1/stream/streamparser.go +++ b/lib/protoparser/datadogv1/stream/streamparser.go @@ -40,6 +40,7 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []datadogv1 defer common.PutZlibReader(zlr) r = zlr } + ctx := getPushCtx(r) defer putPushCtx(ctx) if err := ctx.Read(); err != nil { @@ -47,10 +48,12 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []datadogv1 } req := getRequest() defer putRequest(req) + if err := req.Unmarshal(ctx.reqBuf.B); err != nil { unmarshalErrors.Inc() return fmt.Errorf("cannot unmarshal DataDog POST request with size %d bytes: %w", len(ctx.reqBuf.B), err) } + rows := 0 series := req.Series for i := range series {