mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/mergeset: eliminate copying of itemsData and lensData from storageBlock to inmemoryBlock
This should improve performance when registering new time series. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2247
This commit is contained in:
parent
7da4068f48
commit
c84a8b34cc
@ -148,21 +148,6 @@ func (sb *storageBlock) Reset() {
|
||||
sb.lensData = sb.lensData[:0]
|
||||
}
|
||||
|
||||
func getStorageBlock() *storageBlock {
|
||||
v := storageBlockPool.Get()
|
||||
if v == nil {
|
||||
return &storageBlock{}
|
||||
}
|
||||
return v.(*storageBlock)
|
||||
}
|
||||
|
||||
func putStorageBlock(sb *storageBlock) {
|
||||
sb.Reset()
|
||||
storageBlockPool.Put(sb)
|
||||
}
|
||||
|
||||
var storageBlockPool sync.Pool
|
||||
|
||||
type marshalType uint8
|
||||
|
||||
const (
|
||||
|
@ -1,8 +1,9 @@
|
||||
package mergeset
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
@ -45,8 +46,13 @@ func (mp *inmemoryPart) Reset() {
|
||||
// Init initializes mp from ib.
|
||||
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
|
||||
mp.Reset()
|
||||
sb := getStorageBlock()
|
||||
defer putStorageBlock(sb)
|
||||
|
||||
// Re-use mp.itemsData and mp.lensData in sb.
|
||||
// This eliminates copying itemsData and lensData from sb to mp later.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2247
|
||||
sb := &storageBlock{}
|
||||
sb.itemsData = mp.itemsData.B[:0]
|
||||
sb.lensData = mp.lensData.B[:0]
|
||||
|
||||
// Use the minimum possible compressLevel for compressing inmemoryPart,
|
||||
// since it will be merged into file part soon.
|
||||
@ -59,11 +65,11 @@ func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
|
||||
mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
|
||||
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
|
||||
|
||||
fs.MustWriteData(&mp.itemsData, sb.itemsData)
|
||||
mp.itemsData.B = sb.itemsData
|
||||
mp.bh.itemsBlockOffset = 0
|
||||
mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))
|
||||
|
||||
fs.MustWriteData(&mp.lensData, sb.lensData)
|
||||
mp.lensData.B = sb.lensData
|
||||
mp.bh.lensBlockOffset = 0
|
||||
mp.bh.lensBlockSize = uint32(len(mp.lensData.B))
|
||||
|
||||
@ -97,25 +103,16 @@ func (mp *inmemoryPart) size() uint64 {
|
||||
}
|
||||
|
||||
func getInmemoryPart() *inmemoryPart {
|
||||
select {
|
||||
case mp := <-mpPool:
|
||||
return mp
|
||||
default:
|
||||
v := inmemoryPartPool.Get()
|
||||
if v == nil {
|
||||
return &inmemoryPart{}
|
||||
}
|
||||
return v.(*inmemoryPart)
|
||||
}
|
||||
|
||||
func putInmemoryPart(mp *inmemoryPart) {
|
||||
mp.Reset()
|
||||
select {
|
||||
case mpPool <- mp:
|
||||
default:
|
||||
// Drop mp in order to reduce memory usage.
|
||||
}
|
||||
inmemoryPartPool.Put(mp)
|
||||
}
|
||||
|
||||
// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores,
|
||||
// since sync.Pool maintains per-CPU pool of inmemoryPart objects.
|
||||
//
|
||||
// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage.
|
||||
var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs())
|
||||
var inmemoryPartPool sync.Pool
|
||||
|
@ -227,7 +227,9 @@ func (pw *partWrapper) decRef() {
|
||||
}
|
||||
|
||||
if pw.mp != nil {
|
||||
putInmemoryPart(pw.mp)
|
||||
// Do not return pw.mp to pool via putInmemoryPart(),
|
||||
// since pw.mp size may be too big compared to other entries stored in the pool.
|
||||
// This may result in increased memory usage because of high fragmentation.
|
||||
pw.mp = nil
|
||||
}
|
||||
pw.p.MustClose()
|
||||
@ -740,7 +742,10 @@ func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {
|
||||
|
||||
// Prepare blockStreamWriter for destination part.
|
||||
bsw := getBlockStreamWriter()
|
||||
mpDst := getInmemoryPart()
|
||||
// Do not obtain mpDst via getInmemoryPart(), since its size
|
||||
// may be too big comparing to other entries in the pool.
|
||||
// This may result in increased memory usage because of high fragmentation.
|
||||
mpDst := &inmemoryPart{}
|
||||
bsw.InitFromInmemoryPart(mpDst)
|
||||
|
||||
// Merge parts.
|
||||
|
Loading…
Reference in New Issue
Block a user