diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index 1a15264207..ed72cd0cfb 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -9,7 +9,6 @@ import ( "net/http" "strconv" "strings" - "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" @@ -19,11 +18,11 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" - "github.com/valyala/fastjson" ) var ( @@ -240,28 +239,21 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, return false, fmt.Errorf(`missing log message after the "create" or "index" command`) } line = sc.Bytes() - pctx := getParserCtx() - if err := pctx.parseLogMessage(line); err != nil { + p := logjson.GetParser() + if err := p.ParseLogMessage(line); err != nil { return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } - timestamp, err := extractTimestampFromFields(timeField, pctx.fields) + timestamp, err := extractTimestampFromFields(timeField, p.Fields) if err != nil { return false, fmt.Errorf("cannot parse timestamp: %w", err) } - updateMessageFieldName(msgField, pctx.fields) - processLogMessage(timestamp, pctx.fields) - putParserCtx(pctx) + updateMessageFieldName(msgField, p.Fields) + processLogMessage(timestamp, p.Fields) + logjson.PutParser(p) return true, nil } -var parserPool fastjson.ParserPool - -var ( - invalidTimestampLogger = logger.WithThrottler("invalidTimestampLogger", 5*time.Second) - invalidJSONLineLogger = logger.WithThrottler("invalidJSONLineLogger", 5*time.Second) -) - func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) { for i := range fields { f := &fields[i] @@ -291,102 +283,6 @@ func updateMessageFieldName(msgField string, fields []logstorage.Field) { } } -type parserCtx struct { - p fastjson.Parser - buf []byte - prefixBuf []byte - fields []logstorage.Field -} - -func (pctx *parserCtx) reset() { - pctx.buf = pctx.buf[:0] - pctx.prefixBuf = pctx.prefixBuf[:0] - - fields := pctx.fields - for i := range fields { - lf := &fields[i] - lf.Name = "" - lf.Value = "" - } - pctx.fields = fields[:0] -} - -func getParserCtx() *parserCtx { - v := parserCtxPool.Get() - if v == nil { - return &parserCtx{} - } - return v.(*parserCtx) -} - -func putParserCtx(pctx *parserCtx) { - pctx.reset() - parserCtxPool.Put(pctx) -} - -var parserCtxPool sync.Pool - -func (pctx *parserCtx) parseLogMessage(msg []byte) error { - s := bytesutil.ToUnsafeString(msg) - v, err := pctx.p.Parse(s) - if err != nil { - return fmt.Errorf("cannot parse json: %w", err) - } - if t := v.Type(); t != fastjson.TypeObject { - return fmt.Errorf("expecting json dictionary; got %s", t) - } - pctx.reset() - pctx.fields, pctx.buf, pctx.prefixBuf = appendLogFields(pctx.fields, pctx.buf, pctx.prefixBuf, v) - return nil -} - -func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) { - o := v.GetObject() - o.Visit(func(k []byte, v *fastjson.Value) { - t := v.Type() - switch t { - case fastjson.TypeNull: - // Skip nulls - case fastjson.TypeObject: - // Flatten nested JSON objects. - // For example, {"foo":{"bar":"baz"}} is converted to {"foo.bar":"baz"} - prefixLen := len(prefixBuf) - prefixBuf = append(prefixBuf, k...) - prefixBuf = append(prefixBuf, '.') - dst, dstBuf, prefixBuf = appendLogFields(dst, dstBuf, prefixBuf, v) - prefixBuf = prefixBuf[:prefixLen] - case fastjson.TypeArray, fastjson.TypeNumber, fastjson.TypeTrue, fastjson.TypeFalse: - // Convert JSON arrays, numbers, true and false values to their string representation - dstBufLen := len(dstBuf) - dstBuf = v.MarshalTo(dstBuf) - value := dstBuf[dstBufLen:] - dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value) - case fastjson.TypeString: - // Decode JSON strings - dstBufLen := len(dstBuf) - dstBuf = append(dstBuf, v.GetStringBytes()...) - value := dstBuf[dstBufLen:] - dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value) - default: - logger.Panicf("BUG: unexpected JSON type: %s", t) - } - }) - return dst, dstBuf, prefixBuf -} - -func appendLogField(dst []logstorage.Field, dstBuf, prefixBuf, k, value []byte) ([]logstorage.Field, []byte) { - dstBufLen := len(dstBuf) - dstBuf = append(dstBuf, prefixBuf...) - dstBuf = append(dstBuf, k...) - name := dstBuf[dstBufLen:] - - dst = append(dst, logstorage.Field{ - Name: bytesutil.ToUnsafeString(name), - Value: bytesutil.ToUnsafeString(value), - }) - return dst, dstBuf -} - func parseElasticsearchTimestamp(s string) (int64, error) { if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' { // Try parsing timestamp in milliseconds diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 05d6f88301..72dacb3c76 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -4,12 +4,18 @@ import ( "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) +var ( + maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+ + "if their summary size doesn't exceed this value; otherwise query results are streamed in the response without sorting; "+ + "too big value for this flag may result in high memory usage, since the sorting is performed in memory") +) + // ProcessQueryRequest handles /select/logsql/query request func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}) { // Extract tenantID @@ -27,9 +33,8 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s } w.Header().Set("Content-Type", "application/stream+json; charset=utf-8") - bw := bufferedwriter.Get(w) - defer bufferedwriter.Put(bw) - + sw := getSortWriter() + sw.Init(w, maxSortBufferSize.IntN()) tenantIDs := []logstorage.TenantID{tenantID} vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) { if len(columns) == 0 { @@ -41,13 +46,11 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s for rowIdx := 0; rowIdx < rowsCount; rowIdx++ { WriteJSONRow(bb, columns, rowIdx) } - // Do not check for error here, since the only valid error is when the client - // closes the connection during Write() call. There is no need in logging this error, - // since it may be too verbose and it doesn't give any actionable info. - _, _ = bw.Write(bb.B) + sw.MustWrite(bb.B) blockResultPool.Put(bb) }) - _ = bw.Flush() + sw.FinalFlush() + putSortWriter(sw) } var blockResultPool bytesutil.ByteBufferPool diff --git a/app/vlselect/logsql/query_response.qtpl b/app/vlselect/logsql/query_response.qtpl index c98b0c9bdf..06205615eb 100644 --- a/app/vlselect/logsql/query_response.qtpl +++ b/app/vlselect/logsql/query_response.qtpl @@ -17,4 +17,25 @@ }{% newline %} {% endfunc %} +// JSONRows prints formatted rows +{% func JSONRows(rows [][]logstorage.Field) %} + {% if len(rows) == 0 %} + {% return %} + {% endif %} + {% for _, fields := range rows %} + { + {% if len(fields) > 0 %} + {% code + f := fields[0] + fields = fields[1:] + %} + {%q= f.Name %}:{%q= f.Value %} + {% for _, f := range fields %} + ,{%q= f.Name %}:{%q= f.Value %} + {% endfor %} + {% endif %} + }{% newline %} + {% endfor %} +{% endfunc %} + {% endstripspace %} diff --git a/app/vlselect/logsql/query_response.qtpl.go b/app/vlselect/logsql/query_response.qtpl.go index d3d6cf1c18..dd3458c21b 100644 --- a/app/vlselect/logsql/query_response.qtpl.go +++ b/app/vlselect/logsql/query_response.qtpl.go @@ -88,3 +88,79 @@ func JSONRow(columns []logstorage.BlockColumn, rowIdx int) string { return qs422016 //line app/vlselect/logsql/query_response.qtpl:18 } + +// JSONRows prints formatted rows + +//line app/vlselect/logsql/query_response.qtpl:21 +func StreamJSONRows(qw422016 *qt422016.Writer, rows [][]logstorage.Field) { +//line app/vlselect/logsql/query_response.qtpl:22 + if len(rows) == 0 { +//line app/vlselect/logsql/query_response.qtpl:23 + return +//line app/vlselect/logsql/query_response.qtpl:24 + } +//line app/vlselect/logsql/query_response.qtpl:25 + for _, fields := range rows { +//line app/vlselect/logsql/query_response.qtpl:25 + qw422016.N().S(`{`) +//line app/vlselect/logsql/query_response.qtpl:27 + if len(fields) > 0 { +//line app/vlselect/logsql/query_response.qtpl:29 + f := fields[0] + fields = fields[1:] + +//line app/vlselect/logsql/query_response.qtpl:32 + qw422016.N().Q(f.Name) +//line app/vlselect/logsql/query_response.qtpl:32 + qw422016.N().S(`:`) +//line app/vlselect/logsql/query_response.qtpl:32 + qw422016.N().Q(f.Value) +//line app/vlselect/logsql/query_response.qtpl:33 + for _, f := range fields { +//line app/vlselect/logsql/query_response.qtpl:33 + qw422016.N().S(`,`) +//line app/vlselect/logsql/query_response.qtpl:34 + qw422016.N().Q(f.Name) +//line app/vlselect/logsql/query_response.qtpl:34 + qw422016.N().S(`:`) +//line app/vlselect/logsql/query_response.qtpl:34 + qw422016.N().Q(f.Value) +//line app/vlselect/logsql/query_response.qtpl:35 + } +//line app/vlselect/logsql/query_response.qtpl:36 + } +//line app/vlselect/logsql/query_response.qtpl:36 + qw422016.N().S(`}`) +//line app/vlselect/logsql/query_response.qtpl:37 + qw422016.N().S(` +`) +//line app/vlselect/logsql/query_response.qtpl:38 + } +//line app/vlselect/logsql/query_response.qtpl:39 +} + +//line app/vlselect/logsql/query_response.qtpl:39 +func WriteJSONRows(qq422016 qtio422016.Writer, rows [][]logstorage.Field) { +//line app/vlselect/logsql/query_response.qtpl:39 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/query_response.qtpl:39 + StreamJSONRows(qw422016, rows) +//line app/vlselect/logsql/query_response.qtpl:39 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/query_response.qtpl:39 +} + +//line app/vlselect/logsql/query_response.qtpl:39 +func JSONRows(rows [][]logstorage.Field) string { +//line app/vlselect/logsql/query_response.qtpl:39 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/query_response.qtpl:39 + WriteJSONRows(qb422016, rows) +//line app/vlselect/logsql/query_response.qtpl:39 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/query_response.qtpl:39 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/query_response.qtpl:39 + return qs422016 +//line app/vlselect/logsql/query_response.qtpl:39 +} diff --git a/app/vlselect/logsql/sort_writer.go b/app/vlselect/logsql/sort_writer.go new file mode 100644 index 0000000000..dcb3b8893b --- /dev/null +++ b/app/vlselect/logsql/sort_writer.go @@ -0,0 +1,222 @@ +package logsql + +import ( + "bytes" + "io" + "sort" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" +) + +func getSortWriter() *sortWriter { + v := sortWriterPool.Get() + if v == nil { + return &sortWriter{} + } + return v.(*sortWriter) +} + +func putSortWriter(sw *sortWriter) { + sw.reset() + sortWriterPool.Put(sw) +} + +var sortWriterPool sync.Pool + +// sortWriter expects JSON line stream to be written to it. +// +// It buffers the incoming data until its size reaches maxBufLen. +// Then it streams the buffered data and all the incoming data to w. +// +// The FinalFlush() must be called when all the data is written. +// If the buf isn't empty at FinalFlush() call, then the buffered data +// is sorted by _time field. +type sortWriter struct { + mu sync.Mutex + w io.Writer + maxBufLen int + buf []byte + bufFlushed bool + + hasErr bool +} + +func (sw *sortWriter) reset() { + sw.w = nil + sw.maxBufLen = 0 + sw.buf = sw.buf[:0] + sw.bufFlushed = false + sw.hasErr = false +} + +func (sw *sortWriter) Init(w io.Writer, maxBufLen int) { + sw.reset() + + sw.w = w + sw.maxBufLen = maxBufLen +} + +func (sw *sortWriter) MustWrite(p []byte) { + sw.mu.Lock() + defer sw.mu.Unlock() + + if sw.hasErr { + return + } + + if sw.bufFlushed { + if _, err := sw.w.Write(p); err != nil { + sw.hasErr = true + } + return + } + if len(sw.buf)+len(p) < sw.maxBufLen { + sw.buf = append(sw.buf, p...) + return + } + sw.bufFlushed = true + if len(sw.buf) > 0 { + if _, err := sw.w.Write(sw.buf); err != nil { + sw.hasErr = true + return + } + sw.buf = sw.buf[:0] + } + if _, err := sw.w.Write(p); err != nil { + sw.hasErr = true + } +} + +func (sw *sortWriter) FinalFlush() { + if sw.hasErr || sw.bufFlushed { + return + } + rs := getRowsSorter() + rs.parseRows(sw.buf) + rs.sort() + WriteJSONRows(sw.w, rs.rows) + putRowsSorter(rs) +} + +func getRowsSorter() *rowsSorter { + v := rowsSorterPool.Get() + if v == nil { + return &rowsSorter{} + } + return v.(*rowsSorter) +} + +func putRowsSorter(rs *rowsSorter) { + rs.reset() + rowsSorterPool.Put(rs) +} + +var rowsSorterPool sync.Pool + +type rowsSorter struct { + buf []byte + fieldsBuf []logstorage.Field + rows [][]logstorage.Field + times []string +} + +func (rs *rowsSorter) reset() { + rs.buf = rs.buf[:0] + + fieldsBuf := rs.fieldsBuf + for i := range fieldsBuf { + fieldsBuf[i].Reset() + } + rs.fieldsBuf = fieldsBuf[:0] + + rows := rs.rows + for i := range rows { + rows[i] = nil + } + rs.rows = rows[:0] + + times := rs.times + for i := range times { + times[i] = "" + } + rs.times = times[:0] +} + +func (rs *rowsSorter) parseRows(src []byte) { + rs.reset() + + buf := rs.buf + fieldsBuf := rs.fieldsBuf + rows := rs.rows + times := rs.times + + p := logjson.GetParser() + for len(src) > 0 { + var line []byte + n := bytes.IndexByte(src, '\n') + if n < 0 { + line = src + src = nil + } else { + line = src[:n] + src = src[n+1:] + } + if len(line) == 0 { + continue + } + + p.ParseLogMessage(line) + + timeValue := "" + fieldsBufLen := len(fieldsBuf) + for _, f := range p.Fields { + bufLen := len(buf) + buf = append(buf, f.Name...) + name := bytesutil.ToUnsafeString(buf[bufLen:]) + + bufLen = len(buf) + buf = append(buf, f.Value...) + value := bytesutil.ToUnsafeString(buf[bufLen:]) + + fieldsBuf = append(fieldsBuf, logstorage.Field{ + Name: name, + Value: value, + }) + + if name == "_time" { + timeValue = value + } + } + rows = append(rows, fieldsBuf[fieldsBufLen:]) + times = append(times, timeValue) + } + logjson.PutParser(p) + + rs.buf = buf + rs.fieldsBuf = fieldsBuf + rs.rows = rows + rs.times = times +} + +func (rs *rowsSorter) Len() int { + return len(rs.rows) +} + +func (rs *rowsSorter) Less(i, j int) bool { + times := rs.times + return times[i] < times[j] +} + +func (rs *rowsSorter) Swap(i, j int) { + times := rs.times + rows := rs.rows + times[i], times[j] = times[j], times[i] + rows[i], rows[j] = rows[j], rows[i] +} + +func (rs *rowsSorter) sort() { + sort.Sort(rs) +} diff --git a/app/vlselect/logsql/sort_writer_test.go b/app/vlselect/logsql/sort_writer_test.go new file mode 100644 index 0000000000..9313bed85d --- /dev/null +++ b/app/vlselect/logsql/sort_writer_test.go @@ -0,0 +1,39 @@ +package logsql + +import ( + "bytes" + "strings" + "testing" +) + +func TestSortWriter(t *testing.T) { + f := func(maxBufLen int, data string, expectedResult string) { + t.Helper() + + var bb bytes.Buffer + sw := getSortWriter() + sw.Init(&bb, maxBufLen) + + for _, s := range strings.Split(data, "\n") { + sw.MustWrite([]byte(s + "\n")) + } + sw.FinalFlush() + putSortWriter(sw) + + result := bb.String() + if result != expectedResult { + t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, expectedResult) + } + } + + f(100, "", "") + f(100, "{}", "{}\n") + + data := `{"_time":"def","_msg":"xxx"} +{"_time":"abc","_msg":"foo"}` + resultExpected := `{"_time":"abc","_msg":"foo"} +{"_time":"def","_msg":"xxx"} +` + f(100, data, resultExpected) + f(10, data, data+"\n") +} diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index b40476d017..fc0c67f92c 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1058,8 +1058,9 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo ## Sorting -By default VictoriaLogs doesn't sort the returned results because of performance and efficiency concerns -described [here](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line). +By default VictoriaLogs sorts the returned results by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) +if their total size doesn't exceed `-select.maxSortBufferSize` command-line value (by default it is set to one megabytes). +Otherwise sorting is skipped because of performance and efficiency concerns described [here](https://docs.victoriametrics.com/VictoriaLogs/querying/). It is possible to sort the [selected log entries](#filters) at client side with `sort` Unix command according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line). diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 23586354c4..ec5ada43bb 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -31,7 +31,9 @@ The response can be interrupted at any time by closing the connection to Victori This allows post-processing the returned lines at the client side with the usual Unix commands such as `grep`, `jq`, `less`, `head`, etc. See [these docs](#command-line) for more details. -The returned lines aren't sorted by default, since sorting disables the ability to send matching log entries to response stream as soon as they are found. +The returned lines are sorted by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) +if their total size doesn't exceed `-select.maxSortBufferSize` command-line flag value (by default it is set to one megabyte). +Otherwise the returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found. Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sorting) or at client side with the usual `sort` command according to [these docs](#command-line). diff --git a/lib/logjson/parser.go b/lib/logjson/parser.go new file mode 100644 index 0000000000..6bc8919d5f --- /dev/null +++ b/lib/logjson/parser.go @@ -0,0 +1,132 @@ +package logjson + +import ( + "fmt" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/valyala/fastjson" +) + +// Parser parses a single JSON log message into Fields. +// +// See https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model +// +// Use GetParser() for obtaining the parser. +type Parser struct { + // Fields contains the parsed JSON line after Parse() call + // + // The Fields are valid until the next call to ParseLogMessage() + // or until the parser is returned to the pool with PutParser() call. + Fields []logstorage.Field + + // p is used for fast JSON parsing + p fastjson.Parser + + // buf is used for holding the backing data for Fields + buf []byte + + // prefixBuf is used for holding the current key prefix + // when it is composed from multiple keys. + prefixBuf []byte +} + +func (p *Parser) reset() { + fields := p.Fields + for i := range fields { + lf := &fields[i] + lf.Name = "" + lf.Value = "" + } + p.Fields = fields[:0] + + p.buf = p.buf[:0] + p.prefixBuf = p.prefixBuf[:0] +} + +// GetParser returns Parser ready to parse JSON lines. +// +// Return the parser to the pool when it is no longer needed by calling PutParser(). +func GetParser() *Parser { + v := parserPool.Get() + if v == nil { + return &Parser{} + } + return v.(*Parser) +} + +// PutParser returns the parser to the pool. +// +// The parser cannot be used after returning to the pool. +func PutParser(p *Parser) { + p.reset() + parserPool.Put(p) +} + +var parserPool sync.Pool + +// ParseLogMessage parses the given JSON log message msg into p.Fields. +// +// The p.Fields remains valid until the next call to ParseLogMessage() or PutParser(). +func (p *Parser) ParseLogMessage(msg []byte) error { + s := bytesutil.ToUnsafeString(msg) + v, err := p.p.Parse(s) + if err != nil { + return fmt.Errorf("cannot parse json: %w", err) + } + if t := v.Type(); t != fastjson.TypeObject { + return fmt.Errorf("expecting json dictionary; got %s", t) + } + p.reset() + p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v) + return nil +} + +func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) { + o := v.GetObject() + o.Visit(func(k []byte, v *fastjson.Value) { + t := v.Type() + switch t { + case fastjson.TypeNull: + // Skip nulls + case fastjson.TypeObject: + // Flatten nested JSON objects. + // For example, {"foo":{"bar":"baz"}} is converted to {"foo.bar":"baz"} + prefixLen := len(prefixBuf) + prefixBuf = append(prefixBuf, k...) + prefixBuf = append(prefixBuf, '.') + dst, dstBuf, prefixBuf = appendLogFields(dst, dstBuf, prefixBuf, v) + prefixBuf = prefixBuf[:prefixLen] + case fastjson.TypeArray, fastjson.TypeNumber, fastjson.TypeTrue, fastjson.TypeFalse: + // Convert JSON arrays, numbers, true and false values to their string representation + dstBufLen := len(dstBuf) + dstBuf = v.MarshalTo(dstBuf) + value := dstBuf[dstBufLen:] + dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value) + case fastjson.TypeString: + // Decode JSON strings + dstBufLen := len(dstBuf) + dstBuf = append(dstBuf, v.GetStringBytes()...) + value := dstBuf[dstBufLen:] + dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value) + default: + logger.Panicf("BUG: unexpected JSON type: %s", t) + } + }) + return dst, dstBuf, prefixBuf +} + +func appendLogField(dst []logstorage.Field, dstBuf, prefixBuf, k, value []byte) ([]logstorage.Field, []byte) { + dstBufLen := len(dstBuf) + dstBuf = append(dstBuf, prefixBuf...) + dstBuf = append(dstBuf, k...) + name := dstBuf[dstBufLen:] + + dst = append(dst, logstorage.Field{ + Name: bytesutil.ToUnsafeString(name), + Value: bytesutil.ToUnsafeString(value), + }) + return dst, dstBuf +} diff --git a/lib/logjson/parser_test.go b/lib/logjson/parser_test.go new file mode 100644 index 0000000000..ee4f40faf1 --- /dev/null +++ b/lib/logjson/parser_test.go @@ -0,0 +1,71 @@ +package logjson + +import ( + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" +) + +func TestParserFailure(t *testing.T) { + f := func(data string) { + t.Helper() + + p := GetParser() + err := p.ParseLogMessage([]byte(data)) + if err == nil { + t.Fatalf("expecting non-nil error") + } + PutParser(p) + } + f("") + f("{foo") + f("[1,2,3]") + f(`{"foo",}`) +} + +func TestParserSuccess(t *testing.T) { + f := func(data string, fieldsExpected []logstorage.Field) { + t.Helper() + + p := GetParser() + err := p.ParseLogMessage([]byte(data)) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !reflect.DeepEqual(p.Fields, fieldsExpected) { + t.Fatalf("unexpected fields;\ngot\n%s\nwant\n%s", p.Fields, fieldsExpected) + } + PutParser(p) + } + + f("{}", nil) + f(`{"foo":"bar"}`, []logstorage.Field{ + { + Name: "foo", + Value: "bar", + }, + }) + f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, []logstorage.Field{ + { + Name: "foo.bar", + Value: "baz", + }, + { + Name: "a", + Value: "1", + }, + { + Name: "b", + Value: "true", + }, + { + Name: "c", + Value: "[1,2]", + }, + { + Name: "d", + Value: "false", + }, + }) +}