mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
lib/protoparser: prevent from busy loop on repeated timeout errors when reading streams of ingested data
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696
This commit is contained in:
parent
9d79a3a99d
commit
c6b0547847
@ -2,10 +2,13 @@ package common
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"net"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The maximum size of a single line returned by ReadLinesBlock.
|
// The maximum size of a single line returned by ReadLinesBlock.
|
||||||
@ -19,6 +22,8 @@ const defaultBlockSize = 64 * 1024
|
|||||||
// Trailing chars after the last newline are put into tailBuf.
|
// Trailing chars after the last newline are put into tailBuf.
|
||||||
//
|
//
|
||||||
// Returns (dstBuf, tailBuf).
|
// Returns (dstBuf, tailBuf).
|
||||||
|
//
|
||||||
|
// It is expected that read timeout on r exceeds 1 second.
|
||||||
func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) {
|
func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) {
|
||||||
return ReadLinesBlockExt(r, dstBuf, tailBuf, maxLineSize)
|
return ReadLinesBlockExt(r, dstBuf, tailBuf, maxLineSize)
|
||||||
}
|
}
|
||||||
@ -30,6 +35,8 @@ func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error)
|
|||||||
// Returns (dstBuf, tailBuf).
|
// Returns (dstBuf, tailBuf).
|
||||||
//
|
//
|
||||||
// maxLineLen limits the maximum length of a single line.
|
// maxLineLen limits the maximum length of a single line.
|
||||||
|
//
|
||||||
|
// It is expected that read timeout on r exceeds 1 second.
|
||||||
func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) {
|
func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) {
|
||||||
if cap(dstBuf) < defaultBlockSize {
|
if cap(dstBuf) < defaultBlockSize {
|
||||||
dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize)
|
dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize)
|
||||||
@ -37,6 +44,7 @@ func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]b
|
|||||||
dstBuf = append(dstBuf[:0], tailBuf...)
|
dstBuf = append(dstBuf[:0], tailBuf...)
|
||||||
tailBuf = tailBuf[:0]
|
tailBuf = tailBuf[:0]
|
||||||
again:
|
again:
|
||||||
|
startTime := fasttime.UnixTimestamp()
|
||||||
n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)])
|
n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)])
|
||||||
// Check for error only if zero bytes read from r, i.e. no forward progress made.
|
// Check for error only if zero bytes read from r, i.e. no forward progress made.
|
||||||
// Otherwise process the read data.
|
// Otherwise process the read data.
|
||||||
@ -51,6 +59,12 @@ again:
|
|||||||
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 .
|
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 .
|
||||||
return dstBuf, tailBuf, nil
|
return dstBuf, tailBuf, nil
|
||||||
}
|
}
|
||||||
|
var ne net.Error
|
||||||
|
if errors.As(err, &ne) && ne.Timeout() && fasttime.UnixTimestamp() == startTime {
|
||||||
|
// Prevent from busy loop when timeout erorrs are returned immediately.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 .
|
||||||
|
return dstBuf, tailBuf, fmt.Errorf("detected busy loop with repeated timeout error")
|
||||||
|
}
|
||||||
return dstBuf, tailBuf, err
|
return dstBuf, tailBuf, err
|
||||||
}
|
}
|
||||||
dstBuf = dstBuf[:len(dstBuf)+n]
|
dstBuf = dstBuf[:len(dstBuf)+n]
|
||||||
|
Loading…
Reference in New Issue
Block a user