Statsd protocol compatibility (#5053)

In this PR I added compatibility with [statsd
protocol](https://github.com/b/statsd_spec) with tags to be able to send
metrics directly from statsd clients to vmagent or directly to VM.
For example its compatible with
[statsd-instrument](https://github.com/Shopify/statsd-instrument) and
[dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby) gems

Related issues: #5052, #206, #4600

(cherry picked from commit c6c5a5a186)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
hagen1778 2024-05-10 14:28:37 +02:00
parent 76af930e4a
commit 864fbf9125
No known key found for this signature in database
GPG Key ID: 3BF75F3741CA9640
6 changed files with 1069 additions and 0 deletions

View File

@ -0,0 +1,173 @@
package statsd
import (
"errors"
"io"
"net"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
)
var (
writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="statsd", name="write", net="tcp"}`)
writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="statsd", name="write", net="tcp"}`)
writeRequestsUDP = metrics.NewCounter(`vm_ingestserver_requests_total{type="statsd", name="write", net="udp"}`)
writeErrorsUDP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="statsd", name="write", net="udp"}`)
)
// Server accepts Statsd plaintext lines over TCP and UDP.
type Server struct {
addr string
lnTCP net.Listener
lnUDP net.PacketConn
wg sync.WaitGroup
cm ingestserver.ConnsMap
}
// MustStart starts statsd server on the given addr.
//
// The incoming connections are processed with insertHandler.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server {
logger.Infof("starting TCP Statsd server at %q", addr)
lnTCP, err := netutil.NewTCPListener("statsd", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start TCP Statsd server at %q: %s", addr, err)
}
logger.Infof("starting UDP Statsd server at %q", addr)
lnUDP, err := net.ListenPacket(netutil.GetUDPNetwork(), addr)
if err != nil {
logger.Fatalf("cannot start UDP Statsd server at %q: %s", addr, err)
}
s := &Server{
addr: addr,
lnTCP: lnTCP,
lnUDP: lnUDP,
}
s.cm.Init("statsd")
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.serveTCP(insertHandler)
logger.Infof("stopped TCP Statsd server at %q", addr)
}()
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.serveUDP(insertHandler)
logger.Infof("stopped UDP Statsd server at %q", addr)
}()
return s
}
// MustStop stops the server.
func (s *Server) MustStop() {
logger.Infof("stopping TCP Statsd server at %q...", s.addr)
if err := s.lnTCP.Close(); err != nil {
logger.Errorf("cannot close TCP Statsd server: %s", err)
}
logger.Infof("stopping UDP Statsd server at %q...", s.addr)
if err := s.lnUDP.Close(); err != nil {
logger.Errorf("cannot close UDP Statsd server: %s", err)
}
s.cm.CloseAll(0)
s.wg.Wait()
logger.Infof("TCP and UDP Statsd servers at %q have been stopped", s.addr)
}
func (s *Server) serveTCP(insertHandler func(r io.Reader) error) {
var wg sync.WaitGroup
for {
c, err := s.lnTCP.Accept()
if err != nil {
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("statsd: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
logger.Fatalf("unrecoverable error when accepting TCP Statsd connections: %s", err)
}
logger.Fatalf("unexpected error when accepting TCP Statsd connections: %s", err)
}
if !s.cm.Add(c) {
_ = c.Close()
break
}
wg.Add(1)
go func() {
defer func() {
s.cm.Delete(c)
_ = c.Close()
wg.Done()
}()
writeRequestsTCP.Inc()
if err := insertHandler(c); err != nil {
writeErrorsTCP.Inc()
logger.Errorf("error in TCP Statsd conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
}
}()
}
wg.Wait()
}
func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
for {
bb.Reset()
bb.B = bb.B[:cap(bb.B)]
n, addr, err := s.lnUDP.ReadFrom(bb.B)
if err != nil {
writeErrorsUDP.Inc()
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("statsd: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
}
logger.Errorf("cannot read Statsd UDP data: %s", err)
continue
}
bb.B = bb.B[:n]
writeRequestsUDP.Inc()
if err := insertHandler(bb.NewReader()); err != nil {
writeErrorsUDP.Inc()
logger.Errorf("error in UDP Statsd conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err)
continue
}
}
}()
}
wg.Wait()
}

View File

@ -0,0 +1,226 @@
package statsd
import (
"fmt"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson/fastfloat"
)
// Statsd metric format with tags: MetricName:value|type|@sample_rate|#tag1:value,tag1...
const statsdSeparator = '|'
const statsdPairsSeparator = ':'
const statsdTagsStartSeparator = '#'
const statsdTagsSeparator = ','
// Rows contains parsed statsd rows.
type Rows struct {
Rows []Row
tagsPool []Tag
}
// Reset resets rs.
func (rs *Rows) Reset() {
// Reset items, so they can be GC'ed
for i := range rs.Rows {
rs.Rows[i].reset()
}
rs.Rows = rs.Rows[:0]
for i := range rs.tagsPool {
rs.tagsPool[i].reset()
}
rs.tagsPool = rs.tagsPool[:0]
}
// Unmarshal unmarshals statsd plaintext protocol rows from s.
//
// s shouldn't be modified when rs is in use.
func (rs *Rows) Unmarshal(s string) {
rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0])
}
// Row is a single statsd row.
type Row struct {
Metric string
Tags []Tag
Value float64
Timestamp int64
}
func (r *Row) reset() {
r.Metric = ""
r.Tags = nil
r.Value = 0
r.Timestamp = 0
}
func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
r.reset()
originalString := s
s = stripTrailingWhitespace(s)
separatorPosition := strings.IndexByte(s, statsdSeparator)
if separatorPosition < 0 {
s = stripTrailingWhitespace(s)
} else {
s = stripTrailingWhitespace(s[:separatorPosition])
}
valuesSeparatorPosition := strings.LastIndexByte(s, statsdPairsSeparator)
if valuesSeparatorPosition == 0 {
return tagsPool, fmt.Errorf("cannot find metric name for %q", s)
}
if valuesSeparatorPosition < 0 {
return tagsPool, fmt.Errorf("cannot find separator for %q", s)
}
r.Metric = s[:valuesSeparatorPosition]
valueStr := s[valuesSeparatorPosition+1:]
v, err := fastfloat.Parse(valueStr)
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", valueStr, err, originalString)
}
r.Value = v
// parsing tags
tagsSeparatorPosition := strings.LastIndexByte(originalString, statsdTagsStartSeparator)
if tagsSeparatorPosition < 0 {
// no tags
return tagsPool, nil
}
tagsStart := len(tagsPool)
tagsPool = unmarshalTags(tagsPool, originalString[tagsSeparatorPosition+1:])
tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)]
return tagsPool, nil
}
func unmarshalRows(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
for len(s) > 0 {
n := strings.IndexByte(s, '\n')
if n < 0 {
// The last line.
return unmarshalRow(dst, s, tagsPool)
}
dst, tagsPool = unmarshalRow(dst, s[:n], tagsPool)
s = s[n+1:]
}
return dst, tagsPool
}
func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
if len(s) > 0 && s[len(s)-1] == '\r' {
s = s[:len(s)-1]
}
s = stripLeadingWhitespace(s)
if len(s) == 0 {
// Skip empty line
return dst, tagsPool
}
if cap(dst) > len(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, Row{})
}
r := &dst[len(dst)-1]
var err error
tagsPool, err = r.unmarshal(s, tagsPool)
if err != nil {
dst = dst[:len(dst)-1]
logger.Errorf("cannot unmarshal Statsd line %q: %s", s, err)
invalidLines.Inc()
}
return dst, tagsPool
}
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="statsd"}`)
func unmarshalTags(dst []Tag, s string) []Tag {
for {
if cap(dst) > len(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, Tag{})
}
tag := &dst[len(dst)-1]
n := strings.IndexByte(s, statsdTagsSeparator)
if n < 0 {
// The last tag found
tag.unmarshal(s)
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
return dst
}
tag.unmarshal(s[:n])
s = s[n+1:]
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
}
}
// Tag is a statsd tag.
type Tag struct {
Key string
Value string
}
func (t *Tag) reset() {
t.Key = ""
t.Value = ""
}
func (t *Tag) unmarshal(s string) {
t.reset()
n := strings.IndexByte(s, statsdPairsSeparator)
if n < 0 {
// Empty tag value.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1100
t.Key = s
t.Value = s[len(s):]
} else {
t.Key = s[:n]
t.Value = s[n+1:]
}
}
func stripTrailingWhitespace(s string) string {
n := len(s)
for {
n--
if n < 0 {
return ""
}
ch := s[n]
if ch != ' ' && ch != '\t' {
return s[:n+1]
}
}
}
func stripLeadingWhitespace(s string) string {
for len(s) > 0 {
ch := s[0]
if ch != ' ' && ch != '\t' {
return s
}
s = s[1:]
}
return ""
}

