mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
lib/blockcache: eliminate possible race when Cache.Put is called for the same entry from multiple goroutines
The race could result in incorrect cache size tracking, which, in turn, could result in too frequent cache cleaning
This commit is contained in:
parent
46bd2c4d6d
commit
d0f785defd
@ -109,20 +109,21 @@ func (c *Cache) cleaner() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cache) cleanByTimeout() {
|
func (c *Cache) cleanByTimeout() {
|
||||||
currentTime := fasttime.UnixTimestamp()
|
// Delete items accessed more than five minutes ago.
|
||||||
|
// This time should be enough for repeated queries.
|
||||||
|
lastAccessTime := fasttime.UnixTimestamp()-5*60
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
for _, pes := range c.m {
|
for _, pes := range c.m {
|
||||||
for offset, e := range pes {
|
for offset, e := range pes {
|
||||||
// Delete items accessed more than five minutes ago.
|
if lastAccessTime > atomic.LoadUint64(&e.lastAccessTime) {
|
||||||
// This time should be enough for repeated queries.
|
|
||||||
if currentTime-atomic.LoadUint64(&e.lastAccessTime) > 5*60 {
|
|
||||||
c.updateSizeBytes(-e.block.SizeBytes())
|
c.updateSizeBytes(-e.block.SizeBytes())
|
||||||
delete(pes, offset)
|
delete(pes, offset)
|
||||||
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
|
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBlock returns a block for the given key k from c.
|
// GetBlock returns a block for the given key k from c.
|
||||||
@ -167,14 +168,19 @@ func (c *Cache) PutBlock(k Key, b Block) {
|
|||||||
|
|
||||||
// Store b in the cache.
|
// Store b in the cache.
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
e := &cacheEntry{
|
defer c.mu.Unlock()
|
||||||
lastAccessTime: fasttime.UnixTimestamp(),
|
|
||||||
block: b,
|
|
||||||
}
|
|
||||||
pes := c.m[k.Part]
|
pes := c.m[k.Part]
|
||||||
if pes == nil {
|
if pes == nil {
|
||||||
pes = make(map[uint64]*cacheEntry)
|
pes = make(map[uint64]*cacheEntry)
|
||||||
c.m[k.Part] = pes
|
c.m[k.Part] = pes
|
||||||
|
} else if pes[k.Offset] != nil {
|
||||||
|
// The block has been already registered by concurrent goroutine.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e := &cacheEntry{
|
||||||
|
lastAccessTime: fasttime.UnixTimestamp(),
|
||||||
|
block: b,
|
||||||
}
|
}
|
||||||
pes[k.Offset] = e
|
pes[k.Offset] = e
|
||||||
c.updateSizeBytes(e.block.SizeBytes())
|
c.updateSizeBytes(e.block.SizeBytes())
|
||||||
@ -187,13 +193,11 @@ func (c *Cache) PutBlock(k Key, b Block) {
|
|||||||
delete(pes, offset)
|
delete(pes, offset)
|
||||||
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
|
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
|
||||||
if c.SizeBytes() < maxSizeBytes {
|
if c.SizeBytes() < maxSizeBytes {
|
||||||
goto end
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
end:
|
|
||||||
c.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len returns the number of blocks in the cache c.
|
// Len returns the number of blocks in the cache c.
|
||||||
|
Loading…
Reference in New Issue
Block a user