mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
lib/protoparser: removed unnecessary call to SetReadDeadline when reading a stream of data
The OS should return any buffered data in the stream without the need to set the read timeout. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696
This commit is contained in:
parent
98217e4c40
commit
d9f7ea1c6e
@ -2,13 +2,10 @@ package common
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// The maximum size of a single line returned by ReadLinesBlock.
|
// The maximum size of a single line returned by ReadLinesBlock.
|
||||||
@ -44,7 +41,6 @@ func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]b
|
|||||||
dstBuf = append(dstBuf[:0], tailBuf...)
|
dstBuf = append(dstBuf[:0], tailBuf...)
|
||||||
tailBuf = tailBuf[:0]
|
tailBuf = tailBuf[:0]
|
||||||
again:
|
again:
|
||||||
startTime := fasttime.UnixTimestamp()
|
|
||||||
n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)])
|
n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)])
|
||||||
// Check for error only if zero bytes read from r, i.e. no forward progress made.
|
// Check for error only if zero bytes read from r, i.e. no forward progress made.
|
||||||
// Otherwise process the read data.
|
// Otherwise process the read data.
|
||||||
@ -59,16 +55,6 @@ again:
|
|||||||
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 .
|
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 .
|
||||||
return dstBuf, tailBuf, nil
|
return dstBuf, tailBuf, nil
|
||||||
}
|
}
|
||||||
var ne net.Error
|
|
||||||
if errors.As(err, &ne) && ne.Timeout() {
|
|
||||||
if fasttime.UnixTimestamp() == startTime {
|
|
||||||
// Prevent from busy loop when timeout erorrs are returned immediately.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 .
|
|
||||||
return dstBuf, tailBuf, fmt.Errorf("detected busy loop with repeated timeout error")
|
|
||||||
}
|
|
||||||
// Return empty results for an ordinary timeout.
|
|
||||||
return dstBuf, tailBuf, nil
|
|
||||||
}
|
|
||||||
return dstBuf, tailBuf, err
|
return dstBuf, tailBuf, err
|
||||||
}
|
}
|
||||||
dstBuf = dstBuf[:len(dstBuf)+n]
|
dstBuf = dstBuf[:len(dstBuf)+n]
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -37,20 +36,11 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
|||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
const flushTimeout = 3 * time.Second
|
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader) bool {
|
func (ctx *streamContext) Read(r io.Reader) bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if c, ok := r.(net.Conn); ok {
|
|
||||||
if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil {
|
|
||||||
readErrors.Inc()
|
|
||||||
ctx.err = fmt.Errorf("cannot set read deadline: %w", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
if ctx.err != io.EOF {
|
if ctx.err != io.EOF {
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -36,20 +35,11 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
|||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
const flushTimeout = 3 * time.Second
|
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader) bool {
|
func (ctx *streamContext) Read(r io.Reader) bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if c, ok := r.(net.Conn); ok {
|
|
||||||
if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil {
|
|
||||||
readErrors.Inc()
|
|
||||||
ctx.err = fmt.Errorf("cannot set read deadline: %w", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
if ctx.err != io.EOF {
|
if ctx.err != io.EOF {
|
||||||
|
@ -3,7 +3,6 @@ package prometheus
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -37,20 +36,11 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e
|
|||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
const flushTimeout = 3 * time.Second
|
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader) bool {
|
func (ctx *streamContext) Read(r io.Reader) bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if c, ok := r.(net.Conn); ok {
|
|
||||||
if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil {
|
|
||||||
readErrors.Inc()
|
|
||||||
ctx.err = fmt.Errorf("cannot set read deadline: %w", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
if ctx.err != io.EOF {
|
if ctx.err != io.EOF {
|
||||||
|
Loading…
Reference in New Issue
Block a user