diff --git a/app/vmagent/opentsdb/request_handler.go b/app/vmagent/opentsdb/request_handler.go index e1e42ba4b..8388a5238 100644 --- a/app/vmagent/opentsdb/request_handler.go +++ b/app/vmagent/opentsdb/request_handler.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb/stream" "github.com/VictoriaMetrics/metrics" ) @@ -19,7 +20,7 @@ var ( // // See http://opentsdb.net/docs/build/html/api_telnet/put.html func InsertHandler(r io.Reader) error { - return parser.ParseStream(r, insertRows) + return stream.Parse(r, insertRows) } func insertRows(rows []parser.Row) error { diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 44bf3eb98..9f3f0fbdf 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb/stream" "github.com/VictoriaMetrics/metrics" ) @@ -18,7 +19,7 @@ var ( // // See http://opentsdb.net/docs/build/html/api_telnet/put.html func InsertHandler(r io.Reader) error { - return parser.ParseStream(r, insertRows) + return stream.Parse(r, insertRows) } func insertRows(rows []parser.Row) error { diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/stream/streamparser.go similarity index 92% rename from lib/protoparser/opentsdb/streamparser.go rename to lib/protoparser/opentsdb/stream/streamparser.go index e094d4d8b..d7bdbea0e 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/stream/streamparser.go @@ -1,4 +1,4 @@ -package opentsdb +package stream import ( "bufio" @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -21,12 +22,12 @@ var ( "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") ) -// ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. +// Parse parses OpenTSDB lines from r and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from r. // // callback shouldn't hold rows after returning. -func ParseStream(r io.Reader, callback func(rows []Row) error) error { +func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -134,9 +135,9 @@ var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { - rows Rows + rows opentsdb.Rows ctx *streamContext - callback func(rows []Row) error + callback func(rows []opentsdb.Row) error reqBuf []byte } @@ -147,7 +148,7 @@ func (uw *unmarshalWork) reset() { uw.reqBuf = uw.reqBuf[:0] } -func (uw *unmarshalWork) runCallback(rows []Row) { +func (uw *unmarshalWork) runCallback(rows []opentsdb.Row) { ctx := uw.ctx if err := uw.callback(rows); err != nil { ctx.callbackErrLock.Lock()