lib/protoparser: move common code for detecting timeouts to ReadLinesBlockExt

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696
This commit is contained in:
Aliaksandr Valialkin 2020-08-14 20:39:43 +03:00
parent c6b0547847
commit 3efa4e4e1c
4 changed files with 20 additions and 37 deletions

View File

@ -60,11 +60,15 @@ again:
return dstBuf, tailBuf, nil return dstBuf, tailBuf, nil
} }
var ne net.Error var ne net.Error
if errors.As(err, &ne) && ne.Timeout() && fasttime.UnixTimestamp() == startTime { if errors.As(err, &ne) && ne.Timeout() {
if fasttime.UnixTimestamp() == startTime {
// Prevent from busy loop when timeout erorrs are returned immediately. // Prevent from busy loop when timeout erorrs are returned immediately.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 . // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 .
return dstBuf, tailBuf, fmt.Errorf("detected busy loop with repeated timeout error") return dstBuf, tailBuf, fmt.Errorf("detected busy loop with repeated timeout error")
} }
// Return empty results for an ordinary timeout.
return dstBuf, tailBuf, nil
}
return dstBuf, tailBuf, err return dstBuf, tailBuf, err
} }
dstBuf = dstBuf[:len(dstBuf)+n] dstBuf = dstBuf[:len(dstBuf)+n]

View File

@ -1,7 +1,6 @@
package graphite package graphite
import ( import (
"errors"
"flag" "flag"
"fmt" "fmt"
"io" "io"
@ -54,18 +53,12 @@ func (ctx *streamContext) Read(r io.Reader) bool {
} }
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil { if ctx.err != nil {
var ne net.Error
if errors.As(ctx.err, &ne) && ne.Timeout() {
// Flush the read data on timeout and try reading again.
ctx.err = nil
} else {
if ctx.err != io.EOF { if ctx.err != io.EOF {
readErrors.Inc() readErrors.Inc()
ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %w", ctx.err) ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %w", ctx.err)
} }
return false return false
} }
}
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows)) rowsRead.Add(len(ctx.Rows.Rows))

View File

@ -1,7 +1,6 @@
package opentsdb package opentsdb
import ( import (
"errors"
"flag" "flag"
"fmt" "fmt"
"io" "io"
@ -53,18 +52,12 @@ func (ctx *streamContext) Read(r io.Reader) bool {
} }
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil { if ctx.err != nil {
var ne net.Error
if errors.As(ctx.err, &ne) && ne.Timeout() {
// Flush the read data on timeout and try reading again.
ctx.err = nil
} else {
if ctx.err != io.EOF { if ctx.err != io.EOF {
readErrors.Inc() readErrors.Inc()
ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %w", ctx.err) ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %w", ctx.err)
} }
return false return false
} }
}
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows)) rowsRead.Add(len(ctx.Rows.Rows))

View File

@ -1,7 +1,6 @@
package prometheus package prometheus
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -54,18 +53,12 @@ func (ctx *streamContext) Read(r io.Reader) bool {
} }
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil { if ctx.err != nil {
var ne net.Error
if errors.As(ctx.err, &ne) && ne.Timeout() {
// Flush the read data on timeout and try reading again.
ctx.err = nil
} else {
if ctx.err != io.EOF { if ctx.err != io.EOF {
readErrors.Inc() readErrors.Inc()
ctx.err = fmt.Errorf("cannot read Prometheus exposition data: %w", ctx.err) ctx.err = fmt.Errorf("cannot read Prometheus exposition data: %w", ctx.err)
} }
return false return false
} }
}
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows)) rowsRead.Add(len(ctx.Rows.Rows))