2023-10-05 14:39:51 +02:00
package newrelic
import (
"fmt"
"github.com/valyala/fastjson"
"github.com/valyala/fastjson/fastfloat"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
2023-10-16 00:25:23 +02:00
// Rows contains rows parsed from NewRelic Event request
//
// See https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events
type Rows struct {
Rows [ ] Row
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
// Reset resets r, so it can be re-used
func ( r * Rows ) Reset ( ) {
rows := r . Rows
for i := range rows {
rows [ i ] . reset ( )
}
r . Rows = rows [ : 0 ]
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
var jsonParserPool fastjson . ParserPool
2023-10-05 14:39:51 +02:00
2023-10-16 00:25:23 +02:00
// Unmarshal parses NewRelic Event request from b to r.
//
// b can be re-used after returning from r.
func ( r * Rows ) Unmarshal ( b [ ] byte ) error {
p := jsonParserPool . Get ( )
defer jsonParserPool . Put ( p )
2023-10-05 14:39:51 +02:00
2023-10-16 00:25:23 +02:00
r . Reset ( )
v , err := p . ParseBytes ( b )
if err != nil {
return err
}
metricPosts , err := v . Array ( )
if err != nil {
return fmt . Errorf ( "cannot find the top-level array of MetricPost objects: %w" , err )
}
for _ , mp := range metricPosts {
o , err := mp . Object ( )
2023-10-05 14:39:51 +02:00
if err != nil {
2023-10-16 00:25:23 +02:00
return fmt . Errorf ( "cannot find MetricPost object: %w" , err )
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
rows := r . Rows
o . Visit ( func ( k [ ] byte , v * fastjson . Value ) {
2023-10-05 14:39:51 +02:00
if err != nil {
2023-10-16 00:25:23 +02:00
return
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
switch string ( k ) {
case "Events" :
events , errLocal := v . Array ( )
if errLocal != nil {
err = fmt . Errorf ( "cannot find Events array in MetricPost object: %w" , errLocal )
return
}
for _ , e := range events {
eventObject , errLocal := e . Object ( )
if errLocal != nil {
err = fmt . Errorf ( "cannot find EventObject: %w" , errLocal )
return
}
if cap ( rows ) > len ( rows ) {
rows = rows [ : len ( rows ) + 1 ]
} else {
rows = append ( rows , Row { } )
}
r := & rows [ len ( rows ) - 1 ]
if errLocal := r . unmarshal ( eventObject ) ; errLocal != nil {
err = fmt . Errorf ( "cannot unmarshal EventObject: %w" , errLocal )
return
}
}
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
} )
r . Rows = rows
if err != nil {
return fmt . Errorf ( "cannot parse MetricPost object: %w" , err )
2023-10-05 14:39:51 +02:00
}
}
return nil
}
2023-10-16 00:25:23 +02:00
// Row represents parsed row
type Row struct {
2023-10-05 14:39:51 +02:00
Tags [ ] Tag
2023-10-16 00:25:23 +02:00
Samples [ ] Sample
Timestamp int64
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
// Tag represents a key=value tag
type Tag struct {
Key [ ] byte
Value [ ] byte
}
2023-10-05 14:39:51 +02:00
2023-10-16 00:25:23 +02:00
// Sample represents parsed sample
type Sample struct {
Name [ ] byte
Value float64
}
2023-10-05 14:39:51 +02:00
2023-10-16 00:25:23 +02:00
func ( r * Row ) reset ( ) {
tags := r . Tags
for i := range tags {
tags [ i ] . reset ( )
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
r . Tags = tags [ : 0 ]
2023-10-05 14:39:51 +02:00
2023-10-16 00:25:23 +02:00
samples := r . Samples
for i := range samples {
samples [ i ] . reset ( )
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
r . Samples = samples [ : 0 ]
2023-10-05 14:39:51 +02:00
2023-10-16 00:25:23 +02:00
r . Timestamp = 0
}
func ( t * Tag ) reset ( ) {
t . Key = t . Key [ : 0 ]
t . Value = t . Value [ : 0 ]
}
2023-10-05 14:39:51 +02:00
2023-10-16 00:25:23 +02:00
func ( s * Sample ) reset ( ) {
s . Name = s . Name [ : 0 ]
s . Value = 0
}
func ( r * Row ) unmarshal ( o * fastjson . Object ) ( err error ) {
r . reset ( )
tags := r . Tags [ : 0 ]
samples := r . Samples [ : 0 ]
o . Visit ( func ( k [ ] byte , v * fastjson . Value ) {
if err != nil {
return
}
if len ( k ) == 0 {
2023-10-05 14:39:51 +02:00
return
}
switch v . Type ( ) {
case fastjson . TypeString :
2023-10-16 00:25:23 +02:00
// Register new tag
valueBytes := v . GetStringBytes ( )
if len ( valueBytes ) == 0 {
2023-10-05 14:39:51 +02:00
return
}
2023-10-16 00:25:23 +02:00
if cap ( tags ) > len ( tags ) {
tags = tags [ : len ( tags ) + 1 ]
} else {
tags = append ( tags , Tag { } )
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
t := & tags [ len ( tags ) - 1 ]
t . Key = append ( t . Key [ : 0 ] , k ... )
t . Value = append ( t . Value [ : 0 ] , valueBytes ... )
case fastjson . TypeNumber :
if string ( k ) == "timestamp" {
// Parse timestamp
ts , errLocal := getFloat64 ( v )
if errLocal != nil {
err = fmt . Errorf ( "cannot parse `timestamp` field: %w" , errLocal )
return
}
if ts < ( 1 << 32 ) {
// The timestamp is in seconds. Convert it to milliseconds.
ts *= 1e3
}
r . Timestamp = int64 ( ts )
2023-10-05 14:39:51 +02:00
return
}
2023-10-16 00:25:23 +02:00
// Register new sample
if cap ( samples ) > len ( samples ) {
samples = samples [ : len ( samples ) + 1 ]
} else {
samples = append ( samples , Sample { } )
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
s := & samples [ len ( samples ) - 1 ]
s . Name = append ( s . Name [ : 0 ] , k ... )
s . Value = v . GetFloat64 ( )
2023-10-05 14:39:51 +02:00
}
2023-10-16 00:25:23 +02:00
} )
r . Tags = tags
r . Samples = samples
return err
2023-10-05 14:39:51 +02:00
}
func getFloat64 ( v * fastjson . Value ) ( float64 , error ) {
switch v . Type ( ) {
case fastjson . TypeNumber :
return v . Float64 ( )
case fastjson . TypeString :
vStr , _ := v . StringBytes ( )
vFloat , err := fastfloat . Parse ( bytesutil . ToUnsafeString ( vStr ) )
if err != nil {
return 0 , fmt . Errorf ( "cannot parse value %q: %w" , vStr , err )
}
return vFloat , nil
default :
return 0 , fmt . Errorf ( "value doesn't contain float64; it contains %s" , v . Type ( ) )
}
}