VictoriaMetrics/lib/blockcache/blockcache.go
2022-02-08 15:42:31 +02:00

233 lines
6.3 KiB
Go

package blockcache
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// Cache caches Block entries.
//
// Call NewCache() for creating new Cache.
type Cache struct {
// Atomically updated fields must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
// sizeBytes contains an approximate size for all the blocks stored in the cache.
sizeBytes int64
// getMaxSizeBytes() is a callback, which returns the maximum allowed cache size in bytes.
getMaxSizeBytes func() int
// mu protects all the fields below.
mu sync.RWMutex
// m contains cached blocks keyed by Key.Part and then by Key.Offset
m map[interface{}]map[uint64]*cacheEntry
// perKeyMisses contains per-block cache misses.
//
// Blocks with less than 2 cache misses aren't stored in the cache in order to prevent from eviction for frequently accessed items.
perKeyMisses map[Key]int
}
// Key represents a key, which uniquely identifies the Block.
type Key struct {
// Part must contain a pointer to part structure where the block belongs to.
Part interface{}
// Offset is the offset of the block in the part.
Offset uint64
}
// Block is an item, which may be cached in the Cache.
type Block interface {
// SizeBytes must return the approximate size of the given block in bytes
SizeBytes() int
}
type cacheEntry struct {
// Atomically updated fields must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
// block contains the cached block.
block Block
}
// NewCache creates new cache.
//
// Cache size in bytes is limited by the value returned by getMaxSizeBytes() callback.
func NewCache(getMaxSizeBytes func() int) *Cache {
var c Cache
c.getMaxSizeBytes = getMaxSizeBytes
c.m = make(map[interface{}]map[uint64]*cacheEntry)
c.perKeyMisses = make(map[Key]int)
go c.cleaner()
return &c
}
// RemoveBlocksForPart removes all the blocks for the given part from the cache.
func (c *Cache) RemoveBlocksForPart(p interface{}) {
c.mu.Lock()
sizeBytes := 0
for _, e := range c.m[p] {
sizeBytes += e.block.SizeBytes()
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
}
c.updateSizeBytes(-sizeBytes)
delete(c.m, p)
c.mu.Unlock()
}
func (c *Cache) updateSizeBytes(n int) {
atomic.AddInt64(&c.sizeBytes, int64(n))
}
// cleaner periodically cleans least recently used entries in c.
func (c *Cache) cleaner() {
ticker := time.NewTicker(57 * time.Second)
defer ticker.Stop()
perKeyMissesTicker := time.NewTicker(2 * time.Minute)
defer perKeyMissesTicker.Stop()
for {
select {
case <-ticker.C:
c.cleanByTimeout()
case <-perKeyMissesTicker.C:
c.mu.Lock()
c.perKeyMisses = make(map[Key]int, len(c.perKeyMisses))
c.mu.Unlock()
}
}
}
func (c *Cache) cleanByTimeout() {
// Delete items accessed more than five minutes ago.
// This time should be enough for repeated queries.
lastAccessTime := fasttime.UnixTimestamp() - 5*60
c.mu.Lock()
defer c.mu.Unlock()
for _, pes := range c.m {
for offset, e := range pes {
if lastAccessTime > atomic.LoadUint64(&e.lastAccessTime) {
c.updateSizeBytes(-e.block.SizeBytes())
delete(pes, offset)
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
}
}
}
}
// GetBlock returns a block for the given key k from c.
func (c *Cache) GetBlock(k Key) Block {
atomic.AddUint64(&c.requests, 1)
var e *cacheEntry
c.mu.RLock()
pes := c.m[k.Part]
if pes != nil {
e = pes[k.Offset]
}
c.mu.RUnlock()
if e != nil {
// Fast path - the block already exists in the cache, so return it to the caller.
currentTime := fasttime.UnixTimestamp()
if atomic.LoadUint64(&e.lastAccessTime) != currentTime {
atomic.StoreUint64(&e.lastAccessTime, currentTime)
}
return e.block
}
// Slow path - the entry is missing in the cache.
c.mu.Lock()
c.perKeyMisses[k]++
c.mu.Unlock()
atomic.AddUint64(&c.misses, 1)
return nil
}
// PutBlock puts the given block b under the given key k into c.
func (c *Cache) PutBlock(k Key, b Block) {
c.mu.RLock()
// If the entry wasn't accessed yet (e.g. c.perKeyMisses[k] == 0), then cache it, since it is likely it will be accessed soon.
// Do not cache the entry only if there was only a single unsuccessful attempt to access it.
// This may be one-time-wonders entry, which won't be accessed more, so there is no need in caching it.
doNotCache := c.perKeyMisses[k] == 1
c.mu.RUnlock()
if doNotCache {
// Do not cache b if it has been requested only once (aka one-time-wonders items).
// This should reduce memory usage for the cache.
return
}
// Store b in the cache.
c.mu.Lock()
defer c.mu.Unlock()
pes := c.m[k.Part]
if pes == nil {
pes = make(map[uint64]*cacheEntry)
c.m[k.Part] = pes
} else if pes[k.Offset] != nil {
// The block has been already registered by concurrent goroutine.
return
}
e := &cacheEntry{
lastAccessTime: fasttime.UnixTimestamp(),
block: b,
}
pes[k.Offset] = e
c.updateSizeBytes(e.block.SizeBytes())
maxSizeBytes := c.getMaxSizeBytes()
if c.SizeBytes() > maxSizeBytes {
// Entries in the cache occupy too much space. Free up space by deleting some entries.
for _, pes := range c.m {
for offset, e := range pes {
c.updateSizeBytes(-e.block.SizeBytes())
delete(pes, offset)
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
if c.SizeBytes() < maxSizeBytes {
return
}
}
}
}
}
// Len returns the number of blocks in the cache c.
func (c *Cache) Len() int {
c.mu.RLock()
n := 0
for _, m := range c.m {
n += len(m)
}
c.mu.RUnlock()
return n
}
// SizeBytes returns an approximate size in bytes of all the blocks stored in the cache c.
func (c *Cache) SizeBytes() int {
return int(atomic.LoadInt64(&c.sizeBytes))
}
// SizeMaxBytes returns the max allowed size in bytes for c.
func (c *Cache) SizeMaxBytes() int {
return c.getMaxSizeBytes()
}
// Requests returns the number of requests served by c.
func (c *Cache) Requests() uint64 {
return atomic.LoadUint64(&c.requests)
}
// Misses returns the number of cache misses for c.
func (c *Cache) Misses() uint64 {
return atomic.LoadUint64(&c.misses)
}