mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 14:22:15 +01:00
ae91a6883c
Previously such blocks were cleaned after they weren't accessed during 10 minutes. Now they are cleaned after one minute of missing access. This should reduce memory usage in general case.
449 lines
10 KiB
Go
449 lines
10 KiB
Go
package mergeset
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
)
|
|
|
|
func getMaxCachedIndexBlocksPerPart() int {
|
|
maxCachedIndexBlocksPerPartOnce.Do(func() {
|
|
n := memory.Allowed() / 1024 / 1024 / 4
|
|
if n == 0 {
|
|
n = 10
|
|
}
|
|
maxCachedIndexBlocksPerPart = n
|
|
})
|
|
return maxCachedIndexBlocksPerPart
|
|
}
|
|
|
|
var (
|
|
maxCachedIndexBlocksPerPart int
|
|
maxCachedIndexBlocksPerPartOnce sync.Once
|
|
)
|
|
|
|
func getMaxCachedInmemoryBlocksPerPart() int {
|
|
maxCachedInmemoryBlocksPerPartOnce.Do(func() {
|
|
n := memory.Allowed() / 1024 / 1024 / 4
|
|
if n == 0 {
|
|
n = 10
|
|
}
|
|
maxCachedInmemoryBlocksPerPart = n
|
|
})
|
|
return maxCachedInmemoryBlocksPerPart
|
|
}
|
|
|
|
var (
|
|
maxCachedInmemoryBlocksPerPart int
|
|
maxCachedInmemoryBlocksPerPartOnce sync.Once
|
|
)
|
|
|
|
type part struct {
|
|
ph partHeader
|
|
|
|
path string
|
|
|
|
size uint64
|
|
|
|
mrs []metaindexRow
|
|
|
|
indexFile fs.MustReadAtCloser
|
|
itemsFile fs.MustReadAtCloser
|
|
lensFile fs.MustReadAtCloser
|
|
|
|
idxbCache *indexBlockCache
|
|
ibCache *inmemoryBlockCache
|
|
}
|
|
|
|
func openFilePart(path string) (*part, error) {
|
|
path = filepath.Clean(path)
|
|
|
|
var ph partHeader
|
|
if err := ph.ParseFromPath(path); err != nil {
|
|
return nil, fmt.Errorf("cannot parse path to part: %w", err)
|
|
}
|
|
|
|
metaindexPath := path + "/metaindex.bin"
|
|
metaindexFile, err := filestream.Open(metaindexPath, true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot open %q: %w", metaindexPath, err)
|
|
}
|
|
metaindexSize := fs.MustFileSize(metaindexPath)
|
|
|
|
indexPath := path + "/index.bin"
|
|
indexFile, err := fs.OpenReaderAt(indexPath)
|
|
if err != nil {
|
|
metaindexFile.MustClose()
|
|
return nil, fmt.Errorf("cannot open %q: %w", indexPath, err)
|
|
}
|
|
indexSize := fs.MustFileSize(indexPath)
|
|
|
|
itemsPath := path + "/items.bin"
|
|
itemsFile, err := fs.OpenReaderAt(itemsPath)
|
|
if err != nil {
|
|
metaindexFile.MustClose()
|
|
indexFile.MustClose()
|
|
return nil, fmt.Errorf("cannot open %q: %w", itemsPath, err)
|
|
}
|
|
itemsSize := fs.MustFileSize(itemsPath)
|
|
|
|
lensPath := path + "/lens.bin"
|
|
lensFile, err := fs.OpenReaderAt(lensPath)
|
|
if err != nil {
|
|
metaindexFile.MustClose()
|
|
indexFile.MustClose()
|
|
itemsFile.MustClose()
|
|
return nil, fmt.Errorf("cannot open %q: %w", lensPath, err)
|
|
}
|
|
lensSize := fs.MustFileSize(lensPath)
|
|
|
|
size := metaindexSize + indexSize + itemsSize + lensSize
|
|
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
|
|
}
|
|
|
|
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) (*part, error) {
|
|
var errors []error
|
|
mrs, err := unmarshalMetaindexRows(nil, metaindexReader)
|
|
if err != nil {
|
|
errors = append(errors, fmt.Errorf("cannot unmarshal metaindexRows: %w", err))
|
|
}
|
|
metaindexReader.MustClose()
|
|
|
|
var p part
|
|
p.path = path
|
|
p.size = size
|
|
p.mrs = mrs
|
|
|
|
p.indexFile = indexFile
|
|
p.itemsFile = itemsFile
|
|
p.lensFile = lensFile
|
|
|
|
p.ph.CopyFrom(ph)
|
|
p.idxbCache = newIndexBlockCache()
|
|
p.ibCache = newInmemoryBlockCache()
|
|
|
|
if len(errors) > 0 {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
err := fmt.Errorf("error opening part %s: %w", p.path, errors[0])
|
|
p.MustClose()
|
|
return nil, err
|
|
}
|
|
return &p, nil
|
|
}
|
|
|
|
func (p *part) MustClose() {
|
|
p.indexFile.MustClose()
|
|
p.itemsFile.MustClose()
|
|
p.lensFile.MustClose()
|
|
|
|
p.idxbCache.MustClose()
|
|
p.ibCache.MustClose()
|
|
}
|
|
|
|
type indexBlock struct {
|
|
bhs []blockHeader
|
|
}
|
|
|
|
func getIndexBlock() *indexBlock {
|
|
v := indexBlockPool.Get()
|
|
if v == nil {
|
|
return &indexBlock{}
|
|
}
|
|
return v.(*indexBlock)
|
|
}
|
|
|
|
func putIndexBlock(idxb *indexBlock) {
|
|
idxb.bhs = idxb.bhs[:0]
|
|
indexBlockPool.Put(idxb)
|
|
}
|
|
|
|
var indexBlockPool sync.Pool
|
|
|
|
type indexBlockCache struct {
|
|
// Atomically updated counters must go first in the struct, so they are properly
|
|
// aligned to 8 bytes on 32-bit architectures.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
|
requests uint64
|
|
misses uint64
|
|
|
|
m map[uint64]*indexBlockCacheEntry
|
|
mu sync.RWMutex
|
|
|
|
cleanerStopCh chan struct{}
|
|
cleanerWG sync.WaitGroup
|
|
}
|
|
|
|
type indexBlockCacheEntry struct {
|
|
// Atomically updated counters must go first in the struct, so they are properly
|
|
// aligned to 8 bytes on 32-bit architectures.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
|
lastAccessTime uint64
|
|
|
|
idxb *indexBlock
|
|
}
|
|
|
|
func newIndexBlockCache() *indexBlockCache {
|
|
var idxbc indexBlockCache
|
|
idxbc.m = make(map[uint64]*indexBlockCacheEntry)
|
|
idxbc.cleanerStopCh = make(chan struct{})
|
|
idxbc.cleanerWG.Add(1)
|
|
go func() {
|
|
defer idxbc.cleanerWG.Done()
|
|
idxbc.cleaner()
|
|
}()
|
|
return &idxbc
|
|
}
|
|
|
|
func (idxbc *indexBlockCache) MustClose() {
|
|
close(idxbc.cleanerStopCh)
|
|
idxbc.cleanerWG.Wait()
|
|
|
|
// It is safe returning idxbc.m to pool, since the Reset must be called
|
|
// when the idxbc entries are no longer accessed by concurrent goroutines.
|
|
for _, idxbe := range idxbc.m {
|
|
putIndexBlock(idxbe.idxb)
|
|
}
|
|
idxbc.m = nil
|
|
}
|
|
|
|
// cleaner periodically cleans least recently used items.
|
|
func (idxbc *indexBlockCache) cleaner() {
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
idxbc.cleanByTimeout()
|
|
case <-idxbc.cleanerStopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (idxbc *indexBlockCache) cleanByTimeout() {
|
|
currentTime := fasttime.UnixTimestamp()
|
|
idxbc.mu.Lock()
|
|
for k, idxbe := range idxbc.m {
|
|
// Delete items accessed more than a minute ago.
|
|
if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 60 {
|
|
delete(idxbc.m, k)
|
|
}
|
|
}
|
|
idxbc.mu.Unlock()
|
|
}
|
|
|
|
func (idxbc *indexBlockCache) Get(k uint64) *indexBlock {
|
|
atomic.AddUint64(&idxbc.requests, 1)
|
|
idxbc.mu.RLock()
|
|
idxbe := idxbc.m[k]
|
|
idxbc.mu.RUnlock()
|
|
|
|
if idxbe != nil {
|
|
currentTime := fasttime.UnixTimestamp()
|
|
if atomic.LoadUint64(&idxbe.lastAccessTime) != currentTime {
|
|
atomic.StoreUint64(&idxbe.lastAccessTime, currentTime)
|
|
}
|
|
return idxbe.idxb
|
|
}
|
|
atomic.AddUint64(&idxbc.misses, 1)
|
|
return nil
|
|
}
|
|
|
|
// Put puts idxb under the key k into idxbc.
|
|
func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) {
|
|
idxbc.mu.Lock()
|
|
|
|
// Remove superflouos entries.
|
|
if overflow := len(idxbc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 {
|
|
// Remove 10% of items from the cache.
|
|
overflow = int(float64(len(idxbc.m)) * 0.1)
|
|
for k := range idxbc.m {
|
|
// Do not return idxb to pool, since these entries may be used
|
|
// by concurrent goroutines.
|
|
delete(idxbc.m, k)
|
|
overflow--
|
|
if overflow == 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Store idxb in the cache.
|
|
idxbe := &indexBlockCacheEntry{
|
|
lastAccessTime: fasttime.UnixTimestamp(),
|
|
idxb: idxb,
|
|
}
|
|
idxbc.m[k] = idxbe
|
|
idxbc.mu.Unlock()
|
|
}
|
|
|
|
func (idxbc *indexBlockCache) Len() uint64 {
|
|
idxbc.mu.RLock()
|
|
n := len(idxbc.m)
|
|
idxbc.mu.RUnlock()
|
|
return uint64(n)
|
|
}
|
|
|
|
func (idxbc *indexBlockCache) Requests() uint64 {
|
|
return atomic.LoadUint64(&idxbc.requests)
|
|
}
|
|
|
|
func (idxbc *indexBlockCache) Misses() uint64 {
|
|
return atomic.LoadUint64(&idxbc.misses)
|
|
}
|
|
|
|
type inmemoryBlockCache struct {
|
|
// Atomically updated counters must go first in the struct, so they are properly
|
|
// aligned to 8 bytes on 32-bit architectures.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
|
requests uint64
|
|
misses uint64
|
|
|
|
m map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry
|
|
mu sync.RWMutex
|
|
|
|
cleanerStopCh chan struct{}
|
|
cleanerWG sync.WaitGroup
|
|
}
|
|
|
|
type inmemoryBlockCacheKey struct {
|
|
firstItem string
|
|
itemsBlockOffset uint64
|
|
}
|
|
|
|
func (ibck *inmemoryBlockCacheKey) Init(bh *blockHeader) {
|
|
ibck.firstItem = ""
|
|
if bh.itemsBlockSize == 0 {
|
|
ibck.firstItem = string(bh.firstItem)
|
|
}
|
|
ibck.itemsBlockOffset = bh.itemsBlockOffset
|
|
}
|
|
|
|
type inmemoryBlockCacheEntry struct {
|
|
// Atomically updated counters must go first in the struct, so they are properly
|
|
// aligned to 8 bytes on 32-bit architectures.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
|
lastAccessTime uint64
|
|
|
|
ib *inmemoryBlock
|
|
}
|
|
|
|
func newInmemoryBlockCache() *inmemoryBlockCache {
|
|
var ibc inmemoryBlockCache
|
|
ibc.m = make(map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry)
|
|
|
|
ibc.cleanerStopCh = make(chan struct{})
|
|
ibc.cleanerWG.Add(1)
|
|
go func() {
|
|
defer ibc.cleanerWG.Done()
|
|
ibc.cleaner()
|
|
}()
|
|
return &ibc
|
|
}
|
|
|
|
func (ibc *inmemoryBlockCache) MustClose() {
|
|
close(ibc.cleanerStopCh)
|
|
ibc.cleanerWG.Wait()
|
|
|
|
// It is safe returning ibc.m entries to pool, since the Reset function may be called
|
|
// only if no other goroutines access ibc entries.
|
|
for _, ibe := range ibc.m {
|
|
putInmemoryBlock(ibe.ib)
|
|
}
|
|
ibc.m = nil
|
|
}
|
|
|
|
// cleaner periodically cleans least recently used items.
|
|
func (ibc *inmemoryBlockCache) cleaner() {
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
ibc.cleanByTimeout()
|
|
case <-ibc.cleanerStopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ibc *inmemoryBlockCache) cleanByTimeout() {
|
|
currentTime := fasttime.UnixTimestamp()
|
|
ibc.mu.Lock()
|
|
for k, ibe := range ibc.m {
|
|
// Delete items accessed more than a minute ago.
|
|
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 60 {
|
|
delete(ibc.m, k)
|
|
}
|
|
}
|
|
ibc.mu.Unlock()
|
|
}
|
|
|
|
func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock {
|
|
atomic.AddUint64(&ibc.requests, 1)
|
|
|
|
ibc.mu.RLock()
|
|
ibe := ibc.m[k]
|
|
ibc.mu.RUnlock()
|
|
|
|
if ibe != nil {
|
|
currentTime := fasttime.UnixTimestamp()
|
|
if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime {
|
|
atomic.StoreUint64(&ibe.lastAccessTime, currentTime)
|
|
}
|
|
return ibe.ib
|
|
}
|
|
atomic.AddUint64(&ibc.misses, 1)
|
|
return nil
|
|
}
|
|
|
|
// Put puts ib under key k into ibc.
|
|
func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) {
|
|
ibc.mu.Lock()
|
|
|
|
// Clean superflouos entries in cache.
|
|
if overflow := len(ibc.m) - getMaxCachedInmemoryBlocksPerPart(); overflow > 0 {
|
|
// Remove 10% of items from the cache.
|
|
overflow = int(float64(len(ibc.m)) * 0.1)
|
|
for k := range ibc.m {
|
|
// do not call putInmemoryBlock(ib), since the ib
|
|
// may be used by concurrent goroutines.
|
|
delete(ibc.m, k)
|
|
overflow--
|
|
if overflow == 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Store ib in the cache.
|
|
ibe := &inmemoryBlockCacheEntry{
|
|
lastAccessTime: fasttime.UnixTimestamp(),
|
|
ib: ib,
|
|
}
|
|
ibc.m[k] = ibe
|
|
ibc.mu.Unlock()
|
|
}
|
|
|
|
func (ibc *inmemoryBlockCache) Len() uint64 {
|
|
ibc.mu.RLock()
|
|
n := len(ibc.m)
|
|
ibc.mu.RUnlock()
|
|
return uint64(n)
|
|
}
|
|
|
|
func (ibc *inmemoryBlockCache) Requests() uint64 {
|
|
return atomic.LoadUint64(&ibc.requests)
|
|
}
|
|
|
|
func (ibc *inmemoryBlockCache) Misses() uint64 {
|
|
return atomic.LoadUint64(&ibc.misses)
|
|
}
|