lib/logstorage: add block_stats pipe for analyzing per-block storage stats

(cherry picked from commit 5ed54ebadf)
This commit is contained in:
Aliaksandr Valialkin 2024-11-06 18:07:52 +01:00 committed by hagen1778
parent 83c9d42263
commit 7a39f526ec
No known key found for this signature in database
GPG Key ID: E92986095E0DD614
11 changed files with 366 additions and 7 deletions

View File

@ -18,7 +18,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: add [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe), which can be used for performing SQL-like joins. * FEATURE: add [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe), which can be used for performing SQL-like joins.
* FEATURE: support returning historical logs from [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) via `start_offset` query arg. For example, request to `/select/logsql/tail?query=*&start_offset=5m` returns logs for the last 5 minutes before starting returning live tailing logs for the given `query`. * FEATURE: support returning historical logs from [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) via `start_offset` query arg. For example, request to `/select/logsql/tail?query=*&start_offset=5m` returns logs for the last 5 minutes before starting returning live tailing logs for the given `query`.
* FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters). * FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters).
* BUGFIX: Properly parse structured metadata when ingesting logs with Loki ingestion protocol. An issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details. * FEATURE: add [`block_stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe) for returning various per-block stats. This pipe is useful for debugging.
* BUGFIX: properly parse structured metadata when ingesting logs with [Loki ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). The issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details.
## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs) ## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs)

View File

@ -1293,6 +1293,7 @@ _time:5m | stats by (_stream) count() per_stream_logs | sort by (per_stream_logs
LogsQL supports the following pipes: LogsQL supports the following pipes:
- [`block_stats`](#block_stats-pipe) returns various stats for the selected blocks with logs.
- [`blocks_count`](#blocks_count-pipe) counts the number of blocks with logs processed by the query. - [`blocks_count`](#blocks_count-pipe) counts the number of blocks with logs processed by the query.
- [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -1325,10 +1326,34 @@ LogsQL supports the following pipes:
- [`unpack_syslog`](#unpack_syslog-pipe) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_syslog`](#unpack_syslog-pipe) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
### block_stats pipe
`<q> | block_stats` [pipe](#pipes) returns the following stats per each block processed by `<q>`. This pipe is needed mostly for debugging.
The returned per-block stats:
- `field` - [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) name
- `rows` - the number of rows at the given `field.
- `type` - internal storage type for the given `field`
- `values_bytes` - on-disk size of the data for the given `field`
- `bloom_bytes` - on-disk size of bloom filter data for the given `field`
- `dict_bytes` - on-disk size of the dictionary data for the given `field`
- `dict_items` - the number of unique values in the dictionary for the given `field`
See also:
- [`blocks_count` pipe](#blocks_count-pipe)
- [`len` pipe](#len-pipe)
### blocks_count pipe ### blocks_count pipe
`<q> | blocks_count` [pipe](#pipes) counts the number of blocks with logs processed by `<q>`. This pipe is needed mostly for debugging. `<q> | blocks_count` [pipe](#pipes) counts the number of blocks with logs processed by `<q>`. This pipe is needed mostly for debugging.
See also:
- [`block_stats` pipe](#block_stats-pipe)
- [`len` pipe](#len-pipe)
### copy pipe ### copy pipe
If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be copied, then `| copy src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used. If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be copied, then `| copy src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used.
@ -1820,6 +1845,7 @@ See also:
- [`sum_len` stats function](#sum_len-stats) - [`sum_len` stats function](#sum_len-stats)
- [`sort` pipe](#sort-pipe) - [`sort` pipe](#sort-pipe)
- [`limit` pipe](#limit-pipe) - [`limit` pipe](#limit-pipe)
- [`block_stats` pipe](#block_stats-pipe)
### limit pipe ### limit pipe

View File

@ -243,14 +243,25 @@ func marshalBytesBlock(dst, src []byte) []byte {
// Compress the block // Compress the block
dst = append(dst, marshalBytesTypeZSTD) dst = append(dst, marshalBytesTypeZSTD)
compressLevel := getCompressLevel(len(src))
bb := bbPool.Get() bb := bbPool.Get()
bb.B = encoding.CompressZSTDLevel(bb.B[:0], src, 1) bb.B = encoding.CompressZSTDLevel(bb.B[:0], src, compressLevel)
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
dst = append(dst, bb.B...) dst = append(dst, bb.B...)
bbPool.Put(bb) bbPool.Put(bb)
return dst return dst
} }
func getCompressLevel(dataLen int) int {
if dataLen <= 512 {
return 1
}
if dataLen <= 4*1024 {
return 2
}
return 3
}
func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) { func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) {
if len(src) < 1 { if len(src) < 1 {
return dst, src, fmt.Errorf("cannot unmarshal block type from empty src") return dst, src, fmt.Errorf("cannot unmarshal block type from empty src")

View File

@ -349,9 +349,11 @@ func (q *Query) Clone(timestamp int64) *Query {
func (q *Query) CanReturnLastNResults() bool { func (q *Query) CanReturnLastNResults() bool {
for _, p := range q.pipes { for _, p := range q.pipes {
switch p.(type) { switch p.(type) {
case *pipeBlocksCount, case *pipeBlockStats,
*pipeBlocksCount,
*pipeFieldNames, *pipeFieldNames,
*pipeFieldValues, *pipeFieldValues,
*pipeJoin,
*pipeLimit, *pipeLimit,
*pipeOffset, *pipeOffset,
*pipeTop, *pipeTop,

View File

@ -982,6 +982,9 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | field_values x`, `* | field_values x`) f(`* | field_values x`, `* | field_values x`)
f(`* | field_values (x)`, `* | field_values x`) f(`* | field_values (x)`, `* | field_values x`)
// block_stats pipe
f(`foo | block_stats`, `foo | block_stats`)
// blocks_count pipe // blocks_count pipe
f(`foo | blocks_count as x`, `foo | blocks_count as x`) f(`foo | blocks_count as x`, `foo | blocks_count as x`)
f(`foo | blocks_count y`, `foo | blocks_count as y`) f(`foo | blocks_count y`, `foo | blocks_count as y`)
@ -1222,6 +1225,11 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | unpack_logfmt from x`, `* | unpack_logfmt from x`) f(`* | unpack_logfmt from x`, `* | unpack_logfmt from x`)
f(`* | unpack_logfmt from x result_prefix y`, `* | unpack_logfmt from x result_prefix y`) f(`* | unpack_logfmt from x result_prefix y`, `* | unpack_logfmt from x result_prefix y`)
// join pipe
f(`* | join by (x) (foo:bar)`, `* | join by (x) (foo:bar)`)
f(`* | join on (x, y) (foo:bar)`, `* | join by (x, y) (foo:bar)`)
f(`* | join (x, y) (foo:bar)`, `* | join by (x, y) (foo:bar)`)
// multiple different pipes // multiple different pipes
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
@ -1504,6 +1512,11 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | field_names x y`) f(`foo | field_names x y`)
f(`foo | field_names x, y`) f(`foo | field_names x, y`)
// invalid block_stats
f(`foo | block_stats foo`)
f(`foo | block_stats ()`)
f(`foo | block_stats (foo)`)
// invalid blocks_count // invalid blocks_count
f(`foo | blocks_count |`) f(`foo | blocks_count |`)
f(`foo | blocks_count (`) f(`foo | blocks_count (`)
@ -1894,6 +1907,10 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``) f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``)
f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`) f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`)
f(`* | block_stats`, `*`, ``)
f(`* | block_stats | fields foo`, `*`, ``)
f(`* | block_stats | rm foo`, `*`, ``)
f(`* | blocks_count as foo`, ``, ``) f(`* | blocks_count as foo`, ``, ``)
f(`* | blocks_count foo | fields bar`, ``, ``) f(`* | blocks_count foo | fields bar`, ``, ``)
f(`* | blocks_count foo | fields foo`, ``, ``) f(`* | blocks_count foo | fields foo`, ``, ``)
@ -2014,6 +2031,7 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | extract_regexp if (q:w p:a) "(?P<f1>.*)bar" from x | count() r1`, `p,q`, ``) f(`* | extract_regexp if (q:w p:a) "(?P<f1>.*)bar" from x | count() r1`, `p,q`, ``)
f(`* | field_names | count() r1`, ``, ``) f(`* | field_names | count() r1`, ``, ``)
f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``) f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``)
f(`* | block_stats | count() r1`, `*`, ``)
f(`* | blocks_count | count() r1`, ``, ``) f(`* | blocks_count | count() r1`, ``, ``)
f(`* | limit 10 | blocks_count as abc | count() r1`, ``, ``) f(`* | limit 10 | blocks_count as abc | count() r1`, ``, ``)
f(`* | fields a, b | count() r1`, ``, ``) f(`* | fields a, b | count() r1`, ``, ``)
@ -2117,10 +2135,12 @@ func TestQueryCanReturnLastNResults(t *testing.T) {
f("* | limit 10", false) f("* | limit 10", false)
f("* | offset 10", false) f("* | offset 10", false)
f("* | uniq (x)", false) f("* | uniq (x)", false)
f("* | block_stats", false)
f("* | blocks_count", false) f("* | blocks_count", false)
f("* | field_names", false) f("* | field_names", false)
f("* | field_values x", false) f("* | field_values x", false)
f("* | top 5 by (x)", false) f("* | top 5 by (x)", false)
f("* | join by (x) (foo)", false)
} }
@ -2144,6 +2164,7 @@ func TestQueryCanLiveTail(t *testing.T) {
f("* | drop_empty_fields", true) f("* | drop_empty_fields", true)
f("* | extract 'foo<bar>baz'", true) f("* | extract 'foo<bar>baz'", true)
f("* | extract_regexp 'foo(?P<bar>baz)'", true) f("* | extract_regexp 'foo(?P<bar>baz)'", true)
f("* | block_stats", false)
f("* | blocks_count a", false) f("* | blocks_count a", false)
f("* | field_names a", false) f("* | field_names a", false)
f("* | fields a, b", true) f("* | fields a, b", true)
@ -2341,6 +2362,7 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) {
f(`foo | count() | drop_empty_fields`) f(`foo | count() | drop_empty_fields`)
f(`foo | count() | extract "foo<bar>baz"`) f(`foo | count() | extract "foo<bar>baz"`)
f(`foo | count() | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"`) f(`foo | count() | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"`)
f(`foo | count() | block_stats`)
f(`foo | count() | blocks_count`) f(`foo | count() | blocks_count`)
f(`foo | count() | field_names`) f(`foo | count() | field_names`)
f(`foo | count() | field_values abc`) f(`foo | count() | field_values abc`)

View File

@ -99,6 +99,12 @@ func parsePipes(lex *lexer) ([]pipe, error) {
func parsePipe(lex *lexer) (pipe, error) { func parsePipe(lex *lexer) (pipe, error) {
switch { switch {
case lex.isKeyword("block_stats"):
ps, err := parsePipeBlockStats(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'block_stats' pipe: %w", err)
}
return ps, nil
case lex.isKeyword("blocks_count"): case lex.isKeyword("blocks_count"):
pc, err := parsePipeBlocksCount(lex) pc, err := parsePipeBlocksCount(lex)
if err != nil { if err != nil {
@ -302,6 +308,7 @@ func parsePipe(lex *lexer) (pipe, error) {
var pipeNames = func() map[string]struct{} { var pipeNames = func() map[string]struct{} {
a := []string{ a := []string{
"block_stats",
"blocks_count", "blocks_count",
"copy", "cp", "copy", "cp",
"delete", "del", "rm", "drop", "delete", "del", "rm", "drop",

View File

@ -0,0 +1,218 @@
package logstorage
import (
"fmt"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// pipeBlockStats processes '| block_stats ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe
type pipeBlockStats struct {
}
func (ps *pipeBlockStats) String() string {
return "block_stats"
}
func (ps *pipeBlockStats) canLiveTail() bool {
return false
}
func (ps *pipeBlockStats) optimize() {
// nothing to do
}
func (ps *pipeBlockStats) hasFilterInWithQuery() bool {
return false
}
func (ps *pipeBlockStats) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return ps, nil
}
func (ps *pipeBlockStats) updateNeededFields(neededFields, unneededFields fieldsSet) {
unneededFields.reset()
neededFields.add("*")
}
func (ps *pipeBlockStats) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeBlockStatsProcessor{
ppNext: ppNext,
shards: make([]pipeBlockStatsProcessorShard, workersCount),
}
}
type pipeBlockStatsProcessor struct {
ppNext pipeProcessor
shards []pipeBlockStatsProcessorShard
}
type pipeBlockStatsProcessorShard struct {
pipeBlockStatsProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeBlockStatsProcessorShardNopad{})%128]byte
}
type pipeBlockStatsProcessorShardNopad struct {
wctx pipeBlockStatsWriteContext
}
func (psp *pipeBlockStatsProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
shard := &psp.shards[workerID]
shard.wctx.init(workerID, psp.ppNext, br.rowsLen)
cs := br.getColumns()
for _, c := range cs {
if c.isConst {
shard.wctx.writeRow(c.name, "const", uint64(len(c.valuesEncoded[0])), 0, 0, 0)
continue
}
if c.isTime {
var blockSize uint64
if br.bs != nil {
blockSize = br.bs.bsw.bh.timestampsHeader.blockSize
}
shard.wctx.writeRow(c.name, "time", blockSize, 0, 0, 0)
continue
}
if br.bs == nil {
shard.wctx.writeRow(c.name, "inmemory", 0, 0, 0, 0)
continue
}
typ := c.valueType.String()
ch := br.bs.getColumnHeader(c.name)
dictSize := 0
dictItemsCount := len(ch.valuesDict.values)
if c.valueType == valueTypeDict {
for _, v := range ch.valuesDict.values {
dictSize += len(v)
}
}
shard.wctx.writeRow(c.name, typ, ch.valuesSize, ch.bloomFilterSize, uint64(dictItemsCount), uint64(dictSize))
}
shard.wctx.flush()
shard.wctx.reset()
}
func (psp *pipeBlockStatsProcessor) flush() error {
return nil
}
func parsePipeBlockStats(lex *lexer) (*pipeBlockStats, error) {
if !lex.isKeyword("block_stats") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "block_stats")
}
lex.nextToken()
ps := &pipeBlockStats{}
return ps, nil
}
type pipeBlockStatsWriteContext struct {
workerID uint
ppNext pipeProcessor
a arena
rowsLen int
tmpBuf []byte
rcs []resultColumn
br blockResult
// rowsCount is the number of rows in the current block
rowsCount int
}
func (wctx *pipeBlockStatsWriteContext) reset() {
wctx.workerID = 0
wctx.ppNext = nil
wctx.a.reset()
wctx.rowsLen = 0
wctx.tmpBuf = wctx.tmpBuf[:0]
rcs := wctx.rcs
for i := range rcs {
rcs[i].reset()
}
wctx.rcs = rcs[:0]
wctx.rowsCount = 0
}
func (wctx *pipeBlockStatsWriteContext) init(workerID uint, ppNext pipeProcessor, rowsLen int) {
wctx.reset()
wctx.workerID = workerID
wctx.ppNext = ppNext
wctx.rowsLen = rowsLen
}
func (wctx *pipeBlockStatsWriteContext) writeRow(columnName, columnType string, valuesSize, bloomSize, dictItems, dictSize uint64) {
rcs := wctx.rcs
if len(rcs) == 0 {
wctx.rcs = slicesutil.SetLength(wctx.rcs, 7)
rcs = wctx.rcs
rcs[0].name = "field"
rcs[1].name = "type"
rcs[2].name = "values_bytes"
rcs[3].name = "bloom_bytes"
rcs[4].name = "dict_items"
rcs[5].name = "dict_bytes"
rcs[6].name = "rows"
}
wctx.addValue(&rcs[0], columnName)
wctx.addValue(&rcs[1], columnType)
wctx.addUint64Value(&rcs[2], valuesSize)
wctx.addUint64Value(&rcs[3], bloomSize)
wctx.addUint64Value(&rcs[4], dictItems)
wctx.addUint64Value(&rcs[5], dictSize)
wctx.addUint64Value(&rcs[6], uint64(wctx.rowsLen))
wctx.rowsCount++
if len(wctx.a.b) >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeBlockStatsWriteContext) addUint64Value(rc *resultColumn, n uint64) {
wctx.tmpBuf = marshalUint64String(wctx.tmpBuf[:0], n)
wctx.addValue(rc, bytesutil.ToUnsafeString(wctx.tmpBuf))
}
func (wctx *pipeBlockStatsWriteContext) addValue(rc *resultColumn, v string) {
vCopy := wctx.a.copyString(v)
rc.addValue(vCopy)
}
func (wctx *pipeBlockStatsWriteContext) flush() {
rcs := wctx.rcs
// Flush rcs to ppNext
br := &wctx.br
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.ppNext.writeBlock(wctx.workerID, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
wctx.a.reset()
}

View File

@ -0,0 +1,41 @@
package logstorage
import (
"testing"
)
func TestParsePipeBlockStatsSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`block_stats`)
}
func TestParsePipeBlockStatsFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`block_stats foo`)
f(`block_stats ()`)
f(`block_stats (foo)`)
}
func TestPipeBlockStatsUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("block_stats", "*", "", "*", "")
// all the needed fields, plus unneeded fields
f("block_stats", "*", "f1,f2", "*", "")
// needed fields
f("block_stats", "f1,f2", "", "*", "")
}

View File

@ -38,7 +38,7 @@ func (pj *pipeJoin) hasFilterInWithQuery() bool {
return false return false
} }
func (pj *pipeJoin) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { func (pj *pipeJoin) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pj, nil return pj, nil
} }
@ -122,6 +122,9 @@ func (pjp *pipeJoinProcessor) writeBlock(workerID uint, br *blockResult) {
continue continue
} }
for _, extraFields := range matchingRows { for _, extraFields := range matchingRows {
if needStop(pjp.stopCh) {
return
}
shard.wctx.writeRow(rowIdx, extraFields) shard.wctx.writeRow(rowIdx, extraFields)
} }
} }

View File

@ -134,10 +134,10 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) {
fields := shard.fields fields := shard.fields
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
if needStop(pup.stopCh) {
return
}
if bm.isSetBit(rowIdx) { if bm.isSetBit(rowIdx) {
if needStop(pup.stopCh) {
return
}
shard.writeUnrolledFields(pu.fields, columnValues, rowIdx) shard.writeUnrolledFields(pu.fields, columnValues, rowIdx)
} else { } else {
fields = fields[:0] fields = fields[:0]

View File

@ -55,6 +55,33 @@ const (
valueTypeTimestampISO8601 = valueType(9) valueTypeTimestampISO8601 = valueType(9)
) )
func (t valueType) String() string {
switch t {
case valueTypeUnknown:
return "unknown"
case valueTypeString:
return "string"
case valueTypeDict:
return "dict"
case valueTypeUint8:
return "uint8"
case valueTypeUint16:
return "uint16"
case valueTypeUint32:
return "uint32"
case valueTypeUint64:
return "uint64"
case valueTypeFloat64:
return "float64"
case valueTypeIPv4:
return "ipv4"
case valueTypeTimestampISO8601:
return "iso8601"
default:
return fmt.Sprintf("unknown valueType=%d", t)
}
}
type valuesEncoder struct { type valuesEncoder struct {
// buf contains data for values. // buf contains data for values.
buf []byte buf []byte