From 5cdad60a6fd369eb2514bb75be086dfe079e0d64 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 28 Sep 2020 02:06:27 +0300 Subject: [PATCH] lib/protoparser: use 64KB read buffer instead of default 4KB buffer provided by net/http.Server This should reduce syscall overhead when reading big amounts of data --- lib/protoparser/csvimport/streamparser.go | 23 +++++++++++------- lib/protoparser/graphite/parser_test.go | 4 ++-- lib/protoparser/graphite/streamparser.go | 22 +++++++++++------ lib/protoparser/influx/streamparser.go | 22 +++++++++++------ lib/protoparser/native/streamparser.go | 22 ++++++++++++++--- lib/protoparser/opentsdb/streamparser.go | 22 +++++++++++------ lib/protoparser/opentsdbhttp/streamparser.go | 18 ++++++++++---- lib/protoparser/prometheus/streamparser.go | 22 +++++++++++------ .../promremotewrite/streamparser.go | 23 ++++++++++++------ lib/protoparser/vmimport/streamparser.go | 24 +++++++++++-------- 10 files changed, 139 insertions(+), 63 deletions(-) diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 4222a968b..5828ea7ff 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -1,6 +1,7 @@ package csvimport import ( + "bufio" "flag" "fmt" "io" @@ -40,10 +41,9 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { defer common.PutGzipReader(zr) r = zr } - - ctx := getStreamContext() + ctx := getStreamContext(r) defer putStreamContext(ctx) - for ctx.Read(r, cds) { + for ctx.Read(cds) { if err := callback(ctx.Rows.Rows); err != nil { return err } @@ -51,12 +51,12 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { return ctx.Error() } -func (ctx *streamContext) Read(r io.Reader, cds []ColumnDescriptor) bool { +func (ctx *streamContext) Read(cds []ColumnDescriptor) bool { readCalls.Inc() if ctx.err != nil { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -97,6 +97,7 @@ var ( type streamContext struct { Rows Rows + br *bufio.Reader reqBuf []byte tailBuf []byte err error @@ -111,20 +112,26 @@ func (ctx *streamContext) Error() error { func (ctx *streamContext) reset() { ctx.Rows.Reset() + ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil } -func getStreamContext() *streamContext { +func getStreamContext(r io.Reader) *streamContext { select { case ctx := <-streamContextPoolCh: + ctx.br.Reset(r) return ctx default: if v := streamContextPool.Get(); v != nil { - return v.(*streamContext) + ctx := v.(*streamContext) + ctx.br.Reset(r) + return ctx + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } - return &streamContext{} } } diff --git a/lib/protoparser/graphite/parser_test.go b/lib/protoparser/graphite/parser_test.go index 982d51fa0..1ad5a5dfd 100644 --- a/lib/protoparser/graphite/parser_test.go +++ b/lib/protoparser/graphite/parser_test.go @@ -184,8 +184,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) { func Test_streamContext_Read(t *testing.T) { f := func(s string, rowsExpected *Rows) { t.Helper() - ctx := &streamContext{} - ctx.Read(strings.NewReader(s)) + ctx := getStreamContext(strings.NewReader(s)) + ctx.Read() if len(ctx.Rows.Rows) != len(rowsExpected.Rows) { t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", ctx.Rows, rowsExpected.Rows) } diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 45a9159af..08942bf3e 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -1,6 +1,7 @@ package graphite import ( + "bufio" "flag" "fmt" "io" @@ -25,10 +26,10 @@ var ( // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { - ctx := getStreamContext() + ctx := getStreamContext(r) defer putStreamContext(ctx) - for ctx.Read(r) { + for ctx.Read() { if err := callback(ctx.Rows.Rows); err != nil { return err } @@ -36,12 +37,12 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { return ctx.Error() } -func (ctx *streamContext) Read(r io.Reader) bool { +func (ctx *streamContext) Read() bool { readCalls.Inc() if ctx.err != nil { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -81,6 +82,7 @@ func (ctx *streamContext) Read(r io.Reader) bool { type streamContext struct { Rows Rows + br *bufio.Reader reqBuf []byte tailBuf []byte err error @@ -95,6 +97,7 @@ func (ctx *streamContext) Error() error { func (ctx *streamContext) reset() { ctx.Rows.Reset() + ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil @@ -106,15 +109,20 @@ var ( rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="graphite"}`) ) -func getStreamContext() *streamContext { +func getStreamContext(r io.Reader) *streamContext { select { case ctx := <-streamContextPoolCh: + ctx.br.Reset(r) return ctx default: if v := streamContextPool.Get(); v != nil { - return v.(*streamContext) + ctx := v.(*streamContext) + ctx.br.Reset(r) + return ctx + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } - return &streamContext{} } } diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index a964b4616..6a6a6c381 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -1,6 +1,7 @@ package influx import ( + "bufio" "flag" "fmt" "io" @@ -50,9 +51,9 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun tsMultiplier = -1e3 * 3600 } - ctx := getStreamContext() + ctx := getStreamContext(r) defer putStreamContext(ctx) - for ctx.Read(r, tsMultiplier) { + for ctx.Read(tsMultiplier) { if err := callback(db, ctx.Rows.Rows); err != nil { return err } @@ -60,12 +61,12 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun return ctx.Error() } -func (ctx *streamContext) Read(r io.Reader, tsMultiplier int64) bool { +func (ctx *streamContext) Read(tsMultiplier int64) bool { readCalls.Inc() if ctx.err != nil { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -121,6 +122,7 @@ var ( type streamContext struct { Rows Rows + br *bufio.Reader reqBuf []byte tailBuf []byte err error @@ -135,20 +137,26 @@ func (ctx *streamContext) Error() error { func (ctx *streamContext) reset() { ctx.Rows.Reset() + ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil } -func getStreamContext() *streamContext { +func getStreamContext(r io.Reader) *streamContext { select { case ctx := <-streamContextPoolCh: + ctx.br.Reset(r) return ctx default: if v := streamContextPool.Get(); v != nil { - return v.(*streamContext) + ctx := v.(*streamContext) + ctx.br.Reset(r) + return ctx + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } - return &streamContext{} } } diff --git a/lib/protoparser/native/streamparser.go b/lib/protoparser/native/streamparser.go index cc3bcb18f..8728040c3 100644 --- a/lib/protoparser/native/streamparser.go +++ b/lib/protoparser/native/streamparser.go @@ -32,9 +32,8 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error { defer common.PutGzipReader(zr) r = zr } - // By default req.Body uses 4Kb buffer. This size is too small for typical request to /api/v1/import/native, - // so use slightly bigger buffer in order to reduce read syscall overhead. - br := bufio.NewReaderSize(r, 1024*1024) + br := getBufferedReader(r) + defer putBufferedReader(br) // Read time range (tr) trBuf := make([]byte, 16) @@ -195,3 +194,20 @@ func putUnmarshalWork(uw *unmarshalWork) { } var unmarshalWorkPool sync.Pool + +func getBufferedReader(r io.Reader) *bufio.Reader { + v := bufferedReaderPool.Get() + if v == nil { + return bufio.NewReaderSize(r, 64*1024) + } + br := v.(*bufio.Reader) + br.Reset(r) + return br +} + +func putBufferedReader(br *bufio.Reader) { + br.Reset(nil) + bufferedReaderPool.Put(br) +} + +var bufferedReaderPool sync.Pool diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index 4a7844e87..7f1762f06 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -1,6 +1,7 @@ package opentsdb import ( + "bufio" "flag" "fmt" "io" @@ -25,9 +26,9 @@ var ( // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { - ctx := getStreamContext() + ctx := getStreamContext(r) defer putStreamContext(ctx) - for ctx.Read(r) { + for ctx.Read() { if err := callback(ctx.Rows.Rows); err != nil { return err } @@ -35,12 +36,12 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { return ctx.Error() } -func (ctx *streamContext) Read(r io.Reader) bool { +func (ctx *streamContext) Read() bool { readCalls.Inc() if ctx.err != nil { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -80,6 +81,7 @@ func (ctx *streamContext) Read(r io.Reader) bool { type streamContext struct { Rows Rows + br *bufio.Reader reqBuf []byte tailBuf []byte err error @@ -94,6 +96,7 @@ func (ctx *streamContext) Error() error { func (ctx *streamContext) reset() { ctx.Rows.Reset() + ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil @@ -105,15 +108,20 @@ var ( rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentsdb"}`) ) -func getStreamContext() *streamContext { +func getStreamContext(r io.Reader) *streamContext { select { case ctx := <-streamContextPoolCh: + ctx.br.Reset(r) return ctx default: if v := streamContextPool.Get(); v != nil { - return v.(*streamContext) + ctx := v.(*streamContext) + ctx.br.Reset(r) + return ctx + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } - return &streamContext{} } } diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index 3ba3bb11a..eaa1df9cd 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -1,6 +1,7 @@ package opentsdbhttp import ( + "bufio" "flag" "fmt" "io" @@ -40,11 +41,11 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { r = zr } - ctx := getStreamContext() + ctx := getStreamContext(r) defer putStreamContext(ctx) // Read the request in ctx.reqBuf - lr := io.LimitReader(r, int64(maxInsertRequestSize.N)+1) + lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1) reqLen, err := ctx.reqBuf.ReadFrom(lr) if err != nil { readErrors.Inc() @@ -102,11 +103,13 @@ const secondMask int64 = 0x7FFFFFFF00000000 type streamContext struct { Rows Rows + br *bufio.Reader reqBuf bytesutil.ByteBuffer } func (ctx *streamContext) reset() { ctx.Rows.Reset() + ctx.br.Reset(nil) ctx.reqBuf.Reset() } @@ -117,15 +120,20 @@ var ( unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="opentsdbhttp"}`) ) -func getStreamContext() *streamContext { +func getStreamContext(r io.Reader) *streamContext { select { case ctx := <-streamContextPoolCh: + ctx.br.Reset(r) return ctx default: if v := streamContextPool.Get(); v != nil { - return v.(*streamContext) + ctx := v.(*streamContext) + ctx.br.Reset(r) + return ctx + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } - return &streamContext{} } } diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index fe17e6a36..b4d317696 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -1,6 +1,7 @@ package prometheus import ( + "bufio" "fmt" "io" "runtime" @@ -26,9 +27,9 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f defer common.PutGzipReader(zr) r = zr } - ctx := getStreamContext() + ctx := getStreamContext(r) defer putStreamContext(ctx) - for ctx.Read(r, defaultTimestamp) { + for ctx.Read(defaultTimestamp) { if err := callback(ctx.Rows.Rows); err != nil { return err } @@ -36,12 +37,12 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f return ctx.Error() } -func (ctx *streamContext) Read(r io.Reader, defaultTimestamp int64) bool { +func (ctx *streamContext) Read(defaultTimestamp int64) bool { readCalls.Inc() if ctx.err != nil { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -69,6 +70,7 @@ func (ctx *streamContext) Read(r io.Reader, defaultTimestamp int64) bool { type streamContext struct { Rows Rows + br *bufio.Reader reqBuf []byte tailBuf []byte err error @@ -83,6 +85,7 @@ func (ctx *streamContext) Error() error { func (ctx *streamContext) reset() { ctx.Rows.Reset() + ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil @@ -94,15 +97,20 @@ var ( rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="prometheus"}`) ) -func getStreamContext() *streamContext { +func getStreamContext(r io.Reader) *streamContext { select { case ctx := <-streamContextPoolCh: + ctx.br.Reset(r) return ctx default: if v := streamContextPool.Get(); v != nil { - return v.(*streamContext) + ctx := v.(*streamContext) + ctx.br.Reset(r) + return ctx + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } - return &streamContext{} } } diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index 265e5df70..a4ef73846 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -1,6 +1,7 @@ package promremotewrite import ( + "bufio" "fmt" "io" "net/http" @@ -20,9 +21,9 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102 // // callback shouldn't hold timeseries after returning. func ParseStream(req *http.Request, callback func(timeseries []prompb.TimeSeries) error) error { - ctx := getPushCtx() + ctx := getPushCtx(req.Body) defer putPushCtx(ctx) - if err := ctx.Read(req); err != nil { + if err := ctx.Read(); err != nil { return err } return callback(ctx.wr.Timeseries) @@ -30,18 +31,21 @@ func ParseStream(req *http.Request, callback func(timeseries []prompb.TimeSeries type pushCtx struct { wr prompb.WriteRequest + br *bufio.Reader reqBuf []byte } func (ctx *pushCtx) reset() { ctx.wr.Reset() + ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] } -func (ctx *pushCtx) Read(r *http.Request) error { +func (ctx *pushCtx) Read() error { readCalls.Inc() var err error - ctx.reqBuf, err = readSnappy(ctx.reqBuf[:0], r.Body) + + ctx.reqBuf, err = readSnappy(ctx.reqBuf[:0], ctx.br) if err != nil { readErrors.Inc() return fmt.Errorf("cannot read prompb.WriteRequest: %w", err) @@ -68,15 +72,20 @@ var ( unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="promremotewrite"}`) ) -func getPushCtx() *pushCtx { +func getPushCtx(r io.Reader) *pushCtx { select { case ctx := <-pushCtxPoolCh: + ctx.br.Reset(r) return ctx default: if v := pushCtxPool.Get(); v != nil { - return v.(*pushCtx) + ctx := v.(*pushCtx) + ctx.br.Reset(r) + return ctx + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } - return &pushCtx{} } } diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 781c13238..38040ddac 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -34,9 +34,6 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { defer common.PutGzipReader(zr) r = zr } - // By default req.Body uses 4Kb buffer. This size is too small for typical request to /api/v1/import, - // so use slightly bigger buffer in order to reduce read syscall overhead. - br := bufio.NewReaderSize(r, 1024*1024) // Start gomaxprocs workers for processing the parsed data in parallel. gomaxprocs := runtime.GOMAXPROCS(-1) @@ -67,9 +64,9 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { }() } - ctx := getStreamContext() + ctx := getStreamContext(r) defer putStreamContext(ctx) - for ctx.Read(br) { + for ctx.Read() { uw := getUnmarshalWork() uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) workCh <- uw @@ -77,12 +74,12 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { return ctx.Error() } -func (ctx *streamContext) Read(r io.Reader) bool { +func (ctx *streamContext) Read() bool { readCalls.Inc() if ctx.err != nil { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(r, ctx.reqBuf, ctx.tailBuf, maxLineLen.N) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.N) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -100,6 +97,7 @@ var ( ) type streamContext struct { + br *bufio.Reader reqBuf []byte tailBuf []byte err error @@ -113,20 +111,26 @@ func (ctx *streamContext) Error() error { } func (ctx *streamContext) reset() { + ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil } -func getStreamContext() *streamContext { +func getStreamContext(r io.Reader) *streamContext { select { case ctx := <-streamContextPoolCh: + ctx.br.Reset(r) return ctx default: if v := streamContextPool.Get(); v != nil { - return v.(*streamContext) + ctx := v.(*streamContext) + ctx.br.Reset(r) + return ctx + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } - return &streamContext{} } }