diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 1205f9304c..48e76d87a0 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -526,6 +526,18 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheSizeBytes) }) + metrics.NewGauge(`vm_cache_size_bytes{type="storage/bigIndexBlocks"}`, func() float64 { + return float64(tm().BigIndexBlocksCacheSizeBytes) + }) + metrics.NewGauge(`vm_cache_size_bytes{type="storage/smallIndexBlocks"}`, func() float64 { + return float64(tm().SmallIndexBlocksCacheSizeBytes) + }) + metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/dataBlocks"}`, func() float64 { + return float64(idbm().DataBlocksCacheSizeBytes) + }) + metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/indexBlocks"}`, func() float64 { + return float64(idbm().IndexBlocksCacheSizeBytes) + }) metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 { return float64(m().DateMetricIDCacheSizeBytes) }) diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index 9a48a8c02a..647b0a6745 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -6,6 +6,7 @@ import ( "sort" "strings" "sync" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -28,6 +29,10 @@ type inmemoryBlock struct { items byteSliceSorter } +func (ib *inmemoryBlock) SizeBytes() int { + return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof([]byte{})) +} + func (ib *inmemoryBlock) Reset() { ib.commonPrefix = ib.commonPrefix[:0] ib.data = ib.data[:0] diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index ed4d41c9c9..300f5d8a97 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -6,6 +6,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" @@ -136,6 +137,10 @@ type indexBlock struct { bhs []blockHeader } +func (idxb *indexBlock) SizeBytes() int { + return cap(idxb.bhs) * int(unsafe.Sizeof(blockHeader{})) +} + func getIndexBlock() *indexBlock { v := indexBlockPool.Get() if v == nil { @@ -200,7 +205,7 @@ func (idxbc *indexBlockCache) MustClose() { // cleaner periodically cleans least recently used items. func (idxbc *indexBlockCache) cleaner() { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { @@ -216,8 +221,8 @@ func (idxbc *indexBlockCache) cleanByTimeout() { currentTime := fasttime.UnixTimestamp() idxbc.mu.Lock() for k, idxbe := range idxbc.m { - // Delete items accessed more than a minute ago. - if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 60 { + // Delete items accessed more than two minutes ago. + if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 2*60 { delete(idxbc.m, k) } } @@ -276,6 +281,16 @@ func (idxbc *indexBlockCache) Len() uint64 { return uint64(n) } +func (idxbc *indexBlockCache) SizeBytes() uint64 { + n := 0 + idxbc.mu.RLock() + for _, e := range idxbc.m { + n += e.idxb.SizeBytes() + } + idxbc.mu.RUnlock() + return uint64(n) +} + func (idxbc *indexBlockCache) Requests() uint64 { return atomic.LoadUint64(&idxbc.requests) } @@ -347,7 +362,7 @@ func (ibc *inmemoryBlockCache) MustClose() { // cleaner periodically cleans least recently used items. func (ibc *inmemoryBlockCache) cleaner() { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { @@ -363,8 +378,10 @@ func (ibc *inmemoryBlockCache) cleanByTimeout() { currentTime := fasttime.UnixTimestamp() ibc.mu.Lock() for k, ibe := range ibc.m { - // Delete items accessed more than a minute ago. - if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 60 { + // Delete items accessed more than a two minutes ago. + if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 { + // do not call putInmemoryBlock(ibc.m[k]), since it + // may be used by concurrent goroutines. delete(ibc.m, k) } } @@ -424,6 +441,16 @@ func (ibc *inmemoryBlockCache) Len() uint64 { return uint64(n) } +func (ibc *inmemoryBlockCache) SizeBytes() uint64 { + n := 0 + ibc.mu.RLock() + for _, e := range ibc.m { + n += e.ib.SizeBytes() + } + ibc.mu.RUnlock() + return uint64(n) +} + func (ibc *inmemoryBlockCache) Requests() uint64 { return atomic.LoadUint64(&ibc.requests) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index a58c9067c5..d666392536 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -294,13 +294,15 @@ type TableMetrics struct { ItemsCount uint64 SizeBytes uint64 - DataBlocksCacheSize uint64 - DataBlocksCacheRequests uint64 - DataBlocksCacheMisses uint64 + DataBlocksCacheSize uint64 + DataBlocksCacheSizeBytes uint64 + DataBlocksCacheRequests uint64 + DataBlocksCacheMisses uint64 - IndexBlocksCacheSize uint64 - IndexBlocksCacheRequests uint64 - IndexBlocksCacheMisses uint64 + IndexBlocksCacheSize uint64 + IndexBlocksCacheSizeBytes uint64 + IndexBlocksCacheRequests uint64 + IndexBlocksCacheMisses uint64 PartsRefCount uint64 } @@ -328,10 +330,12 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.SizeBytes += p.size m.DataBlocksCacheSize += p.ibCache.Len() + m.DataBlocksCacheSizeBytes += p.ibCache.SizeBytes() m.DataBlocksCacheRequests += p.ibCache.Requests() m.DataBlocksCacheMisses += p.ibCache.Misses() m.IndexBlocksCacheSize += p.idxbCache.Len() + m.IndexBlocksCacheSizeBytes += p.idxbCache.SizeBytes() m.IndexBlocksCacheRequests += p.idxbCache.Requests() m.IndexBlocksCacheMisses += p.idxbCache.Misses() diff --git a/lib/storage/part.go b/lib/storage/part.go index 653a193117..52b0ec41a9 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -6,6 +6,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" @@ -140,6 +141,10 @@ type indexBlock struct { bhs []blockHeader } +func (idxb *indexBlock) SizeBytes() int { + return cap(idxb.bhs) * int(unsafe.Sizeof(blockHeader{})) +} + func getIndexBlock() *indexBlock { v := indexBlockPool.Get() if v == nil { @@ -204,7 +209,7 @@ func (ibc *indexBlockCache) MustClose(isBig bool) { // cleaner periodically cleans least recently used items. func (ibc *indexBlockCache) cleaner() { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { @@ -220,8 +225,8 @@ func (ibc *indexBlockCache) cleanByTimeout() { currentTime := fasttime.UnixTimestamp() ibc.mu.Lock() for k, ibe := range ibc.m { - // Delete items accessed more than a minute ago. - if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 60 { + // Delete items accessed more than two minutes ago. + if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 { delete(ibc.m, k) } } @@ -286,3 +291,13 @@ func (ibc *indexBlockCache) Len() uint64 { ibc.mu.Unlock() return n } + +func (ibc *indexBlockCache) SizeBytes() uint64 { + n := 0 + ibc.mu.Lock() + for _, e := range ibc.m { + n += e.ib.SizeBytes() + } + ibc.mu.Unlock() + return uint64(n) +} diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 8c05d9f818..ff2ca08ab8 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -307,13 +307,15 @@ func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs type partitionMetrics struct { PendingRows uint64 - BigIndexBlocksCacheSize uint64 - BigIndexBlocksCacheRequests uint64 - BigIndexBlocksCacheMisses uint64 + BigIndexBlocksCacheSize uint64 + BigIndexBlocksCacheSizeBytes uint64 + BigIndexBlocksCacheRequests uint64 + BigIndexBlocksCacheMisses uint64 - SmallIndexBlocksCacheSize uint64 - SmallIndexBlocksCacheRequests uint64 - SmallIndexBlocksCacheMisses uint64 + SmallIndexBlocksCacheSize uint64 + SmallIndexBlocksCacheSizeBytes uint64 + SmallIndexBlocksCacheRequests uint64 + SmallIndexBlocksCacheMisses uint64 BigSizeBytes uint64 SmallSizeBytes uint64 @@ -360,6 +362,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { p := pw.p m.BigIndexBlocksCacheSize += p.ibCache.Len() + m.BigIndexBlocksCacheSizeBytes += p.ibCache.SizeBytes() m.BigIndexBlocksCacheRequests += p.ibCache.Requests() m.BigIndexBlocksCacheMisses += p.ibCache.Misses() m.BigRowsCount += p.ph.RowsCount @@ -372,6 +375,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { p := pw.p m.SmallIndexBlocksCacheSize += p.ibCache.Len() + m.SmallIndexBlocksCacheSizeBytes += p.ibCache.SizeBytes() m.SmallIndexBlocksCacheRequests += p.ibCache.Requests() m.SmallIndexBlocksCacheMisses += p.ibCache.Misses() m.SmallRowsCount += p.ph.RowsCount