VictoriaMetrics/lib/protoparser/newrelic/stream/streamparser.go

74 lines
1.9 KiB
Go
Raw Normal View History

package stream
import (
"fmt"
"io"
"github.com/valyala/fastjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
)
var parserPool fastjson.ParserPool
// Parse parses NewRelic POST request for newrelic/infra/v2/metrics/events/bulk from reader and calls callback for the parsed request.
//
// callback shouldn't hold series after returning.
func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
if isGzip {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped Newrelic agent data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
}
ctx := getPushCtx(r)
defer putPushCtx(ctx)
if err := ctx.Read(); err != nil {
return err
}
p := parserPool.Get()
defer parserPool.Put(p)
v, err := p.ParseBytes(ctx.reqBuf.B)
if err != nil {
return fmt.Errorf("cannot parse NewRelic POST request with size %d bytes: %w", len(ctx.reqBuf.B), err)
}
metricsPost, err := v.Array()
if err != nil {
return fmt.Errorf("cannot fetch data from Newrelic POST request: %w", err)
}
var events newrelic.Events
if err := events.Unmarshal(metricsPost); err != nil {
unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal NewRelic POST request: %w", err)
}
// Fill in missing timestamps
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range events.Metrics {
m := &events.Metrics[i]
if m.Timestamp == 0 {
m.Timestamp = currentTimestamp * 1e3
}
}
if err := callback(events.Metrics); err != nil {
return fmt.Errorf("error when processing imported data: %w", err)
}
return nil
}