diff --git a/lib/blockcache/blockcache.go b/lib/blockcache/blockcache.go index 8c8cafc56..f7801081f 100644 --- a/lib/blockcache/blockcache.go +++ b/lib/blockcache/blockcache.go @@ -4,9 +4,11 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + xxhash "github.com/cespare/xxhash/v2" ) // Cache caches Block entries. @@ -14,13 +16,25 @@ import ( // Call NewCache() for creating new Cache. type Cache struct { shards []*cache + + cleanerMustStopCh chan struct{} + cleanerStoppedCh chan struct{} } // NewCache creates new cache. // // Cache size in bytes is limited by the value returned by getMaxSizeBytes() callback. +// Call MustStop() in order to free up resources occupied by Cache. func NewCache(getMaxSizeBytes func() int) *Cache { + cpusCount := cgroup.AvailableCPUs() shardsCount := cgroup.AvailableCPUs() + // Increase the number of shards with the increased number of available CPU cores. + // This should reduce contention on per-shard mutexes. + multiplier := cpusCount + if multiplier > 16 { + multiplier = 16 + } + shardsCount *= multiplier shards := make([]*cache, shardsCount) getMaxShardBytes := func() int { n := getMaxSizeBytes() @@ -29,9 +43,19 @@ func NewCache(getMaxSizeBytes func() int) *Cache { for i := range shards { shards[i] = newCache(getMaxShardBytes) } - return &Cache{ - shards: shards, + c := &Cache{ + shards: shards, + cleanerMustStopCh: make(chan struct{}), + cleanerStoppedCh: make(chan struct{}), } + go c.cleaner() + return c +} + +// MustStop frees up resources occupied by c. +func (c *Cache) MustStop() { + close(c.cleanerMustStopCh) + <-c.cleanerStoppedCh } // RemoveBlocksForPart removes all the blocks for the given part from the cache. @@ -43,16 +67,22 @@ func (c *Cache) RemoveBlocksForPart(p interface{}) { // GetBlock returns a block for the given key k from c. func (c *Cache) GetBlock(k Key) Block { - h := fastHashUint64(k.Offset) - idx := h % uint64(len(c.shards)) + idx := uint64(0) + if len(c.shards) > 1 { + h := k.hashUint64() + idx = h % uint64(len(c.shards)) + } shard := c.shards[idx] return shard.GetBlock(k) } // PutBlock puts the given block b under the given key k into c. func (c *Cache) PutBlock(k Key, b Block) { - h := fastHashUint64(k.Offset) - idx := h % uint64(len(c.shards)) + idx := uint64(0) + if len(c.shards) > 1 { + h := k.hashUint64() + idx = h % uint64(len(c.shards)) + } shard := c.shards[idx] shard.PutBlock(k, b) } @@ -102,11 +132,34 @@ func (c *Cache) Misses() uint64 { return n } -func fastHashUint64(x uint64) uint64 { - x ^= x >> 12 // a - x ^= x << 25 // b - x ^= x >> 27 // c - return x * 2685821657736338717 +func (c *Cache) cleaner() { + ticker := time.NewTicker(57 * time.Second) + defer ticker.Stop() + perKeyMissesTicker := time.NewTicker(7 * time.Minute) + defer perKeyMissesTicker.Stop() + for { + select { + case <-c.cleanerMustStopCh: + close(c.cleanerStoppedCh) + return + case <-ticker.C: + c.cleanByTimeout() + case <-perKeyMissesTicker.C: + c.cleanPerKeyMisses() + } + } +} + +func (c *Cache) cleanByTimeout() { + for _, shard := range c.shards { + shard.cleanByTimeout() + } +} + +func (c *Cache) cleanPerKeyMisses() { + for _, shard := range c.shards { + shard.cleanPerKeyMisses() + } } type cache struct { @@ -143,6 +196,11 @@ type Key struct { Offset uint64 } +func (k *Key) hashUint64() uint64 { + buf := (*[unsafe.Sizeof(*k)]byte)(unsafe.Pointer(k)) + return xxhash.Sum64(buf[:]) +} + // Block is an item, which may be cached in the Cache. type Block interface { // SizeBytes must return the approximate size of the given block in bytes @@ -164,7 +222,6 @@ func newCache(getMaxSizeBytes func() int) *cache { c.getMaxSizeBytes = getMaxSizeBytes c.m = make(map[interface{}]map[uint64]*cacheEntry) c.perKeyMisses = make(map[Key]int) - go c.cleaner() return &c } @@ -184,21 +241,10 @@ func (c *cache) updateSizeBytes(n int) { atomic.AddInt64(&c.sizeBytes, int64(n)) } -func (c *cache) cleaner() { - ticker := time.NewTicker(57 * time.Second) - defer ticker.Stop() - perKeyMissesTicker := time.NewTicker(2 * time.Minute) - defer perKeyMissesTicker.Stop() - for { - select { - case <-ticker.C: - c.cleanByTimeout() - case <-perKeyMissesTicker.C: - c.mu.Lock() - c.perKeyMisses = make(map[Key]int, len(c.perKeyMisses)) - c.mu.Unlock() - } - } +func (c *cache) cleanPerKeyMisses() { + c.mu.Lock() + c.perKeyMisses = make(map[Key]int, len(c.perKeyMisses)) + c.mu.Unlock() } func (c *cache) cleanByTimeout() { diff --git a/lib/blockcache/blockcache_test.go b/lib/blockcache/blockcache_test.go new file mode 100644 index 000000000..adbb4a760 --- /dev/null +++ b/lib/blockcache/blockcache_test.go @@ -0,0 +1,159 @@ +package blockcache + +import ( + "fmt" + "sync" + "testing" +) + +func TestCache(t *testing.T) { + const sizeMaxBytes = 1024 * 1024 + getMaxSize := func() int { + return sizeMaxBytes + } + c := NewCache(getMaxSize) + defer c.MustStop() + if n := c.SizeBytes(); n != 0 { + t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0) + } + if n := c.SizeMaxBytes(); n != sizeMaxBytes { + t.Fatalf("unexpected SizeMaxBytes(); got %d; want %d", n, sizeMaxBytes) + } + offset := uint64(1234) + part := (interface{})("foobar") + k := Key{ + Offset: offset, + Part: part, + } + var b testBlock + blockSize := b.SizeBytes() + // Put a single entry into cache + c.PutBlock(k, &b) + if n := c.Len(); n != 1 { + t.Fatalf("unexpected number of items in the cache; got %d; want %d", n, 1) + } + if n := c.SizeBytes(); n != blockSize { + t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, blockSize) + } + if n := c.Requests(); n != 0 { + t.Fatalf("unexpected number of requests; got %d; want %d", n, 0) + } + if n := c.Misses(); n != 0 { + t.Fatalf("unexpected number of misses; got %d; want %d", n, 0) + } + // Obtain this entry from the cache + if b1 := c.GetBlock(k); b1 != &b { + t.Fatalf("unexpected block obtained; got %v; want %v", b1, &b) + } + if n := c.Requests(); n != 1 { + t.Fatalf("unexpected number of requests; got %d; want %d", n, 1) + } + if n := c.Misses(); n != 0 { + t.Fatalf("unexpected number of misses; got %d; want %d", n, 0) + } + // Obtain non-existing entry from the cache + if b1 := c.GetBlock(Key{Offset: offset + 1}); b1 != nil { + t.Fatalf("unexpected non-nil block obtained for non-existing key: %v", b1) + } + if n := c.Requests(); n != 2 { + t.Fatalf("unexpected number of requests; got %d; want %d", n, 2) + } + if n := c.Misses(); n != 1 { + t.Fatalf("unexpected number of misses; got %d; want %d", n, 1) + } + // Remove entries for the given part from the cache + c.RemoveBlocksForPart(part) + if n := c.SizeBytes(); n != 0 { + t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0) + } + // Verify that the entry has been removed from the cache + if b1 := c.GetBlock(k); b1 != nil { + t.Fatalf("unexpected non-nil block obtained after removing all the blocks for the part; got %v", b1) + } + if n := c.Requests(); n != 3 { + t.Fatalf("unexpected number of requests; got %d; want %d", n, 3) + } + if n := c.Misses(); n != 2 { + t.Fatalf("unexpected number of misses; got %d; want %d", n, 2) + } + // Store the missed entry to the cache. It shouldn't be stored because of the previous cache miss + c.PutBlock(k, &b) + if n := c.SizeBytes(); n != 0 { + t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0) + } + // Verify that the entry wasn't stored to the cache. + if b1 := c.GetBlock(k); b1 != nil { + t.Fatalf("unexpected non-nil block obtained after removing all the blocks for the part; got %v", b1) + } + if n := c.Requests(); n != 4 { + t.Fatalf("unexpected number of requests; got %d; want %d", n, 4) + } + if n := c.Misses(); n != 3 { + t.Fatalf("unexpected number of misses; got %d; want %d", n, 3) + } + // Store the entry again. Now it must be stored because of the second cache miss. + c.PutBlock(k, &b) + if n := c.SizeBytes(); n != blockSize { + t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, blockSize) + } + if b1 := c.GetBlock(k); b1 != &b { + t.Fatalf("unexpected block obtained; got %v; want %v", b1, &b) + } + if n := c.Requests(); n != 5 { + t.Fatalf("unexpected number of requests; got %d; want %d", n, 5) + } + if n := c.Misses(); n != 3 { + t.Fatalf("unexpected number of misses; got %d; want %d", n, 3) + } + + // Manually clean the cache. The entry shouldn't be deleted because it was recently accessed. + c.cleanPerKeyMisses() + c.cleanByTimeout() + if n := c.SizeBytes(); n != blockSize { + t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, blockSize) + } +} + +func TestCacheConcurrentAccess(t *testing.T) { + const sizeMaxBytes = 16 * 1024 * 1024 + getMaxSize := func() int { + return sizeMaxBytes + } + c := NewCache(getMaxSize) + defer c.MustStop() + + workers := 5 + var wg sync.WaitGroup + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer wg.Done() + testCacheSetGet(c) + }() + } + wg.Wait() +} + +func testCacheSetGet(c *Cache) { + for i := 0; i < 1000; i++ { + part := (interface{})(i) + b := testBlock{} + k := Key{ + Offset: uint64(i), + Part: part, + } + c.PutBlock(k, &b) + if b1 := c.GetBlock(k); b1 != &b { + panic(fmt.Errorf("unexpected block obtained; got %v; want %v", b1, &b)) + } + if b1 := c.GetBlock(Key{}); b1 != nil { + panic(fmt.Errorf("unexpected non-nil block obtained: %v", b1)) + } + } +} + +type testBlock struct{} + +func (tb *testBlock) SizeBytes() int { + return 42 +} diff --git a/lib/blockcache/blockcache_timing_test.go b/lib/blockcache/blockcache_timing_test.go new file mode 100644 index 000000000..776a29585 --- /dev/null +++ b/lib/blockcache/blockcache_timing_test.go @@ -0,0 +1,50 @@ +package blockcache + +import ( + "fmt" + "sync/atomic" + "testing" +) + +func BenchmarkKeyHashUint64(b *testing.B) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var hSum uint64 + var k Key + for pb.Next() { + k.Offset++ + h := k.hashUint64() + hSum += h + } + atomic.AddUint64(&BenchSink, hSum) + }) +} + +var BenchSink uint64 + +func BenchmarkCacheGet(b *testing.B) { + c := NewCache(func() int { + return 1024 * 1024 * 16 + }) + defer c.MustStop() + const blocksCount = 10000 + blocks := make([]*testBlock, blocksCount) + for i := 0; i < blocksCount; i++ { + blocks[i] = &testBlock{} + c.PutBlock(Key{Offset: uint64(i)}, blocks[i]) + } + b.ReportAllocs() + b.SetBytes(int64(len(blocks))) + b.RunParallel(func(pb *testing.PB) { + var k Key + for pb.Next() { + for i := 0; i < blocksCount; i++ { + k.Offset = uint64(i) + b := c.GetBlock(k) + if b != blocks[i] { + panic(fmt.Errorf("unexpected block obtained from the cache; got %v; want %v", b, blocks[i])) + } + } + } + }) +}