VictoriaMetrics/lib/logstorage/block_search.go
Aliaksandr Valialkin 92b9b13df1
lib/logstorage: optimize performance for queries, which select all the log fields for logs containing hundreds of log fields (aka "wide events")
Unpack the full columnsHeader block instead of unpacking meta-information per each individual column
when the query, which selects all the columns, is executed. This improves performance when scanning
logs with big number of fields.

(cherry picked from commit 2023f017b1)
2024-10-18 11:42:15 +02:00

580 lines
16 KiB
Go

package logstorage
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// The number of blocks to search at once by a single worker
//
// This number must be increased on systems with many CPU cores in order to amortize
// the overhead for passing the blockSearchWork to worker goroutines.
const blockSearchWorksPerBatch = 64
type blockSearchWork struct {
// p is the part where the block belongs to.
p *part
// so contains search options for the block search.
so *searchOptions
// bh is the header of the block to search.
bh blockHeader
}
func (bsw *blockSearchWork) reset() {
bsw.p = nil
bsw.so = nil
bsw.bh.reset()
}
type blockSearchWorkBatch struct {
bsws []blockSearchWork
}
func (bswb *blockSearchWorkBatch) reset() {
bsws := bswb.bsws
for i := range bsws {
bsws[i].reset()
}
bswb.bsws = bsws[:0]
}
func getBlockSearchWorkBatch() *blockSearchWorkBatch {
v := blockSearchWorkBatchPool.Get()
if v == nil {
return &blockSearchWorkBatch{
bsws: make([]blockSearchWork, 0, blockSearchWorksPerBatch),
}
}
return v.(*blockSearchWorkBatch)
}
func putBlockSearchWorkBatch(bswb *blockSearchWorkBatch) {
bswb.reset()
blockSearchWorkBatchPool.Put(bswb)
}
var blockSearchWorkBatchPool sync.Pool
func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool {
bsws := bswb.bsws
bsws = append(bsws, blockSearchWork{
p: p,
so: so,
})
bsw := &bsws[len(bsws)-1]
bsw.bh.copyFrom(bh)
bswb.bsws = bsws
return len(bsws) < cap(bsws)
}
func getBlockSearch() *blockSearch {
v := blockSearchPool.Get()
if v == nil {
return &blockSearch{}
}
return v.(*blockSearch)
}
func putBlockSearch(bs *blockSearch) {
bs.reset()
// reset seenStreams before returning bs to the pool in order to reduce memory usage.
bs.seenStreams = nil
blockSearchPool.Put(bs)
}
var blockSearchPool sync.Pool
type blockSearch struct {
// bsw is the actual work to perform on the given block pointed by bsw.ph
bsw *blockSearchWork
// br contains result for the search in the block after search() call
br blockResult
// timestampsCache contains cached timestamps for the given block.
timestampsCache *encoding.Int64s
// bloomFilterCache contains cached bloom filters for requested columns in the given block
bloomFilterCache map[string]*bloomFilter
// valuesCache contains cached values for requested columns in the given block
valuesCache map[string]*stringBucket
// sbu is used for unmarshaling local columns
sbu stringsBlockUnmarshaler
// cshIndexBlockCache holds columnsHeaderIndex data for the given block.
//
// It is initialized lazily by calling getColumnsHeaderIndex().
cshIndexBlockCache []byte
// cshBlockCache holds columnsHeader data for the given block.
//
// It is initialized lazily by calling getColumnsHeaderBlock().
cshBlockCache []byte
cshBlockInitialized bool
// ccsCache is the cache for accessed const columns
ccsCache []Field
// chsCache is the cache for accessed column headers
chsCache []columnHeader
// cshIndexCache is the columnsHeaderIndex associated with the given block.
//
// It is initialized lazily by calling getColumnsHeaderIndex().
cshIndexCache *columnsHeaderIndex
// cshCache is the columnsHeader associated with the given block.
//
// It is initialized lazily by calling getColumnsHeader().
cshCache *columnsHeader
// seenStreams contains seen streamIDs for the recent searches.
//
// It is used for speeding up fetching _stream column.
seenStreams map[u128]string
}
func (bs *blockSearch) reset() {
bs.bsw = nil
bs.br.reset()
if bs.timestampsCache != nil {
encoding.PutInt64s(bs.timestampsCache)
bs.timestampsCache = nil
}
bloomFilterCache := bs.bloomFilterCache
for k, bf := range bloomFilterCache {
putBloomFilter(bf)
delete(bloomFilterCache, k)
}
valuesCache := bs.valuesCache
for k, values := range valuesCache {
putStringBucket(values)
delete(valuesCache, k)
}
bs.sbu.reset()
bs.cshIndexBlockCache = bs.cshIndexBlockCache[:0]
bs.cshBlockCache = bs.cshBlockCache[:0]
bs.cshBlockInitialized = false
ccsCache := bs.ccsCache
for i := range ccsCache {
ccsCache[i].Reset()
}
bs.ccsCache = ccsCache[:0]
chsCache := bs.chsCache
for i := range chsCache {
chsCache[i].reset()
}
bs.chsCache = chsCache[:0]
if bs.cshIndexCache != nil {
putColumnsHeaderIndex(bs.cshIndexCache)
bs.cshIndexCache = nil
}
if bs.cshCache != nil {
putColumnsHeader(bs.cshCache)
bs.cshCache = nil
}
// Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code.
}
func (bs *blockSearch) partPath() string {
return bs.bsw.p.path
}
func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) {
bs.reset()
bs.bsw = bsw
// search rows matching the given filter
bm.init(int(bsw.bh.rowsCount))
bm.setBits()
bs.bsw.so.filter.applyToBlockSearch(bs, bm)
if bm.isZero() {
// The filter doesn't match any logs in the current block.
return
}
bs.br.mustInit(bs, bm)
// fetch the requested columns to bs.br.
if bs.bsw.so.needAllColumns {
bs.br.initAllColumns()
} else {
bs.br.initRequestedColumns()
}
}
func (bs *blockSearch) partFormatVersion() uint {
return bs.bsw.p.ph.FormatVersion
}
func (bs *blockSearch) getConstColumnValue(name string) string {
if name == "_msg" {
name = ""
}
if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeader()
for _, cc := range csh.constColumns {
if cc.Name == name {
return cc.Value
}
}
return ""
}
columnNameID, ok := bs.getColumnNameID(name)
if !ok {
return ""
}
for i := range bs.ccsCache {
if bs.ccsCache[i].Name == name {
return bs.ccsCache[i].Value
}
}
cshIndex := bs.getColumnsHeaderIndex()
for _, cr := range cshIndex.constColumnsRefs {
if cr.columnNameID != columnNameID {
continue
}
b := bs.getColumnsHeaderBlock()
if cr.offset > uint64(len(b)) {
logger.Panicf("FATAL: %s: header offset for const column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
}
b = b[cr.offset:]
bs.ccsCache = slicesutil.SetLength(bs.ccsCache, len(bs.ccsCache)+1)
cc := &bs.ccsCache[len(bs.ccsCache)-1]
if _, err := cc.unmarshalNoArena(b, false); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal header for const column %q: %s", bs.bsw.p.path, name, err)
}
cc.Name = strings.Clone(name)
return cc.Value
}
return ""
}
func (bs *blockSearch) getColumnHeader(name string) *columnHeader {
if name == "_msg" {
name = ""
}
if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeader()
chs := csh.columnHeaders
for i := range chs {
ch := &chs[i]
if ch.name == name {
return ch
}
}
return nil
}
columnNameID, ok := bs.getColumnNameID(name)
if !ok {
return nil
}
for i := range bs.chsCache {
if bs.chsCache[i].name == name {
return &bs.chsCache[i]
}
}
cshIndex := bs.getColumnsHeaderIndex()
for _, cr := range cshIndex.columnHeadersRefs {
if cr.columnNameID != columnNameID {
continue
}
b := bs.getColumnsHeaderBlock()
if cr.offset > uint64(len(b)) {
logger.Panicf("FATAL: %s: header offset for column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
}
b = b[cr.offset:]
bs.chsCache = slicesutil.SetLength(bs.chsCache, len(bs.chsCache)+1)
ch := &bs.chsCache[len(bs.chsCache)-1]
if _, err := ch.unmarshalNoArena(b, partFormatLatestVersion); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal header for column %q: %s", bs.bsw.p.path, name, err)
}
ch.name = strings.Clone(name)
return ch
}
return nil
}
func (bs *blockSearch) getColumnNameID(name string) (uint64, bool) {
id, ok := bs.bsw.p.columnNameIDs[name]
return id, ok
}
func (bs *blockSearch) getColumnsHeaderIndex() *columnsHeaderIndex {
if bs.partFormatVersion() < 1 {
logger.Panicf("BUG: getColumnsHeaderIndex() can be called only for part encoding v1+, while it has been called for v%d", bs.partFormatVersion())
}
if bs.cshIndexCache == nil {
bs.cshIndexBlockCache = readColumnsHeaderIndexBlock(bs.cshIndexBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
bs.cshIndexCache = getColumnsHeaderIndex()
if err := bs.cshIndexCache.unmarshalNoArena(bs.cshIndexBlockCache); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header index: %s", bs.bsw.p.path, err)
}
}
return bs.cshIndexCache
}
func (bs *blockSearch) getColumnsHeader() *columnsHeader {
if bs.cshCache == nil {
b := bs.getColumnsHeaderBlock()
csh := getColumnsHeader()
partFormatVersion := bs.partFormatVersion()
if err := csh.unmarshalNoArena(b, partFormatVersion); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", bs.bsw.p.path, err)
}
if partFormatVersion >= 1 {
cshIndex := bs.getColumnsHeaderIndex()
if err := csh.setColumnNames(cshIndex, bs.bsw.p.columnNames); err != nil {
logger.Panicf("FATAL: %s: %s", bs.bsw.p.path, err)
}
}
bs.cshCache = csh
}
return bs.cshCache
}
func (bs *blockSearch) getColumnsHeaderBlock() []byte {
if !bs.cshBlockInitialized {
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
bs.cshBlockInitialized = true
}
return bs.cshBlockCache
}
func readColumnsHeaderIndexBlock(dst []byte, p *part, bh *blockHeader) []byte {
n := bh.columnsHeaderIndexSize
if n > maxColumnsHeaderIndexSize {
logger.Panicf("FATAL: %s: columns header index size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderIndexSize, n)
}
dstLen := len(dst)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
p.columnsHeaderIndexFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderIndexOffset))
return dst
}
func readColumnsHeaderBlock(dst []byte, p *part, bh *blockHeader) []byte {
n := bh.columnsHeaderSize
if n > maxColumnsHeaderSize {
logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, n)
}
dstLen := len(dst)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
p.columnsHeaderFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderOffset))
return dst
}
// getBloomFilterForColumn returns bloom filter for the given ch.
//
// The returned bloom filter belongs to bs, so it becomes invalid after bs reset.
func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter {
bf := bs.bloomFilterCache[ch.name]
if bf != nil {
return bf
}
p := bs.bsw.p
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
bb := longTermBufPool.Get()
bloomFilterSize := ch.bloomFilterSize
if bloomFilterSize > maxBloomFilterBlockSize {
logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxBloomFilterBlockSize, bloomFilterSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(bloomFilterSize))
bloomValuesFile.bloom.MustReadAt(bb.B, int64(ch.bloomFilterOffset))
bf = getBloomFilter()
if err := bf.unmarshal(bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal bloom filter: %s", bs.partPath(), err)
}
longTermBufPool.Put(bb)
if bs.bloomFilterCache == nil {
bs.bloomFilterCache = make(map[string]*bloomFilter)
}
bs.bloomFilterCache[ch.name] = bf
return bf
}
// getValuesForColumn returns block values for the given ch.
//
// The returned values belong to bs, so they become invalid after bs reset.
func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string {
values := bs.valuesCache[ch.name]
if values != nil {
return values.a
}
p := bs.bsw.p
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
bb := longTermBufPool.Get()
valuesSize := ch.valuesSize
if valuesSize > maxValuesBlockSize {
logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxValuesBlockSize, valuesSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(valuesSize))
bloomValuesFile.values.MustReadAt(bb.B, int64(ch.valuesOffset))
values = getStringBucket()
var err error
values.a, err = bs.sbu.unmarshal(values.a[:0], bb.B, bs.bsw.bh.rowsCount)
longTermBufPool.Put(bb)
if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal column %q: %s", bs.partPath(), ch.name, err)
}
if bs.valuesCache == nil {
bs.valuesCache = make(map[string]*stringBucket)
}
bs.valuesCache[ch.name] = values
return values.a
}
// getTimestamps returns timestamps for the given bs.
//
// The returned timestamps belong to bs, so they become invalid after bs reset.
func (bs *blockSearch) getTimestamps() []int64 {
timestamps := bs.timestampsCache
if timestamps != nil {
return timestamps.A
}
p := bs.bsw.p
bb := longTermBufPool.Get()
th := &bs.bsw.bh.timestampsHeader
blockSize := th.blockSize
if blockSize > maxTimestampsBlockSize {
logger.Panicf("FATAL: %s: timestamps block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxTimestampsBlockSize, blockSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(blockSize))
p.timestampsFile.MustReadAt(bb.B, int64(th.blockOffset))
rowsCount := int(bs.bsw.bh.rowsCount)
timestamps = encoding.GetInt64s(rowsCount)
var err error
timestamps.A, err = encoding.UnmarshalTimestamps(timestamps.A[:0], bb.B, th.marshalType, th.minTimestamp, rowsCount)
longTermBufPool.Put(bb)
if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal timestamps: %s", bs.partPath(), err)
}
bs.timestampsCache = timestamps
return timestamps.A
}
// mustReadBlockHeaders reads ih block headers from p, appends them to dst and returns the result.
func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []blockHeader {
bbCompressed := longTermBufPool.Get()
indexBlockSize := ih.indexBlockSize
if indexBlockSize > maxIndexBlockSize {
logger.Panicf("FATAL: %s: index block size cannot exceed %d bytes; got %d bytes", p.indexFile.Path(), maxIndexBlockSize, indexBlockSize)
}
bbCompressed.B = bytesutil.ResizeNoCopyMayOverallocate(bbCompressed.B, int(indexBlockSize))
p.indexFile.MustReadAt(bbCompressed.B, int64(ih.indexBlockOffset))
bb := longTermBufPool.Get()
var err error
bb.B, err = encoding.DecompressZSTD(bb.B, bbCompressed.B)
longTermBufPool.Put(bbCompressed)
if err != nil {
logger.Panicf("FATAL: %s: cannot decompress indexBlock read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)
}
dst, err = unmarshalBlockHeaders(dst, bb.B, p.ph.FormatVersion)
longTermBufPool.Put(bb)
if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal block headers read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)
}
return dst
}
// getStreamStr returns _stream value for the given block at bs.
func (bs *blockSearch) getStreamStr() string {
sid := bs.bsw.bh.streamID.id
streamStr := bs.seenStreams[sid]
if streamStr != "" {
// Fast path - streamStr is found in the seenStreams.
return streamStr
}
// Slow path - load streamStr from the storage.
streamStr = bs.getStreamStrSlow()
if streamStr != "" {
// Store the found streamStr in seenStreams.
if len(bs.seenStreams) > 20_000 {
bs.seenStreams = nil
}
if bs.seenStreams == nil {
bs.seenStreams = make(map[u128]string)
}
bs.seenStreams[sid] = streamStr
}
return streamStr
}
func (bs *blockSearch) getStreamStrSlow() string {
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = bs.bsw.p.pt.idb.appendStreamTagsByStreamID(bb.B[:0], &bs.bsw.bh.streamID)
if len(bb.B) == 0 {
// Couldn't find stream tags by sid. This may be the case when the corresponding log stream
// was recently registered and its tags aren't visible to search yet.
// The stream tags must become visible in a few seconds.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
return ""
}
st := GetStreamTags()
mustUnmarshalStreamTags(st, bb.B)
bb.B = st.marshalString(bb.B[:0])
PutStreamTags(st)
return string(bb.B)
}