lib/logstorage: refactor storage format to be more efficient for querying wide events

It has been appeared that VictoriaLogs is frequently used for collecting logs with tens of fields.
For example, standard Kuberntes setup on top of Filebeat generates more than 20 fields per each log.
Such logs are also known as "wide events".

The previous storage format was optimized for logs with a few fields. When at least a single field
was referenced in the query, then the all the meta-information about all the log fields was unpacked
and parsed per each scanned block during the query. This could require a lot of additional disk IO
and CPU time when logs contain many fields. Resolve this issue by providing an (field -> metainfo_offset)
index per each field in every data block. This index allows reading and extracting only the needed
metainfo for fields used in the query. This index is stored in columnsHeaderIndexFilename ( columns_header_index.bin ).
This allows increasing performance for queries over wide events by 10x and more.

Another issue was that the data for bloom filters and field values across all the log fields except of _msg
was intermixed in two files - fieldBloomFilename ( field_bloom.bin ) and fieldValuesFilename ( field_values.bin ).
This could result in huge disk read IO overhead when some small field was referred in the query,
since the Operating System usually reads more data than requested. It reads the data from disk
in at least 4KiB blocks (usually the block size is much bigger in the range 64KiB - 512KiB).
So, if 512-byte bloom filter or values' block is read from the file, then the Operating System
reads up to 512KiB of data from disk, which results in 1000x disk read IO overhead. This overhead isn't visible
for recently accessed data, since this data is usually stored in RAM (aka Operating System page cache),
but this overhead may become very annoying when performing the query over large volumes of data
which isn't present in OS page cache.

The solution for this issue is to split bloom filters and field values across multiple shards.
This reduces the worst-case disk read IO overhead by at least Nx where N is the number of shards,
while the disk read IO overhead is completely removed in best case when the number of columns doesn't exceed N.
Currently the number of shards is 8 - see bloomValuesShardsCount . This solution increases
performance for queries over large volumes of newly ingested data by up to 1000x.

The new storage format is versioned as v1, while the old storage format is version as v0.
It is stored in the partHeader.FormatVersion.

Parts with the old storage format are converted into parts with the new storage format during background merge.
It is possible to force merge by querying /internal/force_merge HTTP endpoint - see https://docs.victoriametrics.com/victorialogs/#forced-merge .
This commit is contained in:
Aliaksandr Valialkin 2024-10-16 16:18:28 +02:00 committed by Zakhar Bessarab
parent 75790305c9
commit 3346576a3a
No known key found for this signature in database
GPG Key ID: 932B34D6FE062023
26 changed files with 1766 additions and 368 deletions

View File

@ -15,8 +15,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: optimize [LogsQL queries](https://docs.victoriametrics.com/victorialogs/logsql/), which need to scan big number of logs with big number of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) (aka `wide events`). The performance for such queries is improved by 10x and more depending on the number of log fields in the scanned logs. The performance improvement is visible when querying logs ingested after the upgrade to this release.
* FEATURE: add support for forced merge. See [these docs](https://docs.victoriametrics.com/victorialogs/#forced-merge).
* FEATURE: skip empty log fields in query results, since they are treated as non-existing fields in [VictoriaLogs data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
* FEATURE: skip empty log fields in query results, since they are treated as non-existing fields in [VictoriaLogs data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). This should reduce the level of confusion for end users when they see empty log fields.
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add the ability to cancel running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7097).
* BUGFIX: avoid possible panic when logs for a new day are ingested during execution of concurrent queries.

View File

@ -120,6 +120,13 @@ func (bm *bitmap) or(x *bitmap) {
}
}
func (bm *bitmap) setBit(i int) {
wordIdx := uint(i) / 64
wordOffset := uint(i) % 64
wordPtr := &bm.a[wordIdx]
*wordPtr |= (1 << wordOffset)
}
func (bm *bitmap) isSetBit(i int) bool {
wordIdx := uint(i) / 64
wordOffset := uint(i) % 64

View File

@ -27,7 +27,6 @@ func TestBitmap(t *testing.T) {
}
bm.setBits()
if n := bm.onesCount(); n != i {
t.Fatalf("unexpected number of set bits; got %d; want %d", n, i)
}
@ -106,6 +105,23 @@ func TestBitmap(t *testing.T) {
t.Fatalf("unexpected non-zero number of set bits remained: %d", bitsCount)
}
// Set bits via setBit() call
for i := 0; i < bitsLen; i++ {
if n := bm.onesCount(); n != i {
t.Fatalf("unexpected number of ones set; got %d; want %d", n, i)
}
if bm.isSetBit(i) {
t.Fatalf("the bit %d mustn't be set", i)
}
bm.setBit(i)
if !bm.isSetBit(i) {
t.Fatalf("the bit %d must be set", i)
}
if n := bm.onesCount(); n != i+1 {
t.Fatalf("unexpected number of ones set; got %d; want %d", n, i+1)
}
}
putBitmap(bm)
}
}

View File

@ -150,18 +150,13 @@ func (c *column) resizeValues(valuesLen int) []string {
// mustWriteTo writes c to sw and updates ch accordingly.
//
// ch is valid until c is changed.
func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) {
ch.reset()
valuesWriter := &sw.fieldValuesWriter
bloomFilterWriter := &sw.fieldBloomFilterWriter
if c.name == "" {
valuesWriter = &sw.messageValuesWriter
bloomFilterWriter = &sw.messageBloomFilterWriter
}
ch.name = c.name
bloomValuesWriter := sw.getBloomValuesWriterForColumnName(ch.name)
// encode values
ve := getValuesEncoder()
ch.valueType, ch.minValue, ch.maxValue = ve.encode(c.values, &ch.valuesDict)
@ -176,15 +171,15 @@ func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
if ch.valuesSize > maxValuesBlockSize {
logger.Panicf("BUG: too valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
}
ch.valuesOffset = valuesWriter.bytesWritten
valuesWriter.MustWrite(bb.B)
ch.valuesOffset = bloomValuesWriter.values.bytesWritten
bloomValuesWriter.values.MustWrite(bb.B)
// create and marshal bloom filter for c.values
if ch.valueType != valueTypeDict {
tokensBuf := getTokensBuf()
tokensBuf.A = tokenizeStrings(tokensBuf.A[:0], c.values)
bb.B = bloomFilterMarshal(bb.B[:0], tokensBuf.A)
putTokensBuf(tokensBuf)
hashesBuf := encoding.GetUint64s(0)
hashesBuf.A = tokenizeHashes(hashesBuf.A[:0], c.values)
bb.B = bloomFilterMarshalHashes(bb.B[:0], hashesBuf.A)
encoding.PutUint64s(hashesBuf)
} else {
// there is no need in ecoding bloom filter for dictionary type,
// since it isn't used during querying - all the dictionary values are available in ch.valuesDict
@ -194,8 +189,8 @@ func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
if ch.bloomFilterSize > maxBloomFilterBlockSize {
logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize)
}
ch.bloomFilterOffset = bloomFilterWriter.bytesWritten
bloomFilterWriter.MustWrite(bb.B)
ch.bloomFilterOffset = bloomValuesWriter.bloom.bytesWritten
bloomValuesWriter.bloom.MustWrite(bb.B)
}
func (b *block) assertValid() {
@ -436,7 +431,7 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v
}
// mustWriteTo writes b with the given sid to sw and updates bh accordingly.
func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
// Do not store the version used for encoding directly in the block data, since:
// - all the blocks in the same part use the same encoding
// - the block encoding version can be put in metadata file for the part (aka metadataFilename)
@ -458,22 +453,13 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
chs := csh.resizeColumnHeaders(len(cs))
for i := range cs {
cs[i].mustWriteToNoArena(&chs[i], sw)
cs[i].mustWriteTo(&chs[i], sw)
}
csh.constColumns = append(csh.constColumns[:0], b.constColumns...)
bb := longTermBufPool.Get()
bb.B = csh.marshal(bb.B)
csh.mustWriteTo(bh, sw, g)
putColumnsHeader(csh)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(bb.B))
if bh.columnsHeaderSize > maxColumnsHeaderSize {
logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize)
}
sw.columnsHeaderWriter.MustWrite(bb.B)
longTermBufPool.Put(bb)
}
// appendRowsTo appends log entries from b to dst.

View File

@ -93,7 +93,7 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *
}
// mustWriteTo writes bd to sw and updates bh accordingly
func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
// Do not store the version used for encoding directly in the block data, since:
// - all the blocks in the same part use the same encoding
// - the block encoding version can be put in metadata file for the part (aka metadataFilename)
@ -114,28 +114,19 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
chs := csh.resizeColumnHeaders(len(cds))
for i := range cds {
cds[i].mustWriteToNoArena(&chs[i], sw)
cds[i].mustWriteTo(&chs[i], sw)
}
csh.constColumns = append(csh.constColumns[:0], bd.constColumns...)
bb := longTermBufPool.Get()
bb.B = csh.marshal(bb.B)
csh.mustWriteTo(bh, sw, g)
putColumnsHeader(csh)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(bb.B))
if bh.columnsHeaderSize > maxColumnsHeaderSize {
logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize)
}
sw.columnsHeaderWriter.MustWrite(bb.B)
longTermBufPool.Put(bb)
}
// mustReadFrom reads block data associated with bh from sr to bd.
//
// The bd is valid until a.reset() is called.
func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) {
func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders, partFormatVersion uint, columnNames []string) {
bd.reset()
bd.streamID = bh.streamID
@ -159,19 +150,46 @@ func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders)
sr.columnsHeaderReader.MustReadFull(bb.B)
csh := getColumnsHeader()
if err := csh.unmarshalNoArena(bb.B); err != nil {
if err := csh.unmarshalNoArena(bb.B, partFormatVersion); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err)
}
if partFormatVersion >= 1 {
readColumnNamesFromColumnsHeaderIndex(bh, sr, csh, columnNames)
}
chs := csh.columnHeaders
cds := bd.resizeColumnsData(len(chs))
for i := range chs {
cds[i].mustReadFrom(a, &chs[i], sr)
cds[i].mustReadFrom(a, &chs[i], sr, partFormatVersion)
}
bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns)
putColumnsHeader(csh)
longTermBufPool.Put(bb)
}
func readColumnNamesFromColumnsHeaderIndex(bh *blockHeader, sr *streamReaders, csh *columnsHeader, columnNames []string) {
bb := longTermBufPool.Get()
defer longTermBufPool.Put(bb)
n := bh.columnsHeaderIndexSize
if n > maxColumnsHeaderIndexSize {
logger.Panicf("BUG: %s: too big columnsHeaderIndexSize: %d bytes; mustn't exceed %d bytes", sr.columnsHeaderIndexReader.Path(), n, maxColumnsHeaderIndexSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(n))
sr.columnsHeaderIndexReader.MustReadFull(bb.B)
cshIndex := getColumnsHeaderIndex()
if err := cshIndex.unmarshalNoArena(bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columnsHeaderIndex: %s", sr.columnsHeaderIndexReader.Path(), err)
}
if err := csh.setColumnNames(cshIndex, columnNames); err != nil {
logger.Panicf("FATAL: %s: %s", sr.columnsHeaderIndexReader.Path(), err)
}
putColumnsHeaderIndex(cshIndex)
}
// timestampsData contains the encoded timestamps data.
type timestampsData struct {
// data contains packed timestamps data.
@ -307,16 +325,9 @@ func (cd *columnData) copyFrom(a *arena, src *columnData) {
// mustWriteTo writes cd to sw and updates ch accordingly.
//
// ch is valid until cd is changed.
func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) {
ch.reset()
valuesWriter := &sw.fieldValuesWriter
bloomFilterWriter := &sw.fieldBloomFilterWriter
if cd.name == "" {
valuesWriter = &sw.messageValuesWriter
bloomFilterWriter = &sw.messageBloomFilterWriter
}
ch.name = cd.name
ch.valueType = cd.valueType
@ -324,36 +335,31 @@ func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
ch.maxValue = cd.maxValue
ch.valuesDict.copyFromNoArena(&cd.valuesDict)
bloomValuesWriter := sw.getBloomValuesWriterForColumnName(ch.name)
// marshal values
ch.valuesSize = uint64(len(cd.valuesData))
if ch.valuesSize > maxValuesBlockSize {
logger.Panicf("BUG: too big valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
}
ch.valuesOffset = valuesWriter.bytesWritten
valuesWriter.MustWrite(cd.valuesData)
ch.valuesOffset = bloomValuesWriter.values.bytesWritten
bloomValuesWriter.values.MustWrite(cd.valuesData)
// marshal bloom filter
ch.bloomFilterSize = uint64(len(cd.bloomFilterData))
if ch.bloomFilterSize > maxBloomFilterBlockSize {
logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize)
}
ch.bloomFilterOffset = bloomFilterWriter.bytesWritten
bloomFilterWriter.MustWrite(cd.bloomFilterData)
ch.bloomFilterOffset = bloomValuesWriter.bloom.bytesWritten
bloomValuesWriter.bloom.MustWrite(cd.bloomFilterData)
}
// mustReadFrom reads columns data associated with ch from sr to cd.
//
// cd is valid until a.reset() is called.
func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders) {
func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders, partFormatVersion uint) {
cd.reset()
valuesReader := &sr.fieldValuesReader
bloomFilterReader := &sr.fieldBloomFilterReader
if ch.name == "" {
valuesReader = &sr.messageValuesReader
bloomFilterReader = &sr.messageBloomFilterReader
}
cd.name = a.copyString(ch.name)
cd.valueType = ch.valueType
@ -361,30 +367,32 @@ func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders
cd.maxValue = ch.maxValue
cd.valuesDict.copyFrom(a, &ch.valuesDict)
bloomValuesReader := sr.getBloomValuesReaderForColumnName(ch.name, partFormatVersion)
// read values
if ch.valuesOffset != valuesReader.bytesRead {
if ch.valuesOffset != bloomValuesReader.values.bytesRead {
logger.Panicf("FATAL: %s: unexpected columnHeader.valuesOffset=%d; must equal to the number of bytes read: %d",
valuesReader.Path(), ch.valuesOffset, valuesReader.bytesRead)
bloomValuesReader.values.Path(), ch.valuesOffset, bloomValuesReader.values.bytesRead)
}
valuesSize := ch.valuesSize
if valuesSize > maxValuesBlockSize {
logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", valuesReader.Path(), maxValuesBlockSize, valuesSize)
logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bloomValuesReader.values.Path(), maxValuesBlockSize, valuesSize)
}
cd.valuesData = a.newBytes(int(valuesSize))
valuesReader.MustReadFull(cd.valuesData)
bloomValuesReader.values.MustReadFull(cd.valuesData)
// read bloom filter
// bloom filter is missing in valueTypeDict.
if ch.valueType != valueTypeDict {
if ch.bloomFilterOffset != bloomFilterReader.bytesRead {
if ch.bloomFilterOffset != bloomValuesReader.bloom.bytesRead {
logger.Panicf("FATAL: %s: unexpected columnHeader.bloomFilterOffset=%d; must equal to the number of bytes read: %d",
bloomFilterReader.Path(), ch.bloomFilterOffset, bloomFilterReader.bytesRead)
bloomValuesReader.bloom.Path(), ch.bloomFilterOffset, bloomValuesReader.bloom.bytesRead)
}
bloomFilterSize := ch.bloomFilterSize
if bloomFilterSize > maxBloomFilterBlockSize {
logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bloomFilterReader.Path(), maxBloomFilterBlockSize, bloomFilterSize)
logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bloomValuesReader.bloom.Path(), maxBloomFilterBlockSize, bloomFilterSize)
}
cd.bloomFilterData = a.newBytes(int(bloomFilterSize))
bloomFilterReader.MustReadFull(cd.bloomFilterData)
bloomValuesReader.bloom.MustReadFull(cd.bloomFilterData)
}
}

