mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-12 12:46:23 +01:00
Revert "app/vlselect/logsql: use buffered writer in order to save syscalls when sending big amounts of data to clients"
This reverts commit c19048dc13
.
Reason for revert: it has been appeared that the net/http.ResponseWriter is already buffered,
so there in no need in double bufferring
This commit is contained in:
parent
19870d42c5
commit
352429486a
@ -1,70 +0,0 @@
|
|||||||
package logsql
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
func getBufferedWriter() *bufferedWriter {
|
|
||||||
v := bufferedWriterPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
return &bufferedWriter{}
|
|
||||||
}
|
|
||||||
return v.(*bufferedWriter)
|
|
||||||
}
|
|
||||||
|
|
||||||
func putBufferedWriter(bw *bufferedWriter) {
|
|
||||||
bw.reset()
|
|
||||||
bufferedWriterPool.Put(bw)
|
|
||||||
}
|
|
||||||
|
|
||||||
var bufferedWriterPool sync.Pool
|
|
||||||
|
|
||||||
type bufferedWriter struct {
|
|
||||||
w io.Writer
|
|
||||||
buf []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bw *bufferedWriter) reset() {
|
|
||||||
bw.w = nil
|
|
||||||
bw.buf = bw.buf[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bw *bufferedWriter) Init(w io.Writer, bufLen int) {
|
|
||||||
bw.reset()
|
|
||||||
bw.w = w
|
|
||||||
|
|
||||||
buf := bw.buf
|
|
||||||
if n := bufLen - cap(buf); n > 0 {
|
|
||||||
buf = append(buf[:cap(buf)], make([]byte, n)...)
|
|
||||||
}
|
|
||||||
bw.buf = buf[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bw *bufferedWriter) Write(p []byte) (int, error) {
|
|
||||||
buf := bw.buf
|
|
||||||
if len(buf)+len(p) <= cap(buf) {
|
|
||||||
bw.buf = append(buf, p...)
|
|
||||||
return len(p), nil
|
|
||||||
}
|
|
||||||
if len(buf) > 0 {
|
|
||||||
if _, err := bw.w.Write(buf); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
buf = buf[:0]
|
|
||||||
}
|
|
||||||
if len(p) <= cap(buf) {
|
|
||||||
bw.buf = append(buf, p...)
|
|
||||||
return len(p), nil
|
|
||||||
}
|
|
||||||
bw.buf = buf
|
|
||||||
return bw.w.Write(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bw *bufferedWriter) FlushIgnoreErrors() {
|
|
||||||
buf := bw.buf
|
|
||||||
if len(buf) > 0 {
|
|
||||||
_, _ = bw.w.Write(buf)
|
|
||||||
bw.buf = buf[:0]
|
|
||||||
}
|
|
||||||
}
|
|
@ -37,7 +37,7 @@ var sortWriterPool sync.Pool
|
|||||||
// is sorted by _time field.
|
// is sorted by _time field.
|
||||||
type sortWriter struct {
|
type sortWriter struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
bw *bufferedWriter
|
w io.Writer
|
||||||
maxBufLen int
|
maxBufLen int
|
||||||
buf []byte
|
buf []byte
|
||||||
bufFlushed bool
|
bufFlushed bool
|
||||||
@ -46,10 +46,7 @@ type sortWriter struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sw *sortWriter) reset() {
|
func (sw *sortWriter) reset() {
|
||||||
if sw.bw != nil {
|
sw.w = nil
|
||||||
putBufferedWriter(sw.bw)
|
|
||||||
sw.bw = nil
|
|
||||||
}
|
|
||||||
sw.maxBufLen = 0
|
sw.maxBufLen = 0
|
||||||
sw.buf = sw.buf[:0]
|
sw.buf = sw.buf[:0]
|
||||||
sw.bufFlushed = false
|
sw.bufFlushed = false
|
||||||
@ -59,8 +56,7 @@ func (sw *sortWriter) reset() {
|
|||||||
func (sw *sortWriter) Init(w io.Writer, maxBufLen int) {
|
func (sw *sortWriter) Init(w io.Writer, maxBufLen int) {
|
||||||
sw.reset()
|
sw.reset()
|
||||||
|
|
||||||
sw.bw = getBufferedWriter()
|
sw.w = w
|
||||||
sw.bw.Init(w, 64*1024)
|
|
||||||
sw.maxBufLen = maxBufLen
|
sw.maxBufLen = maxBufLen
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,7 +69,7 @@ func (sw *sortWriter) MustWrite(p []byte) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if sw.bufFlushed {
|
if sw.bufFlushed {
|
||||||
if _, err := sw.bw.Write(p); err != nil {
|
if _, err := sw.w.Write(p); err != nil {
|
||||||
sw.hasErr = true
|
sw.hasErr = true
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@ -84,26 +80,26 @@ func (sw *sortWriter) MustWrite(p []byte) {
|
|||||||
}
|
}
|
||||||
sw.bufFlushed = true
|
sw.bufFlushed = true
|
||||||
if len(sw.buf) > 0 {
|
if len(sw.buf) > 0 {
|
||||||
if _, err := sw.bw.Write(sw.buf); err != nil {
|
if _, err := sw.w.Write(sw.buf); err != nil {
|
||||||
sw.hasErr = true
|
sw.hasErr = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sw.buf = sw.buf[:0]
|
sw.buf = sw.buf[:0]
|
||||||
}
|
}
|
||||||
if _, err := sw.bw.Write(p); err != nil {
|
if _, err := sw.w.Write(p); err != nil {
|
||||||
sw.hasErr = true
|
sw.hasErr = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sw *sortWriter) FinalFlush() {
|
func (sw *sortWriter) FinalFlush() {
|
||||||
if !sw.hasErr && !sw.bufFlushed {
|
if sw.hasErr || sw.bufFlushed {
|
||||||
rs := getRowsSorter()
|
return
|
||||||
rs.parseRows(sw.buf)
|
|
||||||
rs.sort()
|
|
||||||
WriteJSONRows(sw.bw, rs.rows)
|
|
||||||
putRowsSorter(rs)
|
|
||||||
}
|
}
|
||||||
sw.bw.FlushIgnoreErrors()
|
rs := getRowsSorter()
|
||||||
|
rs.parseRows(sw.buf)
|
||||||
|
rs.sort()
|
||||||
|
WriteJSONRows(sw.w, rs.rows)
|
||||||
|
putRowsSorter(rs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRowsSorter() *rowsSorter {
|
func getRowsSorter() *rowsSorter {
|
||||||
|
Loading…
Reference in New Issue
Block a user