From 352429486ac6707d802dc2b733eeccccacae7c83 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 21 Jun 2023 20:40:01 -0700 Subject: [PATCH] Revert "app/vlselect/logsql: use buffered writer in order to save syscalls when sending big amounts of data to clients" This reverts commit c19048dc13d2b49d3b7d901ba81f4d9dd172508b. Reason for revert: it has been appeared that the net/http.ResponseWriter is already buffered, so there in no need in double bufferring --- app/vlselect/logsql/buffered_writer.go | 70 -------------------------- app/vlselect/logsql/sort_writer.go | 30 +++++------ 2 files changed, 13 insertions(+), 87 deletions(-) delete mode 100644 app/vlselect/logsql/buffered_writer.go diff --git a/app/vlselect/logsql/buffered_writer.go b/app/vlselect/logsql/buffered_writer.go deleted file mode 100644 index cf539d1b01..0000000000 --- a/app/vlselect/logsql/buffered_writer.go +++ /dev/null @@ -1,70 +0,0 @@ -package logsql - -import ( - "io" - "sync" -) - -func getBufferedWriter() *bufferedWriter { - v := bufferedWriterPool.Get() - if v == nil { - return &bufferedWriter{} - } - return v.(*bufferedWriter) -} - -func putBufferedWriter(bw *bufferedWriter) { - bw.reset() - bufferedWriterPool.Put(bw) -} - -var bufferedWriterPool sync.Pool - -type bufferedWriter struct { - w io.Writer - buf []byte -} - -func (bw *bufferedWriter) reset() { - bw.w = nil - bw.buf = bw.buf[:0] -} - -func (bw *bufferedWriter) Init(w io.Writer, bufLen int) { - bw.reset() - bw.w = w - - buf := bw.buf - if n := bufLen - cap(buf); n > 0 { - buf = append(buf[:cap(buf)], make([]byte, n)...) - } - bw.buf = buf[:0] -} - -func (bw *bufferedWriter) Write(p []byte) (int, error) { - buf := bw.buf - if len(buf)+len(p) <= cap(buf) { - bw.buf = append(buf, p...) - return len(p), nil - } - if len(buf) > 0 { - if _, err := bw.w.Write(buf); err != nil { - return 0, err - } - buf = buf[:0] - } - if len(p) <= cap(buf) { - bw.buf = append(buf, p...) - return len(p), nil - } - bw.buf = buf - return bw.w.Write(p) -} - -func (bw *bufferedWriter) FlushIgnoreErrors() { - buf := bw.buf - if len(buf) > 0 { - _, _ = bw.w.Write(buf) - bw.buf = buf[:0] - } -} diff --git a/app/vlselect/logsql/sort_writer.go b/app/vlselect/logsql/sort_writer.go index b26148f164..919fc8310b 100644 --- a/app/vlselect/logsql/sort_writer.go +++ b/app/vlselect/logsql/sort_writer.go @@ -37,7 +37,7 @@ var sortWriterPool sync.Pool // is sorted by _time field. type sortWriter struct { mu sync.Mutex - bw *bufferedWriter + w io.Writer maxBufLen int buf []byte bufFlushed bool @@ -46,10 +46,7 @@ type sortWriter struct { } func (sw *sortWriter) reset() { - if sw.bw != nil { - putBufferedWriter(sw.bw) - sw.bw = nil - } + sw.w = nil sw.maxBufLen = 0 sw.buf = sw.buf[:0] sw.bufFlushed = false @@ -59,8 +56,7 @@ func (sw *sortWriter) reset() { func (sw *sortWriter) Init(w io.Writer, maxBufLen int) { sw.reset() - sw.bw = getBufferedWriter() - sw.bw.Init(w, 64*1024) + sw.w = w sw.maxBufLen = maxBufLen } @@ -73,7 +69,7 @@ func (sw *sortWriter) MustWrite(p []byte) { } if sw.bufFlushed { - if _, err := sw.bw.Write(p); err != nil { + if _, err := sw.w.Write(p); err != nil { sw.hasErr = true } return @@ -84,26 +80,26 @@ func (sw *sortWriter) MustWrite(p []byte) { } sw.bufFlushed = true if len(sw.buf) > 0 { - if _, err := sw.bw.Write(sw.buf); err != nil { + if _, err := sw.w.Write(sw.buf); err != nil { sw.hasErr = true return } sw.buf = sw.buf[:0] } - if _, err := sw.bw.Write(p); err != nil { + if _, err := sw.w.Write(p); err != nil { sw.hasErr = true } } func (sw *sortWriter) FinalFlush() { - if !sw.hasErr && !sw.bufFlushed { - rs := getRowsSorter() - rs.parseRows(sw.buf) - rs.sort() - WriteJSONRows(sw.bw, rs.rows) - putRowsSorter(rs) + if sw.hasErr || sw.bufFlushed { + return } - sw.bw.FlushIgnoreErrors() + rs := getRowsSorter() + rs.parseRows(sw.buf) + rs.sort() + WriteJSONRows(sw.w, rs.rows) + putRowsSorter(rs) } func getRowsSorter() *rowsSorter {