VictoriaMetrics/lib/logstorage/pipe_pack.go

115 lines
2.4 KiB
Go
Raw Normal View History

2024-06-05 03:18:12 +02:00
package logstorage
import (
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
func updateNeededFieldsForPipePack(neededFields, unneededFields fieldsSet, resultField string, fields []string) {
if neededFields.contains("*") {
if !unneededFields.contains(resultField) {
if len(fields) > 0 {
unneededFields.removeFields(fields)
} else {
unneededFields.reset()
}
}
} else {
if neededFields.contains(resultField) {
neededFields.remove(resultField)
if len(fields) > 0 {
neededFields.addFields(fields)
} else {
neededFields.add("*")
}
}
}
}
func newPipePackProcessor(workersCount int, ppNext pipeProcessor, resultField string, fields []string, marshalFields func(dst []byte, fields []Field) []byte) pipeProcessor {
return &pipePackProcessor{
ppNext: ppNext,
resultField: resultField,
fields: fields,
marshalFields: marshalFields,
shards: make([]pipePackProcessorShard, workersCount),
}
}
type pipePackProcessor struct {
ppNext pipeProcessor
resultField string
fields []string
marshalFields func(dst []byte, fields []Field) []byte
shards []pipePackProcessorShard
}
type pipePackProcessorShard struct {
pipePackProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipePackProcessorShardNopad{})%128]byte
}
type pipePackProcessorShardNopad struct {
rc resultColumn
buf []byte
fields []Field
cs []*blockResultColumn
}
func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &ppp.shards[workerID]
shard.rc.name = ppp.resultField
cs := shard.cs[:0]
if len(ppp.fields) == 0 {
csAll := br.getColumns()
cs = append(cs, csAll...)
} else {
for _, f := range ppp.fields {
c := br.getColumnByName(f)
cs = append(cs, c)
}
}
shard.cs = cs
buf := shard.buf[:0]
fields := shard.fields
for rowIdx := range br.timestamps {
fields = fields[:0]
for _, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: c.name,
Value: v,
})
}
bufLen := len(buf)
buf = ppp.marshalFields(buf, fields)
v := bytesutil.ToUnsafeString(buf[bufLen:])
shard.rc.addValue(v)
}
shard.fields = fields
br.addResultColumn(&shard.rc)
ppp.ppNext.writeBlock(workerID, br)
shard.rc.reset()
}
func (ppp *pipePackProcessor) flush() error {
return nil
}