mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-12 12:46:23 +01:00
app/vlselect/logsql: sort query results by _time if their summary size doesnt exceed -select.maxSortBufferSize
This commit is contained in:
parent
4e99bf8c9e
commit
efee71986f
@ -9,7 +9,6 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||
@ -19,11 +18,11 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/fastjson"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -240,28 +239,21 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||
return false, fmt.Errorf(`missing log message after the "create" or "index" command`)
|
||||
}
|
||||
line = sc.Bytes()
|
||||
pctx := getParserCtx()
|
||||
if err := pctx.parseLogMessage(line); err != nil {
|
||||
p := logjson.GetParser()
|
||||
if err := p.ParseLogMessage(line); err != nil {
|
||||
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
|
||||
}
|
||||
|
||||
timestamp, err := extractTimestampFromFields(timeField, pctx.fields)
|
||||
timestamp, err := extractTimestampFromFields(timeField, p.Fields)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("cannot parse timestamp: %w", err)
|
||||
}
|
||||
updateMessageFieldName(msgField, pctx.fields)
|
||||
processLogMessage(timestamp, pctx.fields)
|
||||
putParserCtx(pctx)
|
||||
updateMessageFieldName(msgField, p.Fields)
|
||||
processLogMessage(timestamp, p.Fields)
|
||||
logjson.PutParser(p)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var parserPool fastjson.ParserPool
|
||||
|
||||
var (
|
||||
invalidTimestampLogger = logger.WithThrottler("invalidTimestampLogger", 5*time.Second)
|
||||
invalidJSONLineLogger = logger.WithThrottler("invalidJSONLineLogger", 5*time.Second)
|
||||
)
|
||||
|
||||
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
|
||||
for i := range fields {
|
||||
f := &fields[i]
|
||||
@ -291,102 +283,6 @@ func updateMessageFieldName(msgField string, fields []logstorage.Field) {
|
||||
}
|
||||
}
|
||||
|
||||
type parserCtx struct {
|
||||
p fastjson.Parser
|
||||
buf []byte
|
||||
prefixBuf []byte
|
||||
fields []logstorage.Field
|
||||
}
|
||||
|
||||
func (pctx *parserCtx) reset() {
|
||||
pctx.buf = pctx.buf[:0]
|
||||
pctx.prefixBuf = pctx.prefixBuf[:0]
|
||||
|
||||
fields := pctx.fields
|
||||
for i := range fields {
|
||||
lf := &fields[i]
|
||||
lf.Name = ""
|
||||
lf.Value = ""
|
||||
}
|
||||
pctx.fields = fields[:0]
|
||||
}
|
||||
|
||||
func getParserCtx() *parserCtx {
|
||||
v := parserCtxPool.Get()
|
||||
if v == nil {
|
||||
return &parserCtx{}
|
||||
}
|
||||
return v.(*parserCtx)
|
||||
}
|
||||
|
||||
func putParserCtx(pctx *parserCtx) {
|
||||
pctx.reset()
|
||||
parserCtxPool.Put(pctx)
|
||||
}
|
||||
|
||||
var parserCtxPool sync.Pool
|
||||
|
||||
func (pctx *parserCtx) parseLogMessage(msg []byte) error {
|
||||
s := bytesutil.ToUnsafeString(msg)
|
||||
v, err := pctx.p.Parse(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse json: %w", err)
|
||||
}
|
||||
if t := v.Type(); t != fastjson.TypeObject {
|
||||
return fmt.Errorf("expecting json dictionary; got %s", t)
|
||||
}
|
||||
pctx.reset()
|
||||
pctx.fields, pctx.buf, pctx.prefixBuf = appendLogFields(pctx.fields, pctx.buf, pctx.prefixBuf, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) {
|
||||
o := v.GetObject()
|
||||
o.Visit(func(k []byte, v *fastjson.Value) {
|
||||
t := v.Type()
|
||||
switch t {
|
||||
case fastjson.TypeNull:
|
||||
// Skip nulls
|
||||
case fastjson.TypeObject:
|
||||
// Flatten nested JSON objects.
|
||||
// For example, {"foo":{"bar":"baz"}} is converted to {"foo.bar":"baz"}
|
||||
prefixLen := len(prefixBuf)
|
||||
prefixBuf = append(prefixBuf, k...)
|
||||
prefixBuf = append(prefixBuf, '.')
|
||||
dst, dstBuf, prefixBuf = appendLogFields(dst, dstBuf, prefixBuf, v)
|
||||
prefixBuf = prefixBuf[:prefixLen]
|
||||
case fastjson.TypeArray, fastjson.TypeNumber, fastjson.TypeTrue, fastjson.TypeFalse:
|
||||
// Convert JSON arrays, numbers, true and false values to their string representation
|
||||
dstBufLen := len(dstBuf)
|
||||
dstBuf = v.MarshalTo(dstBuf)
|
||||
value := dstBuf[dstBufLen:]
|
||||
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
|
||||
case fastjson.TypeString:
|
||||
// Decode JSON strings
|
||||
dstBufLen := len(dstBuf)
|
||||
dstBuf = append(dstBuf, v.GetStringBytes()...)
|
||||
value := dstBuf[dstBufLen:]
|
||||
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
|
||||
default:
|
||||
logger.Panicf("BUG: unexpected JSON type: %s", t)
|
||||
}
|
||||
})
|
||||
return dst, dstBuf, prefixBuf
|
||||
}
|
||||
|
||||
func appendLogField(dst []logstorage.Field, dstBuf, prefixBuf, k, value []byte) ([]logstorage.Field, []byte) {
|
||||
dstBufLen := len(dstBuf)
|
||||
dstBuf = append(dstBuf, prefixBuf...)
|
||||
dstBuf = append(dstBuf, k...)
|
||||
name := dstBuf[dstBufLen:]
|
||||
|
||||
dst = append(dst, logstorage.Field{
|
||||
Name: bytesutil.ToUnsafeString(name),
|
||||
Value: bytesutil.ToUnsafeString(value),
|
||||
})
|
||||
return dst, dstBuf
|
||||
}
|
||||
|
||||
func parseElasticsearchTimestamp(s string) (int64, error) {
|
||||
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
|
||||
// Try parsing timestamp in milliseconds
|
||||
|
@ -4,12 +4,18 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
)
|
||||
|
||||
var (
|
||||
maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+
|
||||
"if their summary size doesn't exceed this value; otherwise query results are streamed in the response without sorting; "+
|
||||
"too big value for this flag may result in high memory usage, since the sorting is performed in memory")
|
||||
)
|
||||
|
||||
// ProcessQueryRequest handles /select/logsql/query request
|
||||
func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}) {
|
||||
// Extract tenantID
|
||||
@ -27,9 +33,8 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
|
||||
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
|
||||
sw := getSortWriter()
|
||||
sw.Init(w, maxSortBufferSize.IntN())
|
||||
tenantIDs := []logstorage.TenantID{tenantID}
|
||||
vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) {
|
||||
if len(columns) == 0 {
|
||||
@ -41,13 +46,11 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s
|
||||
for rowIdx := 0; rowIdx < rowsCount; rowIdx++ {
|
||||
WriteJSONRow(bb, columns, rowIdx)
|
||||
}
|
||||
// Do not check for error here, since the only valid error is when the client
|
||||
// closes the connection during Write() call. There is no need in logging this error,
|
||||
// since it may be too verbose and it doesn't give any actionable info.
|
||||
_, _ = bw.Write(bb.B)
|
||||
sw.MustWrite(bb.B)
|
||||
blockResultPool.Put(bb)
|
||||
})
|
||||
_ = bw.Flush()
|
||||
sw.FinalFlush()
|
||||
putSortWriter(sw)
|
||||
}
|
||||
|
||||
var blockResultPool bytesutil.ByteBufferPool
|
||||
|
@ -17,4 +17,25 @@
|
||||
}{% newline %}
|
||||
{% endfunc %}
|
||||
|
||||
// JSONRows prints formatted rows
|
||||
{% func JSONRows(rows [][]logstorage.Field) %}
|
||||
{% if len(rows) == 0 %}
|
||||
{% return %}
|
||||
{% endif %}
|
||||
{% for _, fields := range rows %}
|
||||
{
|
||||
{% if len(fields) > 0 %}
|
||||
{% code
|
||||
f := fields[0]
|
||||
fields = fields[1:]
|
||||
%}
|
||||
{%q= f.Name %}:{%q= f.Value %}
|
||||
{% for _, f := range fields %}
|
||||
,{%q= f.Name %}:{%q= f.Value %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
}{% newline %}
|
||||
{% endfor %}
|
||||
{% endfunc %}
|
||||
|
||||
{% endstripspace %}
|
||||
|
@ -88,3 +88,79 @@ func JSONRow(columns []logstorage.BlockColumn, rowIdx int) string {
|
||||
return qs422016
|
||||
//line app/vlselect/logsql/query_response.qtpl:18
|
||||
}
|
||||
|
||||
// JSONRows prints formatted rows
|
||||
|
||||
//line app/vlselect/logsql/query_response.qtpl:21
|
||||
func StreamJSONRows(qw422016 *qt422016.Writer, rows [][]logstorage.Field) {
|
||||
//line app/vlselect/logsql/query_response.qtpl:22
|
||||
if len(rows) == 0 {
|
||||
//line app/vlselect/logsql/query_response.qtpl:23
|
||||
return
|
||||
//line app/vlselect/logsql/query_response.qtpl:24
|
||||
}
|
||||
//line app/vlselect/logsql/query_response.qtpl:25
|
||||
for _, fields := range rows {
|
||||
//line app/vlselect/logsql/query_response.qtpl:25
|
||||
qw422016.N().S(`{`)
|
||||
//line app/vlselect/logsql/query_response.qtpl:27
|
||||
if len(fields) > 0 {
|
||||
//line app/vlselect/logsql/query_response.qtpl:29
|
||||
f := fields[0]
|
||||
fields = fields[1:]
|
||||
|
||||
//line app/vlselect/logsql/query_response.qtpl:32
|
||||
qw422016.N().Q(f.Name)
|
||||
//line app/vlselect/logsql/query_response.qtpl:32
|
||||
qw422016.N().S(`:`)
|
||||
//line app/vlselect/logsql/query_response.qtpl:32
|
||||
qw422016.N().Q(f.Value)
|
||||
//line app/vlselect/logsql/query_response.qtpl:33
|
||||
for _, f := range fields {
|
||||
//line app/vlselect/logsql/query_response.qtpl:33
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vlselect/logsql/query_response.qtpl:34
|
||||
qw422016.N().Q(f.Name)
|
||||
//line app/vlselect/logsql/query_response.qtpl:34
|
||||
qw422016.N().S(`:`)
|
||||
//line app/vlselect/logsql/query_response.qtpl:34
|
||||
qw422016.N().Q(f.Value)
|
||||
//line app/vlselect/logsql/query_response.qtpl:35
|
||||
}
|
||||
//line app/vlselect/logsql/query_response.qtpl:36
|
||||
}
|
||||
//line app/vlselect/logsql/query_response.qtpl:36
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vlselect/logsql/query_response.qtpl:37
|
||||
qw422016.N().S(`
|
||||
`)
|
||||
//line app/vlselect/logsql/query_response.qtpl:38
|
||||
}
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
func WriteJSONRows(qq422016 qtio422016.Writer, rows [][]logstorage.Field) {
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
StreamJSONRows(qw422016, rows)
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
func JSONRows(rows [][]logstorage.Field) string {
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
WriteJSONRows(qb422016, rows)
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
return qs422016
|
||||
//line app/vlselect/logsql/query_response.qtpl:39
|
||||
}
|
||||
|
222
app/vlselect/logsql/sort_writer.go
Normal file
222
app/vlselect/logsql/sort_writer.go
Normal file
@ -0,0 +1,222 @@
|
||||
package logsql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
)
|
||||
|
||||
func getSortWriter() *sortWriter {
|
||||
v := sortWriterPool.Get()
|
||||
if v == nil {
|
||||
return &sortWriter{}
|
||||
}
|
||||
return v.(*sortWriter)
|
||||
}
|
||||
|
||||
func putSortWriter(sw *sortWriter) {
|
||||
sw.reset()
|
||||
sortWriterPool.Put(sw)
|
||||
}
|
||||
|
||||
var sortWriterPool sync.Pool
|
||||
|
||||
// sortWriter expects JSON line stream to be written to it.
|
||||
//
|
||||
// It buffers the incoming data until its size reaches maxBufLen.
|
||||
// Then it streams the buffered data and all the incoming data to w.
|
||||
//
|
||||
// The FinalFlush() must be called when all the data is written.
|
||||
// If the buf isn't empty at FinalFlush() call, then the buffered data
|
||||
// is sorted by _time field.
|
||||
type sortWriter struct {
|
||||
mu sync.Mutex
|
||||
w io.Writer
|
||||
maxBufLen int
|
||||
buf []byte
|
||||
bufFlushed bool
|
||||
|
||||
hasErr bool
|
||||
}
|
||||
|
||||
func (sw *sortWriter) reset() {
|
||||
sw.w = nil
|
||||
sw.maxBufLen = 0
|
||||
sw.buf = sw.buf[:0]
|
||||
sw.bufFlushed = false
|
||||
sw.hasErr = false
|
||||
}
|
||||
|
||||
func (sw *sortWriter) Init(w io.Writer, maxBufLen int) {
|
||||
sw.reset()
|
||||
|
||||
sw.w = w
|
||||
sw.maxBufLen = maxBufLen
|
||||
}
|
||||
|
||||
func (sw *sortWriter) MustWrite(p []byte) {
|
||||
sw.mu.Lock()
|
||||
defer sw.mu.Unlock()
|
||||
|
||||
if sw.hasErr {
|
||||
return
|
||||
}
|
||||
|
||||
if sw.bufFlushed {
|
||||
if _, err := sw.w.Write(p); err != nil {
|
||||
sw.hasErr = true
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(sw.buf)+len(p) < sw.maxBufLen {
|
||||
sw.buf = append(sw.buf, p...)
|
||||
return
|
||||
}
|
||||
sw.bufFlushed = true
|
||||
if len(sw.buf) > 0 {
|
||||
if _, err := sw.w.Write(sw.buf); err != nil {
|
||||
sw.hasErr = true
|
||||
return
|
||||
}
|
||||
sw.buf = sw.buf[:0]
|
||||
}
|
||||
if _, err := sw.w.Write(p); err != nil {
|
||||
sw.hasErr = true
|
||||
}
|
||||
}
|
||||
|
||||
func (sw *sortWriter) FinalFlush() {
|
||||
if sw.hasErr || sw.bufFlushed {
|
||||
return
|
||||
}
|
||||
rs := getRowsSorter()
|
||||
rs.parseRows(sw.buf)
|
||||
rs.sort()
|
||||
WriteJSONRows(sw.w, rs.rows)
|
||||
putRowsSorter(rs)
|
||||
}
|
||||
|
||||
func getRowsSorter() *rowsSorter {
|
||||
v := rowsSorterPool.Get()
|
||||
if v == nil {
|
||||
return &rowsSorter{}
|
||||
}
|
||||
return v.(*rowsSorter)
|
||||
}
|
||||
|
||||
func putRowsSorter(rs *rowsSorter) {
|
||||
rs.reset()
|
||||
rowsSorterPool.Put(rs)
|
||||
}
|
||||
|
||||
var rowsSorterPool sync.Pool
|
||||
|
||||
type rowsSorter struct {
|
||||
buf []byte
|
||||
fieldsBuf []logstorage.Field
|
||||
rows [][]logstorage.Field
|
||||
times []string
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) reset() {
|
||||
rs.buf = rs.buf[:0]
|
||||
|
||||
fieldsBuf := rs.fieldsBuf
|
||||
for i := range fieldsBuf {
|
||||
fieldsBuf[i].Reset()
|
||||
}
|
||||
rs.fieldsBuf = fieldsBuf[:0]
|
||||
|
||||
rows := rs.rows
|
||||
for i := range rows {
|
||||
rows[i] = nil
|
||||
}
|
||||
rs.rows = rows[:0]
|
||||
|
||||
times := rs.times
|
||||
for i := range times {
|
||||
times[i] = ""
|
||||
}
|
||||
rs.times = times[:0]
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) parseRows(src []byte) {
|
||||
rs.reset()
|
||||
|
||||
buf := rs.buf
|
||||
fieldsBuf := rs.fieldsBuf
|
||||
rows := rs.rows
|
||||
times := rs.times
|
||||
|
||||
p := logjson.GetParser()
|
||||
for len(src) > 0 {
|
||||
var line []byte
|
||||
n := bytes.IndexByte(src, '\n')
|
||||
if n < 0 {
|
||||
line = src
|
||||
src = nil
|
||||
} else {
|
||||
line = src[:n]
|
||||
src = src[n+1:]
|
||||
}
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
p.ParseLogMessage(line)
|
||||
|
||||
timeValue := ""
|
||||
fieldsBufLen := len(fieldsBuf)
|
||||
for _, f := range p.Fields {
|
||||
bufLen := len(buf)
|
||||
buf = append(buf, f.Name...)
|
||||
name := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
|
||||
bufLen = len(buf)
|
||||
buf = append(buf, f.Value...)
|
||||
value := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
|
||||
fieldsBuf = append(fieldsBuf, logstorage.Field{
|
||||
Name: name,
|
||||
Value: value,
|
||||
})
|
||||
|
||||
if name == "_time" {
|
||||
timeValue = value
|
||||
}
|
||||
}
|
||||
rows = append(rows, fieldsBuf[fieldsBufLen:])
|
||||
times = append(times, timeValue)
|
||||
}
|
||||
logjson.PutParser(p)
|
||||
|
||||
rs.buf = buf
|
||||
rs.fieldsBuf = fieldsBuf
|
||||
rs.rows = rows
|
||||
rs.times = times
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) Len() int {
|
||||
return len(rs.rows)
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) Less(i, j int) bool {
|
||||
times := rs.times
|
||||
return times[i] < times[j]
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) Swap(i, j int) {
|
||||
times := rs.times
|
||||
rows := rs.rows
|
||||
times[i], times[j] = times[j], times[i]
|
||||
rows[i], rows[j] = rows[j], rows[i]
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) sort() {
|
||||
sort.Sort(rs)
|
||||
}
|
39
app/vlselect/logsql/sort_writer_test.go
Normal file
39
app/vlselect/logsql/sort_writer_test.go
Normal file
@ -0,0 +1,39 @@
|
||||
package logsql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSortWriter(t *testing.T) {
|
||||
f := func(maxBufLen int, data string, expectedResult string) {
|
||||
t.Helper()
|
||||
|
||||
var bb bytes.Buffer
|
||||
sw := getSortWriter()
|
||||
sw.Init(&bb, maxBufLen)
|
||||
|
||||
for _, s := range strings.Split(data, "\n") {
|
||||
sw.MustWrite([]byte(s + "\n"))
|
||||
}
|
||||
sw.FinalFlush()
|
||||
putSortWriter(sw)
|
||||
|
||||
result := bb.String()
|
||||
if result != expectedResult {
|
||||
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, expectedResult)
|
||||
}
|
||||
}
|
||||
|
||||
f(100, "", "")
|
||||
f(100, "{}", "{}\n")
|
||||
|
||||
data := `{"_time":"def","_msg":"xxx"}
|
||||
{"_time":"abc","_msg":"foo"}`
|
||||
resultExpected := `{"_time":"abc","_msg":"foo"}
|
||||
{"_time":"def","_msg":"xxx"}
|
||||
`
|
||||
f(100, data, resultExpected)
|
||||
f(10, data, data+"\n")
|
||||
}
|
@ -1058,8 +1058,9 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo
|
||||
|
||||
## Sorting
|
||||
|
||||
By default VictoriaLogs doesn't sort the returned results because of performance and efficiency concerns
|
||||
described [here](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line).
|
||||
By default VictoriaLogs sorts the returned results by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
|
||||
if their total size doesn't exceed `-select.maxSortBufferSize` command-line value (by default it is set to one megabytes).
|
||||
Otherwise sorting is skipped because of performance and efficiency concerns described [here](https://docs.victoriametrics.com/VictoriaLogs/querying/).
|
||||
|
||||
It is possible to sort the [selected log entries](#filters) at client side with `sort` Unix command
|
||||
according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line).
|
||||
|
@ -31,7 +31,9 @@ The response can be interrupted at any time by closing the connection to Victori
|
||||
This allows post-processing the returned lines at the client side with the usual Unix commands such as `grep`, `jq`, `less`, `head`, etc.
|
||||
See [these docs](#command-line) for more details.
|
||||
|
||||
The returned lines aren't sorted by default, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
|
||||
The returned lines are sorted by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
|
||||
if their total size doesn't exceed `-select.maxSortBufferSize` command-line flag value (by default it is set to one megabyte).
|
||||
Otherwise the returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
|
||||
Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sorting)
|
||||
or at client side with the usual `sort` command according to [these docs](#command-line).
|
||||
|
||||
|
132
lib/logjson/parser.go
Normal file
132
lib/logjson/parser.go
Normal file
@ -0,0 +1,132 @@
|
||||
package logjson
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
"github.com/valyala/fastjson"
|
||||
)
|
||||
|
||||
// Parser parses a single JSON log message into Fields.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model
|
||||
//
|
||||
// Use GetParser() for obtaining the parser.
|
||||
type Parser struct {
|
||||
// Fields contains the parsed JSON line after Parse() call
|
||||
//
|
||||
// The Fields are valid until the next call to ParseLogMessage()
|
||||
// or until the parser is returned to the pool with PutParser() call.
|
||||
Fields []logstorage.Field
|
||||
|
||||
// p is used for fast JSON parsing
|
||||
p fastjson.Parser
|
||||
|
||||
// buf is used for holding the backing data for Fields
|
||||
buf []byte
|
||||
|
||||
// prefixBuf is used for holding the current key prefix
|
||||
// when it is composed from multiple keys.
|
||||
prefixBuf []byte
|
||||
}
|
||||
|
||||
func (p *Parser) reset() {
|
||||
fields := p.Fields
|
||||
for i := range fields {
|
||||
lf := &fields[i]
|
||||
lf.Name = ""
|
||||
lf.Value = ""
|
||||
}
|
||||
p.Fields = fields[:0]
|
||||
|
||||
p.buf = p.buf[:0]
|
||||
p.prefixBuf = p.prefixBuf[:0]
|
||||
}
|
||||
|
||||
// GetParser returns Parser ready to parse JSON lines.
|
||||
//
|
||||
// Return the parser to the pool when it is no longer needed by calling PutParser().
|
||||
func GetParser() *Parser {
|
||||
v := parserPool.Get()
|
||||
if v == nil {
|
||||
return &Parser{}
|
||||
}
|
||||
return v.(*Parser)
|
||||
}
|
||||
|
||||
// PutParser returns the parser to the pool.
|
||||
//
|
||||
// The parser cannot be used after returning to the pool.
|
||||
func PutParser(p *Parser) {
|
||||
p.reset()
|
||||
parserPool.Put(p)
|
||||
}
|
||||
|
||||
var parserPool sync.Pool
|
||||
|
||||
// ParseLogMessage parses the given JSON log message msg into p.Fields.
|
||||
//
|
||||
// The p.Fields remains valid until the next call to ParseLogMessage() or PutParser().
|
||||
func (p *Parser) ParseLogMessage(msg []byte) error {
|
||||
s := bytesutil.ToUnsafeString(msg)
|
||||
v, err := p.p.Parse(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse json: %w", err)
|
||||
}
|
||||
if t := v.Type(); t != fastjson.TypeObject {
|
||||
return fmt.Errorf("expecting json dictionary; got %s", t)
|
||||
}
|
||||
p.reset()
|
||||
p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) {
|
||||
o := v.GetObject()
|
||||
o.Visit(func(k []byte, v *fastjson.Value) {
|
||||
t := v.Type()
|
||||
switch t {
|
||||
case fastjson.TypeNull:
|
||||
// Skip nulls
|
||||
case fastjson.TypeObject:
|
||||
// Flatten nested JSON objects.
|
||||
// For example, {"foo":{"bar":"baz"}} is converted to {"foo.bar":"baz"}
|
||||
prefixLen := len(prefixBuf)
|
||||
prefixBuf = append(prefixBuf, k...)
|
||||
prefixBuf = append(prefixBuf, '.')
|
||||
dst, dstBuf, prefixBuf = appendLogFields(dst, dstBuf, prefixBuf, v)
|
||||
prefixBuf = prefixBuf[:prefixLen]
|
||||
case fastjson.TypeArray, fastjson.TypeNumber, fastjson.TypeTrue, fastjson.TypeFalse:
|
||||
// Convert JSON arrays, numbers, true and false values to their string representation
|
||||
dstBufLen := len(dstBuf)
|
||||
dstBuf = v.MarshalTo(dstBuf)
|
||||
value := dstBuf[dstBufLen:]
|
||||
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
|
||||
case fastjson.TypeString:
|
||||
// Decode JSON strings
|
||||
dstBufLen := len(dstBuf)
|
||||
dstBuf = append(dstBuf, v.GetStringBytes()...)
|
||||
value := dstBuf[dstBufLen:]
|
||||
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
|
||||
default:
|
||||
logger.Panicf("BUG: unexpected JSON type: %s", t)
|
||||
}
|
||||
})
|
||||
return dst, dstBuf, prefixBuf
|
||||
}
|
||||
|
||||
func appendLogField(dst []logstorage.Field, dstBuf, prefixBuf, k, value []byte) ([]logstorage.Field, []byte) {
|
||||
dstBufLen := len(dstBuf)
|
||||
dstBuf = append(dstBuf, prefixBuf...)
|
||||
dstBuf = append(dstBuf, k...)
|
||||
name := dstBuf[dstBufLen:]
|
||||
|
||||
dst = append(dst, logstorage.Field{
|
||||
Name: bytesutil.ToUnsafeString(name),
|
||||
Value: bytesutil.ToUnsafeString(value),
|
||||
})
|
||||
return dst, dstBuf
|
||||
}
|
71
lib/logjson/parser_test.go
Normal file
71
lib/logjson/parser_test.go
Normal file
@ -0,0 +1,71 @@
|
||||
package logjson
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
)
|
||||
|
||||
func TestParserFailure(t *testing.T) {
|
||||
f := func(data string) {
|
||||
t.Helper()
|
||||
|
||||
p := GetParser()
|
||||
err := p.ParseLogMessage([]byte(data))
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
PutParser(p)
|
||||
}
|
||||
f("")
|
||||
f("{foo")
|
||||
f("[1,2,3]")
|
||||
f(`{"foo",}`)
|
||||
}
|
||||
|
||||
func TestParserSuccess(t *testing.T) {
|
||||
f := func(data string, fieldsExpected []logstorage.Field) {
|
||||
t.Helper()
|
||||
|
||||
p := GetParser()
|
||||
err := p.ParseLogMessage([]byte(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(p.Fields, fieldsExpected) {
|
||||
t.Fatalf("unexpected fields;\ngot\n%s\nwant\n%s", p.Fields, fieldsExpected)
|
||||
}
|
||||
PutParser(p)
|
||||
}
|
||||
|
||||
f("{}", nil)
|
||||
f(`{"foo":"bar"}`, []logstorage.Field{
|
||||
{
|
||||
Name: "foo",
|
||||
Value: "bar",
|
||||
},
|
||||
})
|
||||
f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, []logstorage.Field{
|
||||
{
|
||||
Name: "foo.bar",
|
||||
Value: "baz",
|
||||
},
|
||||
{
|
||||
Name: "a",
|
||||
Value: "1",
|
||||
},
|
||||
{
|
||||
Name: "b",
|
||||
Value: "true",
|
||||
},
|
||||
{
|
||||
Name: "c",
|
||||
Value: "[1,2]",
|
||||
},
|
||||
{
|
||||
Name: "d",
|
||||
Value: "false",
|
||||
},
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue
Block a user