diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index d7211e385b..7ff651d5b9 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -36,7 +37,7 @@ var ( // // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(r io.Reader, isGzipped bool) error { - return parser.ParseStream(r, isGzipped, "", "", func(db string, rows []parser.Row) error { + return stream.Parse(r, isGzipped, "", "", func(db string, rows []parser.Row) error { return insertRows(nil, db, rows, nil) }) } @@ -54,7 +55,7 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { precision := q.Get("precision") // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") - return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { + return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { return insertRows(at, db, rows, extraLabels) }) } diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 3a82157e74..732641ecf7 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -37,7 +38,7 @@ var ( // // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(at *auth.Token, r io.Reader) error { - return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { + return stream.Parse(r, false, "", "", func(db string, rows []parser.Row) error { return insertRows(at, db, rows, nil) }) } @@ -55,7 +56,7 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { precision := q.Get("precision") // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") - return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { + return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { return insertRows(at, db, rows, extraLabels) }) } diff --git a/lib/protoparser/datadog/stream/streamparser.go b/lib/protoparser/datadog/stream/streamparser.go index de90d51d2b..a657c56189 100644 --- a/lib/protoparser/datadog/stream/streamparser.go +++ b/lib/protoparser/datadog/stream/streamparser.go @@ -31,7 +31,7 @@ var ( "https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics") ) -// ParseStream parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request. +// Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request. // // callback shouldn't hold series after returning. func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error { diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/stream/streamparser.go similarity index 93% rename from lib/protoparser/influx/streamparser.go rename to lib/protoparser/influx/stream/streamparser.go index 1d1a1057f3..a01de56393 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/stream/streamparser.go @@ -1,4 +1,4 @@ -package influx +package stream import ( "bufio" @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -22,12 +23,12 @@ var ( "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") ) -// ParseStream parses r with the given args and calls callback for the parsed rows. +// Parse parses r with the given args and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from r. // // callback shouldn't hold rows after returning. -func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error { +func Parse(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []influx.Row) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -162,9 +163,9 @@ var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { - rows Rows + rows influx.Rows ctx *streamContext - callback func(db string, rows []Row) error + callback func(db string, rows []influx.Row) error db string tsMultiplier int64 reqBuf []byte @@ -179,7 +180,7 @@ func (uw *unmarshalWork) reset() { uw.reqBuf = uw.reqBuf[:0] } -func (uw *unmarshalWork) runCallback(rows []Row) { +func (uw *unmarshalWork) runCallback(rows []influx.Row) { ctx := uw.ctx if err := uw.callback(uw.db, rows); err != nil { ctx.callbackErrLock.Lock() diff --git a/lib/protoparser/influx/streamparser_test.go b/lib/protoparser/influx/stream/streamparser_test.go similarity index 97% rename from lib/protoparser/influx/streamparser_test.go rename to lib/protoparser/influx/stream/streamparser_test.go index ca7bd1b30a..4722d17fa7 100644 --- a/lib/protoparser/influx/streamparser_test.go +++ b/lib/protoparser/influx/stream/streamparser_test.go @@ -1,4 +1,4 @@ -package influx +package stream import ( "testing"