VictoriaMetrics/lib/logstorage/pipe_sort.go
Aliaksandr Valialkin 66b2987f49
lib/logstorage: optimize query imeediately after its parsing
This eliminates possible bugs related to forgotten Query.Optimize() calls.

This also allows removing optimize() function from pipe interface.

While at it, drop filterNoop inside filterAnd.
2024-11-08 16:43:54 +01:00

892 lines
20 KiB
Go

package logstorage
import (
"container/heap"
"fmt"
"math"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/valyala/quicktemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
// pipeSort processes '| sort ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe
type pipeSort struct {
// byFields contains field names for sorting from 'by(...)' clause.
byFields []*bySortField
// whether to apply descending order
isDesc bool
// how many results to skip
offset uint64
// how many results to return
//
// if zero, then all the results are returned
limit uint64
// The name of the field to store the row rank.
rankFieldName string
}
func (ps *pipeSort) String() string {
s := "sort"
if len(ps.byFields) > 0 {
a := make([]string, len(ps.byFields))
for i, bf := range ps.byFields {
a[i] = bf.String()
}
s += " by (" + strings.Join(a, ", ") + ")"
}
if ps.isDesc {
s += " desc"
}
if ps.offset > 0 {
s += fmt.Sprintf(" offset %d", ps.offset)
}
if ps.limit > 0 {
s += fmt.Sprintf(" limit %d", ps.limit)
}
if ps.rankFieldName != "" {
s += rankFieldNameString(ps.rankFieldName)
}
return s
}
func (ps *pipeSort) canLiveTail() bool {
return false
}
func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
return
}
if ps.rankFieldName != "" {
neededFields.remove(ps.rankFieldName)
if neededFields.contains("*") {
unneededFields.add(ps.rankFieldName)
}
}
if len(ps.byFields) == 0 {
neededFields.add("*")
unneededFields.reset()
} else {
for _, bf := range ps.byFields {
neededFields.add(bf.name)
unneededFields.remove(bf.name)
}
}
}
func (ps *pipeSort) hasFilterInWithQuery() bool {
return false
}
func (ps *pipeSort) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return ps, nil
}
func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
if ps.limit > 0 {
return newPipeTopkProcessor(ps, workersCount, stopCh, cancel, ppNext)
}
return newPipeSortProcessor(ps, workersCount, stopCh, cancel, ppNext)
}
func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeSortProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeSortProcessorShard{
pipeSortProcessorShardNopad: pipeSortProcessorShardNopad{
ps: ps,
},
}
}
psp := &pipeSortProcessor{
ps: ps,
stopCh: stopCh,
cancel: cancel,
ppNext: ppNext,
shards: shards,
maxStateSize: maxStateSize,
}
psp.stateSizeBudget.Store(maxStateSize)
return psp
}
type pipeSortProcessor struct {
ps *pipeSort
stopCh <-chan struct{}
cancel func()
ppNext pipeProcessor
shards []pipeSortProcessorShard
maxStateSize int64
stateSizeBudget atomic.Int64
}
type pipeSortProcessorShard struct {
pipeSortProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeSortProcessorShardNopad{})%128]byte
}
type pipeSortProcessorShardNopad struct {
// ps points to the parent pipeSort.
ps *pipeSort
// blocks holds all the blocks with logs written to the shard.
blocks []sortBlock
// rowRefs holds references to all the rows stored in blocks.
//
// Sorting sorts rowRefs, while blocks remain unchanged. This should speed up sorting.
rowRefs []sortRowRef
// rowRefNext points to the next index at rowRefs during merge shards phase
rowRefNext int
// stateSizeBudget is the remaining budget for the whole state size for the shard.
// The per-shard budget is provided in chunks from the parent pipeSortProcessor.
stateSizeBudget int
// columnValues is used as temporary buffer at pipeSortProcessorShard.writeBlock
columnValues [][]string
}
// sortBlock represents a block of logs for sorting.
type sortBlock struct {
// br is a result block to sort
br *blockResult
// byColumns refers block data for 'by(...)' columns
byColumns []sortBlockByColumn
// otherColumns refers block data for other than 'by(...)' columns
otherColumns []*blockResultColumn
}
// sortBlockByColumn represents data for a single column from 'sort by(...)' clause.
type sortBlockByColumn struct {
// c contains column data
c *blockResultColumn
// i64Values contains int64 numbers parsed from values
i64Values []int64
// f64Values contains float64 numbers parsed from values
f64Values []float64
}
// sortRowRef is the reference to a single log entry written to `sort` pipe.
type sortRowRef struct {
// blockIdx is the index of the block at pipeSortProcessorShard.blocks.
blockIdx int
// rowIdx is the index of the log entry inside the block referenced by blockIdx.
rowIdx int
}
func (c *sortBlockByColumn) getI64ValueAtRow(rowIdx int) int64 {
if c.c.isConst {
return c.i64Values[0]
}
return c.i64Values[rowIdx]
}
func (c *sortBlockByColumn) getF64ValueAtRow(rowIdx int) float64 {
if c.c.isConst {
return c.f64Values[0]
}
return c.f64Values[rowIdx]
}
// writeBlock writes br to shard.
func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
// clone br, so it could be owned by shard
br = br.clone()
cs := br.getColumns()
byFields := shard.ps.byFields
if len(byFields) == 0 {
// Sort by all the columns
columnValues := shard.columnValues[:0]
for _, c := range cs {
values := c.getValues(br)
columnValues = append(columnValues, values)
}
shard.columnValues = columnValues
// Generate byColumns
valuesEncoded := make([]string, br.rowsLen)
shard.stateSizeBudget -= len(valuesEncoded) * int(unsafe.Sizeof(valuesEncoded[0]))
bb := bbPool.Get()
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
// Marshal all the columns per each row into a single string
// and sort rows by the resulting string.
bb.B = bb.B[:0]
for i, values := range columnValues {
v := values[rowIdx]
bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v)
bb.B = append(bb.B, ',')
}
if rowIdx > 0 && valuesEncoded[rowIdx-1] == string(bb.B) {
valuesEncoded[rowIdx] = valuesEncoded[rowIdx-1]
} else {
valuesEncoded[rowIdx] = string(bb.B)
shard.stateSizeBudget -= len(bb.B)
}
}
bbPool.Put(bb)
i64Values := make([]int64, br.rowsLen)
f64Values := make([]float64, br.rowsLen)
for i := range f64Values {
f64Values[i] = nan
}
byColumns := []sortBlockByColumn{
{
c: &blockResultColumn{
valueType: valueTypeString,
valuesEncoded: valuesEncoded,
},
i64Values: i64Values,
f64Values: f64Values,
},
}
shard.stateSizeBudget -= int(unsafe.Sizeof(byColumns[0]) + unsafe.Sizeof(*byColumns[0].c))
// Append br to shard.blocks.
shard.blocks = append(shard.blocks, sortBlock{
br: br,
byColumns: byColumns,
otherColumns: cs,
})
} else {
// Collect values for columns from byFields.
byColumns := make([]sortBlockByColumn, len(byFields))
for i, bf := range byFields {
c := br.getColumnByName(bf.name)
bc := &byColumns[i]
bc.c = c
if c.isTime {
// Do not initialize bc.i64Values and bc.f64Values, since they aren't used.
// This saves some memory.
continue
}
if c.isConst {
bc.i64Values = shard.createInt64Values(c.valuesEncoded)
bc.f64Values = shard.createFloat64Values(c.valuesEncoded)
continue
}
// pre-populate values in order to track better br memory usage
values := c.getValues(br)
bc.i64Values = shard.createInt64Values(values)
bc.f64Values = shard.createFloat64Values(values)
}
shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0]))
// Collect values for other columns.
otherColumns := make([]*blockResultColumn, 0, len(cs))
for _, c := range cs {
isByField := false
for _, bf := range byFields {
if bf.name == c.name {
isByField = true
break
}
}
if !isByField {
otherColumns = append(otherColumns, c)
}
}
shard.stateSizeBudget -= len(otherColumns) * int(unsafe.Sizeof(otherColumns[0]))
// Append br to shard.blocks.
shard.blocks = append(shard.blocks, sortBlock{
br: br,
byColumns: byColumns,
otherColumns: otherColumns,
})
}
shard.stateSizeBudget -= br.sizeBytes()
shard.stateSizeBudget -= int(unsafe.Sizeof(shard.blocks[0]))
// Add row references to rowRefs.
blockIdx := len(shard.blocks) - 1
rowRefs := shard.rowRefs
rowRefsLen := len(rowRefs)
for i := 0; i < br.rowsLen; i++ {
rowRefs = append(rowRefs, sortRowRef{
blockIdx: blockIdx,
rowIdx: i,
})
}
shard.rowRefs = rowRefs
shard.stateSizeBudget -= (len(rowRefs) - rowRefsLen) * int(unsafe.Sizeof(rowRefs[0]))
}
func (shard *pipeSortProcessorShard) createInt64Values(values []string) []int64 {
a := make([]int64, len(values))
for i, v := range values {
i64, ok := tryParseInt64(v)
if ok {
a[i] = i64
continue
}
u32, _ := tryParseIPv4(v)
a[i] = int64(u32)
// Do not try parsing timestamp and duration, since they may be negative.
// This breaks sorting.
}
shard.stateSizeBudget -= len(a) * int(unsafe.Sizeof(a[0]))
return a
}
func (shard *pipeSortProcessorShard) createFloat64Values(values []string) []float64 {
a := make([]float64, len(values))
for i, v := range values {
f, ok := tryParseFloat64(v)
if !ok {
f = nan
}
a[i] = f
}
shard.stateSizeBudget -= len(a) * int(unsafe.Sizeof(a[0]))
return a
}
func (shard *pipeSortProcessorShard) Len() int {
return len(shard.rowRefs)
}
func (shard *pipeSortProcessorShard) Swap(i, j int) {
rowRefs := shard.rowRefs
rowRefs[i], rowRefs[j] = rowRefs[j], rowRefs[i]
}
func (shard *pipeSortProcessorShard) Less(i, j int) bool {
return sortBlockLess(shard, i, shard, j)
}
func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
shard := &psp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
psp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
shard.writeBlock(br)
}
func (psp *pipeSortProcessor) flush() error {
if n := psp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
if needStop(psp.stopCh) {
return nil
}
// Sort every shard in parallel
var wg sync.WaitGroup
shards := psp.shards
for i := range shards {
wg.Add(1)
go func(shard *pipeSortProcessorShard) {
// TODO: interrupt long sorting when psp.stopCh is closed.
sort.Sort(shard)
wg.Done()
}(&shards[i])
}
wg.Wait()
if needStop(psp.stopCh) {
return nil
}
// Merge sorted results across shards
sh := pipeSortProcessorShardsHeap(make([]*pipeSortProcessorShard, 0, len(shards)))
for i := range shards {
shard := &shards[i]
if len(shard.rowRefs) > 0 {
sh = append(sh, shard)
}
}
if len(sh) == 0 {
return nil
}
heap.Init(&sh)
wctx := &pipeSortWriteContext{
psp: psp,
}
shardNextIdx := 0
for len(sh) > 1 {
shard := sh[0]
wctx.writeNextRow(shard)
if shard.rowRefNext >= len(shard.rowRefs) {
_ = heap.Pop(&sh)
shardNextIdx = 0
if needStop(psp.stopCh) {
return nil
}
continue
}
if shardNextIdx == 0 {
shardNextIdx = 1
if len(sh) > 2 && sh.Less(2, 1) {
shardNextIdx = 2
}
}
if sh.Less(shardNextIdx, 0) {
heap.Fix(&sh, 0)
shardNextIdx = 0
if needStop(psp.stopCh) {
return nil
}
}
}
if len(sh) == 1 {
shard := sh[0]
for shard.rowRefNext < len(shard.rowRefs) {
wctx.writeNextRow(shard)
}
}
wctx.flush()
return nil
}
type pipeSortWriteContext struct {
psp *pipeSortProcessor
rcs []resultColumn
br blockResult
// buf is a temporary buffer for non-flushed block.
buf []byte
// rowsWritten is the total number of rows passed to writeNextRow.
rowsWritten uint64
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the length of all the values in the current block
valuesLen int
}
func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
ps := shard.ps
rankFieldName := ps.rankFieldName
rankFields := 0
if rankFieldName != "" {
rankFields = 1
}
rowIdx := shard.rowRefNext
shard.rowRefNext++
wctx.rowsWritten++
if wctx.rowsWritten <= ps.offset {
return
}
rr := shard.rowRefs[rowIdx]
b := &shard.blocks[rr.blockIdx]
byFields := ps.byFields
rcs := wctx.rcs
areEqualColumns := len(rcs) == rankFields+len(byFields)+len(b.otherColumns)
if areEqualColumns {
for i, c := range b.otherColumns {
if rcs[rankFields+len(byFields)+i].name != c.name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to ppNext and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
if rankFieldName != "" {
rcs = appendResultColumnWithName(rcs, rankFieldName)
}
for _, bf := range byFields {
rcs = appendResultColumnWithName(rcs, bf.name)
}
for _, c := range b.otherColumns {
rcs = appendResultColumnWithName(rcs, c.name)
}
wctx.rcs = rcs
}
if rankFieldName != "" {
bufLen := len(wctx.buf)
wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten)
v := bytesutil.ToUnsafeString(wctx.buf[bufLen:])
rcs[0].addValue(v)
}
br := b.br
byColumns := b.byColumns
for i := range byFields {
v := byColumns[i].c.getValueAtRow(br, rr.rowIdx)
rcs[rankFields+i].addValue(v)
wctx.valuesLen += len(v)
}
for i, c := range b.otherColumns {
v := c.getValueAtRow(br, rr.rowIdx)
rcs[rankFields+len(byFields)+i].addValue(v)
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeSortWriteContext) flush() {
rcs := wctx.rcs
br := &wctx.br
wctx.valuesLen = 0
// Flush rcs to ppNext
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.psp.ppNext.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
wctx.buf = wctx.buf[:0]
}
type pipeSortProcessorShardsHeap []*pipeSortProcessorShard
func (sh *pipeSortProcessorShardsHeap) Len() int {
return len(*sh)
}
func (sh *pipeSortProcessorShardsHeap) Swap(i, j int) {
a := *sh
a[i], a[j] = a[j], a[i]
}
func (sh *pipeSortProcessorShardsHeap) Less(i, j int) bool {
a := *sh
shardA := a[i]
shardB := a[j]
return sortBlockLess(shardA, shardA.rowRefNext, shardB, shardB.rowRefNext)
}
func (sh *pipeSortProcessorShardsHeap) Push(x any) {
shard := x.(*pipeSortProcessorShard)
*sh = append(*sh, shard)
}
func (sh *pipeSortProcessorShardsHeap) Pop() any {
a := *sh
x := a[len(a)-1]
a[len(a)-1] = nil
*sh = a[:len(a)-1]
return x
}
func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSortProcessorShard, rowIdxB int) bool {
byFields := shardA.ps.byFields
rrA := shardA.rowRefs[rowIdxA]
rrB := shardB.rowRefs[rowIdxB]
bA := &shardA.blocks[rrA.blockIdx]
bB := &shardB.blocks[rrB.blockIdx]
for idx := range bA.byColumns {
cA := &bA.byColumns[idx]
cB := &bB.byColumns[idx]
isDesc := len(byFields) > 0 && byFields[idx].isDesc
if shardA.ps.isDesc {
isDesc = !isDesc
}
if cA.c.isConst && cB.c.isConst {
// Fast path - compare const values
ccA := cA.c.valuesEncoded[0]
ccB := cB.c.valuesEncoded[0]
if ccA == ccB {
continue
}
if isDesc {
return ccB < ccA
}
return ccA < ccB
}
if cA.c.isTime && cB.c.isTime {
// Fast path - sort by _time
timestampsA := bA.br.getTimestamps()
timestampsB := bB.br.getTimestamps()
tA := timestampsA[rrA.rowIdx]
tB := timestampsB[rrB.rowIdx]
if tA == tB {
continue
}
if isDesc {
return tB < tA
}
return tA < tB
}
if cA.c.isTime {
// treat timestamps as smaller than other values
return true
}
if cB.c.isTime {
// treat timestamps as smaller than other values
return false
}
// Try sorting by int64 values at first
uA := cA.getI64ValueAtRow(rrA.rowIdx)
uB := cB.getI64ValueAtRow(rrB.rowIdx)
if uA != 0 && uB != 0 {
if uA == uB {
continue
}
if isDesc {
return uB < uA
}
return uA < uB
}
// Try sorting by float64 then
fA := cA.getF64ValueAtRow(rrA.rowIdx)
fB := cB.getF64ValueAtRow(rrB.rowIdx)
if !math.IsNaN(fA) && !math.IsNaN(fB) {
if fA == fB {
continue
}
if isDesc {
return fB < fA
}
return fA < fB
}
// Fall back to string sorting
sA := cA.c.getValueAtRow(bA.br, rrA.rowIdx)
sB := cB.c.getValueAtRow(bB.br, rrB.rowIdx)
if sA == sB {
continue
}
if isDesc {
return lessString(sB, sA)
}
return lessString(sA, sB)
}
return false
}
func parsePipeSort(lex *lexer) (*pipeSort, error) {
if !lex.isKeyword("sort") && !lex.isKeyword("order") {
return nil, fmt.Errorf("expecting 'sort' or 'order'; got %q", lex.token)
}
lex.nextToken()
var ps pipeSort
if lex.isKeyword("by", "(") {
if lex.isKeyword("by") {
lex.nextToken()
}
bfs, err := parseBySortFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by' clause: %w", err)
}
ps.byFields = bfs
}
switch {
case lex.isKeyword("desc"):
lex.nextToken()
ps.isDesc = true
case lex.isKeyword("asc"):
lex.nextToken()
}
for {
switch {
case lex.isKeyword("offset"):
lex.nextToken()
s := lex.token
n, ok := tryParseUint64(s)
lex.nextToken()
if !ok {
return nil, fmt.Errorf("cannot parse 'offset %s'", s)
}
if ps.offset > 0 {
return nil, fmt.Errorf("duplicate 'offset'; the previous one is %d; the new one is %s", ps.offset, s)
}
ps.offset = n
case lex.isKeyword("limit"):
lex.nextToken()
s := lex.token
n, ok := tryParseUint64(s)
lex.nextToken()
if !ok {
return nil, fmt.Errorf("cannot parse 'limit %s'", s)
}
if ps.limit > 0 {
return nil, fmt.Errorf("duplicate 'limit'; the previous one is %d; the new one is %s", ps.limit, s)
}
ps.limit = n
case lex.isKeyword("rank"):
rankFieldName, err := parseRankFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot read rank field name: %s", err)
}
ps.rankFieldName = rankFieldName
default:
return &ps, nil
}
}
}
// bySortField represents 'by (...)' part of the pipeSort.
type bySortField struct {
// the name of the field to sort
name string
// whether the sorting for the given field in descending order
isDesc bool
}
func (bf *bySortField) String() string {
s := quoteTokenIfNeeded(bf.name)
if bf.isDesc {
s += " desc"
}
return s
}
func parseBySortFields(lex *lexer) ([]*bySortField, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing `(`")
}
var bfs []*bySortField
for {
lex.nextToken()
if lex.isKeyword(")") {
lex.nextToken()
return bfs, nil
}
fieldName, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name: %w", err)
}
bf := &bySortField{
name: fieldName,
}
switch {
case lex.isKeyword("desc"):
lex.nextToken()
bf.isDesc = true
case lex.isKeyword("asc"):
lex.nextToken()
}
bfs = append(bfs, bf)
switch {
case lex.isKeyword(")"):
lex.nextToken()
return bfs, nil
case lex.isKeyword(","):
default:
return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token)
}
}
}
func tryParseInt64(s string) (int64, bool) {
if len(s) == 0 {
return 0, false
}
isMinus := s[0] == '-'
if isMinus {
s = s[1:]
}
u64, ok := tryParseUint64(s)
if !ok {
return 0, false
}
if !isMinus {
if u64 > math.MaxInt64 {
return 0, false
}
return int64(u64), true
}
if u64 > -math.MinInt64 {
return 0, false
}
return -int64(u64), true
}
func marshalJSONKeyValue(dst []byte, k, v string) []byte {
dst = quicktemplate.AppendJSONString(dst, k, true)
dst = append(dst, ':')
dst = quicktemplate.AppendJSONString(dst, v, true)
return dst
}