mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
lib/workingsetcache: use per-bucket stats counters instead of global stats counters for cache hits/misses
This should improve cache scalability on systems with many CPU cores.
This commit is contained in:
parent
f22bea242f
commit
42cda38dbc
@ -27,8 +27,8 @@ type Cache struct {
|
|||||||
curr atomic.Value
|
curr atomic.Value
|
||||||
prev atomic.Value
|
prev atomic.Value
|
||||||
|
|
||||||
// cs holds cache stats
|
// csHistory holds cache stats history
|
||||||
cs fastcache.Stats
|
csHistory fastcache.Stats
|
||||||
|
|
||||||
// mode indicates whether to use only curr and skip prev.
|
// mode indicates whether to use only curr and skip prev.
|
||||||
//
|
//
|
||||||
@ -148,7 +148,13 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
|
|||||||
// Reset prev cache and swap it with the curr cache.
|
// Reset prev cache and swap it with the curr cache.
|
||||||
prev := c.prev.Load().(*fastcache.Cache)
|
prev := c.prev.Load().(*fastcache.Cache)
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
|
|
||||||
c.prev.Store(curr)
|
c.prev.Store(curr)
|
||||||
|
|
||||||
|
var cs fastcache.Stats
|
||||||
|
prev.UpdateStats(&cs)
|
||||||
|
updateCacheStatsHistory(&c.csHistory, &cs)
|
||||||
|
|
||||||
prev.Reset()
|
prev.Reset()
|
||||||
c.curr.Store(prev)
|
c.curr.Store(prev)
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
@ -197,6 +203,11 @@ func (c *Cache) cacheSizeWatcher() {
|
|||||||
prev := c.prev.Load().(*fastcache.Cache)
|
prev := c.prev.Load().(*fastcache.Cache)
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
c.prev.Store(curr)
|
c.prev.Store(curr)
|
||||||
|
|
||||||
|
var cs fastcache.Stats
|
||||||
|
prev.UpdateStats(&cs)
|
||||||
|
updateCacheStatsHistory(&c.csHistory, &cs)
|
||||||
|
|
||||||
prev.Reset()
|
prev.Reset()
|
||||||
// use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache
|
// use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache
|
||||||
// couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit.
|
// couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit.
|
||||||
@ -221,6 +232,11 @@ func (c *Cache) cacheSizeWatcher() {
|
|||||||
c.setMode(whole)
|
c.setMode(whole)
|
||||||
prev = c.prev.Load().(*fastcache.Cache)
|
prev = c.prev.Load().(*fastcache.Cache)
|
||||||
c.prev.Store(fastcache.New(1024))
|
c.prev.Store(fastcache.New(1024))
|
||||||
|
|
||||||
|
cs.Reset()
|
||||||
|
prev.UpdateStats(&cs)
|
||||||
|
updateCacheStatsHistory(&c.csHistory, &cs)
|
||||||
|
|
||||||
prev.Reset()
|
prev.Reset()
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
@ -262,30 +278,43 @@ func (c *Cache) loadMode() int {
|
|||||||
|
|
||||||
// UpdateStats updates fcs with cache stats.
|
// UpdateStats updates fcs with cache stats.
|
||||||
func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
|
func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
|
||||||
|
updateCacheStatsHistory(fcs, &c.csHistory)
|
||||||
|
|
||||||
var cs fastcache.Stats
|
var cs fastcache.Stats
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
curr.UpdateStats(&cs)
|
curr.UpdateStats(&cs)
|
||||||
fcs.Collisions += cs.Collisions
|
updateCacheStats(fcs, &cs)
|
||||||
fcs.Corruptions += cs.Corruptions
|
|
||||||
fcs.EntriesCount += cs.EntriesCount
|
|
||||||
fcs.BytesSize += cs.BytesSize
|
|
||||||
fcs.MaxBytesSize += cs.MaxBytesSize
|
|
||||||
|
|
||||||
fcs.GetCalls += atomic.LoadUint64(&c.cs.GetCalls)
|
|
||||||
fcs.SetCalls += atomic.LoadUint64(&c.cs.SetCalls)
|
|
||||||
fcs.Misses += atomic.LoadUint64(&c.cs.Misses)
|
|
||||||
|
|
||||||
prev := c.prev.Load().(*fastcache.Cache)
|
prev := c.prev.Load().(*fastcache.Cache)
|
||||||
cs.Reset()
|
cs.Reset()
|
||||||
prev.UpdateStats(&cs)
|
prev.UpdateStats(&cs)
|
||||||
fcs.EntriesCount += cs.EntriesCount
|
updateCacheStats(fcs, &cs)
|
||||||
fcs.BytesSize += cs.BytesSize
|
}
|
||||||
fcs.MaxBytesSize += cs.MaxBytesSize
|
|
||||||
|
func updateCacheStats(dst, src *fastcache.Stats) {
|
||||||
|
dst.GetCalls += src.GetCalls
|
||||||
|
dst.SetCalls += src.SetCalls
|
||||||
|
dst.Misses += src.Misses
|
||||||
|
dst.Collisions += src.Collisions
|
||||||
|
dst.Corruptions += src.Corruptions
|
||||||
|
dst.EntriesCount += src.EntriesCount
|
||||||
|
dst.BytesSize += src.BytesSize
|
||||||
|
dst.MaxBytesSize += src.MaxBytesSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateCacheStatsHistory(dst, src *fastcache.Stats) {
|
||||||
|
atomic.AddUint64(&dst.GetCalls, atomic.LoadUint64(&src.GetCalls))
|
||||||
|
atomic.AddUint64(&dst.SetCalls, atomic.LoadUint64(&src.SetCalls))
|
||||||
|
atomic.AddUint64(&dst.Misses, atomic.LoadUint64(&src.Misses))
|
||||||
|
atomic.AddUint64(&dst.Collisions, atomic.LoadUint64(&src.Collisions))
|
||||||
|
atomic.AddUint64(&dst.Corruptions, atomic.LoadUint64(&src.Corruptions))
|
||||||
|
|
||||||
|
// Do not add EntriesCount, BytesSize and MaxBytesSize, since these metrics
|
||||||
|
// are calculated from c.curr and c.prev caches.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get appends the found value for the given key to dst and returns the result.
|
// Get appends the found value for the given key to dst and returns the result.
|
||||||
func (c *Cache) Get(dst, key []byte) []byte {
|
func (c *Cache) Get(dst, key []byte) []byte {
|
||||||
atomic.AddUint64(&c.cs.GetCalls, 1)
|
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
result := curr.Get(dst, key)
|
result := curr.Get(dst, key)
|
||||||
if len(result) > len(dst) {
|
if len(result) > len(dst) {
|
||||||
@ -294,7 +323,6 @@ func (c *Cache) Get(dst, key []byte) []byte {
|
|||||||
}
|
}
|
||||||
if c.loadMode() == whole {
|
if c.loadMode() == whole {
|
||||||
// Nothing found.
|
// Nothing found.
|
||||||
atomic.AddUint64(&c.cs.Misses, 1)
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -303,7 +331,6 @@ func (c *Cache) Get(dst, key []byte) []byte {
|
|||||||
result = prev.Get(dst, key)
|
result = prev.Get(dst, key)
|
||||||
if len(result) <= len(dst) {
|
if len(result) <= len(dst) {
|
||||||
// Nothing found.
|
// Nothing found.
|
||||||
atomic.AddUint64(&c.cs.Misses, 1)
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
// Cache the found entry in the current cache.
|
// Cache the found entry in the current cache.
|
||||||
@ -313,18 +340,15 @@ func (c *Cache) Get(dst, key []byte) []byte {
|
|||||||
|
|
||||||
// Has verifies whether the cache contains the given key.
|
// Has verifies whether the cache contains the given key.
|
||||||
func (c *Cache) Has(key []byte) bool {
|
func (c *Cache) Has(key []byte) bool {
|
||||||
atomic.AddUint64(&c.cs.GetCalls, 1)
|
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
if curr.Has(key) {
|
if curr.Has(key) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if c.loadMode() == whole {
|
if c.loadMode() == whole {
|
||||||
atomic.AddUint64(&c.cs.Misses, 1)
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
prev := c.prev.Load().(*fastcache.Cache)
|
prev := c.prev.Load().(*fastcache.Cache)
|
||||||
if !prev.Has(key) {
|
if !prev.Has(key) {
|
||||||
atomic.AddUint64(&c.cs.Misses, 1)
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
// Cache the found entry in the current cache.
|
// Cache the found entry in the current cache.
|
||||||
@ -339,14 +363,12 @@ var tmpBufPool bytesutil.ByteBufferPool
|
|||||||
|
|
||||||
// Set sets the given value for the given key.
|
// Set sets the given value for the given key.
|
||||||
func (c *Cache) Set(key, value []byte) {
|
func (c *Cache) Set(key, value []byte) {
|
||||||
atomic.AddUint64(&c.cs.SetCalls, 1)
|
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
curr.Set(key, value)
|
curr.Set(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBig appends the found value for the given key to dst and returns the result.
|
// GetBig appends the found value for the given key to dst and returns the result.
|
||||||
func (c *Cache) GetBig(dst, key []byte) []byte {
|
func (c *Cache) GetBig(dst, key []byte) []byte {
|
||||||
atomic.AddUint64(&c.cs.GetCalls, 1)
|
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
result := curr.GetBig(dst, key)
|
result := curr.GetBig(dst, key)
|
||||||
if len(result) > len(dst) {
|
if len(result) > len(dst) {
|
||||||
@ -355,7 +377,6 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
|
|||||||
}
|
}
|
||||||
if c.loadMode() == whole {
|
if c.loadMode() == whole {
|
||||||
// Nothing found.
|
// Nothing found.
|
||||||
atomic.AddUint64(&c.cs.Misses, 1)
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -364,7 +385,6 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
|
|||||||
result = prev.GetBig(dst, key)
|
result = prev.GetBig(dst, key)
|
||||||
if len(result) <= len(dst) {
|
if len(result) <= len(dst) {
|
||||||
// Nothing found.
|
// Nothing found.
|
||||||
atomic.AddUint64(&c.cs.Misses, 1)
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
// Cache the found entry in the current cache.
|
// Cache the found entry in the current cache.
|
||||||
@ -374,7 +394,6 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
|
|||||||
|
|
||||||
// SetBig sets the given value for the given key.
|
// SetBig sets the given value for the given key.
|
||||||
func (c *Cache) SetBig(key, value []byte) {
|
func (c *Cache) SetBig(key, value []byte) {
|
||||||
atomic.AddUint64(&c.cs.SetCalls, 1)
|
|
||||||
curr := c.curr.Load().(*fastcache.Cache)
|
curr := c.curr.Load().(*fastcache.Cache)
|
||||||
curr.SetBig(key, value)
|
curr.SetBig(key, value)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user