mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
lib/storage: reduce memory allocations when syncing dateMetricIDCache
This commit is contained in:
parent
8d2553e6a1
commit
fc2565b4ee
@ -2031,24 +2031,28 @@ type dateMetricIDCache struct {
|
||||
byDate atomic.Value
|
||||
|
||||
// Contains mutable map protected by mu
|
||||
byDateMutable *byDateMetricIDMap
|
||||
lastSyncTime uint64
|
||||
mu sync.Mutex
|
||||
byDateMutable *byDateMetricIDMap
|
||||
nextSyncDeadline uint64
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newDateMetricIDCache() *dateMetricIDCache {
|
||||
var dmc dateMetricIDCache
|
||||
dmc.Reset()
|
||||
dmc.resetLocked()
|
||||
return &dmc
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) Reset() {
|
||||
dmc.mu.Lock()
|
||||
dmc.resetLocked()
|
||||
dmc.mu.Unlock()
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) resetLocked() {
|
||||
// Do not reset syncsCount and resetsCount
|
||||
dmc.byDate.Store(newByDateMetricIDMap())
|
||||
dmc.byDateMutable = newByDateMetricIDMap()
|
||||
dmc.lastSyncTime = fasttime.UnixTimestamp()
|
||||
dmc.mu.Unlock()
|
||||
dmc.nextSyncDeadline = 10 + fasttime.UnixTimestamp()
|
||||
|
||||
atomic.AddUint64(&dmc.resetsCount, 1)
|
||||
}
|
||||
@ -2081,20 +2085,12 @@ func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
|
||||
}
|
||||
|
||||
// Slow path. Check mutable map.
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
dmc.mu.Lock()
|
||||
v = dmc.byDateMutable.get(date)
|
||||
ok := v.Has(metricID)
|
||||
mustSync := false
|
||||
if currentTime-dmc.lastSyncTime > 10 {
|
||||
mustSync = true
|
||||
dmc.lastSyncTime = currentTime
|
||||
}
|
||||
dmc.syncLockedIfNeeded()
|
||||
dmc.mu.Unlock()
|
||||
|
||||
if mustSync {
|
||||
dmc.sync()
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
@ -2133,21 +2129,47 @@ func (dmc *dateMetricIDCache) Set(date, metricID uint64) {
|
||||
dmc.mu.Unlock()
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) sync() {
|
||||
dmc.mu.Lock()
|
||||
func (dmc *dateMetricIDCache) syncLockedIfNeeded() {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
if currentTime >= dmc.nextSyncDeadline {
|
||||
dmc.nextSyncDeadline = currentTime + 10
|
||||
dmc.syncLocked()
|
||||
}
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) syncLocked() {
|
||||
if len(dmc.byDateMutable.m) == 0 {
|
||||
// Nothing to sync.
|
||||
return
|
||||
}
|
||||
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||
for date, e := range dmc.byDateMutable.m {
|
||||
byDateMutable := dmc.byDateMutable
|
||||
for date, e := range byDateMutable.m {
|
||||
v := byDate.get(date)
|
||||
e.v.Union(v)
|
||||
if v == nil {
|
||||
continue
|
||||
}
|
||||
v = v.Clone()
|
||||
v.Union(&e.v)
|
||||
byDateMutable.m[date] = &byDateMetricIDEntry{
|
||||
date: date,
|
||||
v: *v,
|
||||
}
|
||||
}
|
||||
for date, e := range byDate.m {
|
||||
v := byDateMutable.get(date)
|
||||
if v != nil {
|
||||
continue
|
||||
}
|
||||
byDateMutable.m[date] = e
|
||||
}
|
||||
dmc.byDate.Store(dmc.byDateMutable)
|
||||
dmc.byDateMutable = newByDateMetricIDMap()
|
||||
dmc.mu.Unlock()
|
||||
|
||||
atomic.AddUint64(&dmc.syncsCount, 1)
|
||||
|
||||
if dmc.EntriesCount() > memory.Allowed()/128 {
|
||||
dmc.Reset()
|
||||
dmc.resetLocked()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -89,7 +89,9 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
||||
return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID)
|
||||
}
|
||||
if i%11234 == 0 {
|
||||
c.sync()
|
||||
c.mu.Lock()
|
||||
c.syncLocked()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
if i%34323 == 0 {
|
||||
c.Reset()
|
||||
@ -103,7 +105,9 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
||||
metricID := uint64(i) % 123
|
||||
c.Set(date, metricID)
|
||||
}
|
||||
c.sync()
|
||||
c.mu.Lock()
|
||||
c.syncLocked()
|
||||
c.mu.Unlock()
|
||||
for i := 0; i < 1e5; i++ {
|
||||
date := uint64(i) % 3
|
||||
metricID := uint64(i) % 123
|
||||
|
Loading…
Reference in New Issue
Block a user