mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-11 20:52:24 +01:00
2f28819bb1
`| unroll` pipe can make multiple copies of rows from the input row. This doesn't break live tailing, so allow `| unroll` pipe in live tailing.
285 lines
6.2 KiB
Go
285 lines
6.2 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"slices"
|
|
"unsafe"
|
|
|
|
"github.com/valyala/fastjson"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
|
)
|
|
|
|
// pipeUnroll processes '| unroll ...' pipe.
|
|
//
|
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe
|
|
type pipeUnroll struct {
|
|
// fields to unroll
|
|
fields []string
|
|
|
|
// iff is an optional filter for skipping the unroll
|
|
iff *ifFilter
|
|
}
|
|
|
|
func (pu *pipeUnroll) String() string {
|
|
s := "unroll"
|
|
if pu.iff != nil {
|
|
s += " " + pu.iff.String()
|
|
}
|
|
s += " by (" + fieldNamesString(pu.fields) + ")"
|
|
return s
|
|
}
|
|
|
|
func (pu *pipeUnroll) canLiveTail() bool {
|
|
return true
|
|
}
|
|
|
|
func (pu *pipeUnroll) optimize() {
|
|
pu.iff.optimizeFilterIn()
|
|
}
|
|
|
|
func (pu *pipeUnroll) hasFilterInWithQuery() bool {
|
|
return pu.iff.hasFilterInWithQuery()
|
|
}
|
|
|
|
func (pu *pipeUnroll) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
|
|
iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
puNew := *pu
|
|
puNew.iff = iffNew
|
|
return &puNew, nil
|
|
}
|
|
|
|
func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
|
if neededFields.contains("*") {
|
|
if pu.iff != nil {
|
|
unneededFields.removeFields(pu.iff.neededFields)
|
|
}
|
|
unneededFields.removeFields(pu.fields)
|
|
} else {
|
|
if pu.iff != nil {
|
|
neededFields.addFields(pu.iff.neededFields)
|
|
}
|
|
neededFields.addFields(pu.fields)
|
|
}
|
|
}
|
|
|
|
func (pu *pipeUnroll) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
|
return &pipeUnrollProcessor{
|
|
pu: pu,
|
|
stopCh: stopCh,
|
|
ppNext: ppNext,
|
|
|
|
shards: make([]pipeUnrollProcessorShard, workersCount),
|
|
}
|
|
}
|
|
|
|
type pipeUnrollProcessor struct {
|
|
pu *pipeUnroll
|
|
stopCh <-chan struct{}
|
|
ppNext pipeProcessor
|
|
|
|
shards []pipeUnrollProcessorShard
|
|
}
|
|
|
|
type pipeUnrollProcessorShard struct {
|
|
pipeUnrollProcessorShardNopad
|
|
|
|
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
|
_ [128 - unsafe.Sizeof(pipeUnrollProcessorShardNopad{})%128]byte
|
|
}
|
|
|
|
type pipeUnrollProcessorShardNopad struct {
|
|
bm bitmap
|
|
|
|
wctx pipeUnpackWriteContext
|
|
a arena
|
|
|
|
columnValues [][]string
|
|
unrolledValues [][]string
|
|
valuesBuf []string
|
|
fields []Field
|
|
}
|
|
|
|
func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) {
|
|
if len(br.timestamps) == 0 {
|
|
return
|
|
}
|
|
|
|
pu := pup.pu
|
|
shard := &pup.shards[workerID]
|
|
shard.wctx.init(workerID, pup.ppNext, false, false, br)
|
|
|
|
bm := &shard.bm
|
|
bm.init(len(br.timestamps))
|
|
bm.setBits()
|
|
if iff := pu.iff; iff != nil {
|
|
iff.f.applyToBlockResult(br, bm)
|
|
if bm.isZero() {
|
|
pup.ppNext.writeBlock(workerID, br)
|
|
return
|
|
}
|
|
}
|
|
|
|
shard.columnValues = slicesutil.SetLength(shard.columnValues, len(pu.fields))
|
|
columnValues := shard.columnValues
|
|
for i, f := range pu.fields {
|
|
c := br.getColumnByName(f)
|
|
columnValues[i] = c.getValues(br)
|
|
}
|
|
|
|
fields := shard.fields
|
|
for rowIdx := range br.timestamps {
|
|
if bm.isSetBit(rowIdx) {
|
|
if needStop(pup.stopCh) {
|
|
return
|
|
}
|
|
shard.writeUnrolledFields(pu.fields, columnValues, rowIdx)
|
|
} else {
|
|
fields = fields[:0]
|
|
for i, f := range pu.fields {
|
|
v := columnValues[i][rowIdx]
|
|
fields = append(fields, Field{
|
|
Name: f,
|
|
Value: v,
|
|
})
|
|
}
|
|
shard.wctx.writeRow(rowIdx, fields)
|
|
}
|
|
}
|
|
|
|
shard.wctx.flush()
|
|
shard.wctx.reset()
|
|
shard.a.reset()
|
|
}
|
|
|
|
func (shard *pipeUnrollProcessorShard) writeUnrolledFields(fieldNames []string, columnValues [][]string, rowIdx int) {
|
|
// unroll values at rowIdx row
|
|
|
|
shard.unrolledValues = slicesutil.SetLength(shard.unrolledValues, len(columnValues))
|
|
unrolledValues := shard.unrolledValues
|
|
|
|
valuesBuf := shard.valuesBuf[:0]
|
|
for i, values := range columnValues {
|
|
v := values[rowIdx]
|
|
valuesBufLen := len(valuesBuf)
|
|
valuesBuf = unpackJSONArray(valuesBuf, &shard.a, v)
|
|
unrolledValues[i] = valuesBuf[valuesBufLen:]
|
|
}
|
|
shard.valuesBuf = valuesBuf
|
|
|
|
// find the number of rows across unrolled values
|
|
rows := len(unrolledValues[0])
|
|
for _, values := range unrolledValues[1:] {
|
|
if len(values) > rows {
|
|
rows = len(values)
|
|
}
|
|
}
|
|
if rows == 0 {
|
|
// Unroll too a single row with empty unrolled values.
|
|
rows = 1
|
|
}
|
|
|
|
// write unrolled values to the next pipe.
|
|
fields := shard.fields
|
|
for unrollIdx := 0; unrollIdx < rows; unrollIdx++ {
|
|
fields = fields[:0]
|
|
for i, values := range unrolledValues {
|
|
v := ""
|
|
if unrollIdx < len(values) {
|
|
v = values[unrollIdx]
|
|
}
|
|
fields = append(fields, Field{
|
|
Name: fieldNames[i],
|
|
Value: v,
|
|
})
|
|
}
|
|
shard.wctx.writeRow(rowIdx, fields)
|
|
}
|
|
shard.fields = fields
|
|
}
|
|
|
|
func (pup *pipeUnrollProcessor) flush() error {
|
|
return nil
|
|
}
|
|
|
|
func parsePipeUnroll(lex *lexer) (*pipeUnroll, error) {
|
|
if !lex.isKeyword("unroll") {
|
|
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unroll")
|
|
}
|
|
lex.nextToken()
|
|
|
|
// parse optional if (...)
|
|
var iff *ifFilter
|
|
if lex.isKeyword("if") {
|
|
f, err := parseIfFilter(lex)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
iff = f
|
|
}
|
|
|
|
// parse by (...)
|
|
if lex.isKeyword("by") {
|
|
lex.nextToken()
|
|
}
|
|
|
|
fields, err := parseFieldNamesInParens(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'by(...)' at 'unroll': %w", err)
|
|
}
|
|
if len(fields) == 0 {
|
|
return nil, fmt.Errorf("'by(...)' at 'unroll' must contain at least a single field")
|
|
}
|
|
if slices.Contains(fields, "*") {
|
|
return nil, fmt.Errorf("unroll by '*' isn't supported")
|
|
}
|
|
|
|
pu := &pipeUnroll{
|
|
fields: fields,
|
|
iff: iff,
|
|
}
|
|
|
|
return pu, nil
|
|
}
|
|
|
|
func unpackJSONArray(dst []string, a *arena, s string) []string {
|
|
if s == "" || s[0] != '[' {
|
|
return dst
|
|
}
|
|
|
|
p := jspp.Get()
|
|
defer jspp.Put(p)
|
|
|
|
jsv, err := p.Parse(s)
|
|
if err != nil {
|
|
return dst
|
|
}
|
|
jsa, err := jsv.Array()
|
|
if err != nil {
|
|
return dst
|
|
}
|
|
for _, jsv := range jsa {
|
|
if jsv.Type() == fastjson.TypeString {
|
|
sb, err := jsv.StringBytes()
|
|
if err != nil {
|
|
logger.Panicf("BUG: unexpected error returned from StringBytes(): %s", err)
|
|
}
|
|
v := a.copyBytesToString(sb)
|
|
dst = append(dst, v)
|
|
} else {
|
|
bLen := len(a.b)
|
|
a.b = jsv.MarshalTo(a.b)
|
|
v := bytesutil.ToUnsafeString(a.b[bLen:])
|
|
dst = append(dst, v)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
var jspp fastjson.ParserPool
|