mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-22 16:36:27 +01:00
b58c429044
- Consistently return the first `limit` log entries if the total size of found log entries doesn't exceed 1Mb. See app/vlselect/logsql/sort_writer.go . Previously random log entries could be returned with each request. - Document the change at docs/VictoriaLogs/CHANGELOG.md - Document the `limit` query arg at docs/VictoriaLogs/querying/README.md - Make the change less intrusive. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5778
293 lines
5.4 KiB
Go
293 lines
5.4 KiB
Go
package logsql
|
|
|
|
import (
|
|
"bytes"
|
|
"io"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
)
|
|
|
|
func getSortWriter() *sortWriter {
|
|
v := sortWriterPool.Get()
|
|
if v == nil {
|
|
return &sortWriter{}
|
|
}
|
|
return v.(*sortWriter)
|
|
}
|
|
|
|
func putSortWriter(sw *sortWriter) {
|
|
sw.reset()
|
|
sortWriterPool.Put(sw)
|
|
}
|
|
|
|
var sortWriterPool sync.Pool
|
|
|
|
// sortWriter expects JSON line stream to be written to it.
|
|
//
|
|
// It buffers the incoming data until its size reaches maxBufLen.
|
|
// Then it streams the buffered data and all the incoming data to w.
|
|
//
|
|
// The FinalFlush() must be called when all the data is written.
|
|
// If the buf isn't empty at FinalFlush() call, then the buffered data
|
|
// is sorted by _time field.
|
|
type sortWriter struct {
|
|
mu sync.Mutex
|
|
w io.Writer
|
|
|
|
maxLines int
|
|
linesWritten int
|
|
|
|
maxBufLen int
|
|
buf []byte
|
|
bufFlushed bool
|
|
|
|
hasErr bool
|
|
}
|
|
|
|
func (sw *sortWriter) reset() {
|
|
sw.w = nil
|
|
|
|
sw.maxLines = 0
|
|
sw.linesWritten = 0
|
|
|
|
sw.maxBufLen = 0
|
|
sw.buf = sw.buf[:0]
|
|
sw.bufFlushed = false
|
|
sw.hasErr = false
|
|
}
|
|
|
|
// Init initializes sw.
|
|
//
|
|
// If maxLines is set to positive value, then sw accepts up to maxLines
|
|
// and then rejects all the other lines by returning false from TryWrite.
|
|
func (sw *sortWriter) Init(w io.Writer, maxBufLen, maxLines int) {
|
|
sw.reset()
|
|
|
|
sw.w = w
|
|
sw.maxBufLen = maxBufLen
|
|
sw.maxLines = maxLines
|
|
}
|
|
|
|
// TryWrite writes p to sw.
|
|
//
|
|
// True is returned on successful write, false otherwise.
|
|
//
|
|
// Unsuccessful write may occur on underlying write error or when maxLines lines are already written to sw.
|
|
func (sw *sortWriter) TryWrite(p []byte) bool {
|
|
sw.mu.Lock()
|
|
defer sw.mu.Unlock()
|
|
|
|
if sw.hasErr {
|
|
return false
|
|
}
|
|
|
|
if sw.bufFlushed {
|
|
if !sw.writeToUnderlyingWriterLocked(p) {
|
|
sw.hasErr = true
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
if len(sw.buf)+len(p) < sw.maxBufLen {
|
|
sw.buf = append(sw.buf, p...)
|
|
return true
|
|
}
|
|
|
|
sw.bufFlushed = true
|
|
if !sw.writeToUnderlyingWriterLocked(sw.buf) {
|
|
sw.hasErr = true
|
|
return false
|
|
}
|
|
sw.buf = sw.buf[:0]
|
|
|
|
if !sw.writeToUnderlyingWriterLocked(p) {
|
|
sw.hasErr = true
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (sw *sortWriter) writeToUnderlyingWriterLocked(p []byte) bool {
|
|
if len(p) == 0 {
|
|
return true
|
|
}
|
|
if sw.maxLines > 0 {
|
|
if sw.linesWritten >= sw.maxLines {
|
|
return false
|
|
}
|
|
var linesLeft int
|
|
p, linesLeft = trimLines(p, sw.maxLines-sw.linesWritten)
|
|
println("DEBUG: end trimLines", string(p), linesLeft)
|
|
sw.linesWritten += linesLeft
|
|
}
|
|
if _, err := sw.w.Write(p); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func trimLines(p []byte, maxLines int) ([]byte, int) {
|
|
println("DEBUG: start trimLines", string(p), maxLines)
|
|
if maxLines <= 0 {
|
|
return nil, 0
|
|
}
|
|
n := bytes.Count(p, newline)
|
|
if n < maxLines {
|
|
return p, n
|
|
}
|
|
for n >= maxLines {
|
|
idx := bytes.LastIndexByte(p, '\n')
|
|
p = p[:idx]
|
|
n--
|
|
}
|
|
return p[:len(p)+1], maxLines
|
|
}
|
|
|
|
var newline = []byte("\n")
|
|
|
|
func (sw *sortWriter) FinalFlush() {
|
|
if sw.hasErr || sw.bufFlushed {
|
|
return
|
|
}
|
|
|
|
rs := getRowsSorter()
|
|
rs.parseRows(sw.buf)
|
|
rs.sort()
|
|
|
|
rows := rs.rows
|
|
if sw.maxLines > 0 && len(rows) > sw.maxLines {
|
|
rows = rows[:sw.maxLines]
|
|
}
|
|
WriteJSONRows(sw.w, rows)
|
|
|
|
putRowsSorter(rs)
|
|
}
|
|
|
|
func getRowsSorter() *rowsSorter {
|
|
v := rowsSorterPool.Get()
|
|
if v == nil {
|
|
return &rowsSorter{}
|
|
}
|
|
return v.(*rowsSorter)
|
|
}
|
|
|
|
func putRowsSorter(rs *rowsSorter) {
|
|
rs.reset()
|
|
rowsSorterPool.Put(rs)
|
|
}
|
|
|
|
var rowsSorterPool sync.Pool
|
|
|
|
type rowsSorter struct {
|
|
buf []byte
|
|
fieldsBuf []logstorage.Field
|
|
rows [][]logstorage.Field
|
|
times []string
|
|
}
|
|
|
|
func (rs *rowsSorter) reset() {
|
|
rs.buf = rs.buf[:0]
|
|
|
|
fieldsBuf := rs.fieldsBuf
|
|
for i := range fieldsBuf {
|
|
fieldsBuf[i].Reset()
|
|
}
|
|
rs.fieldsBuf = fieldsBuf[:0]
|
|
|
|
rows := rs.rows
|
|
for i := range rows {
|
|
rows[i] = nil
|
|
}
|
|
rs.rows = rows[:0]
|
|
|
|
times := rs.times
|
|
for i := range times {
|
|
times[i] = ""
|
|
}
|
|
rs.times = times[:0]
|
|
}
|
|
|
|
func (rs *rowsSorter) parseRows(src []byte) {
|
|
rs.reset()
|
|
|
|
buf := rs.buf
|
|
fieldsBuf := rs.fieldsBuf
|
|
rows := rs.rows
|
|
times := rs.times
|
|
|
|
p := logjson.GetParser()
|
|
for len(src) > 0 {
|
|
var line []byte
|
|
n := bytes.IndexByte(src, '\n')
|
|
if n < 0 {
|
|
line = src
|
|
src = nil
|
|
} else {
|
|
line = src[:n]
|
|
src = src[n+1:]
|
|
}
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
if err := p.ParseLogMessage(line); err != nil {
|
|
logger.Panicf("BUG: unexpected invalid JSON line: %s", err)
|
|
}
|
|
|
|
timeValue := ""
|
|
fieldsBufLen := len(fieldsBuf)
|
|
for _, f := range p.Fields {
|
|
bufLen := len(buf)
|
|
buf = append(buf, f.Name...)
|
|
name := bytesutil.ToUnsafeString(buf[bufLen:])
|
|
|
|
bufLen = len(buf)
|
|
buf = append(buf, f.Value...)
|
|
value := bytesutil.ToUnsafeString(buf[bufLen:])
|
|
|
|
fieldsBuf = append(fieldsBuf, logstorage.Field{
|
|
Name: name,
|
|
Value: value,
|
|
})
|
|
|
|
if name == "_time" {
|
|
timeValue = value
|
|
}
|
|
}
|
|
rows = append(rows, fieldsBuf[fieldsBufLen:])
|
|
times = append(times, timeValue)
|
|
}
|
|
logjson.PutParser(p)
|
|
|
|
rs.buf = buf
|
|
rs.fieldsBuf = fieldsBuf
|
|
rs.rows = rows
|
|
rs.times = times
|
|
}
|
|
|
|
func (rs *rowsSorter) Len() int {
|
|
return len(rs.rows)
|
|
}
|
|
|
|
func (rs *rowsSorter) Less(i, j int) bool {
|
|
times := rs.times
|
|
return times[i] < times[j]
|
|
}
|
|
|
|
func (rs *rowsSorter) Swap(i, j int) {
|
|
times := rs.times
|
|
rows := rs.rows
|
|
times[i], times[j] = times[j], times[i]
|
|
rows[i], rows[j] = rows[j], rows[i]
|
|
}
|
|
|
|
func (rs *rowsSorter) sort() {
|
|
sort.Sort(rs)
|
|
}
|