diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index 968ea936b..7fecea6b3 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream" "github.com/VictoriaMetrics/metrics" ) @@ -24,7 +25,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } - return parser.ParseStream(req, func(rows []parser.Row) error { + return stream.Parse(req, func(rows []parser.Row) error { return insertRows(at, rows, extraLabels) }) } diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index 3dafe29f9..2213e7dee 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream" "github.com/VictoriaMetrics/metrics" ) @@ -27,7 +28,7 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } - return parser.ParseStream(req, func(rows []parser.Row) error { + return stream.Parse(req, func(rows []parser.Row) error { return insertRows(rows, extraLabels) }) default: diff --git a/lib/protoparser/opentsdbhttp/parser_pool.go b/lib/protoparser/opentsdbhttp/parser_pool.go index 3863528a1..b7196d7c4 100644 --- a/lib/protoparser/opentsdbhttp/parser_pool.go +++ b/lib/protoparser/opentsdbhttp/parser_pool.go @@ -4,17 +4,17 @@ import ( "github.com/valyala/fastjson" ) -// getJSONParser returns JSON parser. +// GetJSONParser returns JSON parser. // -// The parser must be returned to the pool via putJSONParser when no longer needed. -func getJSONParser() *fastjson.Parser { +// The parser must be returned to the pool via PutJSONParser when no longer needed. +func GetJSONParser() *fastjson.Parser { return parserPool.Get() } -// putJSONParser returns p to the pool. +// PutJSONParser returns p to the pool. // // p cannot be used after returning to the pool. -func putJSONParser(p *fastjson.Parser) { +func PutJSONParser(p *fastjson.Parser) { parserPool.Put(p) } diff --git a/lib/protoparser/opentsdbhttp/parser_test.go b/lib/protoparser/opentsdbhttp/parser_test.go index 7b75bbaa5..7092a3486 100644 --- a/lib/protoparser/opentsdbhttp/parser_test.go +++ b/lib/protoparser/opentsdbhttp/parser_test.go @@ -9,8 +9,8 @@ func TestRowsUnmarshalFailure(t *testing.T) { f := func(s string) { t.Helper() var rows Rows - p := getJSONParser() - defer putJSONParser(p) + p := GetJSONParser() + defer PutJSONParser(p) v, err := p.Parse(s) if err != nil { // Expected JSON parser error @@ -84,8 +84,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) { t.Helper() var rows Rows - p := getJSONParser() - defer putJSONParser(p) + p := GetJSONParser() + defer PutJSONParser(p) v, err := p.Parse(s) if err != nil { t.Fatalf("cannot parse json %s: %s", s, err) diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/stream/streamparser.go similarity index 90% rename from lib/protoparser/opentsdbhttp/streamparser.go rename to lib/protoparser/opentsdbhttp/stream/streamparser.go index cfa0512a6..e4dd2119a 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/stream/streamparser.go @@ -1,4 +1,4 @@ -package opentsdbhttp +package stream import ( "bufio" @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -24,12 +25,12 @@ var ( "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") ) -// ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows. +// Parse parses OpenTSDB http lines from req and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from req. // // callback shouldn't hold rows after returning. -func ParseStream(req *http.Request, callback func(rows []Row) error) error { +func Parse(req *http.Request, callback func(rows []opentsdbhttp.Row) error) error { wcr := writeconcurrencylimiter.GetReader(req.Body) defer writeconcurrencylimiter.PutReader(wcr) r := io.Reader(req.Body) @@ -62,8 +63,8 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { // Process the request synchronously, since there is no sense in processing a single request asynchronously. // Sync code is easier to read and understand. - p := getJSONParser() - defer putJSONParser(p) + p := opentsdbhttp.GetJSONParser() + defer opentsdbhttp.PutJSONParser(p) v, err := p.ParseBytes(ctx.reqBuf.B) if err != nil { unmarshalErrors.Inc() @@ -155,15 +156,15 @@ func putStreamContext(ctx *streamContext) { var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) -func getRows() *Rows { +func getRows() *opentsdbhttp.Rows { v := rowsPool.Get() if v == nil { - return &Rows{} + return &opentsdbhttp.Rows{} } - return v.(*Rows) + return v.(*opentsdbhttp.Rows) } -func putRows(rs *Rows) { +func putRows(rs *opentsdbhttp.Rows) { rs.Reset() rowsPool.Put(rs) }