From b03ccbf6f7211973e5911930d00a34bf1cafedb4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 Jan 2020 21:20:18 +0200 Subject: [PATCH] lib/{storage,mergeset}: gradually remove stale entries from block cache and index caches This should reduce memory usage in the long run when old blocks and indexes aren't accessed anymore. --- lib/mergeset/part.go | 225 ++++++++++++++++++++++++++++--------------- lib/storage/part.go | 87 ++++++++++++++--- 2 files changed, 223 insertions(+), 89 deletions(-) diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 043f5c3358..7a544acb0f 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -5,6 +5,7 @@ import ( "path/filepath" "sync" "sync/atomic" + "time" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" @@ -149,8 +150,8 @@ func (p *part) MustClose() { p.itemsFile.MustClose() p.lensFile.MustClose() - p.idxbCache.Reset() - p.ibCache.Reset() + p.idxbCache.MustClose() + p.ibCache.MustClose() } type indexBlock struct { @@ -179,27 +180,69 @@ type indexBlockCache struct { requests uint64 misses uint64 - m map[uint64]*indexBlock - missesMap map[uint64]uint64 - mu sync.RWMutex + m map[uint64]indexBlockCacheEntry + mu sync.RWMutex + + cleanerStopCh chan struct{} + cleanerWG sync.WaitGroup +} + +type indexBlockCacheEntry struct { + idxb *indexBlock + lastAccessTime uint64 } func (idxbc *indexBlockCache) Init() { - idxbc.m = make(map[uint64]*indexBlock) - idxbc.missesMap = make(map[uint64]uint64) + idxbc.m = make(map[uint64]indexBlockCacheEntry) idxbc.requests = 0 idxbc.misses = 0 + + idxbc.cleanerStopCh = make(chan struct{}) + idxbc.cleanerWG.Add(1) + go func() { + defer idxbc.cleanerWG.Done() + idxbc.cleaner() + }() } -func (idxbc *indexBlockCache) Reset() { +func (idxbc *indexBlockCache) MustClose() { + close(idxbc.cleanerStopCh) + idxbc.cleanerWG.Wait() + atomic.AddUint64(&indexBlockCacheRequests, idxbc.requests) atomic.AddUint64(&indexBlockCacheMisses, idxbc.misses) // It is safe returning idxbc.m to pool, since the Reset must be called // when the idxbc entries are no longer accessed by concurrent goroutines. - for _, idxb := range idxbc.m { - putIndexBlock(idxb) + for _, idxbe := range idxbc.m { + putIndexBlock(idxbe.idxb) } - idxbc.Init() + idxbc.m = nil +} + +// cleaner periodically cleans least recently used items. +func (idxbc *indexBlockCache) cleaner() { + t := time.NewTimer(5 * time.Second) + for { + select { + case <-t.C: + idxbc.cleanByTimeout() + case <-idxbc.cleanerStopCh: + t.Stop() + return + } + } +} + +func (idxbc *indexBlockCache) cleanByTimeout() { + currentTime := atomic.LoadUint64(¤tTimestamp) + idxbc.mu.Lock() + for k, idxbe := range idxbc.m { + // Delete items accessed more than 10 minutes ago. + if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 10*60 { + delete(idxbc.m, k) + } + } + idxbc.mu.Unlock() } var ( @@ -210,16 +253,17 @@ var ( func (idxbc *indexBlockCache) Get(k uint64) *indexBlock { atomic.AddUint64(&idxbc.requests, 1) idxbc.mu.RLock() - idxb := idxbc.m[k] + idxbe, ok := idxbc.m[k] idxbc.mu.RUnlock() - if idxb != nil { - return idxb + if ok { + currentTime := atomic.LoadUint64(¤tTimestamp) + if atomic.LoadUint64(&idxbe.lastAccessTime) != currentTime { + atomic.StoreUint64(&idxbe.lastAccessTime, currentTime) + } + return idxbe.idxb } atomic.AddUint64(&idxbc.misses, 1) - idxbc.mu.Lock() - idxbc.missesMap[k]++ - idxbc.mu.Unlock() return nil } @@ -229,13 +273,6 @@ func (idxbc *indexBlockCache) Get(k uint64) *indexBlock { func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) bool { idxbc.mu.Lock() - if idxbc.missesMap[k] < 2 { - // Do not pollute cache with infrequently accessed items, since they may - // evict frequently accessed items. - idxbc.mu.Unlock() - return false - } - // Remove superflouos entries. if overflow := len(idxbc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 { // Remove 10% of items from the cache. @@ -250,21 +287,13 @@ func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) bool { } } } - if overflow := len(idxbc.missesMap) - 8*getMaxCachedIndexBlocksPerPart(); overflow > 0 { - // Remove 10% of items from the cache. - overflow = int(float64(len(idxbc.missesMap)) * 0.1) - for k := range idxbc.missesMap { - delete(idxbc.missesMap, k) - overflow-- - if overflow == 0 { - break - } - } - } - // Store the frequently accessed idxb in the cache. - delete(idxbc.missesMap, k) - idxbc.m[k] = idxb + // Store idxb in the cache. + idxbe := indexBlockCacheEntry{ + idxb: idxb, + lastAccessTime: atomic.LoadUint64(¤tTimestamp), + } + idxbc.m[k] = idxbe idxbc.mu.Unlock() return true } @@ -291,9 +320,11 @@ type inmemoryBlockCache struct { requests uint64 misses uint64 - m map[inmemoryBlockCacheKey]*inmemoryBlock - missesMap map[inmemoryBlockCacheKey]uint64 - mu sync.RWMutex + m map[inmemoryBlockCacheKey]inmemoryBlockCacheEntry + mu sync.RWMutex + + cleanerStopCh chan struct{} + cleanerWG sync.WaitGroup } type inmemoryBlockCacheKey struct { @@ -309,22 +340,62 @@ func (ibck *inmemoryBlockCacheKey) Init(bh *blockHeader) { ibck.itemsBlockOffset = bh.itemsBlockOffset } -func (ibc *inmemoryBlockCache) Init() { - ibc.m = make(map[inmemoryBlockCacheKey]*inmemoryBlock) - ibc.missesMap = make(map[inmemoryBlockCacheKey]uint64) - ibc.requests = 0 - ibc.misses = 0 +type inmemoryBlockCacheEntry struct { + ib *inmemoryBlock + lastAccessTime uint64 } -func (ibc *inmemoryBlockCache) Reset() { +func (ibc *inmemoryBlockCache) Init() { + ibc.m = make(map[inmemoryBlockCacheKey]inmemoryBlockCacheEntry) + ibc.requests = 0 + ibc.misses = 0 + + ibc.cleanerStopCh = make(chan struct{}) + ibc.cleanerWG.Add(1) + go func() { + defer ibc.cleanerWG.Done() + ibc.cleaner() + }() +} + +func (ibc *inmemoryBlockCache) MustClose() { + close(ibc.cleanerStopCh) + ibc.cleanerWG.Wait() + atomic.AddUint64(&inmemoryBlockCacheRequests, ibc.requests) atomic.AddUint64(&inmemoryBlockCacheMisses, ibc.misses) // It is safe returning ibc.m entries to pool, since the Reset function may be called // only if no other goroutines access ibc entries. - for _, ib := range ibc.m { - putInmemoryBlock(ib) + for _, ibe := range ibc.m { + putInmemoryBlock(ibe.ib) } - ibc.Init() + ibc.m = nil +} + +// cleaner periodically cleans least recently used items. +func (ibc *inmemoryBlockCache) cleaner() { + t := time.NewTimer(5 * time.Second) + for { + select { + case <-t.C: + ibc.cleanByTimeout() + case <-ibc.cleanerStopCh: + t.Stop() + return + } + } +} + +func (ibc *inmemoryBlockCache) cleanByTimeout() { + currentTime := atomic.LoadUint64(¤tTimestamp) + ibc.mu.Lock() + for k, ibe := range ibc.m { + // Delete items accessed more than 10 minutes ago. + if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 10*60 { + delete(ibc.m, k) + } + } + ibc.mu.Unlock() } var ( @@ -336,16 +407,17 @@ func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock { atomic.AddUint64(&ibc.requests, 1) ibc.mu.RLock() - ib := ibc.m[k] + ibe, ok := ibc.m[k] ibc.mu.RUnlock() - if ib != nil { - return ib + if ok { + currentTime := atomic.LoadUint64(¤tTimestamp) + if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime { + atomic.StoreUint64(&ibe.lastAccessTime, currentTime) + } + return ibe.ib } atomic.AddUint64(&ibc.misses, 1) - ibc.mu.Lock() - ibc.missesMap[k]++ - ibc.mu.Unlock() return nil } @@ -355,14 +427,7 @@ func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock { func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) bool { ibc.mu.Lock() - if ibc.missesMap[k] < 2 { - // Do not cache entry with low number of accesses, since it may evict - // frequently accessed entries from the cache. - ibc.mu.Unlock() - return false - } - - // Clean superflouos entries in ibc.m and ibc.missesMap. + // Clean superflouos entries in cache. if overflow := len(ibc.m) - getMaxCachedInmemoryBlocksPerPart(); overflow > 0 { // Remove 10% of items from the cache. overflow = int(float64(len(ibc.m)) * 0.1) @@ -376,21 +441,13 @@ func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) b } } } - if overflow := len(ibc.missesMap) - 8*getMaxCachedInmemoryBlocksPerPart(); overflow > 0 { - // Remove 10% of items from the cache. - overflow = int(float64(len(ibc.missesMap)) * 0.1) - for k := range ibc.missesMap { - delete(ibc.missesMap, k) - overflow-- - if overflow == 0 { - break - } - } - } - // The entry is frequently accessed, so store it in the cache. - delete(ibc.missesMap, k) - ibc.m[k] = ib + // Store ib in the cache. + ibe := inmemoryBlockCacheEntry{ + ib: ib, + lastAccessTime: atomic.LoadUint64(¤tTimestamp), + } + ibc.m[k] = ibe ibc.mu.Unlock() return true } @@ -409,3 +466,15 @@ func (ibc *inmemoryBlockCache) Requests() uint64 { func (ibc *inmemoryBlockCache) Misses() uint64 { return atomic.LoadUint64(&ibc.misses) } + +func init() { + go func() { + t := time.NewTimer(time.Second) + for tm := range t.C { + t := uint64(tm.Unix()) + atomic.StoreUint64(¤tTimestamp, t) + } + }() +} + +var currentTimestamp uint64 diff --git a/lib/storage/part.go b/lib/storage/part.go index 19bee3e385..05ffc268ed 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -5,6 +5,7 @@ import ( "path/filepath" "sync" "sync/atomic" + "time" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" @@ -152,7 +153,7 @@ func (p *part) MustClose() { p.indexFile.MustClose() isBig := p.ph.RowsCount > maxRowsPerSmallPart() - p.ibCache.Reset(isBig) + p.ibCache.MustClose(isBig) } type indexBlock struct { @@ -180,19 +181,37 @@ type indexBlockCache struct { requests uint64 misses uint64 - m map[uint64]*indexBlock + m map[uint64]indexBlockCacheEntry missesMap map[uint64]uint64 mu sync.RWMutex + + cleanerStopCh chan struct{} + cleanerWG sync.WaitGroup +} + +type indexBlockCacheEntry struct { + ib *indexBlock + lastAccessTime uint64 } func (ibc *indexBlockCache) Init() { - ibc.m = make(map[uint64]*indexBlock) + ibc.m = make(map[uint64]indexBlockCacheEntry) ibc.missesMap = make(map[uint64]uint64) ibc.requests = 0 ibc.misses = 0 + + ibc.cleanerStopCh = make(chan struct{}) + ibc.cleanerWG.Add(1) + go func() { + defer ibc.cleanerWG.Done() + ibc.cleaner() + }() } -func (ibc *indexBlockCache) Reset(isBig bool) { +func (ibc *indexBlockCache) MustClose(isBig bool) { + close(ibc.cleanerStopCh) + ibc.cleanerWG.Wait() + if isBig { atomic.AddUint64(&bigIndexBlockCacheRequests, ibc.requests) atomic.AddUint64(&bigIndexBlockCacheMisses, ibc.misses) @@ -202,10 +221,36 @@ func (ibc *indexBlockCache) Reset(isBig bool) { } // It is safe returning ibc.m itemst to the pool, since Reset must // be called only when no other goroutines access ibc entries. - for _, ib := range ibc.m { - putIndexBlock(ib) + for _, ibe := range ibc.m { + putIndexBlock(ibe.ib) } - ibc.Init() + ibc.m = nil +} + +// cleaner periodically cleans least recently used items. +func (ibc *indexBlockCache) cleaner() { + t := time.NewTimer(5 * time.Second) + for { + select { + case <-t.C: + ibc.cleanByTimeout() + case <-ibc.cleanerStopCh: + t.Stop() + return + } + } +} + +func (ibc *indexBlockCache) cleanByTimeout() { + currentTime := atomic.LoadUint64(¤tTimestamp) + ibc.mu.Lock() + for k, ibe := range ibc.m { + // Delete items accessed more than 10 minutes ago. + if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 10*60 { + delete(ibc.m, k) + } + } + ibc.mu.Unlock() } var ( @@ -220,11 +265,15 @@ func (ibc *indexBlockCache) Get(k uint64) *indexBlock { atomic.AddUint64(&ibc.requests, 1) ibc.mu.RLock() - ib := ibc.m[k] + ibe, ok := ibc.m[k] ibc.mu.RUnlock() - if ib != nil { - return ib + if ok { + currentTime := atomic.LoadUint64(¤tTimestamp) + if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime { + atomic.StoreUint64(&ibe.lastAccessTime, currentTime) + } + return ibe.ib } atomic.AddUint64(&ibc.misses, 1) ibc.mu.Lock() @@ -270,7 +319,11 @@ func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) bool { // Store frequently requested ib in the cache. delete(ibc.missesMap, k) - ibc.m[k] = ib + ibe := indexBlockCacheEntry{ + ib: ib, + lastAccessTime: atomic.LoadUint64(¤tTimestamp), + } + ibc.m[k] = ibe ibc.mu.Unlock() return true } @@ -289,3 +342,15 @@ func (ibc *indexBlockCache) Len() uint64 { ibc.mu.Unlock() return n } + +func init() { + go func() { + t := time.NewTimer(time.Second) + for tm := range t.C { + t := uint64(tm.Unix()) + atomic.StoreUint64(¤tTimestamp, t) + } + }() +} + +var currentTimestamp uint64