app/vlselect/logsql: use buffered writer in order to save syscalls when sending big amounts of data to clients

This commit is contained in:
Aliaksandr Valialkin 2023-06-21 20:25:32 -07:00
parent 33625610c6
commit 19870d42c5
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 87 additions and 13 deletions

View File

@ -0,0 +1,70 @@
package logsql
import (
"io"
"sync"
)
func getBufferedWriter() *bufferedWriter {
v := bufferedWriterPool.Get()
if v == nil {
return &bufferedWriter{}
}
return v.(*bufferedWriter)
}
func putBufferedWriter(bw *bufferedWriter) {
bw.reset()
bufferedWriterPool.Put(bw)
}
var bufferedWriterPool sync.Pool
type bufferedWriter struct {
w io.Writer
buf []byte
}
func (bw *bufferedWriter) reset() {
bw.w = nil
bw.buf = bw.buf[:0]
}
func (bw *bufferedWriter) Init(w io.Writer, bufLen int) {
bw.reset()
bw.w = w
buf := bw.buf
if n := bufLen - cap(buf); n > 0 {
buf = append(buf[:cap(buf)], make([]byte, n)...)
}
bw.buf = buf[:0]
}
func (bw *bufferedWriter) Write(p []byte) (int, error) {
buf := bw.buf
if len(buf)+len(p) <= cap(buf) {
bw.buf = append(buf, p...)
return len(p), nil
}
if len(buf) > 0 {
if _, err := bw.w.Write(buf); err != nil {
return 0, err
}
buf = buf[:0]
}
if len(p) <= cap(buf) {
bw.buf = append(buf, p...)
return len(p), nil
}
bw.buf = buf
return bw.w.Write(p)
}
func (bw *bufferedWriter) FlushIgnoreErrors() {
buf := bw.buf
if len(buf) > 0 {
_, _ = bw.w.Write(buf)
bw.buf = buf[:0]
}
}

View File

@ -37,7 +37,7 @@ var sortWriterPool sync.Pool
// is sorted by _time field. // is sorted by _time field.
type sortWriter struct { type sortWriter struct {
mu sync.Mutex mu sync.Mutex
w io.Writer bw *bufferedWriter
maxBufLen int maxBufLen int
buf []byte buf []byte
bufFlushed bool bufFlushed bool
@ -46,7 +46,10 @@ type sortWriter struct {
} }
func (sw *sortWriter) reset() { func (sw *sortWriter) reset() {
sw.w = nil if sw.bw != nil {
putBufferedWriter(sw.bw)
sw.bw = nil
}
sw.maxBufLen = 0 sw.maxBufLen = 0
sw.buf = sw.buf[:0] sw.buf = sw.buf[:0]
sw.bufFlushed = false sw.bufFlushed = false
@ -56,7 +59,8 @@ func (sw *sortWriter) reset() {
func (sw *sortWriter) Init(w io.Writer, maxBufLen int) { func (sw *sortWriter) Init(w io.Writer, maxBufLen int) {
sw.reset() sw.reset()
sw.w = w sw.bw = getBufferedWriter()
sw.bw.Init(w, 64*1024)
sw.maxBufLen = maxBufLen sw.maxBufLen = maxBufLen
} }
@ -69,7 +73,7 @@ func (sw *sortWriter) MustWrite(p []byte) {
} }
if sw.bufFlushed { if sw.bufFlushed {
if _, err := sw.w.Write(p); err != nil { if _, err := sw.bw.Write(p); err != nil {
sw.hasErr = true sw.hasErr = true
} }
return return
@ -80,26 +84,26 @@ func (sw *sortWriter) MustWrite(p []byte) {
} }
sw.bufFlushed = true sw.bufFlushed = true
if len(sw.buf) > 0 { if len(sw.buf) > 0 {
if _, err := sw.w.Write(sw.buf); err != nil { if _, err := sw.bw.Write(sw.buf); err != nil {
sw.hasErr = true sw.hasErr = true
return return
} }
sw.buf = sw.buf[:0] sw.buf = sw.buf[:0]
} }
if _, err := sw.w.Write(p); err != nil { if _, err := sw.bw.Write(p); err != nil {
sw.hasErr = true sw.hasErr = true
} }
} }
func (sw *sortWriter) FinalFlush() { func (sw *sortWriter) FinalFlush() {
if sw.hasErr || sw.bufFlushed { if !sw.hasErr && !sw.bufFlushed {
return rs := getRowsSorter()
rs.parseRows(sw.buf)
rs.sort()
WriteJSONRows(sw.bw, rs.rows)
putRowsSorter(rs)
} }
rs := getRowsSorter() sw.bw.FlushIgnoreErrors()
rs.parseRows(sw.buf)
rs.sort()
WriteJSONRows(sw.w, rs.rows)
putRowsSorter(rs)
} }
func getRowsSorter() *rowsSorter { func getRowsSorter() *rowsSorter {