mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/workingsetcache: move the cleaner for the prev cache into a separate goroutine
This makes the code more clear after d906d8573e
This commit is contained in:
parent
4cd173bbaa
commit
08ca45d238
@ -42,7 +42,7 @@ type Cache struct {
|
|||||||
maxBytes int
|
maxBytes int
|
||||||
|
|
||||||
// mu serializes access to curr, prev and mode
|
// mu serializes access to curr, prev and mode
|
||||||
// in expirationWatcher and cacheSizeWatcher.
|
// in expirationWatcher, prevCacheWatcher and cacheSizeWatcher.
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
@ -122,6 +122,11 @@ func (c *Cache) runWatchers(expireDuration time.Duration) {
|
|||||||
c.expirationWatcher(expireDuration)
|
c.expirationWatcher(expireDuration)
|
||||||
}()
|
}()
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
c.prevCacheWatcher()
|
||||||
|
}()
|
||||||
|
c.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
c.cacheSizeWatcher()
|
c.cacheSizeWatcher()
|
||||||
@ -132,7 +137,6 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
|
|||||||
expireDuration += timeJitter(expireDuration / 10)
|
expireDuration += timeJitter(expireDuration / 10)
|
||||||
t := time.NewTicker(expireDuration)
|
t := time.NewTicker(expireDuration)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
var csCurr, csPrev fastcache.Stats
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.stopCh:
|
case <-c.stopCh:
|
||||||
@ -148,52 +152,61 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
|
|||||||
// Reset prev cache and swap it with the curr cache.
|
// Reset prev cache and swap it with the curr cache.
|
||||||
prev := c.prev.Load().(*fastcache.Cache)
|
prev := c.prev.Load().(*fastcache.Cache)
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
csCurr.Reset()
|
|
||||||
curr.UpdateStats(&csCurr)
|
|
||||||
csPrev.Reset()
|
|
||||||
prev.UpdateStats(&csPrev)
|
|
||||||
|
|
||||||
c.prev.Store(curr)
|
c.prev.Store(curr)
|
||||||
|
var cs fastcache.Stats
|
||||||
prevGetCalls := csCurr.GetCalls
|
prev.UpdateStats(&cs)
|
||||||
updateCacheStatsHistory(&c.csHistory, &csPrev)
|
updateCacheStatsHistory(&c.csHistory, &cs)
|
||||||
|
|
||||||
prev.Reset()
|
prev.Reset()
|
||||||
c.curr.Store(prev)
|
c.curr.Store(prev)
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Watch for the usage of the prev cache and drop it whenever it receives
|
func (c *Cache) prevCacheWatcher() {
|
||||||
// less than 5% of get calls comparing to the curr cache.
|
// Watch for the usage of the prev cache and drop it whenever it receives
|
||||||
// This allows saving memory.
|
// less than 5% of requests comparing to the curr cache during the last 10 seconds.
|
||||||
prev, curr = curr, prev
|
checkInterval := 10 * time.Second
|
||||||
checkInterval := 10 * time.Second
|
checkInterval += timeJitter(checkInterval / 10)
|
||||||
checkerT := time.NewTicker(checkInterval)
|
t := time.NewTicker(checkInterval)
|
||||||
checkerDeadline := time.Now().Add(expireDuration - checkInterval)
|
defer t.Stop()
|
||||||
for time.Now().Before(checkerDeadline) {
|
prevGetCalls := uint64(0)
|
||||||
select {
|
currGetCalls := uint64(0)
|
||||||
case <-c.stopCh:
|
for {
|
||||||
break
|
select {
|
||||||
case <-checkerT.C:
|
case <-c.stopCh:
|
||||||
}
|
return
|
||||||
c.mu.Lock()
|
case <-t.C:
|
||||||
if atomic.LoadUint32(&c.mode) != split {
|
}
|
||||||
// Do nothing in non-split mode.
|
c.mu.Lock()
|
||||||
c.mu.Unlock()
|
if atomic.LoadUint32(&c.mode) != split {
|
||||||
break
|
// Do nothing in non-split mode.
|
||||||
}
|
c.mu.Unlock()
|
||||||
csCurr.Reset()
|
return
|
||||||
curr.UpdateStats(&csCurr)
|
}
|
||||||
csPrev.Reset()
|
prev := c.prev.Load().(*fastcache.Cache)
|
||||||
prev.UpdateStats(&csPrev)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
getCalls := csPrev.GetCalls - prevGetCalls
|
var csCurr, csPrev fastcache.Stats
|
||||||
if float64(getCalls) < 0.05*float64(csCurr.GetCalls) {
|
curr.UpdateStats(&csCurr)
|
||||||
// The majority of requests are served from the curr cache,
|
prev.UpdateStats(&csPrev)
|
||||||
// so the prev cache can be deleted.
|
currRequests := csCurr.GetCalls
|
||||||
|
if currRequests >= currGetCalls {
|
||||||
|
currRequests -= currGetCalls
|
||||||
|
}
|
||||||
|
prevRequests := csPrev.GetCalls
|
||||||
|
if prevRequests >= prevGetCalls {
|
||||||
|
prevRequests -= prevGetCalls
|
||||||
|
}
|
||||||
|
currGetCalls = csCurr.GetCalls
|
||||||
|
prevGetCalls = csPrev.GetCalls
|
||||||
|
if currRequests >= 20 && float64(prevRequests)/float64(currRequests) < 0.05 {
|
||||||
|
// The majority of requests are served from the curr cache,
|
||||||
|
// so the prev cache can be deleted in order to free up memory.
|
||||||
|
if csPrev.EntriesCount > 0 {
|
||||||
|
updateCacheStatsHistory(&c.csHistory, &csPrev)
|
||||||
prev.Reset()
|
prev.Reset()
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
checkerT.Stop()
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -239,11 +252,9 @@ func (c *Cache) cacheSizeWatcher() {
|
|||||||
prev := c.prev.Load().(*fastcache.Cache)
|
prev := c.prev.Load().(*fastcache.Cache)
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
c.prev.Store(curr)
|
c.prev.Store(curr)
|
||||||
|
|
||||||
var cs fastcache.Stats
|
var cs fastcache.Stats
|
||||||
prev.UpdateStats(&cs)
|
prev.UpdateStats(&cs)
|
||||||
updateCacheStatsHistory(&c.csHistory, &cs)
|
updateCacheStatsHistory(&c.csHistory, &cs)
|
||||||
|
|
||||||
prev.Reset()
|
prev.Reset()
|
||||||
// use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache
|
// use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache
|
||||||
// couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit.
|
// couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit.
|
||||||
@ -268,11 +279,9 @@ func (c *Cache) cacheSizeWatcher() {
|
|||||||
c.setMode(whole)
|
c.setMode(whole)
|
||||||
prev = c.prev.Load().(*fastcache.Cache)
|
prev = c.prev.Load().(*fastcache.Cache)
|
||||||
c.prev.Store(fastcache.New(1024))
|
c.prev.Store(fastcache.New(1024))
|
||||||
|
|
||||||
cs.Reset()
|
cs.Reset()
|
||||||
prev.UpdateStats(&cs)
|
prev.UpdateStats(&cs)
|
||||||
updateCacheStatsHistory(&c.csHistory, &cs)
|
updateCacheStatsHistory(&c.csHistory, &cs)
|
||||||
|
|
||||||
prev.Reset()
|
prev.Reset()
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
@ -296,9 +305,13 @@ func (c *Cache) Stop() {
|
|||||||
|
|
||||||
// Reset resets the cache.
|
// Reset resets the cache.
|
||||||
func (c *Cache) Reset() {
|
func (c *Cache) Reset() {
|
||||||
|
var cs fastcache.Stats
|
||||||
prev := c.prev.Load().(*fastcache.Cache)
|
prev := c.prev.Load().(*fastcache.Cache)
|
||||||
|
prev.UpdateStats(&cs)
|
||||||
prev.Reset()
|
prev.Reset()
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
|
curr.UpdateStats(&cs)
|
||||||
|
updateCacheStatsHistory(&c.csHistory, &cs)
|
||||||
curr.Reset()
|
curr.Reset()
|
||||||
// Reset the mode to `split` in the hope the working set size becomes smaller after the reset.
|
// Reset the mode to `split` in the hope the working set size becomes smaller after the reset.
|
||||||
c.setMode(split)
|
c.setMode(split)
|
||||||
|
Loading…
Reference in New Issue
Block a user