View File

@ -0,0 +1,367 @@
package statsd
import (
"reflect"
"testing"
)
func TestUnmarshalTagsSuccess(t *testing.T) {
f := func(dst []Tag, s string, tagsPoolExpected []Tag) {
t.Helper()
tagsPool := unmarshalTags(dst, s)
if !reflect.DeepEqual(tagsPool, tagsPoolExpected) {
t.Fatalf("unexpected tags;\ngot\n%+v;\nwant\n%+v", tagsPool, tagsPoolExpected)
}
// Try unmarshaling again
tagsPool = unmarshalTags(dst, s)
if !reflect.DeepEqual(tagsPool, tagsPoolExpected) {
t.Fatalf("unexpected tags on second unmarshal;\ngot\n%+v;\nwant\n%+v", tagsPool, tagsPoolExpected)
}
}
f([]Tag{}, "foo:bar", []Tag{
{
Key: "foo",
Value: "bar",
},
})
f([]Tag{}, "foo:bar,qwe:123", []Tag{
{
Key: "foo",
Value: "bar",
},
{
Key: "qwe",
Value: "123",
},
})
f([]Tag{}, "foo.qwe:bar", []Tag{
{
Key: "foo.qwe",
Value: "bar",
},
})
f([]Tag{}, "foo:10", []Tag{
{
Key: "foo",
Value: "10",
},
})
f([]Tag{}, "foo: _qwe", []Tag{
{
Key: "foo",
Value: " _qwe",
},
})
f([]Tag{}, "foo:qwe ", []Tag{
{
Key: "foo",
Value: "qwe ",
},
})
f([]Tag{}, "foo asd:qwe ", []Tag{
{
Key: "foo asd",
Value: "qwe ",
},
})
f([]Tag{}, "foo:var:123", []Tag{
{
Key: "foo",
Value: "var:123",
},
})
// invalid tags
f([]Tag{}, ":bar", []Tag{})
f([]Tag{}, "foo:", []Tag{})
f([]Tag{}, " ", []Tag{})
}
func TestRowsUnmarshalSuccess(t *testing.T) {
f := func(s string, rowsExpected *Rows) {
t.Helper()
var rows Rows
rows.Unmarshal(s)
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
}
// Try unmarshaling again
rows.Unmarshal(s)
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows on second unmarshal;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
}
rows.Reset()
if len(rows.Rows) != 0 {
t.Fatalf("non-empty rows after reset: %+v", rows.Rows)
}
}
// Empty line
f("", &Rows{})
f("\r", &Rows{})
f("\n\n", &Rows{})
f("\n\r\n", &Rows{})
// Single line
f(" 123:455", &Rows{
Rows: []Row{{
Metric: "123",
Value: 455,
}},
})
f("123:455 |c", &Rows{
Rows: []Row{{
Metric: "123",
Value: 455,
}},
})
f("foobar:-123.456|c", &Rows{
Rows: []Row{{
Metric: "foobar",
Value: -123.456,
}},
})
f("foo.bar:123.456|c\n", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123.456,
}},
})
// with sample rate
f("foo.bar:1|c|@0.1", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 1,
}},
})
// without specifying metric unit
f("foo.bar:123", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123,
}},
})
// without specifying metric unit but with tags
f("foo.bar:123|#foo:bar", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123,
Tags: []Tag{
{
Key: "foo",
Value: "bar",
},
},
}},
})
f("foo.bar:123.456|c|#foo:bar,qwe:asd", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123.456,
Tags: []Tag{
{
Key: "foo",
Value: "bar",
},
{
Key: "qwe",
Value: "asd",
},
},
}},
})
// Whitespace in metric name, tag name and tag value
f("s a:1|c|#ta g1:aaa1,tag2:bb b2", &Rows{
Rows: []Row{{
Metric: "s a",
Value: 1,
Tags: []Tag{
{
Key: "ta g1",
Value: "aaa1",
},
{
Key: "tag2",
Value: "bb b2",
},
},
}},
})
// Tags
f("foo:1|c", &Rows{
Rows: []Row{{
Metric: "foo",
Value: 1,
}},
})
// Empty tag name
f("foo:1|#:123", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{},
Value: 1,
}},
})
// Empty tag value
f("foo:1|#tag1:", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{},
Value: 1,
}},
})
f("foo:1|#bar:baz,aa:,x:y,:z", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{
{
Key: "bar",
Value: "baz",
},
{
Key: "x",
Value: "y",
},
},
Value: 1,
}},
})
// Multi lines
f("foo:0.3|c\naaa:3|g\nbar.baz:0.34|c\n", &Rows{
Rows: []Row{
{
Metric: "foo",
Value: 0.3,
},
{
Metric: "aaa",
Value: 3,
},
{
Metric: "bar.baz",
Value: 0.34,
},
},
})
f("foo:0.3|c|#tag1:1,tag2:2\naaa:3|g|#tag3:3,tag4:4", &Rows{
Rows: []Row{
{
Metric: "foo",
Value: 0.3,
Tags: []Tag{
{
Key: "tag1",
Value: "1",
},
{
Key: "tag2",
Value: "2",
},
},
},
{
Metric: "aaa",
Value: 3,
Tags: []Tag{
{
Key: "tag3",
Value: "3",
},
{
Key: "tag4",
Value: "4",
},
},
},
},
})
// Multi lines with invalid line
f("foo:0.3|c\naaa\nbar.baz:0.34\n", &Rows{
Rows: []Row{
{
Metric: "foo",
Value: 0.3,
},
{
Metric: "bar.baz",
Value: 0.34,
},
},
})
// Whitespace after at the end
f("foo.baz:125|c\na:1.34\t ", &Rows{
Rows: []Row{
{
Metric: "foo.baz",
Value: 125,
},
{
Metric: "a",
Value: 1.34,
},
},
})
// ignores sample rate
f("foo.baz:125|c|@0.5#tag1:12", &Rows{
Rows: []Row{
{
Metric: "foo.baz",
Value: 125,
Tags: []Tag{
{
Key: "tag1",
Value: "12",
},
},
},
},
})
}
func TestRowsUnmarshalFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var rows Rows
rows.Unmarshal(s)
if len(rows.Rows) != 0 {
t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows))
}
// Try again
rows.Unmarshal(s)
if len(rows.Rows) != 0 {
t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows))
}
}
// random string
f("aaa")
// empty value
f("foo:")
// empty metric name
f(":12")
}

