mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 16:20:05 +01:00
55ecf4f766
This pipe is useful for debugging purposes when the number of processed blocks must be calculated for the given query: <query> | blocks_count This helps detecting the root cause of query performance slowdown in cases like https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070
137 lines
3.1 KiB
Go
137 lines
3.1 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"unsafe"
|
|
)
|
|
|
|
// pipeBlocksCount processes '| blocks_count' pipe.
|
|
//
|
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#blocks_count-pipe
|
|
type pipeBlocksCount struct {
|
|
// resultName is an optional name of the column to write results to.
|
|
// By default results are written into 'blocks_count' column.
|
|
resultName string
|
|
}
|
|
|
|
func (pc *pipeBlocksCount) String() string {
|
|
s := "blocks_count"
|
|
if pc.resultName != "blocks_count" {
|
|
s += " as " + quoteTokenIfNeeded(pc.resultName)
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (pc *pipeBlocksCount) canLiveTail() bool {
|
|
return false
|
|
}
|
|
|
|
func (pc *pipeBlocksCount) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
|
neededFields.reset()
|
|
unneededFields.reset()
|
|
}
|
|
|
|
func (pc *pipeBlocksCount) optimize() {
|
|
// nothing to do
|
|
}
|
|
|
|
func (pc *pipeBlocksCount) hasFilterInWithQuery() bool {
|
|
return false
|
|
}
|
|
|
|
func (pc *pipeBlocksCount) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
|
|
return pc, nil
|
|
}
|
|
|
|
func (pc *pipeBlocksCount) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
|
shards := make([]pipeBlocksCountProcessorShard, workersCount)
|
|
|
|
pcp := &pipeBlocksCountProcessor{
|
|
pc: pc,
|
|
stopCh: stopCh,
|
|
ppNext: ppNext,
|
|
|
|
shards: shards,
|
|
}
|
|
return pcp
|
|
}
|
|
|
|
type pipeBlocksCountProcessor struct {
|
|
pc *pipeBlocksCount
|
|
stopCh <-chan struct{}
|
|
ppNext pipeProcessor
|
|
|
|
shards []pipeBlocksCountProcessorShard
|
|
}
|
|
|
|
type pipeBlocksCountProcessorShard struct {
|
|
pipeBlocksCountProcessorShardNopad
|
|
|
|
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
|
_ [128 - unsafe.Sizeof(pipeBlocksCountProcessorShardNopad{})%128]byte
|
|
}
|
|
|
|
type pipeBlocksCountProcessorShardNopad struct {
|
|
blocksCount uint64
|
|
}
|
|
|
|
func (pcp *pipeBlocksCountProcessor) writeBlock(workerID uint, _ *blockResult) {
|
|
shard := &pcp.shards[workerID]
|
|
shard.blocksCount++
|
|
}
|
|
|
|
func (pcp *pipeBlocksCountProcessor) flush() error {
|
|
if needStop(pcp.stopCh) {
|
|
return nil
|
|
}
|
|
|
|
// merge state across shards
|
|
shards := pcp.shards
|
|
blocksCount := shards[0].blocksCount
|
|
shards = shards[1:]
|
|
for i := range shards {
|
|
blocksCount += shards[i].blocksCount
|
|
}
|
|
|
|
// write result
|
|
rowsCountStr := string(marshalUint64String(nil, blocksCount))
|
|
|
|
rcs := [1]resultColumn{}
|
|
rcs[0].name = pcp.pc.resultName
|
|
rcs[0].addValue(rowsCountStr)
|
|
|
|
var br blockResult
|
|
br.setResultColumns(rcs[:], 1)
|
|
pcp.ppNext.writeBlock(0, &br)
|
|
|
|
return nil
|
|
}
|
|
|
|
func parsePipeBlocksCount(lex *lexer) (*pipeBlocksCount, error) {
|
|
if !lex.isKeyword("blocks_count") {
|
|
return nil, fmt.Errorf("expecting 'blocks_count'; got %q", lex.token)
|
|
}
|
|
lex.nextToken()
|
|
|
|
resultName := "blocks_count"
|
|
if lex.isKeyword("as") {
|
|
lex.nextToken()
|
|
name, err := parseFieldName(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)
|
|
}
|
|
resultName = name
|
|
} else if !lex.isKeyword("", "|") {
|
|
name, err := parseFieldName(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)
|
|
}
|
|
resultName = name
|
|
}
|
|
|
|
pc := &pipeBlocksCount{
|
|
resultName: resultName,
|
|
}
|
|
return pc, nil
|
|
}
|