mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-04 16:51:11 +01:00
app/vminsert: fix inifinite loop when reading two lines without newline in the end
Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/82
This commit is contained in:
parent
7ce1f73ada
commit
1f7d9a213a
@ -24,6 +24,7 @@ func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error)
|
|||||||
dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize)
|
dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize)
|
||||||
}
|
}
|
||||||
dstBuf = append(dstBuf[:0], tailBuf...)
|
dstBuf = append(dstBuf[:0], tailBuf...)
|
||||||
|
tailBuf = tailBuf[:0]
|
||||||
again:
|
again:
|
||||||
n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)])
|
n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)])
|
||||||
// Check for error only if zero bytes read from r, i.e. no forward progress made.
|
// Check for error only if zero bytes read from r, i.e. no forward progress made.
|
||||||
@ -34,7 +35,7 @@ again:
|
|||||||
}
|
}
|
||||||
if err == io.EOF && len(dstBuf) > 0 {
|
if err == io.EOF && len(dstBuf) > 0 {
|
||||||
// Missing newline in the end of stream. This is OK,
|
// Missing newline in the end of stream. This is OK,
|
||||||
/// so suppress io.EOF for now. It will be returned during the next
|
// so suppress io.EOF for now. It will be returned during the next
|
||||||
// call to ReadLinesBlock.
|
// call to ReadLinesBlock.
|
||||||
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 .
|
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 .
|
||||||
return dstBuf, tailBuf, nil
|
return dstBuf, tailBuf, nil
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -40,7 +41,73 @@ func (fr *failureReader) Read(p []byte) (int, error) {
|
|||||||
return 0, fmt.Errorf("some error")
|
return 0, fmt.Errorf("some error")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReadLineBlockSuccessSingleByteReader(t *testing.T) {
|
func TestReadLinesBlockMultiLinesSingleByteReader(t *testing.T) {
|
||||||
|
f := func(s string, linesExpected []string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
r := &singleByteReader{
|
||||||
|
b: []byte(s),
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
var dstBuf, tailBuf []byte
|
||||||
|
var lines []string
|
||||||
|
for {
|
||||||
|
dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
t.Fatalf("unexpected error in ReadLinesBlock(%q): %s", s, err)
|
||||||
|
}
|
||||||
|
lines = append(lines, string(dstBuf))
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(lines, linesExpected) {
|
||||||
|
t.Fatalf("unexpected lines after reading %q: got %q; want %q", s, lines, linesExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f("", nil)
|
||||||
|
f("foo", []string{"foo"})
|
||||||
|
f("foo\n", []string{"foo"})
|
||||||
|
f("foo\nbar", []string{"foo", "bar"})
|
||||||
|
f("\nfoo\nbar", []string{"", "foo", "bar"})
|
||||||
|
f("\nfoo\nbar\n", []string{"", "foo", "bar"})
|
||||||
|
f("\nfoo\nbar\n\n", []string{"", "foo", "bar", ""})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadLinesBlockMultiLinesBytesBuffer(t *testing.T) {
|
||||||
|
f := func(s string, linesExpected []string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
r := bytes.NewBufferString(s)
|
||||||
|
var err error
|
||||||
|
var dstBuf, tailBuf []byte
|
||||||
|
var lines []string
|
||||||
|
for {
|
||||||
|
dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
t.Fatalf("unexpected error in ReadLinesBlock(%q): %s", s, err)
|
||||||
|
}
|
||||||
|
lines = append(lines, string(dstBuf))
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(lines, linesExpected) {
|
||||||
|
t.Fatalf("unexpected lines after reading %q: got %q; want %q", s, lines, linesExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f("", nil)
|
||||||
|
f("foo", []string{"foo"})
|
||||||
|
f("foo\n", []string{"foo"})
|
||||||
|
f("foo\nbar", []string{"foo", "bar"})
|
||||||
|
f("\nfoo\nbar", []string{"\nfoo", "bar"})
|
||||||
|
f("\nfoo\nbar\n", []string{"\nfoo\nbar"})
|
||||||
|
f("\nfoo\nbar\n\n", []string{"\nfoo\nbar\n"})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadLinesBlockSuccessSingleByteReader(t *testing.T) {
|
||||||
f := func(s, dstBufExpected, tailBufExpected string) {
|
f := func(s, dstBufExpected, tailBufExpected string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
@ -87,7 +154,7 @@ func TestReadLineBlockSuccessSingleByteReader(t *testing.T) {
|
|||||||
f(string(b), string(b[:maxLineSize]), "")
|
f(string(b), string(b[:maxLineSize]), "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReadLineBlockSuccessBytesBuffer(t *testing.T) {
|
func TestReadLinesBlockSuccessBytesBuffer(t *testing.T) {
|
||||||
f := func(s, dstBufExpected, tailBufExpected string) {
|
f := func(s, dstBufExpected, tailBufExpected string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
@ -348,4 +348,30 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
// No newline after the second line.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/82
|
||||||
|
f("foo,tag=xyz field=1.23 48934\n"+
|
||||||
|
"bar x=-1i", &Rows{
|
||||||
|
Rows: []Row{
|
||||||
|
{
|
||||||
|
Measurement: "foo",
|
||||||
|
Tags: []Tag{{
|
||||||
|
Key: "tag",
|
||||||
|
Value: "xyz",
|
||||||
|
}},
|
||||||
|
Fields: []Field{{
|
||||||
|
Key: "field",
|
||||||
|
Value: 1.23,
|
||||||
|
}},
|
||||||
|
Timestamp: 48934,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Measurement: "bar",
|
||||||
|
Fields: []Field{{
|
||||||
|
Key: "x",
|
||||||
|
Value: -1,
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user