View File

@ -0,0 +1,25 @@
package statsd
import (
"fmt"
"testing"
)
func BenchmarkRowsUnmarshal(b *testing.B) {
s := `cpu.usage_user:1.23|c
cpu.usage_system:23.344|c
cpu.usage_iowait:3.3443|c
cpu.usage_irq:0.34432|c
`
b.SetBytes(int64(len(s)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var rows Rows
for pb.Next() {
rows.Unmarshal(s)
if len(rows.Rows) != 4 {
panic(fmt.Errorf("unexpected number of rows unmarshaled: got %d; want 4", len(rows.Rows)))
}
}
})
}

View File

@ -0,0 +1,218 @@
package stream
import (
"bufio"
"flag"
"fmt"
"io"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
trimTimestamp = flag.Duration("statsdTrimTimestamp", time.Second, "Trim timestamps for Statsd data to this duration. "+
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
)
// Parse parses Statsd lines from r and calls callback for the parsed rows.
//
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, isGzipped bool, callback func(rows []statsd.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped statsd data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
}
ctx := getStreamContext(r)
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.ctx = ctx
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {
return err
}
return ctx.callbackErr
}
func (ctx *streamContext) Read() bool {
readCalls.Inc()
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()
ctx.err = fmt.Errorf("cannot read statsd plaintext protocol data: %w", ctx.err)
}
return false
}
return true
}
type streamContext struct {
br *bufio.Reader
reqBuf []byte
tailBuf []byte
err error
wg sync.WaitGroup
callbackErrLock sync.Mutex
callbackErr error
}
func (ctx *streamContext) Error() error {
if ctx.err == io.EOF {
return nil
}
return ctx.err
}
func (ctx *streamContext) hasCallbackError() bool {
ctx.callbackErrLock.Lock()
ok := ctx.callbackErr != nil
ctx.callbackErrLock.Unlock()
return ok
}
func (ctx *streamContext) reset() {
ctx.br.Reset(nil)
ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0]
ctx.err = nil
ctx.callbackErr = nil
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="statsd"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="statsd"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="statsd"}`)
)
func getStreamContext(r io.Reader) *streamContext {
select {
case ctx := <-streamContextPoolCh:
ctx.br.Reset(r)
return ctx
default:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows statsd.Rows
ctx *streamContext
callback func(rows []statsd.Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []statsd.Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
rowsRead.Add(len(rows))
// Fill missing timestamps with the current timestamp rounded to seconds.
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 || r.Timestamp == -1 {
r.Timestamp = currentTimestamp
}
}
// Convert timestamps from seconds to milliseconds.
for i := range rows {
rows[i].Timestamp *= 1e3
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
uw.runCallback(rows)
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View File

@ -0,0 +1,60 @@
package stream
import (
"reflect"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd"
)
func Test_streamContext_Read(t *testing.T) {
f := func(s string, rowsExpected *statsd.Rows) {
t.Helper()
ctx := getStreamContext(strings.NewReader(s))
if !ctx.Read() {
t.Fatalf("expecting successful read")
}
uw := getUnmarshalWork()
callbackCalls := 0
uw.ctx = ctx
uw.callback = func(rows []statsd.Row) error {
callbackCalls++
if len(rows) != len(rowsExpected.Rows) {
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
if !reflect.DeepEqual(rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
return nil
}
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
ctx.wg.Add(1)
uw.Unmarshal()
if callbackCalls != 1 {
t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)
}
}
// Full line without tags
f("aaa:1123|c", &statsd.Rows{
Rows: []statsd.Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
// Full line with tags
f("aaa:1123|c|#x:y", &statsd.Rows{
Rows: []statsd.Row{{
Metric: "aaa",
Tags: []statsd.Tag{{
Key: "x",
Value: "y",
}},
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
}