2023-06-21 15:31:28 +02:00
|
|
|
package jsonline
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"testing"
|
2024-06-17 12:13:18 +02:00
|
|
|
|
2024-06-17 22:28:15 +02:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
2023-06-21 15:31:28 +02:00
|
|
|
)
|
|
|
|
|
2024-06-17 22:28:15 +02:00
|
|
|
func TestProcessStreamInternal_Success(t *testing.T) {
|
2023-06-21 15:31:28 +02:00
|
|
|
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
|
|
|
|
t.Helper()
|
|
|
|
|
2024-10-30 14:13:56 +01:00
|
|
|
msgFields := []string{msgField}
|
2024-06-17 22:28:15 +02:00
|
|
|
tlp := &insertutils.TestLogMessageProcessor{}
|
2023-06-21 15:31:28 +02:00
|
|
|
r := bytes.NewBufferString(data)
|
2024-10-30 14:13:56 +01:00
|
|
|
if err := processStreamInternal(r, timeField, msgFields, tlp); err != nil {
|
2024-06-17 22:28:15 +02:00
|
|
|
t.Fatalf("unexpected error: %s", err)
|
2023-06-21 15:31:28 +02:00
|
|
|
}
|
|
|
|
|
2024-06-17 22:28:15 +02:00
|
|
|
if err := tlp.Verify(rowsExpected, timestampsExpected, resultExpected); err != nil {
|
|
|
|
t.Fatal(err)
|
2023-06-21 15:31:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
|
2024-09-26 12:43:16 +02:00
|
|
|
{"@timestamp":"2023-06-06T04:48:12.735+01:00","message":"baz"}
|
|
|
|
{"message":"xyz","@timestamp":"2023-06-06 04:48:13.735Z","x":"y"}
|
2023-06-21 15:31:28 +02:00
|
|
|
`
|
|
|
|
timeField := "@timestamp"
|
|
|
|
msgField := "message"
|
|
|
|
rowsExpected := 3
|
2024-09-26 12:43:16 +02:00
|
|
|
timestampsExpected := []int64{1686026891735000000, 1686023292735000000, 1686026893735000000}
|
2024-10-14 23:39:29 +02:00
|
|
|
resultExpected := `{"log.offset":"71770","log.file.path":"/var/log/auth.log","_msg":"foobar"}
|
|
|
|
{"_msg":"baz"}
|
|
|
|
{"_msg":"xyz","x":"y"}`
|
2023-06-21 15:31:28 +02:00
|
|
|
f(data, timeField, msgField, rowsExpected, timestampsExpected, resultExpected)
|
2024-10-30 14:59:03 +01:00
|
|
|
|
|
|
|
// Non-existing msgField
|
|
|
|
data = `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
|
|
|
|
{"@timestamp":"2023-06-06T04:48:12.735+01:00","message":"baz"}
|
|
|
|
`
|
|
|
|
timeField = "@timestamp"
|
|
|
|
msgField = "foobar"
|
|
|
|
rowsExpected = 2
|
|
|
|
timestampsExpected = []int64{1686026891735000000, 1686023292735000000}
|
|
|
|
resultExpected = `{"log.offset":"71770","log.file.path":"/var/log/auth.log","message":"foobar"}
|
|
|
|
{"message":"baz","aa":"bb"}`
|
|
|
|
f(data, timeField, msgField, rowsExpected, timestampsExpected, resultExpected)
|
2023-06-21 15:31:28 +02:00
|
|
|
}
|
2024-06-17 22:28:15 +02:00
|
|
|
|
|
|
|
func TestProcessStreamInternal_Failure(t *testing.T) {
|
|
|
|
f := func(data string) {
|
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
tlp := &insertutils.TestLogMessageProcessor{}
|
|
|
|
r := bytes.NewBufferString(data)
|
2024-10-30 14:13:56 +01:00
|
|
|
if err := processStreamInternal(r, "time", nil, tlp); err == nil {
|
2024-06-17 22:28:15 +02:00
|
|
|
t.Fatalf("expecting non-nil error")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// invalid json
|
|
|
|
f("foobar")
|
|
|
|
|
|
|
|
// invalid timestamp field
|
|
|
|
f(`{"time":"foobar"}`)
|
|
|
|
}
|