mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
lib/workingsetcache: drop the previous cache whenever it recieves less than 5% of requests comparing to the current cache
This means that the majority of requests are successfully served from the current cache, so the previous cache can be reset in order to free up memory.
This commit is contained in:
parent
42cda38dbc
commit
526bc8a8b0
@ -131,14 +131,14 @@ func (c *Cache) runWatchers(expireDuration time.Duration) {
|
|||||||
func (c *Cache) expirationWatcher(expireDuration time.Duration) {
|
func (c *Cache) expirationWatcher(expireDuration time.Duration) {
|
||||||
expireDuration += timeJitter(expireDuration / 10)
|
expireDuration += timeJitter(expireDuration / 10)
|
||||||
t := time.NewTicker(expireDuration)
|
t := time.NewTicker(expireDuration)
|
||||||
|
defer t.Stop()
|
||||||
|
var csCurr, csPrev fastcache.Stats
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.stopCh:
|
case <-c.stopCh:
|
||||||
t.Stop()
|
|
||||||
return
|
return
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
}
|
}
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
if atomic.LoadUint32(&c.mode) != split {
|
if atomic.LoadUint32(&c.mode) != split {
|
||||||
// Stop the expirationWatcher on non-split mode.
|
// Stop the expirationWatcher on non-split mode.
|
||||||
@ -148,16 +148,52 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
|
|||||||
// Reset prev cache and swap it with the curr cache.
|
// Reset prev cache and swap it with the curr cache.
|
||||||
prev := c.prev.Load().(*fastcache.Cache)
|
prev := c.prev.Load().(*fastcache.Cache)
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
|
csCurr.Reset()
|
||||||
|
curr.UpdateStats(&csCurr)
|
||||||
|
csPrev.Reset()
|
||||||
|
prev.UpdateStats(&csPrev)
|
||||||
|
|
||||||
c.prev.Store(curr)
|
c.prev.Store(curr)
|
||||||
|
|
||||||
var cs fastcache.Stats
|
prevGetCalls := csCurr.GetCalls
|
||||||
prev.UpdateStats(&cs)
|
updateCacheStatsHistory(&c.csHistory, &csPrev)
|
||||||
updateCacheStatsHistory(&c.csHistory, &cs)
|
|
||||||
|
|
||||||
prev.Reset()
|
prev.Reset()
|
||||||
c.curr.Store(prev)
|
c.curr.Store(prev)
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
// Watch for the usage of the prev cache and drop it whenever it receives
|
||||||
|
// less than 5% of get calls comparing to the curr cache.
|
||||||
|
// This allows saving memory.
|
||||||
|
prev, curr = curr, prev
|
||||||
|
checkInterval := 10 * time.Second
|
||||||
|
checkerT := time.NewTicker(checkInterval)
|
||||||
|
checkerDeadline := time.Now().Add(expireDuration - checkInterval)
|
||||||
|
for time.Now().Before(checkerDeadline) {
|
||||||
|
select {
|
||||||
|
case <-c.stopCh:
|
||||||
|
break
|
||||||
|
case <-checkerT.C:
|
||||||
|
}
|
||||||
|
c.mu.Lock()
|
||||||
|
if atomic.LoadUint32(&c.mode) != split {
|
||||||
|
// Do nothing in non-split mode.
|
||||||
|
c.mu.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
csCurr.Reset()
|
||||||
|
curr.UpdateStats(&csCurr)
|
||||||
|
csPrev.Reset()
|
||||||
|
prev.UpdateStats(&csPrev)
|
||||||
|
getCalls := csPrev.GetCalls - prevGetCalls
|
||||||
|
if float64(getCalls) < 0.05*float64(csCurr.GetCalls) {
|
||||||
|
// The majority of requests are served from the curr cache,
|
||||||
|
// so the prev cache can be deleted.
|
||||||
|
prev.Reset()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkerT.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user