mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-02 09:10:40 +01:00
d128a5bf99
This should reduce the maximum memory usage for the cache in `whole` state
385 lines
10 KiB
Go
385 lines
10 KiB
Go
package workingsetcache
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/fastcache"
|
|
)
|
|
|
|
// Cache modes.
|
|
const (
|
|
split = 0
|
|
switching = 1
|
|
whole = 2
|
|
)
|
|
|
|
const defaultExpireDuration = 20 * time.Minute
|
|
|
|
// Cache is a cache for working set entries.
|
|
//
|
|
// The cache evicts inactive entries after the given expireDuration.
|
|
// Recently accessed entries survive expireDuration.
|
|
type Cache struct {
|
|
curr atomic.Value
|
|
prev atomic.Value
|
|
|
|
// cs holds cache stats
|
|
cs fastcache.Stats
|
|
|
|
// mode indicates whether to use only curr and skip prev.
|
|
//
|
|
// This flag is set to switching if curr is filled for more than 50% space.
|
|
// In this case using prev would result in RAM waste,
|
|
// it is better to use only curr cache with doubled size.
|
|
// After the process of switching, this flag will be set to whole.
|
|
mode uint32
|
|
|
|
// The maxBytes value passed to New() or to Load().
|
|
maxBytes int
|
|
|
|
// mu serializes access to curr, prev and mode
|
|
// in expirationWatcher and cacheSizeWatcher.
|
|
mu sync.Mutex
|
|
|
|
wg sync.WaitGroup
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
// Load loads the cache from filePath and limits its size to maxBytes
|
|
// and evicts inactive entries in 20 minutes.
|
|
//
|
|
// Stop must be called on the returned cache when it is no longer needed.
|
|
func Load(filePath string, maxBytes int) *Cache {
|
|
return LoadWithExpire(filePath, maxBytes, defaultExpireDuration)
|
|
}
|
|
|
|
// LoadWithExpire loads the cache from filePath and limits its size to maxBytes
|
|
// and evicts inactive entires after expireDuration.
|
|
//
|
|
// Stop must be called on the returned cache when it is no longer needed.
|
|
func LoadWithExpire(filePath string, maxBytes int, expireDuration time.Duration) *Cache {
|
|
curr := fastcache.LoadFromFileOrNew(filePath, maxBytes)
|
|
var cs fastcache.Stats
|
|
curr.UpdateStats(&cs)
|
|
if cs.EntriesCount == 0 {
|
|
curr.Reset()
|
|
// The cache couldn't be loaded with maxBytes size.
|
|
// This may mean that the cache is split into curr and prev caches.
|
|
// Try loading it again with maxBytes / 2 size.
|
|
// Put the loaded cache into `prev` instead of `curr`
|
|
// in order to limit the growth of the cache for the current period of time.
|
|
prev := fastcache.LoadFromFileOrNew(filePath, maxBytes/2)
|
|
curr := fastcache.New(maxBytes / 2)
|
|
c := newCacheInternal(curr, prev, split, maxBytes)
|
|
c.runWatchers(expireDuration)
|
|
return c
|
|
}
|
|
|
|
// The cache has been successfully loaded in full.
|
|
// Set its' mode to `whole`.
|
|
// There is no need in runWatchers call.
|
|
prev := fastcache.New(1024)
|
|
return newCacheInternal(curr, prev, whole, maxBytes)
|
|
}
|
|
|
|
// New creates new cache with the given maxBytes capacity.
|
|
//
|
|
// Stop must be called on the returned cache when it is no longer needed.
|
|
func New(maxBytes int) *Cache {
|
|
return NewWithExpire(maxBytes, defaultExpireDuration)
|
|
}
|
|
|
|
// NewWithExpire creates new cache with the given maxBytes capacity and the given expireDuration
|
|
// for inactive entries.
|
|
//
|
|
// Stop must be called on the returned cache when it is no longer needed.
|
|
func NewWithExpire(maxBytes int, expireDuration time.Duration) *Cache {
|
|
curr := fastcache.New(maxBytes / 2)
|
|
prev := fastcache.New(1024)
|
|
c := newCacheInternal(curr, prev, split, maxBytes)
|
|
c.runWatchers(expireDuration)
|
|
return c
|
|
}
|
|
|
|
func newCacheInternal(curr, prev *fastcache.Cache, mode, maxBytes int) *Cache {
|
|
var c Cache
|
|
c.maxBytes = maxBytes
|
|
c.curr.Store(curr)
|
|
c.prev.Store(prev)
|
|
c.stopCh = make(chan struct{})
|
|
c.setMode(mode)
|
|
return &c
|
|
}
|
|
|
|
func (c *Cache) runWatchers(expireDuration time.Duration) {
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.expirationWatcher(expireDuration)
|
|
}()
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.cacheSizeWatcher()
|
|
}()
|
|
}
|
|
|
|
func (c *Cache) expirationWatcher(expireDuration time.Duration) {
|
|
t := time.NewTicker(expireDuration / 2)
|
|
for {
|
|
select {
|
|
case <-c.stopCh:
|
|
t.Stop()
|
|
return
|
|
case <-t.C:
|
|
}
|
|
|
|
c.mu.Lock()
|
|
if atomic.LoadUint32(&c.mode) != split {
|
|
// Stop the expirationWatcher on non-split mode.
|
|
c.mu.Unlock()
|
|
return
|
|
}
|
|
// Expire prev cache and create fresh curr cache with the same capacity.
|
|
// Do not reuse prev cache, since it can occupy too big amounts of memory.
|
|
prev := c.prev.Load().(*fastcache.Cache)
|
|
prev.Reset()
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
var cs fastcache.Stats
|
|
curr.UpdateStats(&cs)
|
|
c.prev.Store(curr)
|
|
// Use c.maxBytes/2 instead of cs.MaxBytesSize for creating new cache,
|
|
// since cs.MaxBytesSize may not match c.maxBytes/2, so the created cache
|
|
// couldn't be loaded from file with c.maxBytes/2 limit after saving with cs.MaxBytesSize size.
|
|
curr = fastcache.New(c.maxBytes / 2)
|
|
c.curr.Store(curr)
|
|
c.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (c *Cache) cacheSizeWatcher() {
|
|
t := time.NewTicker(time.Minute)
|
|
defer t.Stop()
|
|
|
|
var maxBytesSize uint64
|
|
for {
|
|
select {
|
|
case <-c.stopCh:
|
|
return
|
|
case <-t.C:
|
|
}
|
|
if c.loadMode() != split {
|
|
continue
|
|
}
|
|
var cs fastcache.Stats
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
curr.UpdateStats(&cs)
|
|
if cs.BytesSize >= uint64(0.9*float64(cs.MaxBytesSize)) {
|
|
maxBytesSize = cs.MaxBytesSize
|
|
break
|
|
}
|
|
}
|
|
|
|
// curr cache size exceeds 90% of its capacity. It is better
|
|
// to double the size of curr cache and stop using prev cache,
|
|
// since this will result in higher summary cache capacity.
|
|
//
|
|
// Do this in the following steps:
|
|
// 1) switch to mode=switching
|
|
// 2) move curr cache to prev
|
|
// 3) create curr cache with doubled size
|
|
// 4) wait until curr cache size exceeds maxBytesSize, i.e. it is populated with new data
|
|
// 5) switch to mode=whole
|
|
// 6) drop prev cache
|
|
|
|
c.mu.Lock()
|
|
c.setMode(switching)
|
|
prev := c.prev.Load().(*fastcache.Cache)
|
|
prev.Reset()
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
c.prev.Store(curr)
|
|
// use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache
|
|
// couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit.
|
|
c.curr.Store(fastcache.New(c.maxBytes))
|
|
c.mu.Unlock()
|
|
|
|
for {
|
|
select {
|
|
case <-c.stopCh:
|
|
return
|
|
case <-t.C:
|
|
}
|
|
var cs fastcache.Stats
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
curr.UpdateStats(&cs)
|
|
if cs.BytesSize >= maxBytesSize {
|
|
break
|
|
}
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.setMode(whole)
|
|
prev = c.prev.Load().(*fastcache.Cache)
|
|
prev.Reset()
|
|
c.prev.Store(fastcache.New(1024))
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// Save saves the cache to filePath.
|
|
func (c *Cache) Save(filePath string) error {
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
concurrency := cgroup.AvailableCPUs()
|
|
return curr.SaveToFileConcurrent(filePath, concurrency)
|
|
}
|
|
|
|
// Stop stops the cache.
|
|
//
|
|
// The cache cannot be used after the Stop call.
|
|
func (c *Cache) Stop() {
|
|
close(c.stopCh)
|
|
c.wg.Wait()
|
|
|
|
c.Reset()
|
|
}
|
|
|
|
// Reset resets the cache.
|
|
func (c *Cache) Reset() {
|
|
prev := c.prev.Load().(*fastcache.Cache)
|
|
prev.Reset()
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
curr.Reset()
|
|
// Reset the mode to `split` in the hope the working set size becomes smaller after the reset.
|
|
c.setMode(split)
|
|
}
|
|
|
|
func (c *Cache) setMode(mode int) {
|
|
atomic.StoreUint32(&c.mode, uint32(mode))
|
|
}
|
|
|
|
func (c *Cache) loadMode() int {
|
|
return int(atomic.LoadUint32(&c.mode))
|
|
}
|
|
|
|
// UpdateStats updates fcs with cache stats.
|
|
func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
|
|
var cs fastcache.Stats
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
curr.UpdateStats(&cs)
|
|
fcs.Collisions += cs.Collisions
|
|
fcs.Corruptions += cs.Corruptions
|
|
fcs.EntriesCount += cs.EntriesCount
|
|
fcs.BytesSize += cs.BytesSize
|
|
fcs.MaxBytesSize += cs.MaxBytesSize
|
|
|
|
fcs.GetCalls += atomic.LoadUint64(&c.cs.GetCalls)
|
|
fcs.SetCalls += atomic.LoadUint64(&c.cs.SetCalls)
|
|
fcs.Misses += atomic.LoadUint64(&c.cs.Misses)
|
|
|
|
prev := c.prev.Load().(*fastcache.Cache)
|
|
cs.Reset()
|
|
prev.UpdateStats(&cs)
|
|
fcs.EntriesCount += cs.EntriesCount
|
|
fcs.BytesSize += cs.BytesSize
|
|
fcs.MaxBytesSize += cs.MaxBytesSize
|
|
}
|
|
|
|
// Get appends the found value for the given key to dst and returns the result.
|
|
func (c *Cache) Get(dst, key []byte) []byte {
|
|
atomic.AddUint64(&c.cs.GetCalls, 1)
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
result := curr.Get(dst, key)
|
|
if len(result) > len(dst) {
|
|
// Fast path - the entry is found in the current cache.
|
|
return result
|
|
}
|
|
if c.loadMode() == whole {
|
|
// Nothing found.
|
|
atomic.AddUint64(&c.cs.Misses, 1)
|
|
return result
|
|
}
|
|
|
|
// Search for the entry in the previous cache.
|
|
prev := c.prev.Load().(*fastcache.Cache)
|
|
result = prev.Get(dst, key)
|
|
if len(result) <= len(dst) {
|
|
// Nothing found.
|
|
atomic.AddUint64(&c.cs.Misses, 1)
|
|
return result
|
|
}
|
|
// Cache the found entry in the current cache.
|
|
curr.Set(key, result[len(dst):])
|
|
return result
|
|
}
|
|
|
|
// Has verifies whether the cache contains the given key.
|
|
func (c *Cache) Has(key []byte) bool {
|
|
atomic.AddUint64(&c.cs.GetCalls, 1)
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
if curr.Has(key) {
|
|
return true
|
|
}
|
|
if c.loadMode() == whole {
|
|
atomic.AddUint64(&c.cs.Misses, 1)
|
|
return false
|
|
}
|
|
prev := c.prev.Load().(*fastcache.Cache)
|
|
if !prev.Has(key) {
|
|
atomic.AddUint64(&c.cs.Misses, 1)
|
|
return false
|
|
}
|
|
// Cache the found entry in the current cache.
|
|
tmpBuf := tmpBufPool.Get()
|
|
tmpBuf.B = prev.Get(tmpBuf.B, key)
|
|
curr.Set(key, tmpBuf.B)
|
|
tmpBufPool.Put(tmpBuf)
|
|
return true
|
|
}
|
|
|
|
var tmpBufPool bytesutil.ByteBufferPool
|
|
|
|
// Set sets the given value for the given key.
|
|
func (c *Cache) Set(key, value []byte) {
|
|
atomic.AddUint64(&c.cs.SetCalls, 1)
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
curr.Set(key, value)
|
|
}
|
|
|
|
// GetBig appends the found value for the given key to dst and returns the result.
|
|
func (c *Cache) GetBig(dst, key []byte) []byte {
|
|
atomic.AddUint64(&c.cs.GetCalls, 1)
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
result := curr.GetBig(dst, key)
|
|
if len(result) > len(dst) {
|
|
// Fast path - the entry is found in the current cache.
|
|
return result
|
|
}
|
|
if c.loadMode() == whole {
|
|
// Nothing found.
|
|
atomic.AddUint64(&c.cs.Misses, 1)
|
|
return result
|
|
}
|
|
|
|
// Search for the entry in the previous cache.
|
|
prev := c.prev.Load().(*fastcache.Cache)
|
|
result = prev.GetBig(dst, key)
|
|
if len(result) <= len(dst) {
|
|
// Nothing found.
|
|
atomic.AddUint64(&c.cs.Misses, 1)
|
|
return result
|
|
}
|
|
// Cache the found entry in the current cache.
|
|
curr.SetBig(key, result[len(dst):])
|
|
return result
|
|
}
|
|
|
|
// SetBig sets the given value for the given key.
|
|
func (c *Cache) SetBig(key, value []byte) {
|
|
atomic.AddUint64(&c.cs.SetCalls, 1)
|
|
curr := c.curr.Load().(*fastcache.Cache)
|
|
curr.SetBig(key, value)
|
|
}
|