From 19870d42c57c6a1b6ed471dc9146bc6e144045dd Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 21 Jun 2023 20:25:32 -0700 Subject: [PATCH] app/vlselect/logsql: use buffered writer in order to save syscalls when sending big amounts of data to clients --- app/vlselect/logsql/buffered_writer.go | 70 ++++++++++++++++++++++++++ app/vlselect/logsql/sort_writer.go | 30 ++++++----- 2 files changed, 87 insertions(+), 13 deletions(-) create mode 100644 app/vlselect/logsql/buffered_writer.go diff --git a/app/vlselect/logsql/buffered_writer.go b/app/vlselect/logsql/buffered_writer.go new file mode 100644 index 0000000000..cf539d1b01 --- /dev/null +++ b/app/vlselect/logsql/buffered_writer.go @@ -0,0 +1,70 @@ +package logsql + +import ( + "io" + "sync" +) + +func getBufferedWriter() *bufferedWriter { + v := bufferedWriterPool.Get() + if v == nil { + return &bufferedWriter{} + } + return v.(*bufferedWriter) +} + +func putBufferedWriter(bw *bufferedWriter) { + bw.reset() + bufferedWriterPool.Put(bw) +} + +var bufferedWriterPool sync.Pool + +type bufferedWriter struct { + w io.Writer + buf []byte +} + +func (bw *bufferedWriter) reset() { + bw.w = nil + bw.buf = bw.buf[:0] +} + +func (bw *bufferedWriter) Init(w io.Writer, bufLen int) { + bw.reset() + bw.w = w + + buf := bw.buf + if n := bufLen - cap(buf); n > 0 { + buf = append(buf[:cap(buf)], make([]byte, n)...) + } + bw.buf = buf[:0] +} + +func (bw *bufferedWriter) Write(p []byte) (int, error) { + buf := bw.buf + if len(buf)+len(p) <= cap(buf) { + bw.buf = append(buf, p...) + return len(p), nil + } + if len(buf) > 0 { + if _, err := bw.w.Write(buf); err != nil { + return 0, err + } + buf = buf[:0] + } + if len(p) <= cap(buf) { + bw.buf = append(buf, p...) + return len(p), nil + } + bw.buf = buf + return bw.w.Write(p) +} + +func (bw *bufferedWriter) FlushIgnoreErrors() { + buf := bw.buf + if len(buf) > 0 { + _, _ = bw.w.Write(buf) + bw.buf = buf[:0] + } +} diff --git a/app/vlselect/logsql/sort_writer.go b/app/vlselect/logsql/sort_writer.go index 919fc8310b..b26148f164 100644 --- a/app/vlselect/logsql/sort_writer.go +++ b/app/vlselect/logsql/sort_writer.go @@ -37,7 +37,7 @@ var sortWriterPool sync.Pool // is sorted by _time field. type sortWriter struct { mu sync.Mutex - w io.Writer + bw *bufferedWriter maxBufLen int buf []byte bufFlushed bool @@ -46,7 +46,10 @@ type sortWriter struct { } func (sw *sortWriter) reset() { - sw.w = nil + if sw.bw != nil { + putBufferedWriter(sw.bw) + sw.bw = nil + } sw.maxBufLen = 0 sw.buf = sw.buf[:0] sw.bufFlushed = false @@ -56,7 +59,8 @@ func (sw *sortWriter) reset() { func (sw *sortWriter) Init(w io.Writer, maxBufLen int) { sw.reset() - sw.w = w + sw.bw = getBufferedWriter() + sw.bw.Init(w, 64*1024) sw.maxBufLen = maxBufLen } @@ -69,7 +73,7 @@ func (sw *sortWriter) MustWrite(p []byte) { } if sw.bufFlushed { - if _, err := sw.w.Write(p); err != nil { + if _, err := sw.bw.Write(p); err != nil { sw.hasErr = true } return @@ -80,26 +84,26 @@ func (sw *sortWriter) MustWrite(p []byte) { } sw.bufFlushed = true if len(sw.buf) > 0 { - if _, err := sw.w.Write(sw.buf); err != nil { + if _, err := sw.bw.Write(sw.buf); err != nil { sw.hasErr = true return } sw.buf = sw.buf[:0] } - if _, err := sw.w.Write(p); err != nil { + if _, err := sw.bw.Write(p); err != nil { sw.hasErr = true } } func (sw *sortWriter) FinalFlush() { - if sw.hasErr || sw.bufFlushed { - return + if !sw.hasErr && !sw.bufFlushed { + rs := getRowsSorter() + rs.parseRows(sw.buf) + rs.sort() + WriteJSONRows(sw.bw, rs.rows) + putRowsSorter(rs) } - rs := getRowsSorter() - rs.parseRows(sw.buf) - rs.sort() - WriteJSONRows(sw.w, rs.rows) - putRowsSorter(rs) + sw.bw.FlushIgnoreErrors() } func getRowsSorter() *rowsSorter {