View File

@ -27,6 +27,12 @@ type blockHeader struct {
// timestampsHeader contains information about timestamps for log entries in the block
timestampsHeader timestampsHeader
// columnsHeaderIndexOffset is the offset of columnsHeaderIndex at columnsHeaderIndexFilename
columnsHeaderIndexOffset uint64
// columnsHeaderIndexSize is the size of columnsHeaderIndex at columnsHeaderIndexFilename
columnsHeaderIndexSize uint64
// columnsHeaderOffset is the offset of columnsHeader at columnsHeaderFilename
columnsHeaderOffset uint64
@ -40,6 +46,8 @@ func (bh *blockHeader) reset() {
bh.uncompressedSizeBytes = 0
bh.rowsCount = 0
bh.timestampsHeader.reset()
bh.columnsHeaderIndexOffset = 0
bh.columnsHeaderIndexSize = 0
bh.columnsHeaderOffset = 0
bh.columnsHeaderSize = 0
}
@ -51,6 +59,8 @@ func (bh *blockHeader) copyFrom(src *blockHeader) {
bh.uncompressedSizeBytes = src.uncompressedSizeBytes
bh.rowsCount = src.rowsCount
bh.timestampsHeader.copyFrom(&src.timestampsHeader)
bh.columnsHeaderIndexOffset = src.columnsHeaderIndexOffset
bh.columnsHeaderIndexSize = src.columnsHeaderIndexSize
bh.columnsHeaderOffset = src.columnsHeaderOffset
bh.columnsHeaderSize = src.columnsHeaderSize
}
@ -59,12 +69,14 @@ func (bh *blockHeader) copyFrom(src *blockHeader) {
func (bh *blockHeader) marshal(dst []byte) []byte {
// Do not store the version used for encoding directly in the block header, since:
// - all the block headers in the same part use the same encoding
// - the block header encoding version can be put in metadata file for the part (aka metadataFilename)
// - the format encoding version is stored in metadata file for the part (aka metadataFilename)
dst = bh.streamID.marshal(dst)
dst = encoding.MarshalVarUint64(dst, bh.uncompressedSizeBytes)
dst = encoding.MarshalVarUint64(dst, bh.rowsCount)
dst = bh.timestampsHeader.marshal(dst)
dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderIndexOffset)
dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderIndexSize)
dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderOffset)
dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderSize)
@ -72,7 +84,7 @@ func (bh *blockHeader) marshal(dst []byte) []byte {
}
// unmarshal unmarshals bh from src and returns the remaining tail.
func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) {
func (bh *blockHeader) unmarshal(src []byte, partFormatVersion uint) ([]byte, error) {
bh.reset()
srcOrig := src
@ -110,6 +122,24 @@ func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) {
}
src = tail
if partFormatVersion >= 1 {
// unmarshal columnsHeaderIndexOffset
n, nSize = encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderIndexOffset")
}
src = src[nSize:]
bh.columnsHeaderIndexOffset = n
// unmarshal columnsHeaderIndexSize
n, nSize = encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderIndexSize")
}
src = src[nSize:]
bh.columnsHeaderIndexSize = n
}
// unmarshal columnsHeaderOffset
n, nSize = encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
@ -148,8 +178,8 @@ func putBlockHeader(bh *blockHeader) {
var blockHeaderPool sync.Pool
// unmarshalBlockHeaders appends unmarshaled from src blockHeader entries to dst and returns the result.
func unmarshalBlockHeaders(dst []blockHeader, src []byte) ([]blockHeader, error) {
dstOrig := dst
func unmarshalBlockHeaders(dst []blockHeader, src []byte, partFormatVersion uint) ([]blockHeader, error) {
dstLen := len(dst)
for len(src) > 0 {
if len(dst) < cap(dst) {
dst = dst[:len(dst)+1]
@ -157,14 +187,14 @@ func unmarshalBlockHeaders(dst []blockHeader, src []byte) ([]blockHeader, error)
dst = append(dst, blockHeader{})
}
bh := &dst[len(dst)-1]
tail, err := bh.unmarshal(src)
tail, err := bh.unmarshal(src, partFormatVersion)
if err != nil {
return dstOrig, fmt.Errorf("cannot unmarshal blockHeader entries: %w", err)
return dst, fmt.Errorf("cannot unmarshal blockHeader entries: %w", err)
}
src = tail
}
if err := validateBlockHeaders(dst[len(dstOrig):]); err != nil {
return dstOrig, err
if err := validateBlockHeaders(dst[dstLen:]); err != nil {
return dst, err
}
return dst, nil
}
@ -195,6 +225,128 @@ func resetBlockHeaders(bhs []blockHeader) []blockHeader {
return bhs[:0]
}
// columnHeaderRef references column header in the marshaled columnsHeader.
type columnHeaderRef struct {
// columnNameID is the ID of the column name. The column name can be obtained from part.columnNames.
columnNameID uint64
// offset is the offset of the the corresponding columnHeader inside marshaled columnsHeader.
offset uint64
}
// columnsHeaderIndex contains offsets for marshaled column headers.
type columnsHeaderIndex struct {
// columnHeadersRefs contains references to columnHeaders.
columnHeadersRefs []columnHeaderRef
// constColumnsRefs contains references to constColumns.
constColumnsRefs []columnHeaderRef
}
func getColumnsHeaderIndex() *columnsHeaderIndex {
v := columnsHeaderIndexPool.Get()
if v == nil {
return &columnsHeaderIndex{}
}
return v.(*columnsHeaderIndex)
}
func putColumnsHeaderIndex(cshIndex *columnsHeaderIndex) {
cshIndex.reset()
columnsHeaderIndexPool.Put(cshIndex)
}
var columnsHeaderIndexPool sync.Pool
func (cshIndex *columnsHeaderIndex) reset() {
clear(cshIndex.columnHeadersRefs)
cshIndex.columnHeadersRefs = cshIndex.columnHeadersRefs[:0]
clear(cshIndex.constColumnsRefs)
cshIndex.constColumnsRefs = cshIndex.constColumnsRefs[:0]
}
func (cshIndex *columnsHeaderIndex) resizeConstColumnsRefs(n int) []columnHeaderRef {
cshIndex.constColumnsRefs = slicesutil.SetLength(cshIndex.constColumnsRefs, n)
return cshIndex.constColumnsRefs
}
func (cshIndex *columnsHeaderIndex) resizeColumnHeadersRefs(n int) []columnHeaderRef {
cshIndex.columnHeadersRefs = slicesutil.SetLength(cshIndex.columnHeadersRefs, n)
return cshIndex.columnHeadersRefs
}
func (cshIndex *columnsHeaderIndex) marshal(dst []byte) []byte {
dst = marshalColumnHeadersRefs(dst, cshIndex.columnHeadersRefs)
dst = marshalColumnHeadersRefs(dst, cshIndex.constColumnsRefs)
return dst
}
// unmarshalNoArena unmarshals cshIndex from src.
//
// cshIndex is valid until src is changed.
func (cshIndex *columnsHeaderIndex) unmarshalNoArena(src []byte) error {
cshIndex.reset()
refs, tail, err := unmarshalColumnHeadersRefsNoArena(cshIndex.columnHeadersRefs[:0], src)
if err != nil {
return fmt.Errorf("cannot unmarshal columnHeadersRefs: %w", err)
}
cshIndex.columnHeadersRefs = refs
src = tail
refs, tail, err = unmarshalColumnHeadersRefsNoArena(cshIndex.constColumnsRefs[:0], src)
if err != nil {
return fmt.Errorf("cannot unmarshal constColumnsRefs: %w", err)
}
cshIndex.constColumnsRefs = refs
if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling columnsHeaderIndex; len(tail)=%d", len(tail))
}
return nil
}
func marshalColumnHeadersRefs(dst []byte, refs []columnHeaderRef) []byte {
dst = encoding.MarshalVarUint64(dst, uint64(len(refs)))
for _, r := range refs {
dst = encoding.MarshalVarUint64(dst, r.columnNameID)
dst = encoding.MarshalVarUint64(dst, r.offset)
}
return dst
}
func unmarshalColumnHeadersRefsNoArena(dst []columnHeaderRef, src []byte) ([]columnHeaderRef, []byte, error) {
srcOrig := src
n, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return dst, srcOrig, fmt.Errorf("cannot unmarshal the number of columnHeaderRef items")
}
src = src[nSize:]
for i := uint64(0); i < n; i++ {
columnNameID, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return dst, srcOrig, fmt.Errorf("cannot unmarshal column name ID number %d out of %d", i, n)
}
src = src[nSize:]
offset, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return dst, srcOrig, fmt.Errorf("cannot unmarshal offset number %d out of %d", i, n)
}
src = src[nSize:]
dst = append(dst, columnHeaderRef{
columnNameID: columnNameID,
offset: offset,
})
}
return dst, src, nil
}
func getColumnsHeader() *columnsHeader {
v := columnsHeaderPool.Get()
if v == nil {
@ -235,27 +387,98 @@ func (csh *columnsHeader) reset() {
csh.constColumns = ccs[:0]
}
func (csh *columnsHeader) resizeConstColumns(columnsLen int) []Field {
csh.constColumns = slicesutil.SetLength(csh.constColumns, columnsLen)
func (csh *columnsHeader) resizeConstColumns(n int) []Field {
csh.constColumns = slicesutil.SetLength(csh.constColumns, n)
return csh.constColumns
}
func (csh *columnsHeader) resizeColumnHeaders(columnHeadersLen int) []columnHeader {
csh.columnHeaders = slicesutil.SetLength(csh.columnHeaders, columnHeadersLen)
func (csh *columnsHeader) resizeColumnHeaders(n int) []columnHeader {
csh.columnHeaders = slicesutil.SetLength(csh.columnHeaders, n)
return csh.columnHeaders
}
func (csh *columnsHeader) marshal(dst []byte) []byte {
func (csh *columnsHeader) setColumnNames(cshIndex *columnsHeaderIndex, columnNames []string) error {
if len(cshIndex.columnHeadersRefs) != len(csh.columnHeaders) {
return fmt.Errorf("unpexected number of column headers; got %d; want %d", len(cshIndex.columnHeadersRefs), len(csh.columnHeaders))
}
for i := range csh.columnHeaders {
columnNameID := cshIndex.columnHeadersRefs[i].columnNameID
if columnNameID >= uint64(len(columnNames)) {
return fmt.Errorf("unexpected columnNameID=%d in columnHeadersRef; len(columnNames)=%d; columnNames=%v", columnNameID, len(columnNames), columnNames)
}
csh.columnHeaders[i].name = columnNames[columnNameID]
}
if len(cshIndex.constColumnsRefs) != len(csh.constColumns) {
return fmt.Errorf("unexpected number of const columns; got %d; want %d", len(cshIndex.constColumnsRefs), len(csh.constColumns))
}
for i := range csh.constColumns {
columnNameID := cshIndex.constColumnsRefs[i].columnNameID
if columnNameID >= uint64(len(columnNames)) {
return fmt.Errorf("unexpected columnNameID=%d in constColumnsRefs; len(columnNames)=%d; columnNames=%v", columnNameID, len(columnNames), columnNames)
}
csh.constColumns[i].Name = columnNames[columnNameID]
}
return nil
}
func (csh *columnsHeader) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
bb := longTermBufPool.Get()
defer longTermBufPool.Put(bb)
cshIndex := getColumnsHeaderIndex()
bb.B = csh.marshal(bb.B, cshIndex, g)
columnsHeaderData := bb.B
bb.B = cshIndex.marshal(bb.B)
columnsHeaderIndexData := bb.B[len(columnsHeaderData):]
putColumnsHeaderIndex(cshIndex)
bh.columnsHeaderIndexOffset = sw.columnsHeaderIndexWriter.bytesWritten
bh.columnsHeaderIndexSize = uint64(len(columnsHeaderIndexData))
if bh.columnsHeaderIndexSize > maxColumnsHeaderIndexSize {
logger.Panicf("BUG: too big columnsHeaderIndexSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderIndexSize, maxColumnsHeaderIndexSize)
}
sw.columnsHeaderIndexWriter.MustWrite(columnsHeaderIndexData)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(columnsHeaderData))
if bh.columnsHeaderSize > maxColumnsHeaderSize {
logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize)
}
sw.columnsHeaderWriter.MustWrite(columnsHeaderData)
}
func (csh *columnsHeader) marshal(dst []byte, cshIndex *columnsHeaderIndex, g *columnNameIDGenerator) []byte {
dstLen := len(dst)
chs := csh.columnHeaders
chsRefs := cshIndex.resizeColumnHeadersRefs(len(chs))
dst = encoding.MarshalVarUint64(dst, uint64(len(chs)))
for i := range chs {
columnNameID := g.getColumnNameID(chs[i].name)
offset := len(dst) - dstLen
dst = chs[i].marshal(dst)
chsRefs[i] = columnHeaderRef{
columnNameID: columnNameID,
offset: uint64(offset),
}
}
ccs := csh.constColumns
ccsRefs := cshIndex.resizeConstColumnsRefs(len(ccs))
dst = encoding.MarshalVarUint64(dst, uint64(len(ccs)))
for i := range ccs {
dst = ccs[i].marshal(dst)
columnNameID := g.getColumnNameID(ccs[i].Name)
offset := len(dst) - dstLen
dst = ccs[i].marshal(dst, false)
ccsRefs[i] = columnHeaderRef{
columnNameID: columnNameID,
offset: uint64(offset),
}
}
return dst
@ -264,7 +487,7 @@ func (csh *columnsHeader) marshal(dst []byte) []byte {
// unmarshalNoArena unmarshals csh from src.
//
// csh is valid until src is changed.
func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
func (csh *columnsHeader) unmarshalNoArena(src []byte, partFormatVersion uint) error {
csh.reset()
// unmarshal columnHeaders
@ -279,7 +502,7 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
chs := csh.resizeColumnHeaders(int(n))
for i := range chs {
tail, err := chs[i].unmarshalNoArena(src)
tail, err := chs[i].unmarshalNoArena(src, partFormatVersion)
if err != nil {
return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err)
}
@ -299,7 +522,7 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
ccs := csh.resizeConstColumns(int(n))
for i := range ccs {
tail, err := ccs[i].unmarshalNoArena(src)
tail, err := ccs[i].unmarshalNoArena(src, partFormatVersion < 1)
if err != nil {
return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err)
}
@ -317,7 +540,8 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
// columnHeaders contains information for values, which belong to a single label in a single block.
//
// The main column with an empty name is stored in messageValuesFilename,
// while the rest of columns are stored in fieldValuesFilename.
// while the rest of columns are stored in smallValuesFilename or bigValuesFilename depending
// on the block size (see maxSmallValuesBlockSize).
// This allows minimizing disk read IO when filtering by non-message columns.
//
// Every block column contains also a bloom filter for all the tokens stored in the column.
@ -334,7 +558,8 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
// - valueTypeTimestampISO8601 stores encoded into uint64 timestamps
//
// Bloom filters for main column with an empty name is stored in messageBloomFilename,
// while the rest of columns are stored in fieldBloomFilename.
// while the rest of columns are stored in smallBloomFilename or bigBloomFilename depending on their size
// (see maxSmallBloomFilterBlockSize).
type columnHeader struct {
// name contains column name aka label name
name string
@ -355,16 +580,16 @@ type columnHeader struct {
// valuesDict contains unique values for valueType = valueTypeDict
valuesDict valuesDict
// valuesOffset contains the offset of the block in either messageValuesFilename or fieldValuesFilename
// valuesOffset contains the offset of the block in either messageValuesFilename, smallValuesFilename or bigValuesFilename
valuesOffset uint64
// valuesSize contains the size of the block in either messageValuesFilename or fieldValuesFilename
// valuesSize contains the size of the block in either messageValuesFilename, smallValuesFilename or bigValuesFilename
valuesSize uint64
// bloomFilterOffset contains the offset of the bloom filter in either messageBloomFilename or fieldBloomFilename
// bloomFilterOffset contains the offset of the bloom filter in messageBloomFilename, smallBloomFilename or bigBloomFilename
bloomFilterOffset uint64
// bloomFilterSize contains the size of the bloom filter in either messageBloomFilename or fieldBloomFilename
// bloomFilterSize contains the size of the bloom filter in messageBloomFilename, smallBloomFilename or bigBloomFilename
bloomFilterSize uint64
}
@ -403,8 +628,9 @@ func (ch *columnHeader) marshal(dst []byte) []byte {
logger.Panicf("BUG: minValue=%d must be smaller than maxValue=%d for valueType=%d", ch.minValue, ch.maxValue, ch.valueType)
}
// Encode common fields - ch.name and ch.valueType
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(ch.name))
// Do not encode ch.name, since it should be encoded at columnsHeaderIndex.columnHeadersRefs
// Encode common field - ch.valueType
dst = append(dst, byte(ch.valueType))
// Encode other fields depending on ch.valueType
@ -472,18 +698,20 @@ func (ch *columnHeader) marshalBloomFilters(dst []byte) []byte {
// unmarshalNoArena unmarshals ch from src and returns the tail left after unmarshaling.
//
// ch is valid until src is changed.
func (ch *columnHeader) unmarshalNoArena(src []byte) ([]byte, error) {
func (ch *columnHeader) unmarshalNoArena(src []byte, partFormatVersion uint) ([]byte, error) {
ch.reset()
srcOrig := src
// Unmarshal column name
data, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal column name")
if partFormatVersion < 1 {
data, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal column name")
}
src = src[nSize:]
ch.name = bytesutil.ToUnsafeString(data)
}
src = src[nSize:]
ch.name = bytesutil.ToUnsafeString(data)
// Unmarshal value type
if len(src) < 1 {

View File

@ -15,7 +15,7 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) {
t.Fatalf("unexpected lengths of the marshaled blockHeader; got %d; want %d", len(data), marshaledLen)
}
bh2 := &blockHeader{}
tail, err := bh2.unmarshal(data)
tail, err := bh2.unmarshal(data, partFormatLatestVersion)
if err != nil {
t.Fatalf("unexpected error in unmarshal: %s", err)
}
@ -26,7 +26,7 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) {
t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", bh2, bh)
}
}
f(&blockHeader{}, 61)
f(&blockHeader{}, 63)
f(&blockHeader{
streamID: streamID{
tenantID: TenantID{
@ -47,24 +47,71 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) {
maxTimestamp: 23434,
marshalType: encoding.MarshalTypeNearestDelta2,
},
columnsHeaderOffset: 4384,
columnsHeaderSize: 894,
}, 65)
columnsHeaderIndexOffset: 8923481,
columnsHeaderIndexSize: 8989832,
columnsHeaderOffset: 4384,
columnsHeaderSize: 894,
}, 73)
}
func TestColumnsHeaderIndexMarshalUnmarshal(t *testing.T) {
f := func(cshIndex *columnsHeaderIndex, marshaledLen int) {
t.Helper()
data := cshIndex.marshal(nil)
if len(data) != marshaledLen {
t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen)
}
cshIndex2 := &columnsHeaderIndex{}
if err := cshIndex2.unmarshalNoArena(data); err != nil {
t.Fatalf("unexpected error in unmarshal: %s", err)
}
if !reflect.DeepEqual(cshIndex, cshIndex2) {
t.Fatalf("unexpected blockHeaderIndex unmarshaled\ngot\n%v\nwant\n%v", cshIndex2, cshIndex)
}
}
f(&columnsHeaderIndex{}, 2)
f(&columnsHeaderIndex{
columnHeadersRefs: []columnHeaderRef{
{
columnNameID: 234,
offset: 123432,
},
{
columnNameID: 23898,
offset: 0,
},
},
constColumnsRefs: []columnHeaderRef{
{
columnNameID: 0,
offset: 8989,
},
},
}, 14)
}
func TestColumnsHeaderMarshalUnmarshal(t *testing.T) {
f := func(csh *columnsHeader, marshaledLen int) {
t.Helper()
data := csh.marshal(nil)
cshIndex := getColumnsHeaderIndex()
g := &columnNameIDGenerator{}
data := csh.marshal(nil, cshIndex, g)
if len(data) != marshaledLen {
t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen)
t.Fatalf("unexpected length of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen)
}
csh2 := &columnsHeader{}
err := csh2.unmarshalNoArena(data)
if err != nil {
if err := csh2.unmarshalNoArena(data, partFormatLatestVersion); err != nil {
t.Fatalf("unexpected error in unmarshal: %s", err)
}
if err := csh2.setColumnNames(cshIndex, g.columnNames); err != nil {
t.Fatalf("cannot set column names: %s", err)
}
if !reflect.DeepEqual(csh, csh2) {
t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", csh2, csh)
}
@ -98,7 +145,7 @@ func TestColumnsHeaderMarshalUnmarshal(t *testing.T) {
Value: "bar",
},
},
}, 50)
}, 31)
}
func TestBlockHeaderUnmarshalFailure(t *testing.T) {
@ -107,7 +154,7 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) {
dataOrig := append([]byte{}, data...)
bh := getBlockHeader()
defer putBlockHeader(bh)
tail, err := bh.unmarshal(data)
tail, err := bh.unmarshal(data, partFormatLatestVersion)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -138,8 +185,10 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) {
maxTimestamp: 23434,
marshalType: encoding.MarshalTypeNearestDelta2,
},
columnsHeaderOffset: 4384,
columnsHeaderSize: 894,
columnsHeaderIndexOffset: 89434,
columnsHeaderIndexSize: 89123,
columnsHeaderOffset: 4384,
columnsHeaderSize: 894,
}
data := bh.marshal(nil)
for len(data) > 0 {
@ -148,14 +197,52 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) {
}
}
func TestColumnsHeaderIndexUnmarshalFailure(t *testing.T) {
f := func(data []byte) {
t.Helper()
cshIndex := getColumnsHeaderIndex()
defer putColumnsHeaderIndex(cshIndex)
if err := cshIndex.unmarshalNoArena(data); err == nil {
t.Fatalf("expecting non-nil error")
}
}
f(nil)
f([]byte("foo"))
cshIndex := &columnsHeaderIndex{
columnHeadersRefs: []columnHeaderRef{
{
columnNameID: 0,
offset: 123,
},
},
constColumnsRefs: []columnHeaderRef{
{
columnNameID: 2,
offset: 89834,
},
{
columnNameID: 234,
offset: 8934,
},
},
}
data := cshIndex.marshal(nil)
for len(data) > 0 {
data = data[:len(data)-1]
f(data)
}
}
func TestColumnsHeaderUnmarshalFailure(t *testing.T) {
f := func(data []byte) {
t.Helper()
csh := getColumnsHeader()
defer putColumnsHeader(csh)
err := csh.unmarshalNoArena(data)
if err == nil {
if err := csh.unmarshalNoArena(data, partFormatLatestVersion); err == nil {
t.Fatalf("expecting non-nil error")
}
}
@ -163,7 +250,7 @@ func TestColumnsHeaderUnmarshalFailure(t *testing.T) {
f(nil)
f([]byte("foo"))
csh := columnsHeader{
csh := &columnsHeader{
columnHeaders: []columnHeader{
{
name: "foobar",
@ -191,11 +278,14 @@ func TestColumnsHeaderUnmarshalFailure(t *testing.T) {
},
},
}
data := csh.marshal(nil)
cshIndex := getColumnsHeaderIndex()
g := &columnNameIDGenerator{}
data := csh.marshal(nil, cshIndex, g)
for len(data) > 0 {
data = data[:len(data)-1]
f(data)
}
putColumnsHeaderIndex(cshIndex)
}
func TestBlockHeaderReset(t *testing.T) {
@ -219,8 +309,10 @@ func TestBlockHeaderReset(t *testing.T) {
maxTimestamp: 23434,
marshalType: encoding.MarshalTypeNearestDelta2,
},
columnsHeaderOffset: 12332,
columnsHeaderSize: 234,
columnsHeaderIndexOffset: 18934,
columnsHeaderIndexSize: 8912,
columnsHeaderOffset: 12332,
columnsHeaderSize: 234,
}
bh.reset()
bhZero := &blockHeader{}
@ -229,6 +321,35 @@ func TestBlockHeaderReset(t *testing.T) {
}
}
func TestColumnsHeaderIndexReset(t *testing.T) {
cshIndex := &columnsHeaderIndex{
columnHeadersRefs: []columnHeaderRef{
{
columnNameID: 234,
offset: 1234,
},
},
constColumnsRefs: []columnHeaderRef{
{
columnNameID: 328,
offset: 21344,
},
{
columnNameID: 1,
offset: 234,
},
},
}
cshIndex.reset()
cshIndexZero := &columnsHeaderIndex{
columnHeadersRefs: []columnHeaderRef{},
constColumnsRefs: []columnHeaderRef{},
}
if !reflect.DeepEqual(cshIndex, cshIndexZero) {
t.Fatalf("unexpected non-zero columnsHeaderIndex after reset: %v", cshIndex)
}
}
func TestColumnsHeaderReset(t *testing.T) {
csh := &columnsHeader{
columnHeaders: []columnHeader{
@ -278,7 +399,7 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) {
if len(data) != marshaledLen {
t.Fatalf("unexpected length for marshaled blockHeader entries; got %d; want %d", len(data), marshaledLen)
}
bhs2, err := unmarshalBlockHeaders(nil, data)
bhs2, err := unmarshalBlockHeaders(nil, data, partFormatLatestVersion)
if err != nil {
t.Fatalf("unexpected error when unmarshaling blockHeader entries: %s", err)
}
@ -287,7 +408,7 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) {
}
}
f(nil, 0)
f([]blockHeader{{}}, 61)
f([]blockHeader{{}}, 63)
f([]blockHeader{
{},
{
@ -310,10 +431,12 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) {
maxTimestamp: 23434,
marshalType: encoding.MarshalTypeNearestDelta2,
},
columnsHeaderOffset: 12332,
columnsHeaderSize: 234,
columnsHeaderIndexOffset: 1234,
columnsHeaderIndexSize: 89324,
columnsHeaderOffset: 12332,
columnsHeaderSize: 234,
},
}, 127)
}, 134)
}
func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
@ -325,13 +448,17 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
t.Fatalf("unexpected marshaled length of columnHeader; got %d; want %d", len(data), marshaledLen)
}
var ch2 columnHeader
tail, err := ch2.unmarshalNoArena(data)
tail, err := ch2.unmarshalNoArena(data, partFormatLatestVersion)
if err != nil {
t.Fatalf("unexpected error in umarshal(%v): %s", ch, err)
}
if len(tail) > 0 {
t.Fatalf("unexpected non-empty tail after unmarshal(%v): %X", ch, tail)
}
// columnHeader.name isn't marshaled, since it is marshaled via columnsHeaderIndex starting from part format v1.
ch2.name = ch.name
if !reflect.DeepEqual(ch, &ch2) {
t.Fatalf("unexpected columnHeader after unmarshal;\ngot\n%v\nwant\n%v", &ch2, ch)
}
@ -340,7 +467,7 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
f(&columnHeader{
name: "foo",
valueType: valueTypeUint8,
}, 11)
}, 7)
ch := &columnHeader{
name: "foobar",
valueType: valueTypeDict,
@ -349,7 +476,7 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
valuesSize: 254452,
}
ch.valuesDict.getOrAdd("abc")
f(ch, 18)
f(ch, 11)
}
func TestColumnHeaderUnmarshalFailure(t *testing.T) {
@ -358,7 +485,7 @@ func TestColumnHeaderUnmarshalFailure(t *testing.T) {
dataOrig := append([]byte{}, data...)
var ch columnHeader
tail, err := ch.unmarshalNoArena(data)
tail, err := ch.unmarshalNoArena(data, partFormatLatestVersion)
if err == nil {
t.Fatalf("expecting non-nil error")
}

View File

@ -1,11 +1,13 @@
package logstorage
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// The number of blocks to search at once by a single worker
@ -113,17 +115,35 @@ type blockSearch struct {
// sbu is used for unmarshaling local columns
sbu stringsBlockUnmarshaler
// cshIndexBlockCache holds columnsHeaderIndex data for the given block.
//
// It is initialized lazily by calling getColumnsHeaderIndex().
cshIndexBlockCache []byte
// cshBlockCache holds columnsHeader data for the given block.
//
// it is initialized lazily by calling getColumnsHeader().
cshBlockCache []byte
// It is initialized lazily by calling getColumnsHeaderBlock().
cshBlockCache []byte
cshBlockInitialized bool
// cshCache is the columnsHeader associated with the given block
// ccsCache is the cache for accessed const columns
ccsCache []Field
// chsCache is the cache for accessed column headers
chsCache []columnHeader
// cshIndexCache is the columnsHeaderIndex associated with the given block.
//
// it is initialized lazily by calling getColumnsHeader().
// It is initialized lazily by calling getColumnsHeaderIndex().
cshIndexCache *columnsHeaderIndex
// cshCache is the columnsHeader associated with the given block.
//
// It is initialized lazily by calling getColumnsHeaderV0().
cshCache *columnsHeader
// seenStreams contains seen streamIDs for the recent searches.
//
// It is used for speeding up fetching _stream column.
seenStreams map[u128]string
}
@ -151,7 +171,27 @@ func (bs *blockSearch) reset() {
bs.sbu.reset()
bs.cshIndexBlockCache = bs.cshIndexBlockCache[:0]
bs.cshBlockCache = bs.cshBlockCache[:0]
bs.cshBlockInitialized = false
ccsCache := bs.ccsCache
for i := range ccsCache {
ccsCache[i].Reset()
}
bs.ccsCache = ccsCache[:0]
chsCache := bs.chsCache
for i := range chsCache {
chsCache[i].reset()
}
bs.chsCache = chsCache[:0]
if bs.cshIndexCache != nil {
putColumnsHeaderIndex(bs.cshIndexCache)
bs.cshIndexCache = nil
}
if bs.cshCache != nil {
putColumnsHeader(bs.cshCache)
@ -190,16 +230,54 @@ func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) {
}
}
func (bs *blockSearch) partFormatVersion() uint {
return bs.bsw.p.ph.FormatVersion
}
func (bs *blockSearch) getConstColumnValue(name string) string {
if name == "_msg" {
name = ""
}
csh := bs.getColumnsHeader()
for _, cc := range csh.constColumns {
if cc.Name == name {
return cc.Value
if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeaderV0()
for _, cc := range csh.constColumns {
if cc.Name == name {
return cc.Value
}
}
return ""
}
columnNameID, ok := bs.getColumnNameID(name)
if !ok {
return ""
}
for i := range bs.ccsCache {
if bs.ccsCache[i].Name == name {
return bs.ccsCache[i].Value
}
}
cshIndex := bs.getColumnsHeaderIndex()
for _, cr := range cshIndex.constColumnsRefs {
if cr.columnNameID != columnNameID {
continue
}
b := bs.getColumnsHeaderBlock()
if cr.offset > uint64(len(b)) {
logger.Panicf("FATAL: %s: header offset for const column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
}
b = b[cr.offset:]
bs.ccsCache = slicesutil.SetLength(bs.ccsCache, len(bs.ccsCache)+1)
cc := &bs.ccsCache[len(bs.ccsCache)-1]
if _, err := cc.unmarshalNoArena(b, false); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal header for const column %q: %s", bs.bsw.p.path, name, err)
}
cc.Name = strings.Clone(name)
return cc.Value
}
return ""
}
@ -209,46 +287,158 @@ func (bs *blockSearch) getColumnHeader(name string) *columnHeader {
name = ""
}
csh := bs.getColumnsHeader()
chs := csh.columnHeaders
for i := range chs {
ch := &chs[i]
if ch.name == name {
return ch
if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeaderV0()
chs := csh.columnHeaders
for i := range chs {
ch := &chs[i]
if ch.name == name {
return ch
}
}
return nil
}
columnNameID, ok := bs.getColumnNameID(name)
if !ok {
return nil
}
for i := range bs.chsCache {
if bs.chsCache[i].name == name {
return &bs.chsCache[i]
}
}
cshIndex := bs.getColumnsHeaderIndex()
for _, cr := range cshIndex.columnHeadersRefs {
if cr.columnNameID != columnNameID {
continue
}
b := bs.getColumnsHeaderBlock()
if cr.offset > uint64(len(b)) {
logger.Panicf("FATAL: %s: header offset for column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
}
b = b[cr.offset:]
bs.chsCache = slicesutil.SetLength(bs.chsCache, len(bs.chsCache)+1)
ch := &bs.chsCache[len(bs.chsCache)-1]
if _, err := ch.unmarshalNoArena(b, partFormatLatestVersion); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal header for column %q: %s", bs.bsw.p.path, name, err)
}
ch.name = strings.Clone(name)
return ch
}
return nil
}
func (bs *blockSearch) getColumnNameID(name string) (uint64, bool) {
id, ok := bs.bsw.p.columnNameIDs[name]
return id, ok
}
func (bs *blockSearch) getColumnNameByID(id uint64) (string, bool) {
columnNames := bs.bsw.p.columnNames
if id >= uint64(len(columnNames)) {
return "", false
}
return columnNames[id], true
}
func (bs *blockSearch) getConstColumns() []Field {
csh := bs.getColumnsHeader()
return csh.constColumns
if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeaderV0()
return csh.constColumns
}
chsIndex := bs.getColumnsHeaderIndex()
for _, cr := range chsIndex.constColumnsRefs {
columnName, ok := bs.getColumnNameByID(cr.columnNameID)
if !ok {
logger.Panicf("FATAL: %s: missing column name for id=%d", bs.bsw.p.path, cr.columnNameID)
}
_ = bs.getConstColumnValue(columnName)
}
return bs.ccsCache
}
func (bs *blockSearch) getColumnHeaders() []columnHeader {
csh := bs.getColumnsHeader()
return csh.columnHeaders
if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeaderV0()
return csh.columnHeaders
}
chsIndex := bs.getColumnsHeaderIndex()
for _, cr := range chsIndex.columnHeadersRefs {
columnName, ok := bs.getColumnNameByID(cr.columnNameID)
if !ok {
logger.Panicf("FATAL: %s: missing column name for id=%d", bs.bsw.p.path, cr.columnNameID)
}
_ = bs.getColumnHeader(columnName)
}
return bs.chsCache
}
func (bs *blockSearch) getColumnsHeader() *columnsHeader {
func (bs *blockSearch) getColumnsHeaderIndex() *columnsHeaderIndex {
if bs.partFormatVersion() < 1 {
logger.Panicf("BUG: getColumnsHeaderIndex() can be called only for part encoding v1+, while it has been called for v%d", bs.partFormatVersion())
}
if bs.cshIndexCache == nil {
bs.cshIndexBlockCache = readColumnsHeaderIndexBlock(bs.cshIndexBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
bs.cshIndexCache = getColumnsHeaderIndex()
if err := bs.cshIndexCache.unmarshalNoArena(bs.cshIndexBlockCache); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header index: %s", bs.bsw.p.path, err)
}
}
return bs.cshIndexCache
}
func (bs *blockSearch) getColumnsHeaderV0() *columnsHeader {
if bs.partFormatVersion() >= 1 {
logger.Panicf("BUG: getColumnsHeaderV0() can be called only for part encoding v0, while it has been called for v%d", bs.partFormatVersion())
}
if bs.cshCache == nil {
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
b := bs.getColumnsHeaderBlock()
bs.cshCache = getColumnsHeader()
if err := bs.cshCache.unmarshalNoArena(bs.cshBlockCache); err != nil {
if err := bs.cshCache.unmarshalNoArena(b, 0); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", bs.bsw.p.path, err)
}
}
return bs.cshCache
}
func (bs *blockSearch) getColumnsHeaderBlock() []byte {
if !bs.cshBlockInitialized {
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
bs.cshBlockInitialized = true
}
return bs.cshBlockCache
}
func readColumnsHeaderIndexBlock(dst []byte, p *part, bh *blockHeader) []byte {
n := bh.columnsHeaderIndexSize
if n > maxColumnsHeaderIndexSize {
logger.Panicf("FATAL: %s: columns header index size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderIndexSize, n)
}
dstLen := len(dst)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
p.columnsHeaderIndexFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderIndexOffset))
return dst
}
func readColumnsHeaderBlock(dst []byte, p *part, bh *blockHeader) []byte {
columnsHeaderSize := bh.columnsHeaderSize
if columnsHeaderSize > maxColumnsHeaderSize {
logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, columnsHeaderSize)
n := bh.columnsHeaderSize
if n > maxColumnsHeaderSize {
logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, n)
}
dstLen := len(dst)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(columnsHeaderSize)+dstLen)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
p.columnsHeaderFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderOffset))
return dst
}
@ -263,11 +453,7 @@ func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter {
}
p := bs.bsw.p
bloomFilterFile := p.fieldBloomFilterFile
if ch.name == "" {
bloomFilterFile = p.messageBloomFilterFile
}
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
bb := longTermBufPool.Get()
bloomFilterSize := ch.bloomFilterSize
@ -275,7 +461,8 @@ func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter {
logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxBloomFilterBlockSize, bloomFilterSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(bloomFilterSize))
bloomFilterFile.MustReadAt(bb.B, int64(ch.bloomFilterOffset))
bloomValuesFile.bloom.MustReadAt(bb.B, int64(ch.bloomFilterOffset))
bf = getBloomFilter()
if err := bf.unmarshal(bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal bloom filter: %s", bs.partPath(), err)
@ -299,11 +486,7 @@ func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string {
}
p := bs.bsw.p
valuesFile := p.fieldValuesFile
if ch.name == "" {
valuesFile = p.messageValuesFile
}
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
bb := longTermBufPool.Get()
valuesSize := ch.valuesSize
@ -311,7 +494,7 @@ func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string {
logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxValuesBlockSize, valuesSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(valuesSize))
valuesFile.MustReadAt(bb.B, int64(ch.valuesOffset))
bloomValuesFile.values.MustReadAt(bb.B, int64(ch.valuesOffset))
values = getStringBucket()
var err error
@ -378,7 +561,7 @@ func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []b
logger.Panicf("FATAL: %s: cannot decompress indexBlock read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)
}
dst, err = unmarshalBlockHeaders(dst, bb.B)
dst, err = unmarshalBlockHeaders(dst, bb.B, p.ph.FormatVersion)
longTermBufPool.Put(bb)
if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal block headers read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)

