VictoriaMetrics/lib/logstorage/json_parser.go
2024-05-22 21:01:28 +02:00

142 lines
3.9 KiB
Go

package logstorage
import (
"fmt"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/valyala/fastjson"
)
// JSONParser parses a single JSON log message into Fields.
//
// See https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model
//
// Use GetParser() for obtaining the parser.
type JSONParser struct {
// Fields contains the parsed JSON line after Parse() call
//
// The Fields are valid until the next call to ParseLogMessage()
// or until the parser is returned to the pool with PutParser() call.
Fields []Field
// p is used for fast JSON parsing
p fastjson.Parser
// buf is used for holding the backing data for Fields
buf []byte
// prefixBuf is used for holding the current key prefix
// when it is composed from multiple keys.
prefixBuf []byte
}
func (p *JSONParser) reset() {
clear(p.Fields)
p.Fields = p.Fields[:0]
p.buf = p.buf[:0]
}
// GetJSONParser returns JSONParser ready to parse JSON lines.
//
// Return the parser to the pool when it is no longer needed by calling PutJSONParser().
func GetJSONParser() *JSONParser {
v := parserPool.Get()
if v == nil {
return &JSONParser{}
}
return v.(*JSONParser)
}
// PutJSONParser returns the parser to the pool.
//
// The parser cannot be used after returning to the pool.
func PutJSONParser(p *JSONParser) {
p.reset()
parserPool.Put(p)
}
var parserPool sync.Pool
// ParseLogMessage parses the given JSON log message msg into p.Fields.
//
// The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser().
func (p *JSONParser) ParseLogMessage(msg []byte) error {
p.reset()
msgStr := bytesutil.ToUnsafeString(msg)
v, err := p.p.Parse(msgStr)
if err != nil {
return fmt.Errorf("cannot parse json: %w", err)
}
if t := v.Type(); t != fastjson.TypeObject {
return fmt.Errorf("expecting json dictionary; got %s", t)
}
p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v)
return nil
}
// RenameField renames field with the oldName to newName in p.Fields
func (p *JSONParser) RenameField(oldName, newName string) {
if oldName == "" {
return
}
fields := p.Fields
for i := range fields {
f := &fields[i]
if f.Name == oldName {
f.Name = newName
return
}
}
}
func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]Field, []byte, []byte) {
o := v.GetObject()
o.Visit(func(k []byte, v *fastjson.Value) {
t := v.Type()
switch t {
case fastjson.TypeNull:
// Skip nulls
case fastjson.TypeObject:
// Flatten nested JSON objects.
// For example, {"foo":{"bar":"baz"}} is converted to {"foo.bar":"baz"}
prefixLen := len(prefixBuf)
prefixBuf = append(prefixBuf, k...)
prefixBuf = append(prefixBuf, '.')
dst, dstBuf, prefixBuf = appendLogFields(dst, dstBuf, prefixBuf, v)
prefixBuf = prefixBuf[:prefixLen]
case fastjson.TypeArray, fastjson.TypeNumber, fastjson.TypeTrue, fastjson.TypeFalse:
// Convert JSON arrays, numbers, true and false values to their string representation
dstBufLen := len(dstBuf)
dstBuf = v.MarshalTo(dstBuf)
value := dstBuf[dstBufLen:]
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
case fastjson.TypeString:
// Decode JSON strings
dstBufLen := len(dstBuf)
dstBuf = append(dstBuf, v.GetStringBytes()...)
value := dstBuf[dstBufLen:]
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
default:
logger.Panicf("BUG: unexpected JSON type: %s", t)
}
})
return dst, dstBuf, prefixBuf
}
func appendLogField(dst []Field, dstBuf, prefixBuf, k, value []byte) ([]Field, []byte) {
dstBufLen := len(dstBuf)
dstBuf = append(dstBuf, prefixBuf...)
dstBuf = append(dstBuf, k...)
name := dstBuf[dstBufLen:]
dst = append(dst, Field{
Name: bytesutil.ToUnsafeString(name),
Value: bytesutil.ToUnsafeString(value),
})
return dst, dstBuf
}