mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-02 01:00:07 +01:00
636c55b526
This also should reduce CPU time spent by GC, since inmemoryBlock.items don't have pointers now, so GC doesn't need visiting them.
113 lines
3.3 KiB
Go
113 lines
3.3 KiB
Go
package mergeset
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
type inmemoryPart struct {
|
|
ph partHeader
|
|
sb storageBlock
|
|
bh blockHeader
|
|
mr metaindexRow
|
|
|
|
unpackedIndexBlockBuf []byte
|
|
packedIndexBlockBuf []byte
|
|
|
|
unpackedMetaindexBuf []byte
|
|
packedMetaindexBuf []byte
|
|
|
|
metaindexData bytesutil.ByteBuffer
|
|
indexData bytesutil.ByteBuffer
|
|
itemsData bytesutil.ByteBuffer
|
|
lensData bytesutil.ByteBuffer
|
|
}
|
|
|
|
func (ip *inmemoryPart) Reset() {
|
|
ip.ph.Reset()
|
|
ip.sb.Reset()
|
|
ip.bh.Reset()
|
|
ip.mr.Reset()
|
|
|
|
ip.unpackedIndexBlockBuf = ip.unpackedIndexBlockBuf[:0]
|
|
ip.packedIndexBlockBuf = ip.packedIndexBlockBuf[:0]
|
|
|
|
ip.unpackedMetaindexBuf = ip.unpackedMetaindexBuf[:0]
|
|
ip.packedMetaindexBuf = ip.packedMetaindexBuf[:0]
|
|
|
|
ip.metaindexData.Reset()
|
|
ip.indexData.Reset()
|
|
ip.itemsData.Reset()
|
|
ip.lensData.Reset()
|
|
}
|
|
|
|
// Init initializes ip from ib.
|
|
func (ip *inmemoryPart) Init(ib *inmemoryBlock) {
|
|
ip.Reset()
|
|
|
|
// Use the minimum possible compressLevel for compressing inmemoryPart,
|
|
// since it will be merged into file part soon.
|
|
compressLevel := 0
|
|
ip.bh.firstItem, ip.bh.commonPrefix, ip.bh.itemsCount, ip.bh.marshalType = ib.MarshalUnsortedData(&ip.sb, ip.bh.firstItem[:0], ip.bh.commonPrefix[:0], compressLevel)
|
|
|
|
ip.ph.itemsCount = uint64(len(ib.items))
|
|
ip.ph.blocksCount = 1
|
|
ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0].String(ib.data)...)
|
|
ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
|
|
|
|
fs.MustWriteData(&ip.itemsData, ip.sb.itemsData)
|
|
ip.bh.itemsBlockOffset = 0
|
|
ip.bh.itemsBlockSize = uint32(len(ip.sb.itemsData))
|
|
|
|
fs.MustWriteData(&ip.lensData, ip.sb.lensData)
|
|
ip.bh.lensBlockOffset = 0
|
|
ip.bh.lensBlockSize = uint32(len(ip.sb.lensData))
|
|
|
|
ip.unpackedIndexBlockBuf = ip.bh.Marshal(ip.unpackedIndexBlockBuf[:0])
|
|
ip.packedIndexBlockBuf = encoding.CompressZSTDLevel(ip.packedIndexBlockBuf[:0], ip.unpackedIndexBlockBuf, 0)
|
|
fs.MustWriteData(&ip.indexData, ip.packedIndexBlockBuf)
|
|
|
|
ip.mr.firstItem = append(ip.mr.firstItem[:0], ip.bh.firstItem...)
|
|
ip.mr.blockHeadersCount = 1
|
|
ip.mr.indexBlockOffset = 0
|
|
ip.mr.indexBlockSize = uint32(len(ip.packedIndexBlockBuf))
|
|
ip.unpackedMetaindexBuf = ip.mr.Marshal(ip.unpackedMetaindexBuf[:0])
|
|
ip.packedMetaindexBuf = encoding.CompressZSTDLevel(ip.packedMetaindexBuf[:0], ip.unpackedMetaindexBuf, 0)
|
|
fs.MustWriteData(&ip.metaindexData, ip.packedMetaindexBuf)
|
|
}
|
|
|
|
// It is safe calling NewPart multiple times.
|
|
// It is unsafe re-using ip while the returned part is in use.
|
|
func (ip *inmemoryPart) NewPart() *part {
|
|
ph := ip.ph
|
|
size := ip.size()
|
|
p, err := newPart(&ph, "", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
|
if err != nil {
|
|
logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err)
|
|
}
|
|
return p
|
|
}
|
|
|
|
func (ip *inmemoryPart) size() uint64 {
|
|
return uint64(len(ip.metaindexData.B) + len(ip.indexData.B) + len(ip.itemsData.B) + len(ip.lensData.B))
|
|
}
|
|
|
|
func getInmemoryPart() *inmemoryPart {
|
|
v := ipPool.Get()
|
|
if v == nil {
|
|
return &inmemoryPart{}
|
|
}
|
|
return v.(*inmemoryPart)
|
|
}
|
|
|
|
func putInmemoryPart(ip *inmemoryPart) {
|
|
ip.Reset()
|
|
ipPool.Put(ip)
|
|
}
|
|
|
|
var ipPool sync.Pool
|