View File

@ -4,6 +4,9 @@ import (
"path/filepath"
"sync"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -43,68 +46,132 @@ func (r *readerWithStats) Read(p []byte) (int, error) {
}
func (r *readerWithStats) MustClose() {
r.r.MustClose()
r.r = nil
if r.r != nil {
r.r.MustClose()
r.r = nil
}
}
// streamReaders contains readers for blockStreamReader
type streamReaders struct {
columnNamesReader readerWithStats
metaindexReader readerWithStats
indexReader readerWithStats
columnsHeaderIndexReader readerWithStats
columnsHeaderReader readerWithStats
timestampsReader readerWithStats
fieldValuesReader readerWithStats
fieldBloomFilterReader readerWithStats
messageValuesReader readerWithStats
messageBloomFilterReader readerWithStats
messageBloomValuesReader bloomValuesReader
oldBloomValuesReader bloomValuesReader
bloomValuesShards [bloomValuesShardsCount]bloomValuesReader
}
type bloomValuesReader struct {
bloom readerWithStats
values readerWithStats
}
func (r *bloomValuesReader) reset() {
r.bloom.reset()
r.values.reset()
}
func (r *bloomValuesReader) init(sr bloomValuesStreamReader) {
r.bloom.init(sr.bloom)
r.values.init(sr.values)
}
func (r *bloomValuesReader) totalBytesRead() uint64 {
return r.bloom.bytesRead + r.values.bytesRead
}
func (r *bloomValuesReader) MustClose() {
r.bloom.MustClose()
r.values.MustClose()
}
type bloomValuesStreamReader struct {
bloom filestream.ReadCloser
values filestream.ReadCloser
}
func (sr *streamReaders) reset() {
sr.columnNamesReader.reset()
sr.metaindexReader.reset()
sr.indexReader.reset()
sr.columnsHeaderIndexReader.reset()
sr.columnsHeaderReader.reset()
sr.timestampsReader.reset()
sr.fieldValuesReader.reset()
sr.fieldBloomFilterReader.reset()
sr.messageValuesReader.reset()
sr.messageBloomFilterReader.reset()
sr.messageBloomValuesReader.reset()
sr.oldBloomValuesReader.reset()
for i := range sr.bloomValuesShards[:] {
sr.bloomValuesShards[i].reset()
}
}
func (sr *streamReaders) init(metaindexReader, indexReader, columnsHeaderReader, timestampsReader, fieldValuesReader, fieldBloomFilterReader,
messageValuesReader, messageBloomFilterReader filestream.ReadCloser,
func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser,
messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader,
) {
sr.columnNamesReader.init(columnNamesReader)
sr.metaindexReader.init(metaindexReader)
sr.indexReader.init(indexReader)
sr.columnsHeaderIndexReader.init(columnsHeaderIndexReader)
sr.columnsHeaderReader.init(columnsHeaderReader)
sr.timestampsReader.init(timestampsReader)
sr.fieldValuesReader.init(fieldValuesReader)
sr.fieldBloomFilterReader.init(fieldBloomFilterReader)
sr.messageValuesReader.init(messageValuesReader)
sr.messageBloomFilterReader.init(messageBloomFilterReader)
sr.messageBloomValuesReader.init(messageBloomValuesReader)
sr.oldBloomValuesReader.init(oldBloomValuesReader)
for i := range sr.bloomValuesShards[:] {
sr.bloomValuesShards[i].init(bloomValuesShards[i])
}
}
func (sr *streamReaders) totalBytesRead() uint64 {
n := uint64(0)
n += sr.columnNamesReader.bytesRead
n += sr.metaindexReader.bytesRead
n += sr.indexReader.bytesRead
n += sr.columnsHeaderIndexReader.bytesRead
n += sr.columnsHeaderReader.bytesRead
n += sr.timestampsReader.bytesRead
n += sr.fieldValuesReader.bytesRead
n += sr.fieldBloomFilterReader.bytesRead
n += sr.messageValuesReader.bytesRead
n += sr.messageBloomFilterReader.bytesRead
n += sr.messageBloomValuesReader.totalBytesRead()
n += sr.oldBloomValuesReader.totalBytesRead()
for i := range sr.bloomValuesShards[:] {
n += sr.bloomValuesShards[i].totalBytesRead()
}
return n
}
func (sr *streamReaders) MustClose() {
sr.columnNamesReader.MustClose()
sr.metaindexReader.MustClose()
sr.indexReader.MustClose()
sr.columnsHeaderIndexReader.MustClose()
sr.columnsHeaderReader.MustClose()
sr.timestampsReader.MustClose()
sr.fieldValuesReader.MustClose()
sr.fieldBloomFilterReader.MustClose()
sr.messageValuesReader.MustClose()
sr.messageBloomFilterReader.MustClose()
sr.messageBloomValuesReader.MustClose()
sr.oldBloomValuesReader.MustClose()
for i := range sr.bloomValuesShards[:] {
sr.bloomValuesShards[i].MustClose()
}
}
func (sr *streamReaders) getBloomValuesReaderForColumnName(name string, partFormatVersion uint) *bloomValuesReader {
if name == "" {
return &sr.messageBloomValuesReader
}
if partFormatVersion < 1 {
return &sr.oldBloomValuesReader
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
idx := h % uint64(len(sr.bloomValuesShards))
return &sr.bloomValuesShards[idx]
}
// blockStreamReader is used for reading blocks in streaming manner from a part.
@ -121,6 +188,12 @@ type blockStreamReader struct {
// streamReaders contains data readers in stream mode
streamReaders streamReaders
// columnNameIDs contains columnName->id mapping for all the column names seen in the part
columnNameIDs map[string]uint64
// columnNames constains id->columnName mapping for all the columns seen in the part
columnNames []string
// indexBlockHeaders contains the list of all the indexBlockHeader entries for the part
indexBlockHeaders []indexBlockHeader
@ -156,6 +229,9 @@ func (bsr *blockStreamReader) reset() {
bsr.ph.reset()
bsr.streamReaders.reset()
bsr.columnNameIDs = nil
bsr.columnNames = nil
ihs := bsr.indexBlockHeaders
if len(ihs) > 10e3 {
// The ihs len is unbound, so it is better to drop too long indexBlockHeaders in order to reduce memory usage
@ -195,17 +271,25 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) {
bsr.ph = mp.ph
// Initialize streamReaders
columnNamesReader := mp.columnNames.NewReader()
metaindexReader := mp.metaindex.NewReader()
indexReader := mp.index.NewReader()
columnsHeaderIndexReader := mp.columnsHeaderIndex.NewReader()
columnsHeaderReader := mp.columnsHeader.NewReader()
timestampsReader := mp.timestamps.NewReader()
fieldValuesReader := mp.fieldValues.NewReader()
fieldBloomFilterReader := mp.fieldBloomFilter.NewReader()
messageValuesReader := mp.messageValues.NewReader()
messageBloomFilterReader := mp.messageBloomFilter.NewReader()
bsr.streamReaders.init(metaindexReader, indexReader, columnsHeaderReader, timestampsReader,
fieldValuesReader, fieldBloomFilterReader, messageValuesReader, messageBloomFilterReader)
messageBloomValuesReader := mp.messageBloomValues.NewStreamReader()
var oldBloomValuesReader bloomValuesStreamReader
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader
for i := range bloomValuesShards[:] {
bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamReader()
}
bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards)
// Read columnNames data
bsr.columnNames, bsr.columnNameIDs = mustReadColumnNames(&bsr.streamReaders.columnNamesReader)
// Read metaindex data
bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader)
@ -219,30 +303,63 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// since they are usually deleted after the merge.
const nocache = true
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
fieldValuesPath := filepath.Join(path, fieldValuesFilename)
fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
bsr.ph.mustReadMetadata(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
// Open data readers
var columnNamesReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
columnNamesReader = filestream.MustOpen(columnNamesPath, nocache)
}
metaindexReader := filestream.MustOpen(metaindexPath, nocache)
indexReader := filestream.MustOpen(indexPath, nocache)
var columnsHeaderIndexReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
columnsHeaderIndexReader = filestream.MustOpen(columnsHeaderIndexPath, nocache)
}
columnsHeaderReader := filestream.MustOpen(columnsHeaderPath, nocache)
timestampsReader := filestream.MustOpen(timestampsPath, nocache)
fieldValuesReader := filestream.MustOpen(fieldValuesPath, nocache)
fieldBloomFilterReader := filestream.MustOpen(fieldBloomFilterPath, nocache)
messageValuesReader := filestream.MustOpen(messageValuesPath, nocache)
messageBloomFilterReader := filestream.MustOpen(messageBloomFilterPath, nocache)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomValuesReader := bloomValuesStreamReader{
bloom: filestream.MustOpen(messageBloomFilterPath, nocache),
values: filestream.MustOpen(messageValuesPath, nocache),
}
var oldBloomValuesReader bloomValuesStreamReader
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader
if bsr.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
oldBloomValuesReader.bloom = filestream.MustOpen(bloomPath, nocache)
valuesPath := filepath.Join(path, oldValuesFilename)
oldBloomValuesReader.values = filestream.MustOpen(valuesPath, nocache)
} else {
for i := range bloomValuesShards[:] {
shard := &bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = filestream.MustOpen(bloomPath, nocache)
valuesPath := getValuesFilePath(path, uint64(i))
shard.values = filestream.MustOpen(valuesPath, nocache)
}
}
// Initialize streamReaders
bsr.streamReaders.init(metaindexReader, indexReader, columnsHeaderReader, timestampsReader,
fieldValuesReader, fieldBloomFilterReader, messageValuesReader, messageBloomFilterReader)
bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards)
if bsr.ph.FormatVersion >= 1 {
// Read columnNames data
bsr.columnNames, bsr.columnNameIDs = mustReadColumnNames(&bsr.streamReaders.columnNamesReader)
}
// Read metaindex data
bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader)
@ -282,7 +399,7 @@ func (bsr *blockStreamReader) NextBlock() bool {
// Read bsr.blockData
bsr.a.reset()
bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders)
bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders, bsr.ph.FormatVersion, bsr.columnNames)
bsr.globalUncompressedSizeBytes += bh.uncompressedSizeBytes
bsr.globalRowsCount += bh.rowsCount
@ -342,7 +459,7 @@ func (bsr *blockStreamReader) nextIndexBlock() bool {
bb.B = ih.mustReadNextIndexBlock(bb.B[:0], &bsr.streamReaders)
bsr.blockHeaders = resetBlockHeaders(bsr.blockHeaders)
var err error
bsr.blockHeaders, err = unmarshalBlockHeaders(bsr.blockHeaders[:0], bb.B)
bsr.blockHeaders, err = unmarshalBlockHeaders(bsr.blockHeaders[:0], bb.B, bsr.ph.FormatVersion)
longTermBufPool.Put(bb)
if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal blockHeader entries: %s", bsr.streamReaders.indexReader.Path(), err)

View File

@ -4,8 +4,9 @@ import (
"path/filepath"
"sync"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -44,62 +45,116 @@ func (w *writerWithStats) MustClose() {
// streamWriters contain writers for blockStreamWriter
type streamWriters struct {
columnNamesWriter writerWithStats
metaindexWriter writerWithStats
indexWriter writerWithStats
columnsHeaderIndexWriter writerWithStats
columnsHeaderWriter writerWithStats
timestampsWriter writerWithStats
fieldValuesWriter writerWithStats
fieldBloomFilterWriter writerWithStats
messageValuesWriter writerWithStats
messageBloomFilterWriter writerWithStats
messageBloomValuesWriter bloomValuesWriter
bloomValuesShards [bloomValuesShardsCount]bloomValuesWriter
}
type bloomValuesWriter struct {
bloom writerWithStats
values writerWithStats
}
func (w *bloomValuesWriter) reset() {
w.bloom.reset()
w.values.reset()
}
func (w *bloomValuesWriter) init(sw bloomValuesStreamWriter) {
w.bloom.init(sw.bloom)
w.values.init(sw.values)
}
func (w *bloomValuesWriter) totalBytesWritten() uint64 {
return w.bloom.bytesWritten + w.values.bytesWritten
}
func (w *bloomValuesWriter) MustClose() {
w.bloom.MustClose()
w.values.MustClose()
}
type bloomValuesStreamWriter struct {
bloom filestream.WriteCloser
values filestream.WriteCloser
}
func (sw *streamWriters) reset() {
sw.columnNamesWriter.reset()
sw.metaindexWriter.reset()
sw.indexWriter.reset()
sw.columnsHeaderIndexWriter.reset()
sw.columnsHeaderWriter.reset()
sw.timestampsWriter.reset()
sw.fieldValuesWriter.reset()
sw.fieldBloomFilterWriter.reset()
sw.messageValuesWriter.reset()
sw.messageBloomFilterWriter.reset()
sw.messageBloomValuesWriter.reset()
for i := range sw.bloomValuesShards[:] {
sw.bloomValuesShards[i].reset()
}
}
func (sw *streamWriters) init(metaindexWriter, indexWriter, columnsHeaderWriter, timestampsWriter, fieldValuesWriter, fieldBloomFilterWriter,
messageValuesWriter, messageBloomFilterWriter filestream.WriteCloser,
func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter filestream.WriteCloser,
messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter,
) {
sw.columnNamesWriter.init(columnNamesWriter)
sw.metaindexWriter.init(metaindexWriter)
sw.indexWriter.init(indexWriter)
sw.columnsHeaderIndexWriter.init(columnsHeaderIndexWriter)
sw.columnsHeaderWriter.init(columnsHeaderWriter)
sw.timestampsWriter.init(timestampsWriter)
sw.fieldValuesWriter.init(fieldValuesWriter)
sw.fieldBloomFilterWriter.init(fieldBloomFilterWriter)
sw.messageValuesWriter.init(messageValuesWriter)
sw.messageBloomFilterWriter.init(messageBloomFilterWriter)
sw.messageBloomValuesWriter.init(messageBloomValuesWriter)
for i := range sw.bloomValuesShards[:] {
sw.bloomValuesShards[i].init(bloomValuesShards[i])
}
}
func (sw *streamWriters) totalBytesWritten() uint64 {
n := uint64(0)
n += sw.columnNamesWriter.bytesWritten
n += sw.metaindexWriter.bytesWritten
n += sw.indexWriter.bytesWritten
n += sw.columnsHeaderIndexWriter.bytesWritten
n += sw.columnsHeaderWriter.bytesWritten
n += sw.timestampsWriter.bytesWritten
n += sw.fieldValuesWriter.bytesWritten
n += sw.fieldBloomFilterWriter.bytesWritten
n += sw.messageValuesWriter.bytesWritten
n += sw.messageBloomFilterWriter.bytesWritten
n += sw.messageBloomValuesWriter.totalBytesWritten()
for i := range sw.bloomValuesShards[:] {
n += sw.bloomValuesShards[i].totalBytesWritten()
}
return n
}
func (sw *streamWriters) MustClose() {
sw.columnNamesWriter.MustClose()
sw.metaindexWriter.MustClose()
sw.indexWriter.MustClose()
sw.columnsHeaderIndexWriter.MustClose()
sw.columnsHeaderWriter.MustClose()
sw.timestampsWriter.MustClose()
sw.fieldValuesWriter.MustClose()
sw.fieldBloomFilterWriter.MustClose()
sw.messageValuesWriter.MustClose()
sw.messageBloomFilterWriter.MustClose()
sw.messageBloomValuesWriter.MustClose()
for i := range sw.bloomValuesShards[:] {
sw.bloomValuesShards[i].MustClose()
}
}
func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomValuesWriter {
if name == "" {
return &sw.messageBloomValuesWriter
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
idx := h % uint64(len(sw.bloomValuesShards))
return &sw.bloomValuesShards[idx]
}
// blockStreamWriter is used for writing blocks into the underlying storage in streaming manner.
@ -148,6 +203,9 @@ type blockStreamWriter struct {
// indexBlockHeader is used for marshaling the data to metaindexData
indexBlockHeader indexBlockHeader
// columnNameIDGenerator is used for generating columnName->id mapping for all the columns seen in bsw
columnNameIDGenerator columnNameIDGenerator
}
// reset resets bsw for subsequent re-use.
@ -175,12 +233,22 @@ func (bsw *blockStreamWriter) reset() {
}
bsw.indexBlockHeader.reset()
bsw.columnNameIDGenerator.reset()
}
// MustInitForInmemoryPart initializes bsw from mp
func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) {
bsw.reset()
bsw.streamWriters.init(&mp.metaindex, &mp.index, &mp.columnsHeader, &mp.timestamps, &mp.fieldValues, &mp.fieldBloomFilter, &mp.messageValues, &mp.messageBloomFilter)
messageBloomValues := mp.messageBloomValues.NewStreamWriter()
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter
for i := range bloomValuesShards[:] {
bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamWriter()
}
bsw.streamWriters.init(&mp.columnNames, &mp.metaindex, &mp.index, &mp.columnsHeaderIndex, &mp.columnsHeader, &mp.timestamps, messageBloomValues, bloomValuesShards)
}
// MustInitForFilePart initializes bsw for writing data to file part located at path.
@ -191,28 +259,43 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
fs.MustMkdirFailIfExist(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
fieldValuesPath := filepath.Join(path, fieldValuesFilename)
fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
// Always cache metaindex file, since it it re-read immediately after part creation
// Always cache columnNames files, since it is re-read immediately after part creation
columnNamesWriter := filestream.MustCreate(columnNamesPath, false)
// Always cache metaindex file, since it is re-read immediately after part creation
metaindexWriter := filestream.MustCreate(metaindexPath, false)
indexWriter := filestream.MustCreate(indexPath, nocache)
columnsHeaderIndexWriter := filestream.MustCreate(columnsHeaderIndexPath, nocache)
columnsHeaderWriter := filestream.MustCreate(columnsHeaderPath, nocache)
timestampsWriter := filestream.MustCreate(timestampsPath, nocache)
fieldValuesWriter := filestream.MustCreate(fieldValuesPath, nocache)
fieldBloomFilterWriter := filestream.MustCreate(fieldBloomFilterPath, nocache)
messageValuesWriter := filestream.MustCreate(messageValuesPath, nocache)
messageBloomFilterWriter := filestream.MustCreate(messageBloomFilterPath, nocache)
bsw.streamWriters.init(metaindexWriter, indexWriter, columnsHeaderWriter, timestampsWriter,
fieldValuesWriter, fieldBloomFilterWriter, messageValuesWriter, messageBloomFilterWriter)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomValuesWriter := bloomValuesStreamWriter{
bloom: filestream.MustCreate(messageBloomFilterPath, nocache),
values: filestream.MustCreate(messageValuesPath, nocache),
}
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter
for i := range bloomValuesShards[:] {
shard := &bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = filestream.MustCreate(bloomPath, nocache)
valuesPath := getValuesFilePath(path, uint64(i))
shard.values = filestream.MustCreate(valuesPath, nocache)
}
bsw.streamWriters.init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter, messageBloomValuesWriter, bloomValuesShards)
}
// MustWriteRows writes timestamps with rows under the given sid to bsw.
@ -266,9 +349,9 @@ func (bsw *blockStreamWriter) mustWriteBlockInternal(sid *streamID, b *block, bd
bh := getBlockHeader()
if b != nil {
b.mustWriteTo(sid, bh, &bsw.streamWriters)
b.mustWriteTo(sid, bh, &bsw.streamWriters, &bsw.columnNameIDGenerator)
} else {
bd.mustWriteTo(bh, &bsw.streamWriters)
bd.mustWriteTo(bh, &bsw.streamWriters, &bsw.columnNameIDGenerator)
}
th := &bh.timestampsHeader
if bsw.globalRowsCount == 0 || th.minTimestamp < bsw.globalMinTimestamp {
@ -318,6 +401,7 @@ func (bsw *blockStreamWriter) mustFlushIndexBlock(data []byte) {
//
// bsw can be re-used after calling Finalize().
func (bsw *blockStreamWriter) Finalize(ph *partHeader) {
ph.FormatVersion = partFormatLatestVersion
ph.UncompressedSizeBytes = bsw.globalUncompressedSizeBytes
ph.RowsCount = bsw.globalRowsCount
ph.BlocksCount = bsw.globalBlocksCount
@ -326,13 +410,11 @@ func (bsw *blockStreamWriter) Finalize(ph *partHeader) {
bsw.mustFlushIndexBlock(bsw.indexBlockData)
// Write columnNames data
mustWriteColumnNames(&bsw.streamWriters.columnNamesWriter, bsw.columnNameIDGenerator.columnNames)
// Write metaindex data
bb := longTermBufPool.Get()
bb.B = encoding.CompressZSTDLevel(bb.B[:0], bsw.metaindexData, 1)
bsw.streamWriters.metaindexWriter.MustWrite(bb.B)
if len(bb.B) < 1024*1024 {
longTermBufPool.Put(bb)
}
mustWriteIndexBlockHeaders(&bsw.streamWriters.metaindexWriter, bsw.metaindexData)
ph.CompressedSizeBytes = bsw.streamWriters.totalBytesWritten()

View File

@ -18,10 +18,19 @@ const bloomFilterHashesCount = 6
// bloomFilterBitsPerItem is the number of bits to use per each token.
const bloomFilterBitsPerItem = 16
// bloomFilterMarshal appends marshaled bloom filter for tokens to dst and returns the result.
func bloomFilterMarshal(dst []byte, tokens []string) []byte {
// bloomFilterMarshalTokens appends marshaled bloom filter for tokens to dst and returns the result.
func bloomFilterMarshalTokens(dst []byte, tokens []string) []byte {
bf := getBloomFilter()
bf.mustInit(tokens)
bf.mustInitTokens(tokens)
dst = bf.marshal(dst)
putBloomFilter(bf)
return dst
}
// bloomFilterMarshalHashes appends marshaled bloom filter for hashes to dst and returns the result.
func bloomFilterMarshalHashes(dst []byte, hashes []uint64) []byte {
bf := getBloomFilter()
bf.mustInitHashes(hashes)
dst = bf.marshal(dst)
putBloomFilter(bf)
return dst
@ -61,23 +70,45 @@ func (bf *bloomFilter) unmarshal(src []byte) error {
return nil
}
// mustInit initializes bf with the given tokens
func (bf *bloomFilter) mustInit(tokens []string) {
// mustInitTokens initializes bf with the given tokens
func (bf *bloomFilter) mustInitTokens(tokens []string) {
bitsCount := len(tokens) * bloomFilterBitsPerItem
wordsCount := (bitsCount + 63) / 64
bits := slicesutil.SetLength(bf.bits, wordsCount)
bloomFilterAdd(bits, tokens)
bloomFilterAddTokens(bits, tokens)
bf.bits = bits
}
// bloomFilterAdd adds the given tokens to the bloom filter bits
func bloomFilterAdd(bits []uint64, tokens []string) {
// mustInitHashes initializes bf with the given hashes
func (bf *bloomFilter) mustInitHashes(hashes []uint64) {
bitsCount := len(hashes) * bloomFilterBitsPerItem
wordsCount := (bitsCount + 63) / 64
bits := slicesutil.SetLength(bf.bits, wordsCount)
bloomFilterAddHashes(bits, hashes)
bf.bits = bits
}
// bloomFilterAddTokens adds the given tokens to the bloom filter bits
func bloomFilterAddTokens(bits []uint64, tokens []string) {
hashesCount := len(tokens) * bloomFilterHashesCount
a := encoding.GetUint64s(hashesCount)
a.A = appendTokensHashes(a.A[:0], tokens)
initBloomFilter(bits, a.A)
encoding.PutUint64s(a)
}
// bloomFilterAddHashes adds the given haehs to the bloom filter bits
func bloomFilterAddHashes(bits, hashes []uint64) {
hashesCount := len(hashes) * bloomFilterHashesCount
a := encoding.GetUint64s(hashesCount)
a.A = appendHashesHashes(a.A[:0], hashes)
initBloomFilter(bits, a.A)
encoding.PutUint64s(a)
}
func initBloomFilter(bits, hashes []uint64) {
maxBits := uint64(len(bits)) * 64
for _, h := range a.A {
for _, h := range hashes {
idx := h % maxBits
i := idx / 64
j := idx % 64
@ -87,8 +118,6 @@ func bloomFilterAdd(bits []uint64, tokens []string) {
bits[i] = w | mask
}
}
encoding.PutUint64s(a)
}
// appendTokensHashes appends hashes for the given tokens to dst and returns the result.
@ -114,6 +143,29 @@ func appendTokensHashes(dst []uint64, tokens []string) []uint64 {
return dst
}
// appendHashesHashes appends hashes for the given hashes to dst and returns the result.
//
// the appended hashes can be then passed to bloomFilter.containsAll().
func appendHashesHashes(dst, hashes []uint64) []uint64 {
dstLen := len(dst)
hashesCount := len(hashes) * bloomFilterHashesCount
dst = slicesutil.SetLength(dst, dstLen+hashesCount)
dst = dst[:dstLen]
var buf [8]byte
hp := (*uint64)(unsafe.Pointer(&buf[0]))
for _, h := range hashes {
*hp = h
for i := 0; i < bloomFilterHashesCount; i++ {
h := xxhash.Sum64(buf[:])
(*hp)++
dst = append(dst, h)
}
}
return dst
}
// containsAll returns true if bf contains all the given tokens hashes generated by appendTokensHashes.
func (bf *bloomFilter) containsAll(hashes []uint64) bool {
bits := bf.bits

View File

@ -8,10 +8,16 @@ import (
func TestBloomFilter(t *testing.T) {
f := func(tokens []string) {
t.Helper()
data := bloomFilterMarshal(nil, tokens)
dataTokens := bloomFilterMarshalTokens(nil, tokens)
hashes := tokenizeHashes(nil, tokens)
dataHashes := bloomFilterMarshalHashes(nil, hashes)
if string(dataTokens) != string(dataHashes) {
t.Fatalf("unexpected marshaled bloom filters from hashes\ngot\n%X\nwant\n%X", dataHashes, dataTokens)
}
bf := getBloomFilter()
defer putBloomFilter(bf)
if err := bf.unmarshal(data); err != nil {
if err := bf.unmarshal(dataTokens); err != nil {
t.Fatalf("unexpected error when unmarshaling bloom filter: %s", err)
}
tokensHashes := appendTokensHashes(nil, tokens)
@ -57,7 +63,7 @@ func TestBloomFilterFalsePositive(t *testing.T) {
for i := range tokens {
tokens[i] = fmt.Sprintf("token_%d", i)
}
data := bloomFilterMarshal(nil, tokens)
data := bloomFilterMarshalTokens(nil, tokens)
bf := getBloomFilter()
defer putBloomFilter(bf)
if err := bf.unmarshal(data); err != nil {
@ -79,3 +85,35 @@ func TestBloomFilterFalsePositive(t *testing.T) {
t.Fatalf("too high false positive rate; got %.4f; want %.4f max", p, maxFalsePositive)
}
}
func TestBloomFilterMarshal_TokensVSHashes(t *testing.T) {
tokens := make([]string, 100)
for i := range tokens {
tokens[i] = fmt.Sprintf("token_%d", i)
}
dataTokens := bloomFilterMarshalTokens(nil, tokens)
hashes := tokenizeHashes(nil, tokens)
dataHashes := bloomFilterMarshalHashes(nil, hashes)
if string(dataTokens) != string(dataHashes) {
t.Fatalf("unexpected bloom filter obtained from hashes\ngot\n%X\nwant\n%X", dataHashes, dataTokens)
}
}
func TestBloomFilterMarshalTokens(t *testing.T) {
f := func(tokens []string, resultExpected string) {
t.Helper()
result := bloomFilterMarshalTokens(nil, tokens)
if string(result) != resultExpected {
t.Fatalf("unexpected result\ngot\n%X\nwant\n%X", result, resultExpected)
}
}
f([]string{}, "")
f([]string{"foo"}, "\x00\x00\x00\x82\x40\x18\x00\x04")
f([]string{"foo", "bar", "baz"}, "\x00\x00\x81\xA3\x48\x5C\x10\x26")
f([]string{"foo", "bar", "baz", "foo"}, "\x00\x00\x81\xA3\x48\x5C\x10\x26")
}

View File

@ -0,0 +1,127 @@
package logstorage
import (
"fmt"
"io"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
func mustWriteColumnNames(w *writerWithStats, columnNames []string) {
data := marshalColumnNames(nil, columnNames)
w.MustWrite(data)
}
func mustReadColumnNames(r filestream.ReadCloser) ([]string, map[string]uint64) {
src, err := io.ReadAll(r)
if err != nil {
logger.Panicf("FATAL: %s: cannot read colum names: %s", r.Path(), err)
}
columnNames, err := unmarshalColumnNames(src)
if err != nil {
logger.Panicf("FATAL: %s: %s", r.Path(), err)
}
columnNameIDs, err := getColumnNameIDs(columnNames)
if err != nil {
logger.Panicf("BUG: %s: %s; columnNames=%v", r.Path(), err, columnNameIDs)
}
return columnNames, columnNameIDs
}
func getColumnNameIDs(columnNames []string) (map[string]uint64, error) {
m := make(map[uint64]string, len(columnNames))
columnNameIDs := make(map[string]uint64, len(columnNames))
for i, name := range columnNames {
id := uint64(i)
if prevName, ok := m[id]; ok {
return nil, fmt.Errorf("duplicate column name id=%d for columns %q and %q", id, prevName, name)
}
m[id] = name
columnNameIDs[name] = id
}
return columnNameIDs, nil
}
func marshalColumnNames(dst []byte, columnNames []string) []byte {
data := encoding.MarshalVarUint64(nil, uint64(len(columnNames)))
for _, name := range columnNames {
data = encoding.MarshalBytes(data, bytesutil.ToUnsafeBytes(name))
}
dst = encoding.CompressZSTDLevel(dst, data, 1)
return dst
}
func unmarshalColumnNames(src []byte) ([]string, error) {
data, err := encoding.DecompressZSTD(nil, src)
if err != nil {
return nil, fmt.Errorf("cannot decompress column names from len(src)=%d: %w", len(src), err)
}
src = data
n, nBytes := encoding.UnmarshalVarUint64(src)
if nBytes <= 0 {
return nil, fmt.Errorf("cannot parse the number of column names for len(src)=%d", len(src))
}
src = src[nBytes:]
m := make(map[string]uint64, n)
columnNames := make([]string, n)
for id := uint64(0); id < n; id++ {
name, nBytes := encoding.UnmarshalBytes(src)
if nBytes <= 0 {
return nil, fmt.Errorf("cannot parse colum name number %d out of %d", id, n)
}
src = src[nBytes:]
if idPrev, ok := m[string(name)]; ok {
return nil, fmt.Errorf("duplicate ids for column name %q: %d and %d", name, idPrev, id)
}
m[string(name)] = id
columnNames[id] = string(name)
}
if len(src) > 0 {
return nil, fmt.Errorf("unexpected non-empty tail left after unmarshaling column name ids; len(tail)=%d", len(src))
}
return columnNames, nil
}
type columnNameIDGenerator struct {
// columnNameIDs contains columnName->id mapping for already seen columns
columnNameIDs map[string]uint64
// columnNames contains id->columnName mapping for already seen columns
columnNames []string
}
func (g *columnNameIDGenerator) reset() {
g.columnNameIDs = nil
g.columnNames = nil
}
func (g *columnNameIDGenerator) getColumnNameID(name string) uint64 {
id, ok := g.columnNameIDs[name]
if !ok {
if g.columnNameIDs == nil {
g.columnNameIDs = make(map[string]uint64)
}
id = uint64(len(g.columnNames))
nameCopy := strings.Clone(name)
g.columnNameIDs[nameCopy] = id
g.columnNames = append(g.columnNames, nameCopy)
}
return id
}

View File

@ -0,0 +1,54 @@
package logstorage
import (
"reflect"
"testing"
)
func TestMarshalUnmarshalColumnNames(t *testing.T) {
f := func(columnNames []string) {
t.Helper()
data := marshalColumnNames(nil, columnNames)
result, err := unmarshalColumnNames(data)
if err != nil {
t.Fatalf("unexpected error when unmarshaling columnNames: %s", err)
}
if !reflect.DeepEqual(columnNames, result) {
t.Fatalf("unexpected umarshaled columnNames\ngot\n%v\nwant\n%v", result, columnNames)
}
}
f([]string{})
f([]string{"", "foo", "bar"})
f([]string{
"asdf.sdf.dsfds.f fds. fds ",
"foo",
"bar.sdfsdf.fd",
"",
"aso apaa",
})
}
func TestColumnNameIDGenerator(t *testing.T) {
a := []string{"", "foo", "bar.baz", "asdf dsf dfs"}
g := &columnNameIDGenerator{}
for i, s := range a {
id := g.getColumnNameID(s)
if id != uint64(i) {
t.Fatalf("first run: unexpected id generated for s=%q; got %d; want %d; g=%v", s, id, i, g)
}
}
// Repeat the loop
for i, s := range a {
id := g.getColumnNameID(s)
if id != uint64(i) {
t.Fatalf("second run: unexpected id generated for s=%q; got %d; want %d; g=%v", s, id, i, g)
}
}
}

View File

@ -1,5 +1,15 @@
package logstorage
// partFormatLatestVersion is the latest format version for parts.
//
// See partHeader.FormatVersion for details.
const partFormatLatestVersion = 1
// bloomValuesShardsCount is the number of shards for bloomFilename and valuesFilename files.
//
// The partHeader.FormatVersion must be updated when this number changes.
const bloomValuesShardsCount = 8
// maxUncompressedIndexBlockSize contains the maximum length of uncompressed block with blockHeader entries aka index block.
//
// The real block length can exceed this value by a small percentage because of the block write details.
@ -46,6 +56,9 @@ const maxBloomFilterBlockSize = 8 * 1024 * 1024
// maxColumnsHeaderSize is the maximum size of columnsHeader block
const maxColumnsHeaderSize = 8 * 1024 * 1024
// maxColumnsHeaderIndexSize is the maximum size of columnsHeaderIndex block
const maxColumnsHeaderIndexSize = 8 * 1024 * 1024
// maxDictSizeBytes is the maximum length of all the keys in the valuesDict.
//
// Dict is stored in columnsHeader, which is read every time the corresponding block is scanned during search qieries.

View File

@ -1,14 +1,18 @@
package logstorage
const (
metaindexFilename = "metaindex.bin"
indexFilename = "index.bin"
columnsHeaderFilename = "columns_header.bin"
timestampsFilename = "timestamps.bin"
fieldValuesFilename = "field_values.bin"
fieldBloomFilename = "field_bloom.bin"
messageValuesFilename = "message_values.bin"
messageBloomFilename = "message_bloom.bin"
columnNamesFilename = "column_names.bin"
metaindexFilename = "metaindex.bin"
indexFilename = "index.bin"
columnsHeaderIndexFilename = "columns_header_index.bin"
columnsHeaderFilename = "columns_header.bin"
timestampsFilename = "timestamps.bin"
oldValuesFilename = "field_values.bin"
oldBloomFilename = "field_bloom.bin"
valuesFilename = "values.bin"
bloomFilename = "bloom.bin"
messageValuesFilename = "message_values.bin"
messageBloomFilename = "message_bloom.bin"
metadataFilename = "metadata.json"
partsFilename = "parts.json"

View File

@ -0,0 +1,180 @@
package logstorage
import (
"sync"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// tokenizeHashes extracts word tokens from a, hashes them, appends hashes to dst and returns the result.
//
// The returned hashes can be used for building bloom filters.
func tokenizeHashes(dst []uint64, a []string) []uint64 {
t := getHashTokenizer()
for i, s := range a {
if i > 0 && s == a[i-1] {
// This string has been already tokenized
continue
}
dst = t.tokenizeString(dst, s)
}
putHashTokenizer(t)
return dst
}
const hashTokenizerBucketsCount = 1024
type hashTokenizer struct {
buckets [hashTokenizerBucketsCount]hashTokenizerBucket
bm bitmap
}
type hashTokenizerBucket struct {
v uint64
overflow []uint64
}
func (b *hashTokenizerBucket) reset() {
// do not spend CPU time on clearing v and b.overflow items,
// since they'll be overwritten with new items.
b.overflow = b.overflow[:0]
}
func newHashTokenizer() *hashTokenizer {
var t hashTokenizer
t.bm.init(len(t.buckets))
return &t
}
func (t *hashTokenizer) reset() {
if t.bm.onesCount() <= len(t.buckets)/4 {
t.bm.forEachSetBit(func(idx int) bool {
t.buckets[idx].reset()
return false
})
} else {
buckets := t.buckets[:]
for i := range buckets {
buckets[i].reset()
}
t.bm.init(len(t.buckets))
}
}
func (t *hashTokenizer) tokenizeString(dst []uint64, s string) []uint64 {
if !isASCII(s) {
// Slow path - s contains unicode chars
return t.tokenizeStringUnicode(dst, s)
}
// Fast path for ASCII s
i := 0
for i < len(s) {
// Search for the next token.
start := len(s)
for i < len(s) {
if !isTokenChar(s[i]) {
i++
continue
}
start = i
i++
break
}
// Search for the end of the token.
end := len(s)
for i < len(s) {
if isTokenChar(s[i]) {
i++
continue
}
end = i
i++
break
}
if end <= start {
break
}
// Register the token.
token := s[start:end]
if h, ok := t.addToken(token); ok {
dst = append(dst, h)
}
}
return dst
}
func (t *hashTokenizer) tokenizeStringUnicode(dst []uint64, s string) []uint64 {
for len(s) > 0 {
// Search for the next token.
n := len(s)
for offset, r := range s {
if isTokenRune(r) {
n = offset
break
}
}
s = s[n:]
// Search for the end of the token.
n = len(s)
for offset, r := range s {
if !isTokenRune(r) {
n = offset
break
}
}
if n == 0 {
break
}
// Register the token
token := s[:n]
s = s[n:]
if h, ok := t.addToken(token); ok {
dst = append(dst, h)
}
}
return dst
}
func (t *hashTokenizer) addToken(token string) (uint64, bool) {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(token))
idx := int(h % uint64(len(t.buckets)))
b := &t.buckets[idx]
if !t.bm.isSetBit(idx) {
b.v = h
t.bm.setBit(idx)
return h, true
}
if b.v == h {
return h, false
}
for _, v := range b.overflow {
if v == h {
return h, false
}
}
b.overflow = append(b.overflow, h)
return h, true
}
func getHashTokenizer() *hashTokenizer {
v := hashTokenizerPool.Get()
if v == nil {
return newHashTokenizer()
}
return v.(*hashTokenizer)
}
func putHashTokenizer(t *hashTokenizer) {
t.reset()
hashTokenizerPool.Put(t)
}
var hashTokenizerPool sync.Pool

View File

@ -0,0 +1,24 @@
package logstorage
import (
"reflect"
"testing"
)
func TestTokenizeHashes(t *testing.T) {
f := func(a []string, hashesExpected []uint64) {
t.Helper()
hashes := tokenizeHashes(nil, a)
if !reflect.DeepEqual(hashes, hashesExpected) {
t.Fatalf("unexpected hashes\ngot\n%X\nwant\n%X", hashes, hashesExpected)
}
}
f(nil, nil)
f([]string{""}, nil)
f([]string{"foo"}, []uint64{0x33BF00A859C4BA3F})
f([]string{"foo foo", "!!foo //"}, []uint64{0x33BF00A859C4BA3F})
f([]string{"foo bar---.!!([baz]!!! %$# TaSte"}, []uint64{0x33BF00A859C4BA3F, 0x48A37C90AD27A659, 0x42598CF26A247404, 0x34709F40A3286E46})
f([]string{"foo bar---.!!([baz]!!! %$# baz foo TaSte"}, []uint64{0x33BF00A859C4BA3F, 0x48A37C90AD27A659, 0x42598CF26A247404, 0x34709F40A3286E46})
f([]string{"теСТ 1234 f12.34", "34 f12 AS"}, []uint64{0xFE846FA145CEABD1, 0xD8316E61D84F6BA4, 0x6D67BA71C4E03D10, 0x5E8D522CA93563ED, 0xED80AED10E029FC8})
}

View File

@ -0,0 +1,19 @@
package logstorage
import (
"strings"
"testing"
)
func BenchmarkTokenizeHashes(b *testing.B) {
a := strings.Split(benchLogs, "\n")
b.ReportAllocs()
b.SetBytes(int64(len(benchLogs)))
b.RunParallel(func(pb *testing.PB) {
var hashes []uint64
for pb.Next() {
hashes = tokenizeHashes(hashes[:0], a)
}
})
}

View File

@ -110,25 +110,36 @@ func (ih *indexBlockHeader) unmarshal(src []byte) ([]byte, error) {
return src[32:], nil
}
// mustWriteIndexBlockHeaders writes metaindexData to w.
func mustWriteIndexBlockHeaders(w *writerWithStats, metaindexData []byte) {
bb := longTermBufPool.Get()
bb.B = encoding.CompressZSTDLevel(bb.B[:0], metaindexData, 1)
w.MustWrite(bb.B)
if len(bb.B) < 1024*1024 {
longTermBufPool.Put(bb)
}
}
// mustReadIndexBlockHeaders reads indexBlockHeader entries from r, appends them to dst and returns the result.
func mustReadIndexBlockHeaders(dst []indexBlockHeader, r *readerWithStats) []indexBlockHeader {
data, err := io.ReadAll(r)
if err != nil {
logger.Panicf("FATAL: cannot read indexBlockHeader entries from %s: %s", r.Path(), err)
logger.Panicf("FATAL: %s: cannot read indexBlockHeader entries: %s", r.Path(), err)
}
bb := longTermBufPool.Get()
bb.B, err = encoding.DecompressZSTD(bb.B[:0], data)
if err != nil {
logger.Panicf("FATAL: cannot decompress indexBlockHeader entries from %s: %s", r.Path(), err)
logger.Panicf("FATAL: %s: cannot decompress indexBlockHeader entries: %s", r.Path(), err)
}
dst, err = unmarshalIndexBlockHeaders(dst, bb.B)
if len(bb.B) < 1024*1024 {
longTermBufPool.Put(bb)
}
if err != nil {
logger.Panicf("FATAL: cannot parse indexBlockHeader entries from %s: %s", r.Path(), err)
logger.Panicf("FATAL: %s: cannot parse indexBlockHeader entries: %s", r.Path(), err)
}
return dst
}

View File

@ -14,38 +14,62 @@ type inmemoryPart struct {
// ph contains partHeader information for the given in-memory part.
ph partHeader
columnNames bytesutil.ByteBuffer
metaindex bytesutil.ByteBuffer
index bytesutil.ByteBuffer
columnsHeaderIndex bytesutil.ByteBuffer
columnsHeader bytesutil.ByteBuffer
timestamps bytesutil.ByteBuffer
fieldValues bytesutil.ByteBuffer
fieldBloomFilter bytesutil.ByteBuffer
messageValues bytesutil.ByteBuffer
messageBloomFilter bytesutil.ByteBuffer
messageBloomValues bloomValuesBuffer
bloomValuesShards [bloomValuesShardsCount]bloomValuesBuffer
}
type bloomValuesBuffer struct {
bloom bytesutil.ByteBuffer
values bytesutil.ByteBuffer
}
func (b *bloomValuesBuffer) reset() {
b.bloom.Reset()
b.values.Reset()
}
func (b *bloomValuesBuffer) NewStreamReader() bloomValuesStreamReader {
return bloomValuesStreamReader{
bloom: b.bloom.NewReader(),
values: b.values.NewReader(),
}
}
func (b *bloomValuesBuffer) NewStreamWriter() bloomValuesStreamWriter {
return bloomValuesStreamWriter{
bloom: &b.bloom,
values: &b.values,
}
}
// reset resets mp, so it can be re-used
func (mp *inmemoryPart) reset() {
mp.ph.reset()
mp.columnNames.Reset()
mp.metaindex.Reset()
mp.index.Reset()
mp.columnsHeaderIndex.Reset()
mp.columnsHeader.Reset()
mp.timestamps.Reset()
mp.fieldValues.Reset()
mp.fieldBloomFilter.Reset()
mp.messageValues.Reset()
mp.messageBloomFilter.Reset()
mp.messageBloomValues.reset()
for i := range mp.bloomValuesShards[:] {
mp.bloomValuesShards[i].reset()
}
}
// mustInitFromRows initializes mp from lr.
func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) {
mp.reset()
if len(lr.timestamps) == 0 {
return
}
sort.Sort(lr)
bsw := getBlockStreamWriter()
@ -75,6 +99,7 @@ func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) {
}
bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
putTmpRows(trs)
bsw.Finalize(&mp.ph)
putBlockStreamWriter(bsw)
}
@ -83,23 +108,34 @@ func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) {
func (mp *inmemoryPart) MustStoreToDisk(path string) {
fs.MustMkdirFailIfExist(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
fieldValuesPath := filepath.Join(path, fieldValuesFilename)
fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
fs.MustWriteSync(columnNamesPath, mp.columnNames.B)
fs.MustWriteSync(metaindexPath, mp.metaindex.B)
fs.MustWriteSync(indexPath, mp.index.B)
fs.MustWriteSync(columnsHeaderIndexPath, mp.columnsHeaderIndex.B)
fs.MustWriteSync(columnsHeaderPath, mp.columnsHeader.B)
fs.MustWriteSync(timestampsPath, mp.timestamps.B)
fs.MustWriteSync(fieldValuesPath, mp.fieldValues.B)
fs.MustWriteSync(fieldBloomFilterPath, mp.fieldBloomFilter.B)
fs.MustWriteSync(messageValuesPath, mp.messageValues.B)
fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomFilter.B)
fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomValues.bloom.B)
fs.MustWriteSync(messageValuesPath, mp.messageBloomValues.values.B)
for i := range mp.bloomValuesShards[:] {
shard := &mp.bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
fs.MustWriteSync(bloomPath, shard.bloom.B)
valuesPath := getValuesFilePath(path, uint64(i))
fs.MustWriteSync(valuesPath, shard.values.B)
}
mp.ph.mustWriteMetadata(path)

View File

@ -75,7 +75,7 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) {
f(GetLogRows(nil, nil), 0, 0)
// Check how inmemoryPart works with a single stream
f(newTestLogRows(1, 1, 0), 1, 0.8)
f(newTestLogRows(1, 1, 0), 1, 0.7)
f(newTestLogRows(1, 2, 0), 1, 0.9)
f(newTestLogRows(1, 10, 0), 1, 2.0)
f(newTestLogRows(1, 1000, 0), 1, 7.1)
@ -83,9 +83,9 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) {
// Check how inmemoryPart works with multiple streams
f(newTestLogRows(2, 1, 0), 2, 0.8)
f(newTestLogRows(10, 1, 0), 10, 0.9)
f(newTestLogRows(100, 1, 0), 100, 1.0)
f(newTestLogRows(10, 5, 0), 10, 1.4)
f(newTestLogRows(10, 1, 0), 10, 1.1)
f(newTestLogRows(100, 1, 0), 100, 1.2)
f(newTestLogRows(10, 5, 0), 10, 1.5)
f(newTestLogRows(10, 1000, 0), 10, 7.2)
f(newTestLogRows(100, 100, 0), 100, 5.0)
}
@ -192,14 +192,14 @@ func TestInmemoryPartInitFromBlockStreamReaders(t *testing.T) {
f([]*LogRows{GetLogRows(nil, nil), GetLogRows(nil, nil)}, 0, 0)
// Check merge with a single reader
f([]*LogRows{newTestLogRows(1, 1, 0)}, 1, 0.8)
f([]*LogRows{newTestLogRows(1, 1, 0)}, 1, 0.7)
f([]*LogRows{newTestLogRows(1, 10, 0)}, 1, 2.0)
f([]*LogRows{newTestLogRows(1, 100, 0)}, 1, 4.9)
f([]*LogRows{newTestLogRows(1, 1000, 0)}, 1, 7.1)
f([]*LogRows{newTestLogRows(1, 10000, 0)}, 1, 7.4)
f([]*LogRows{newTestLogRows(10, 1, 0)}, 10, 0.9)
f([]*LogRows{newTestLogRows(100, 1, 0)}, 100, 1.0)
f([]*LogRows{newTestLogRows(1000, 1, 0)}, 1000, 1.0)
f([]*LogRows{newTestLogRows(10, 1, 0)}, 10, 1.1)
f([]*LogRows{newTestLogRows(100, 1, 0)}, 100, 1.3)
f([]*LogRows{newTestLogRows(1000, 1, 0)}, 1000, 1.2)
f([]*LogRows{newTestLogRows(10, 10, 0)}, 10, 2.1)
f([]*LogRows{newTestLogRows(10, 100, 0)}, 10, 4.9)

View File

@ -1,8 +1,12 @@
package logstorage
import (
"fmt"
"path/filepath"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
@ -19,16 +23,35 @@ type part struct {
// ph contains partHeader for the given part.
ph partHeader
// columnNameIDs is a mapping from column names seen in the given part to internal IDs.
// The internal IDs are used in columnHeaderRef.
columnNameIDs map[string]uint64
// columnNames is a mapping from internal IDs to column names.
// The internal IDs are used in columnHeaderRef.
columnNames []string
// indexBlockHeaders contains a list of indexBlockHeader entries for the given part.
indexBlockHeaders []indexBlockHeader
indexFile fs.MustReadAtCloser
columnsHeaderIndexFile fs.MustReadAtCloser
columnsHeaderFile fs.MustReadAtCloser
timestampsFile fs.MustReadAtCloser
fieldValuesFile fs.MustReadAtCloser
fieldBloomFilterFile fs.MustReadAtCloser
messageValuesFile fs.MustReadAtCloser
messageBloomFilterFile fs.MustReadAtCloser
messageBloomValues bloomValuesReaderAt
oldBloomValues bloomValuesReaderAt
bloomValuesShards [bloomValuesShardsCount]bloomValuesReaderAt
}
type bloomValuesReaderAt struct {
bloom fs.MustReadAtCloser
values fs.MustReadAtCloser
}
func (r *bloomValuesReaderAt) MustClose() {
r.bloom.MustClose()
r.values.MustClose()
}
func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
@ -37,6 +60,10 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
p.path = ""
p.ph = mp.ph
// Read columnNames
columnNamesReader := mp.columnNames.NewReader()
p.columnNames, p.columnNameIDs = mustReadColumnNames(columnNamesReader)
// Read metaindex
metaindexReader := mp.metaindex.NewReader()
var mrs readerWithStats
@ -45,12 +72,19 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
// Open data files
p.indexFile = &mp.index
p.columnsHeaderIndexFile = &mp.columnsHeaderIndex
p.columnsHeaderFile = &mp.columnsHeader
p.timestampsFile = &mp.timestamps
p.fieldValuesFile = &mp.fieldValues
p.fieldBloomFilterFile = &mp.fieldBloomFilter
p.messageValuesFile = &mp.messageValues
p.messageBloomFilterFile = &mp.messageBloomFilter
p.messageBloomValues.bloom = &mp.messageBloomValues.bloom
p.messageBloomValues.values = &mp.messageBloomValues.values
// Open files with bloom filters and column values
for i := range p.bloomValuesShards[:] {
shard := &p.bloomValuesShards[i]
shard.bloom = &mp.bloomValuesShards[i].bloom
shard.values = &mp.bloomValuesShards[i].values
}
return &p
}
@ -61,14 +95,21 @@ func mustOpenFilePart(pt *partition, path string) *part {
p.path = path
p.ph.mustReadMetadata(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
fieldValuesPath := filepath.Join(path, fieldValuesFilename)
fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
// Read columnNames
if p.ph.FormatVersion >= 1 {
columnNamesReader := filestream.MustOpen(columnNamesPath, true)
var crs readerWithStats
crs.init(columnNamesReader)
p.columnNames, p.columnNameIDs = mustReadColumnNames(columnNamesReader)
crs.MustClose()
}
// Read metaindex
metaindexReader := filestream.MustOpen(metaindexPath, true)
@ -79,24 +120,78 @@ func mustOpenFilePart(pt *partition, path string) *part {
// Open data files
p.indexFile = fs.MustOpenReaderAt(indexPath)
if p.ph.FormatVersion >= 1 {
p.columnsHeaderIndexFile = fs.MustOpenReaderAt(columnsHeaderIndexPath)
}
p.columnsHeaderFile = fs.MustOpenReaderAt(columnsHeaderPath)
p.timestampsFile = fs.MustOpenReaderAt(timestampsPath)
p.fieldValuesFile = fs.MustOpenReaderAt(fieldValuesPath)
p.fieldBloomFilterFile = fs.MustOpenReaderAt(fieldBloomFilterPath)
p.messageValuesFile = fs.MustOpenReaderAt(messageValuesPath)
p.messageBloomFilterFile = fs.MustOpenReaderAt(messageBloomFilterPath)
// Open files with bloom filters and column values
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
p.messageBloomValues.bloom = fs.MustOpenReaderAt(messageBloomFilterPath)
messageValuesPath := filepath.Join(path, messageValuesFilename)
p.messageBloomValues.values = fs.MustOpenReaderAt(messageValuesPath)
if p.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
p.oldBloomValues.bloom = fs.MustOpenReaderAt(bloomPath)
valuesPath := filepath.Join(path, oldValuesFilename)
p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath)
} else {
for i := range p.bloomValuesShards[:] {
shard := &p.bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = fs.MustOpenReaderAt(bloomPath)
valuesPath := getValuesFilePath(path, uint64(i))
shard.values = fs.MustOpenReaderAt(valuesPath)
}
}
return &p
}
func mustClosePart(p *part) {
p.indexFile.MustClose()
if p.ph.FormatVersion >= 1 {
p.columnsHeaderIndexFile.MustClose()
}
p.columnsHeaderFile.MustClose()
p.timestampsFile.MustClose()
p.fieldValuesFile.MustClose()
p.fieldBloomFilterFile.MustClose()
p.messageValuesFile.MustClose()
p.messageBloomFilterFile.MustClose()
p.messageBloomValues.MustClose()
if p.ph.FormatVersion < 1 {
p.oldBloomValues.MustClose()
} else {
for i := range p.bloomValuesShards[:] {
p.bloomValuesShards[i].MustClose()
}
}
p.pt = nil
}
func (p *part) getBloomValuesFileForColumnName(name string) *bloomValuesReaderAt {
if name == "" {
return &p.messageBloomValues
}
if p.ph.FormatVersion < 1 {
return &p.oldBloomValues
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
idx := h % uint64(len(p.bloomValuesShards))
return &p.bloomValuesShards[idx]
}
func getBloomFilePath(partPath string, shardNum uint64) string {
return filepath.Join(partPath, bloomFilename) + fmt.Sprintf("%d", shardNum)
}
func getValuesFilePath(partPath string, shardNum uint64) string {
return filepath.Join(partPath, valuesFilename) + fmt.Sprintf("%d", shardNum)
}

View File

@ -14,6 +14,9 @@ import (
// partHeader contains the information about a single part
type partHeader struct {
// FormatVersion is the version of the part format
FormatVersion uint
// CompressedSizeBytes is physical size of the part
CompressedSizeBytes uint64
@ -35,6 +38,7 @@ type partHeader struct {
// reset resets ph for subsequent re-use
func (ph *partHeader) reset() {
ph.FormatVersion = 0
ph.CompressedSizeBytes = 0
ph.UncompressedSizeBytes = 0
ph.RowsCount = 0
@ -45,8 +49,8 @@ func (ph *partHeader) reset() {
// String returns string represenation for ph.
func (ph *partHeader) String() string {
return fmt.Sprintf("{CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, MinTimestamp=%s, MaxTimestamp=%s}",
ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp))
return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, MinTimestamp=%s, MaxTimestamp=%s}",
ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp))
}
func (ph *partHeader) mustReadMetadata(partPath string) {
@ -62,9 +66,15 @@ func (ph *partHeader) mustReadMetadata(partPath string) {
}
// Perform various checks
if ph.FormatVersion > partFormatLatestVersion {
logger.Panicf("FATAL: unsupported part format version; got %d; mustn't exceed %d", partFormatLatestVersion)
}
if ph.MinTimestamp > ph.MaxTimestamp {
logger.Panicf("FATAL: MinTimestamp cannot exceed MaxTimestamp; got %d vs %d", ph.MinTimestamp, ph.MaxTimestamp)
}
if ph.BlocksCount > ph.RowsCount {
logger.Panicf("FATAL: BlocksCount=%d cannot exceed RowsCount=%d", ph.BlocksCount, ph.RowsCount)
}
}
func (ph *partHeader) mustWriteMetadata(partPath string) {

View File

@ -30,30 +30,34 @@ func (f *Field) String() string {
return string(x)
}
func (f *Field) marshal(dst []byte) []byte {
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Name))
func (f *Field) marshal(dst []byte, marshalFieldName bool) []byte {
if marshalFieldName {
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Name))
}
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Value))
return dst
}
func (f *Field) unmarshalNoArena(src []byte) ([]byte, error) {
func (f *Field) unmarshalNoArena(src []byte, unmarshalFieldName bool) ([]byte, error) {
srcOrig := src
// Unmarshal field name
b, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal field name")
if unmarshalFieldName {
name, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal field name")
}
src = src[nSize:]
f.Name = bytesutil.ToUnsafeString(name)
}
src = src[nSize:]
f.Name = bytesutil.ToUnsafeString(b)
// Unmarshal field value
b, nSize = encoding.UnmarshalBytes(src)
value, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal field value")
}
src = src[nSize:]
f.Value = bytesutil.ToUnsafeString(b)
f.Value = bytesutil.ToUnsafeString(value)
return src, nil
}

View File

@ -8,7 +8,7 @@ import (
// tokenizeStrings extracts word tokens from a, appends them to dst and returns the result.
//
// the order of returned tokens is unspecified.
// The order of returned tokens equals the order of tokens seen in a.
func tokenizeStrings(dst, a []string) []string {
t := getTokenizer()
for i, s := range a {
@ -145,27 +145,3 @@ func putTokenizer(t *tokenizer) {
}
var tokenizerPool sync.Pool
type tokensBuf struct {
A []string
}
func (tb *tokensBuf) reset() {
clear(tb.A)
tb.A = tb.A[:0]
}
func getTokensBuf() *tokensBuf {
v := tokensBufPool.Get()
if v == nil {
return &tokensBuf{}
}
return v.(*tokensBuf)
}
func putTokensBuf(tb *tokensBuf) {
tb.reset()
tokensBufPool.Put(tb)
}
var tokensBufPool sync.Pool