lib/protoparser/datadog: follow-up after 543f218fe9

* prevent /api/v1 from panic on parsing rows
* add tests for Extract function for v1 and v2 api's
* separate request types in different pools to prevent different objects mixing
* add changelog line

543f218fe9
Signed-off-by: hagen1778 <roman@victoriametrics.com>

(cherry picked from commit 98d0f81f21)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
hagen1778 2023-11-28 15:04:15 +01:00
parent d6b4c8e4ef
commit 73d18fbc7a
No known key found for this signature in database
GPG Key ID: 3BF75F3741CA9640
6 changed files with 122 additions and 52 deletions

View File

@ -30,6 +30,7 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queueing to disk when the remote storage cannot keep up with the data ingestion rate. See [these docs](https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queueing to disk when the remote storage cannot keep up with the data ingestion rate. See [these docs](https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for reading and writing samples via [Google PubSub](https://cloud.google.com/pubsub). See [these docs](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for reading and writing samples via [Google PubSub](https://cloud.google.com/pubsub). See [these docs](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Datadog `/api/v2/series` and `/api/beta/sketches` ingestion protocols to vmagent/vminsert components. See this [doc](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) for examples. Thanks to @AndrewChubatiuk for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5094).
* FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format). * FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [day_of_year()](https://docs.victoriametrics.com/MetricsQL.html#day_of_year) function, which returns the day of the year for each of the given unix timestamps. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5345) for details. Thanks to @luckyxiaoqiang for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5368/). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [day_of_year()](https://docs.victoriametrics.com/MetricsQL.html#day_of_year) function, which returns the day of the year for each of the given unix timestamps. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5345) for details. Thanks to @luckyxiaoqiang for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5368/).

View File

@ -30,10 +30,10 @@ func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn fu
if ts <= 0 { if ts <= 0 {
ts = float64(currentTimestamp) ts = float64(currentTimestamp)
} }
samples[j] = prompbmarshal.Sample{ samples = append(samples, prompbmarshal.Sample{
Timestamp: int64(ts * 1000), Timestamp: int64(ts * 1000),
Value: val, Value: val,
} })
} }
ts := prompbmarshal.TimeSeries{ ts := prompbmarshal.TimeSeries{
Samples: samples, Samples: samples,

View File

@ -3,6 +3,8 @@ package datadog
import ( import (
"reflect" "reflect"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
func TestRequestUnmarshalFailure(t *testing.T) { func TestRequestUnmarshalFailure(t *testing.T) {
@ -20,7 +22,8 @@ func TestRequestUnmarshalFailure(t *testing.T) {
f(`[]`) f(`[]`)
} }
func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) { func TestRequestExtract(t *testing.T) {
fn := func(s []byte, reqExpected *Request, samplesExp int) {
t.Helper() t.Helper()
req := new(Request) req := new(Request)
if err := req.Unmarshal(s); err != nil { if err := req.Unmarshal(s); err != nil {
@ -29,13 +32,27 @@ func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) {
if !reflect.DeepEqual(req, reqExpected) { if !reflect.DeepEqual(req, reqExpected) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected) t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
} }
}
func TestRequestUnmarshalSuccess(t *testing.T) { var samplesTotal int
unmarshalRequestValidator( cb := func(ts prompbmarshal.TimeSeries) error {
t, []byte("{}"), new(Request), samplesTotal += len(ts.Samples)
) return nil
unmarshalRequestValidator(t, []byte(` }
sanitizeFn := func(name string) string {
return name
}
if err := req.Extract(cb, sanitizeFn); err != nil {
t.Fatalf("error when extracting data: %s", err)
}
if samplesTotal != samplesExp {
t.Fatalf("expected to extract %d samples; got %d", samplesExp, samplesTotal)
}
}
fn([]byte("{}"), new(Request), 0)
fn([]byte(`
{ {
"series": [ "series": [
{ {
@ -67,5 +84,5 @@ func TestRequestUnmarshalSuccess(t *testing.T) {
"environment:test", "environment:test",
}, },
}}, }},
}) }, 1)
} }

View File

@ -3,6 +3,8 @@ package datadog
import ( import (
"reflect" "reflect"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
func TestRequestUnmarshalFailure(t *testing.T) { func TestRequestUnmarshalFailure(t *testing.T) {
@ -20,7 +22,8 @@ func TestRequestUnmarshalFailure(t *testing.T) {
f(`[]`) f(`[]`)
} }
func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) { func TestRequestExtract(t *testing.T) {
fn := func(s []byte, reqExpected *Request, samplesExp int) {
t.Helper() t.Helper()
req := new(Request) req := new(Request)
if err := req.Unmarshal(s); err != nil { if err := req.Unmarshal(s); err != nil {
@ -29,13 +32,27 @@ func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) {
if !reflect.DeepEqual(req, reqExpected) { if !reflect.DeepEqual(req, reqExpected) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected) t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
} }
}
func TestRequestUnmarshalSuccess(t *testing.T) { var samplesTotal int
unmarshalRequestValidator( cb := func(ts prompbmarshal.TimeSeries) error {
t, []byte("{}"), new(Request), samplesTotal += len(ts.Samples)
) return nil
unmarshalRequestValidator(t, []byte(` }
sanitizeFn := func(name string) string {
return name
}
if err := req.Extract(cb, sanitizeFn); err != nil {
t.Fatalf("error when extracting data: %s", err)
}
if samplesTotal != samplesExp {
t.Fatalf("expected to extract %d samples; got %d", samplesExp, samplesTotal)
}
}
fn([]byte("{}"), new(Request), 0)
fn([]byte(`
{ {
"series": [ "series": [
{ {
@ -53,6 +70,9 @@ func TestRequestUnmarshalSuccess(t *testing.T) {
"points": [{ "points": [{
"timestamp": 1575317847, "timestamp": 1575317847,
"value": 0.5 "value": 0.5
},{
"timestamp": 1575317848,
"value": 0.6
}], }],
"tags": [ "tags": [
"environment:test" "environment:test"
@ -74,10 +94,13 @@ func TestRequestUnmarshalSuccess(t *testing.T) {
Points: []point{{ Points: []point{{
Timestamp: 1575317847, Timestamp: 1575317847,
Value: 0.5, Value: 0.5,
}, {
Timestamp: 1575317848,
Value: 0.6,
}}, }},
Tags: []string{ Tags: []string{
"environment:test", "environment:test",
}, },
}}, }},
}) }, 2)
} }

View File

@ -25,7 +25,7 @@ func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn fu
for _, sketch := range r.SketchPayload.Sketches { for _, sketch := range r.SketchPayload.Sketches {
sketchSeries := make([]prompbmarshal.TimeSeries, 5) sketchSeries := make([]prompbmarshal.TimeSeries, 5)
for _, point := range sketch.Dogsketches { for _, point := range sketch.Dogsketches {
timestamp := int64(point.Ts * 1000) timestamp := point.Ts * 1000
updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{ updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{
"max": point.Max, "max": point.Max,
"min": point.Min, "min": point.Min,
@ -35,7 +35,7 @@ func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn fu
}) })
} }
for _, point := range sketch.Distributions { for _, point := range sketch.Distributions {
timestamp := int64(point.Ts * 1000) timestamp := point.Ts * 1000
updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{ updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{
"max": point.Max, "max": point.Max,
"min": point.Min, "min": point.Min,

View File

@ -70,35 +70,36 @@ func Parse(req *http.Request, callback func(prompbmarshal.TimeSeries) error) err
apiVersion := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${version}") apiVersion := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${version}")
apiKind := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${kind}") apiKind := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${kind}")
ddReq := getRequest() var ddReq datadog.Request
defer putRequest(ddReq)
switch apiKind { switch apiKind {
case "series": case "series":
switch apiVersion { switch apiVersion {
case "v1": case "v1":
ddReq = new(apiSeriesV1.Request) ddReq = getSeriesV1Request()
defer putSeriesV1Request(ddReq)
case "v2": case "v2":
ddReq = new(apiSeriesV2.Request) ddReq = getSeriesV2Request()
defer putSeriesV2Request(ddReq)
default: default:
return fmt.Errorf( return fmt.Errorf(
"API version %q of Datadog series endpoint is not supported", "API version %q of DataDog series endpoint is not supported",
apiVersion, apiVersion,
) )
} }
case "sketches": case "sketches":
switch apiVersion { switch apiVersion {
case "beta": case "beta":
ddReq = new(apiSketchesBeta.Request) ddReq = getSketchesBetaRequest()
defer putSketchesBetaRequest(ddReq)
default: default:
return fmt.Errorf( return fmt.Errorf(
"API version %q of Datadog sketches endpoint is not supported", "API version %q of DataDog sketches endpoint is not supported",
apiVersion, apiVersion,
) )
} }
default: default:
return fmt.Errorf( return fmt.Errorf(
"API kind %q of Datadog API is not supported", "API kind %q of DataDog API is not supported",
apiKind, apiKind,
) )
} }
@ -182,19 +183,47 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() datadog.Request { func getSeriesV1Request() *apiSeriesV1.Request {
v := requestPool.Get() v := seriesV1RequestPool.Get()
if v == nil { if v == nil {
return nil return &apiSeriesV1.Request{}
} }
return v.(datadog.Request) return v.(*apiSeriesV1.Request)
} }
func putRequest(req datadog.Request) { func putSeriesV1Request(req datadog.Request) {
requestPool.Put(req) seriesV1RequestPool.Put(req)
} }
var requestPool sync.Pool var seriesV1RequestPool sync.Pool
func getSeriesV2Request() *apiSeriesV2.Request {
v := seriesV2RequestPool.Get()
if v == nil {
return &apiSeriesV2.Request{}
}
return v.(*apiSeriesV2.Request)
}
func putSeriesV2Request(req datadog.Request) {
seriesV2RequestPool.Put(req)
}
var seriesV2RequestPool sync.Pool
func getSketchesBetaRequest() *apiSketchesBeta.Request {
v := sketchesBetaRequestPool.Get()
if v == nil {
return &apiSketchesBeta.Request{}
}
return v.(*apiSketchesBeta.Request)
}
func putSketchesBetaRequest(req datadog.Request) {
sketchesBetaRequestPool.Put(req)
}
var sketchesBetaRequestPool sync.Pool
// sanitizeName performs DataDog-compatible sanitizing for metric names // sanitizeName performs DataDog-compatible sanitizing for metric names
// //