VictoriaMetrics/lib/logstorage/pipe_topk.go
2024-05-15 04:55:44 +02:00

554 lines
12 KiB
Go

package logstorage
import (
"container/heap"
"fmt"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
)
func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeTopkProcessorShard, workersCount)
for i := range shards {
shard := &shards[i]
shard.ps = ps
shard.stateSizeBudget = stateSizeBudgetChunk
maxStateSize -= stateSizeBudgetChunk
}
ptp := &pipeTopkProcessor{
ps: ps,
stopCh: stopCh,
cancel: cancel,
ppBase: ppBase,
shards: shards,
maxStateSize: maxStateSize,
}
ptp.stateSizeBudget.Store(maxStateSize)
return ptp
}
type pipeTopkProcessor struct {
ps *pipeSort
stopCh <-chan struct{}
cancel func()
ppBase pipeProcessor
shards []pipeTopkProcessorShard
maxStateSize int64
stateSizeBudget atomic.Int64
}
type pipeTopkProcessorShard struct {
pipeTopkProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeTopkProcessorShardNopad{})%128]byte
}
type pipeTopkProcessorShardNopad struct {
// ps points to the parent pipeSort.
ps *pipeSort
// rows contains rows tracked by the given shard.
rows []*pipeTopkRow
// rowNext points to the next index at rows during merge shards phase
rowNext int
// tmpRow is used as a temporary row when determining whether the next ingested row must be stored in the shard.
tmpRow pipeTopkRow
// these are aux fields for determining whether the next row must be stored in rows.
byColumnValues [][]string
otherColumnValues []pipeTopkOtherColumn
byColumns []string
otherColumns []Field
// stateSizeBudget is the remaining budget for the whole state size for the shard.
// The per-shard budget is provided in chunks from the parent pipeTopkProcessor.
stateSizeBudget int
}
type pipeTopkRow struct {
byColumns []string
otherColumns []Field
}
type pipeTopkOtherColumn struct {
name string
values []string
}
func (r *pipeTopkRow) clone() *pipeTopkRow {
byColumnsCopy := make([]string, len(r.byColumns))
for i := range byColumnsCopy {
byColumnsCopy[i] = strings.Clone(r.byColumns[i])
}
otherColumnsCopy := make([]Field, len(r.otherColumns))
for i := range otherColumnsCopy {
src := &r.otherColumns[i]
dst := &otherColumnsCopy[i]
dst.Name = strings.Clone(src.Name)
dst.Value = strings.Clone(src.Value)
}
return &pipeTopkRow{
byColumns: byColumnsCopy,
otherColumns: otherColumnsCopy,
}
}
func (r *pipeTopkRow) sizeBytes() int {
n := int(unsafe.Sizeof(*r))
for _, v := range r.byColumns {
n += len(v)
}
n += len(r.byColumns) * int(unsafe.Sizeof(r.byColumns[0]))
for _, f := range r.otherColumns {
n += len(f.Name) + len(f.Value)
}
n += len(r.otherColumns) * int(unsafe.Sizeof(r.otherColumns[0]))
return n
}
func (shard *pipeTopkProcessorShard) Len() int {
return len(shard.rows)
}
func (shard *pipeTopkProcessorShard) Swap(i, j int) {
rows := shard.rows
rows[i], rows[j] = rows[j], rows[i]
}
func (shard *pipeTopkProcessorShard) Less(i, j int) bool {
rows := shard.rows
// This is max heap
return topkLess(shard.ps, rows[j], rows[i])
}
func (shard *pipeTopkProcessorShard) Push(x any) {
r := x.(*pipeTopkRow)
shard.rows = append(shard.rows, r)
}
func (shard *pipeTopkProcessorShard) Pop() any {
rows := shard.rows
x := rows[len(rows)-1]
rows[len(rows)-1] = nil
shard.rows = rows[:len(rows)-1]
return x
}
// writeBlock writes br to shard.
func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
cs := br.getColumns()
byFields := shard.ps.byFields
if len(byFields) == 0 {
// Sort by all the fields
byColumnValues := shard.byColumnValues[:0]
for _, c := range cs {
byColumnValues = append(byColumnValues, c.getValues(br))
}
shard.byColumnValues = byColumnValues
byColumns := shard.byColumns[:0]
otherColumns := shard.otherColumns[:0]
bb := bbPool.Get()
for rowIdx := range br.timestamps {
byColumns = byColumns[:0]
bb.B = bb.B[:0]
for i, values := range byColumnValues {
v := values[rowIdx]
bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v)
bb.B = append(bb.B, ',')
}
byColumns = append(byColumns, bytesutil.ToUnsafeString(bb.B))
otherColumns = otherColumns[:0]
for i, values := range byColumnValues {
otherColumns = append(otherColumns, Field{
Name: cs[i].name,
Value: values[rowIdx],
})
}
shard.addRow(byColumns, otherColumns)
}
bbPool.Put(bb)
shard.byColumns = byColumns
shard.otherColumns = otherColumns
} else {
// Sort by byFields
byColumnValues := shard.byColumnValues[:0]
for _, bf := range byFields {
c := br.getColumnByName(bf.name)
byColumnValues = append(byColumnValues, c.getValues(br))
}
shard.byColumnValues = byColumnValues
otherColumnValues := shard.otherColumnValues[:0]
for _, c := range cs {
isByField := false
for _, bf := range byFields {
if bf.name == c.name {
isByField = true
break
}
}
if !isByField {
otherColumnValues = append(otherColumnValues, pipeTopkOtherColumn{
name: c.name,
values: c.getValues(br),
})
}
}
shard.otherColumnValues = otherColumnValues
// add rows to shard
byColumns := shard.byColumns[:0]
otherColumns := shard.otherColumns[:0]
for rowIdx := range br.timestamps {
byColumns = byColumns[:0]
for _, values := range byColumnValues {
byColumns = append(byColumns, values[rowIdx])
}
otherColumns = otherColumns[:0]
for _, ocv := range otherColumnValues {
otherColumns = append(otherColumns, Field{
Name: ocv.name,
Value: ocv.values[rowIdx],
})
}
shard.addRow(byColumns, otherColumns)
}
shard.byColumns = byColumns
shard.otherColumns = otherColumns
}
}
func (shard *pipeTopkProcessorShard) addRow(byColumns []string, otherColumns []Field) {
r := &shard.tmpRow
r.byColumns = byColumns
r.otherColumns = otherColumns
rows := shard.rows
if len(rows) > 0 && !topkLess(shard.ps, r, rows[0]) {
// Fast path - nothing to add.
return
}
// Slow path - add r to shard.rows.
r = r.clone()
shard.stateSizeBudget -= r.sizeBytes()
if uint64(len(rows)) < shard.ps.limit {
heap.Push(shard, r)
shard.stateSizeBudget -= int(unsafe.Sizeof(r))
} else {
shard.stateSizeBudget += rows[0].sizeBytes()
rows[0] = r
heap.Fix(shard, 0)
}
}
func (shard *pipeTopkProcessorShard) sortRows(stopCh <-chan struct{}) {
rows := shard.rows
for i := len(rows) - 1; i > 0; i-- {
x := heap.Pop(shard)
rows[i] = x.(*pipeTopkRow)
if needStop(stopCh) {
return
}
}
shard.rows = rows
}
func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &ptp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
ptp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
shard.writeBlock(br)
}
func (ptp *pipeTopkProcessor) flush() error {
if n := ptp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.ps.String(), ptp.maxStateSize/(1<<20))
}
if needStop(ptp.stopCh) {
return nil
}
// Sort every shard in parallel
var wg sync.WaitGroup
shards := ptp.shards
for i := range shards {
wg.Add(1)
go func(shard *pipeTopkProcessorShard) {
shard.sortRows(ptp.stopCh)
wg.Done()
}(&shards[i])
}
wg.Wait()
if needStop(ptp.stopCh) {
return nil
}
// Merge sorted results across shards
sh := pipeTopkProcessorShardsHeap(make([]*pipeTopkProcessorShard, 0, len(shards)))
for i := range shards {
shard := &shards[i]
if len(shard.rows) > 0 {
sh = append(sh, shard)
}
}
if len(sh) == 0 {
return nil
}
heap.Init(&sh)
wctx := &pipeTopkWriteContext{
ptp: ptp,
}
shardNextIdx := 0
for len(sh) > 1 {
shard := sh[0]
if !wctx.writeNextRow(shard) {
break
}
if shard.rowNext >= len(shard.rows) {
_ = heap.Pop(&sh)
shardNextIdx = 0
if needStop(ptp.stopCh) {
return nil
}
continue
}
if shardNextIdx == 0 {
shardNextIdx = 1
if len(sh) > 2 && sh.Less(2, 1) {
shardNextIdx = 2
}
}
if sh.Less(shardNextIdx, 0) {
heap.Fix(&sh, 0)
shardNextIdx = 0
if needStop(ptp.stopCh) {
return nil
}
}
}
if len(sh) == 1 {
shard := sh[0]
for shard.rowNext < len(shard.rows) {
if !wctx.writeNextRow(shard) {
break
}
}
}
wctx.flush()
return nil
}
type pipeTopkWriteContext struct {
ptp *pipeTopkProcessor
rcs []resultColumn
br blockResult
rowsWritten uint64
valuesLen int
}
func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool {
ps := shard.ps
rowIdx := shard.rowNext
shard.rowNext++
wctx.rowsWritten++
if wctx.rowsWritten <= ps.offset {
return true
}
if wctx.rowsWritten > ps.offset+ps.limit {
return false
}
r := shard.rows[rowIdx]
byFields := ps.byFields
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(byFields)+len(r.otherColumns)
if areEqualColumns {
for i, c := range r.otherColumns {
if rcs[len(byFields)+i].name != c.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, bf := range byFields {
rcs = append(rcs, resultColumn{
name: bf.name,
})
}
for _, c := range r.otherColumns {
rcs = append(rcs, resultColumn{
name: c.Name,
})
}
wctx.rcs = rcs
}
byColumns := r.byColumns
for i := range byFields {
v := byColumns[i]
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
for i, c := range r.otherColumns {
v := c.Value
rcs[len(byFields)+i].addValue(v)
wctx.valuesLen += len(v)
}
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
return true
}
func (wctx *pipeTopkWriteContext) flush() {
rcs := wctx.rcs
br := &wctx.br
wctx.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br.setResultColumns(rcs)
wctx.ptp.ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetKeepName()
}
}
type pipeTopkProcessorShardsHeap []*pipeTopkProcessorShard
func (sh *pipeTopkProcessorShardsHeap) Len() int {
return len(*sh)
}
func (sh *pipeTopkProcessorShardsHeap) Swap(i, j int) {
a := *sh
a[i], a[j] = a[j], a[i]
}
func (sh *pipeTopkProcessorShardsHeap) Less(i, j int) bool {
a := *sh
shardA := a[i]
shardB := a[j]
return topkLess(shardA.ps, shardA.rows[shardA.rowNext], shardB.rows[shardB.rowNext])
}
func (sh *pipeTopkProcessorShardsHeap) Push(x any) {
shard := x.(*pipeTopkProcessorShard)
*sh = append(*sh, shard)
}
func (sh *pipeTopkProcessorShardsHeap) Pop() any {
a := *sh
x := a[len(a)-1]
a[len(a)-1] = nil
*sh = a[:len(a)-1]
return x
}
func topkLess(ps *pipeSort, a, b *pipeTopkRow) bool {
byFields := ps.byFields
csA := a.byColumns
csB := b.byColumns
for k := range csA {
isDesc := ps.isDesc
if len(byFields) > 0 && byFields[k].isDesc {
isDesc = !isDesc
}
vA := csA[k]
vB := csB[k]
if vA == vB {
continue
}
if isDesc {
return stringsutil.LessNatural(vB, vA)
}
return stringsutil.LessNatural(vA, vB)
}
return false
}