2023-02-13 18:51:35 +01:00
package stream
2021-09-28 21:47:45 +02:00
import (
"bufio"
2022-09-26 12:57:20 +02:00
"flag"
2021-09-28 21:47:45 +02:00
"fmt"
"io"
2023-11-28 14:52:29 +01:00
"net/http"
2022-09-26 12:57:20 +02:00
"regexp"
2021-09-28 21:47:45 +02:00
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2023-11-28 14:52:29 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2021-09-28 21:47:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2023-02-13 18:51:35 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
2023-11-28 14:52:29 +01:00
apiSeriesV1 "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/series/v1"
apiSeriesV2 "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/series/v2"
apiSketchesBeta "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/api/sketches/beta"
2023-01-07 03:59:39 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
2021-09-28 21:47:45 +02:00
"github.com/VictoriaMetrics/metrics"
)
2022-09-26 12:57:20 +02:00
var (
// The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
2023-11-28 14:52:29 +01:00
maxInsertRequestSize = flagutil . NewBytes ( "datadog.maxInsertRequestSize" , 64 * 1024 * 1024 , "The maximum size in bytes of a single DataDog POST request to /api/v1/series, /api/v2/series, /api/beta/sketches" )
2022-09-26 12:57:20 +02:00
// If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply:
// https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
// But there's some hidden behaviour. In addition to what it states in the docs, the following is also done:
// - Consecutive underscores are replaced with just one underscore
// - Underscore immediately before or after a dot are removed
sanitizeMetricName = flag . Bool ( "datadog.sanitizeMetricName" , true , "Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at " +
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics" )
)
2021-09-28 21:47:45 +02:00
2023-11-28 14:52:29 +01:00
// Parse parses DataDog POST request for /api/v1/series, /api/v2/series, /api/beta/sketches from reader and calls callback for the parsed request.
2021-09-28 21:47:45 +02:00
//
// callback shouldn't hold series after returning.
2023-11-28 14:52:29 +01:00
func Parse ( req * http . Request , callback func ( prompbmarshal . TimeSeries ) error ) error {
var r io . Reader
wcr := writeconcurrencylimiter . GetReader ( req . Body )
2023-01-07 03:59:39 +01:00
defer writeconcurrencylimiter . PutReader ( wcr )
r = wcr
2023-11-28 14:52:29 +01:00
contentEncoding := req . Header . Get ( "Content-Encoding" )
2023-01-07 03:59:39 +01:00
2021-09-28 21:47:45 +02:00
switch contentEncoding {
case "gzip" :
zr , err := common . GetGzipReader ( r )
if err != nil {
return fmt . Errorf ( "cannot read gzipped DataDog data: %w" , err )
}
defer common . PutGzipReader ( zr )
r = zr
case "deflate" :
zlr , err := common . GetZlibReader ( r )
if err != nil {
return fmt . Errorf ( "cannot read deflated DataDog data: %w" , err )
}
defer common . PutZlibReader ( zlr )
r = zlr
}
ctx := getPushCtx ( r )
defer putPushCtx ( ctx )
if err := ctx . Read ( ) ; err != nil {
return err
}
2023-11-28 14:52:29 +01:00
apiVersion := insertApisVersionRegex . ReplaceAllString ( req . URL . Path , "${version}" )
apiKind := insertApisVersionRegex . ReplaceAllString ( req . URL . Path , "${kind}" )
2023-11-28 15:04:15 +01:00
var ddReq datadog . Request
2023-11-28 14:52:29 +01:00
switch apiKind {
case "series" :
switch apiVersion {
case "v1" :
2023-11-28 15:04:15 +01:00
ddReq = getSeriesV1Request ( )
defer putSeriesV1Request ( ddReq )
2023-11-28 14:52:29 +01:00
case "v2" :
2023-11-28 15:04:15 +01:00
ddReq = getSeriesV2Request ( )
defer putSeriesV2Request ( ddReq )
2023-11-28 14:52:29 +01:00
default :
return fmt . Errorf (
2023-11-28 15:04:15 +01:00
"API version %q of DataDog series endpoint is not supported" ,
2023-11-28 14:52:29 +01:00
apiVersion ,
)
}
case "sketches" :
switch apiVersion {
case "beta" :
2023-11-28 15:04:15 +01:00
ddReq = getSketchesBetaRequest ( )
defer putSketchesBetaRequest ( ddReq )
2023-11-28 14:52:29 +01:00
default :
return fmt . Errorf (
2023-11-28 15:04:15 +01:00
"API version %q of DataDog sketches endpoint is not supported" ,
2023-11-28 14:52:29 +01:00
apiVersion ,
)
}
default :
return fmt . Errorf (
2023-11-28 15:04:15 +01:00
"API kind %q of DataDog API is not supported" ,
2023-11-28 14:52:29 +01:00
apiKind ,
)
}
if err := ddReq . Unmarshal ( ctx . reqBuf . B ) ; err != nil {
2021-09-28 21:47:45 +02:00
unmarshalErrors . Inc ( )
2023-10-25 21:24:01 +02:00
return fmt . Errorf ( "cannot unmarshal DataDog POST request with size %d bytes: %w" , len ( ctx . reqBuf . B ) , err )
2021-09-28 21:47:45 +02:00
}
2023-11-28 14:52:29 +01:00
cb := func ( series prompbmarshal . TimeSeries ) error {
rowsRead . Add ( len ( series . Samples ) )
return callback ( series )
2021-09-28 21:47:45 +02:00
}
2023-11-28 14:52:29 +01:00
if err := ddReq . Extract ( cb , sanitizeName ( * sanitizeMetricName ) ) ; err != nil {
2021-09-28 21:47:45 +02:00
return fmt . Errorf ( "error when processing imported data: %w" , err )
}
2023-11-28 14:52:29 +01:00
2021-09-28 21:47:45 +02:00
return nil
}
type pushCtx struct {
br * bufio . Reader
reqBuf bytesutil . ByteBuffer
}
func ( ctx * pushCtx ) reset ( ) {
ctx . br . Reset ( nil )
ctx . reqBuf . Reset ( )
}
func ( ctx * pushCtx ) Read ( ) error {
readCalls . Inc ( )
lr := io . LimitReader ( ctx . br , int64 ( maxInsertRequestSize . N ) + 1 )
startTime := fasttime . UnixTimestamp ( )
reqLen , err := ctx . reqBuf . ReadFrom ( lr )
if err != nil {
readErrors . Inc ( )
2023-10-16 00:25:23 +02:00
return fmt . Errorf ( "cannot read request in %d seconds: %w" , fasttime . UnixTimestamp ( ) - startTime , err )
2021-09-28 21:47:45 +02:00
}
if reqLen > int64 ( maxInsertRequestSize . N ) {
readErrors . Inc ( )
2023-10-16 00:25:23 +02:00
return fmt . Errorf ( "too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes" , maxInsertRequestSize . N )
2021-09-28 21:47:45 +02:00
}
return nil
}
var (
readCalls = metrics . NewCounter ( ` vm_protoparser_read_calls_total { type="datadog"} ` )
readErrors = metrics . NewCounter ( ` vm_protoparser_read_errors_total { type="datadog"} ` )
rowsRead = metrics . NewCounter ( ` vm_protoparser_rows_read_total { type="datadog"} ` )
unmarshalErrors = metrics . NewCounter ( ` vm_protoparser_unmarshal_errors_total { type="datadog"} ` )
)
func getPushCtx ( r io . Reader ) * pushCtx {
select {
case ctx := <- pushCtxPoolCh :
ctx . br . Reset ( r )
return ctx
default :
if v := pushCtxPool . Get ( ) ; v != nil {
ctx := v . ( * pushCtx )
ctx . br . Reset ( r )
return ctx
}
return & pushCtx {
br : bufio . NewReaderSize ( r , 64 * 1024 ) ,
}
}
}
func putPushCtx ( ctx * pushCtx ) {
ctx . reset ( )
select {
case pushCtxPoolCh <- ctx :
default :
pushCtxPool . Put ( ctx )
}
}
var pushCtxPool sync . Pool
var pushCtxPoolCh = make ( chan * pushCtx , cgroup . AvailableCPUs ( ) )
2023-11-28 15:04:15 +01:00
func getSeriesV1Request ( ) * apiSeriesV1 . Request {
v := seriesV1RequestPool . Get ( )
if v == nil {
return & apiSeriesV1 . Request { }
}
return v . ( * apiSeriesV1 . Request )
}
func putSeriesV1Request ( req datadog . Request ) {
seriesV1RequestPool . Put ( req )
}
var seriesV1RequestPool sync . Pool
func getSeriesV2Request ( ) * apiSeriesV2 . Request {
v := seriesV2RequestPool . Get ( )
if v == nil {
return & apiSeriesV2 . Request { }
}
return v . ( * apiSeriesV2 . Request )
}
func putSeriesV2Request ( req datadog . Request ) {
seriesV2RequestPool . Put ( req )
}
var seriesV2RequestPool sync . Pool
func getSketchesBetaRequest ( ) * apiSketchesBeta . Request {
v := sketchesBetaRequestPool . Get ( )
2021-09-28 21:47:45 +02:00
if v == nil {
2023-11-28 15:04:15 +01:00
return & apiSketchesBeta . Request { }
2021-09-28 21:47:45 +02:00
}
2023-11-28 15:04:15 +01:00
return v . ( * apiSketchesBeta . Request )
2021-09-28 21:47:45 +02:00
}
2023-11-28 15:04:15 +01:00
func putSketchesBetaRequest ( req datadog . Request ) {
sketchesBetaRequestPool . Put ( req )
2021-09-28 21:47:45 +02:00
}
2023-11-28 15:04:15 +01:00
var sketchesBetaRequestPool sync . Pool
2022-09-26 12:57:20 +02:00
2023-02-13 13:27:13 +01:00
// sanitizeName performs DataDog-compatible sanitizing for metric names
2022-09-26 12:57:20 +02:00
//
// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
2023-11-28 14:52:29 +01:00
func sanitizeName ( sanitize bool ) func ( string ) string {
if sanitize {
return func ( name string ) string {
return namesSanitizer . Transform ( name )
}
}
return func ( name string ) string {
return name
}
2022-09-26 12:57:20 +02:00
}
2022-09-28 09:39:01 +02:00
var namesSanitizer = bytesutil . NewFastStringTransformer ( func ( s string ) string {
s = unsupportedDatadogChars . ReplaceAllString ( s , "_" )
s = multiUnderscores . ReplaceAllString ( s , "_" )
s = underscoresWithDots . ReplaceAllString ( s , "." )
return s
} )
2022-09-28 09:05:54 +02:00
2022-09-28 09:39:01 +02:00
var (
2022-09-26 12:57:20 +02:00
unsupportedDatadogChars = regexp . MustCompile ( ` [^0-9a-zA-Z_\.]+ ` )
multiUnderscores = regexp . MustCompile ( ` _+ ` )
underscoresWithDots = regexp . MustCompile ( ` _?\._? ` )
2023-11-28 14:52:29 +01:00
insertApisVersionRegex = regexp . MustCompile ( ` .*/api/(?P<version>[\w]+)/(?P<kind>[\w]+) ` )
2022-09-26 12:57:20 +02:00
)