From c6b054784739159a64fb778a4a077151bee6cee0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 14 Aug 2020 20:13:15 +0300 Subject: [PATCH] lib/protoparser: prevent from busy loop on repeated timeout errors when reading streams of ingested data Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 --- lib/protoparser/common/lines_reader.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/lib/protoparser/common/lines_reader.go b/lib/protoparser/common/lines_reader.go index 260cdc4eb5..ab666a11ce 100644 --- a/lib/protoparser/common/lines_reader.go +++ b/lib/protoparser/common/lines_reader.go @@ -2,10 +2,13 @@ package common import ( "bytes" + "errors" "fmt" "io" + "net" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // The maximum size of a single line returned by ReadLinesBlock. @@ -19,6 +22,8 @@ const defaultBlockSize = 64 * 1024 // Trailing chars after the last newline are put into tailBuf. // // Returns (dstBuf, tailBuf). +// +// It is expected that read timeout on r exceeds 1 second. func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) { return ReadLinesBlockExt(r, dstBuf, tailBuf, maxLineSize) } @@ -30,6 +35,8 @@ func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) // Returns (dstBuf, tailBuf). // // maxLineLen limits the maximum length of a single line. +// +// It is expected that read timeout on r exceeds 1 second. func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) { if cap(dstBuf) < defaultBlockSize { dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize) @@ -37,6 +44,7 @@ func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]b dstBuf = append(dstBuf[:0], tailBuf...) tailBuf = tailBuf[:0] again: + startTime := fasttime.UnixTimestamp() n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)]) // Check for error only if zero bytes read from r, i.e. no forward progress made. // Otherwise process the read data. @@ -51,6 +59,12 @@ again: // This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 . return dstBuf, tailBuf, nil } + var ne net.Error + if errors.As(err, &ne) && ne.Timeout() && fasttime.UnixTimestamp() == startTime { + // Prevent from busy loop when timeout erorrs are returned immediately. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 . + return dstBuf, tailBuf, fmt.Errorf("detected busy loop with repeated timeout error") + } return dstBuf, tailBuf, err } dstBuf = dstBuf[:len(dstBuf)+n]