diff --git a/lib/blockcache/blockcache.go b/lib/blockcache/blockcache.go index f7801081f..1737e0f3c 100644 --- a/lib/blockcache/blockcache.go +++ b/lib/blockcache/blockcache.go @@ -1,6 +1,7 @@ package blockcache import ( + "container/heap" "sync" "sync/atomic" "time" @@ -135,7 +136,7 @@ func (c *Cache) Misses() uint64 { func (c *Cache) cleaner() { ticker := time.NewTicker(57 * time.Second) defer ticker.Stop() - perKeyMissesTicker := time.NewTicker(7 * time.Minute) + perKeyMissesTicker := time.NewTicker(3 * time.Minute) defer perKeyMissesTicker.Stop() for { select { @@ -176,7 +177,7 @@ type cache struct { getMaxSizeBytes func() int // mu protects all the fields below. - mu sync.RWMutex + mu sync.Mutex // m contains cached blocks keyed by Key.Part and then by Key.Offset m map[interface{}]map[uint64]*cacheEntry @@ -185,6 +186,9 @@ type cache struct { // // Blocks with less than 2 cache misses aren't stored in the cache in order to prevent from eviction for frequently accessed items. perKeyMisses map[Key]int + + // The heap for removing the least recently used entries from m. + lah lastAccessHeap } // Key represents a key, which uniquely identifies the Block. @@ -208,13 +212,17 @@ type Block interface { } type cacheEntry struct { - // Atomically updated fields must go first in the struct, so they are properly - // aligned to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + // The timestamp in seconds for the last access to the given entry. lastAccessTime uint64 - // block contains the cached block. - block Block + // heapIdx is the index for the entry in lastAccessHeap. + heapIdx int + + // k contains the associated key for the given block. + k Key + + // b contains the cached block. + b Block } func newCache(getMaxSizeBytes func() int) *cache { @@ -227,14 +235,16 @@ func newCache(getMaxSizeBytes func() int) *cache { func (c *cache) RemoveBlocksForPart(p interface{}) { c.mu.Lock() + defer c.mu.Unlock() + sizeBytes := 0 for _, e := range c.m[p] { - sizeBytes += e.block.SizeBytes() + sizeBytes += e.b.SizeBytes() + heap.Remove(&c.lah, e.heapIdx) // do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later. } c.updateSizeBytes(-sizeBytes) delete(c.m, p) - c.mu.Unlock() } func (c *cache) updateSizeBytes(n int) { @@ -248,55 +258,56 @@ func (c *cache) cleanPerKeyMisses() { } func (c *cache) cleanByTimeout() { - // Delete items accessed more than five minutes ago. + // Delete items accessed more than three minutes ago. // This time should be enough for repeated queries. - lastAccessTime := fasttime.UnixTimestamp() - 5*60 + lastAccessTime := fasttime.UnixTimestamp() - 3*60 c.mu.Lock() defer c.mu.Unlock() - for _, pes := range c.m { - for offset, e := range pes { - if lastAccessTime > atomic.LoadUint64(&e.lastAccessTime) { - c.updateSizeBytes(-e.block.SizeBytes()) - delete(pes, offset) - // do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later. - } + for len(c.lah) > 0 { + e := c.lah[0] + if lastAccessTime < e.lastAccessTime { + break } + c.updateSizeBytes(-e.b.SizeBytes()) + pes := c.m[e.k.Part] + delete(pes, e.k.Offset) + heap.Pop(&c.lah) } } func (c *cache) GetBlock(k Key) Block { atomic.AddUint64(&c.requests, 1) var e *cacheEntry - c.mu.RLock() + c.mu.Lock() + defer c.mu.Unlock() + pes := c.m[k.Part] if pes != nil { e = pes[k.Offset] - } - c.mu.RUnlock() - if e != nil { - // Fast path - the block already exists in the cache, so return it to the caller. - currentTime := fasttime.UnixTimestamp() - if atomic.LoadUint64(&e.lastAccessTime) != currentTime { - atomic.StoreUint64(&e.lastAccessTime, currentTime) + if e != nil { + // Fast path - the block already exists in the cache, so return it to the caller. + currentTime := fasttime.UnixTimestamp() + if e.lastAccessTime != currentTime { + e.lastAccessTime = currentTime + heap.Fix(&c.lah, e.heapIdx) + } + return e.b } - return e.block } // Slow path - the entry is missing in the cache. - c.mu.Lock() c.perKeyMisses[k]++ - c.mu.Unlock() atomic.AddUint64(&c.misses, 1) return nil } func (c *cache) PutBlock(k Key, b Block) { - c.mu.RLock() + c.mu.Lock() + defer c.mu.Unlock() // If the entry wasn't accessed yet (e.g. c.perKeyMisses[k] == 0), then cache it, since it is likely it will be accessed soon. // Do not cache the entry only if there was only a single unsuccessful attempt to access it. // This may be one-time-wonders entry, which won't be accessed more, so there is no need in caching it. doNotCache := c.perKeyMisses[k] == 1 - c.mu.RUnlock() if doNotCache { // Do not cache b if it has been requested only once (aka one-time-wonders items). // This should reduce memory usage for the cache. @@ -304,9 +315,6 @@ func (c *cache) PutBlock(k Key, b Block) { } // Store b in the cache. - c.mu.Lock() - defer c.mu.Unlock() - pes := c.m[k.Part] if pes == nil { pes = make(map[uint64]*cacheEntry) @@ -317,33 +325,30 @@ func (c *cache) PutBlock(k Key, b Block) { } e := &cacheEntry{ lastAccessTime: fasttime.UnixTimestamp(), - block: b, + k: k, + b: b, } + heap.Push(&c.lah, e) pes[k.Offset] = e - c.updateSizeBytes(e.block.SizeBytes()) + c.updateSizeBytes(e.b.SizeBytes()) maxSizeBytes := c.getMaxSizeBytes() - if c.SizeBytes() > maxSizeBytes { - // Entries in the cache occupy too much space. Free up space by deleting some entries. - for _, pes := range c.m { - for offset, e := range pes { - c.updateSizeBytes(-e.block.SizeBytes()) - delete(pes, offset) - // do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later. - if c.SizeBytes() < maxSizeBytes { - return - } - } - } + for c.SizeBytes() > maxSizeBytes && len(c.lah) > 0 { + e := c.lah[0] + c.updateSizeBytes(-e.b.SizeBytes()) + pes := c.m[e.k.Part] + delete(pes, e.k.Offset) + heap.Pop(&c.lah) } } func (c *cache) Len() int { - c.mu.RLock() + c.mu.Lock() + defer c.mu.Unlock() + n := 0 for _, m := range c.m { n += len(m) } - c.mu.RUnlock() return n } @@ -362,3 +367,35 @@ func (c *cache) Requests() uint64 { func (c *cache) Misses() uint64 { return atomic.LoadUint64(&c.misses) } + +// lastAccessHeap implements heap.Interface +type lastAccessHeap []*cacheEntry + +func (lah *lastAccessHeap) Len() int { + return len(*lah) +} +func (lah *lastAccessHeap) Swap(i, j int) { + h := *lah + a := h[i] + b := h[j] + a.heapIdx = j + b.heapIdx = i + h[i] = b + h[j] = a +} +func (lah *lastAccessHeap) Less(i, j int) bool { + h := *lah + return h[i].lastAccessTime < h[j].lastAccessTime +} +func (lah *lastAccessHeap) Push(x interface{}) { + e := x.(*cacheEntry) + h := *lah + e.heapIdx = len(h) + *lah = append(h, e) +} +func (lah *lastAccessHeap) Pop() interface{} { + h := *lah + e := h[len(h)-1] + *lah = h[:len(h)-1] + return e +}