diff --git a/lib/protoparser/common/lines_reader.go b/lib/protoparser/common/lines_reader.go index 07433d76d5..170bb317a1 100644 --- a/lib/protoparser/common/lines_reader.go +++ b/lib/protoparser/common/lines_reader.go @@ -2,13 +2,10 @@ package common import ( "bytes" - "errors" "fmt" "io" - "net" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // The maximum size of a single line returned by ReadLinesBlock. @@ -44,7 +41,6 @@ func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]b dstBuf = append(dstBuf[:0], tailBuf...) tailBuf = tailBuf[:0] again: - startTime := fasttime.UnixTimestamp() n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)]) // Check for error only if zero bytes read from r, i.e. no forward progress made. // Otherwise process the read data. @@ -59,16 +55,6 @@ again: // This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 . return dstBuf, tailBuf, nil } - var ne net.Error - if errors.As(err, &ne) && ne.Timeout() { - if fasttime.UnixTimestamp() == startTime { - // Prevent from busy loop when timeout erorrs are returned immediately. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 . - return dstBuf, tailBuf, fmt.Errorf("detected busy loop with repeated timeout error") - } - // Return empty results for an ordinary timeout. - return dstBuf, tailBuf, nil - } return dstBuf, tailBuf, err } dstBuf = dstBuf[:len(dstBuf)+n] diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 3a1f30b8aa..45a9159af9 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -4,7 +4,6 @@ import ( "flag" "fmt" "io" - "net" "runtime" "sync" "time" @@ -37,20 +36,11 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { return ctx.Error() } -const flushTimeout = 3 * time.Second - func (ctx *streamContext) Read(r io.Reader) bool { readCalls.Inc() if ctx.err != nil { return false } - if c, ok := r.(net.Conn); ok { - if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil { - readErrors.Inc() - ctx.err = fmt.Errorf("cannot set read deadline: %w", err) - return false - } - } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index fae719063c..4a7844e877 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -4,7 +4,6 @@ import ( "flag" "fmt" "io" - "net" "runtime" "sync" "time" @@ -36,20 +35,11 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { return ctx.Error() } -const flushTimeout = 3 * time.Second - func (ctx *streamContext) Read(r io.Reader) bool { readCalls.Inc() if ctx.err != nil { return false } - if c, ok := r.(net.Conn); ok { - if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil { - readErrors.Inc() - ctx.err = fmt.Errorf("cannot set read deadline: %w", err) - return false - } - } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index da5c5dac9b..871960e44f 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -3,7 +3,6 @@ package prometheus import ( "fmt" "io" - "net" "runtime" "sync" "time" @@ -37,20 +36,11 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e return ctx.Error() } -const flushTimeout = 3 * time.Second - func (ctx *streamContext) Read(r io.Reader) bool { readCalls.Inc() if ctx.err != nil { return false } - if c, ok := r.(net.Conn); ok { - if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil { - readErrors.Inc() - ctx.err = fmt.Errorf("cannot set read deadline: %w", err) - return false - } - } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF {