lib/storage: periodically reset prefetchedMetricIDs cache in order to limit its size under high churn rate

This commit is contained in:
Aliaksandr Valialkin 2021-07-07 10:27:47 +03:00
parent ce3f087d46
commit 74ace9340d

View File

@ -103,9 +103,15 @@ type Storage struct {
pendingNextDayMetricIDsLock sync.Mutex
pendingNextDayMetricIDs *uint64set.Set
// metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
// prefetchedMetricIDs contains metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
prefetchedMetricIDs atomic.Value
// prefetchedMetricIDsDeadline is used for periodic reset of prefetchedMetricIDs in order to limit its size under high rate of creating new series.
prefetchedMetricIDsDeadline uint64
// prefetchedMetricIDsLock is used for serializing updates of prefetchedMetricIDs from concurrent goroutines.
prefetchedMetricIDsLock sync.Mutex
stop chan struct{}
currHourMetricIDsUpdaterWG sync.WaitGroup
@ -1149,14 +1155,22 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
}
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
prefetchedMetricIDsNew := prefetchedMetricIDs.Clone()
s.prefetchedMetricIDsLock.Lock()
var prefetchedMetricIDsNew *uint64set.Set
if fasttime.UnixTimestamp() < atomic.LoadUint64(&s.prefetchedMetricIDsDeadline) {
// Periodically reset the prefetchedMetricIDs in order to limit its size.
prefetchedMetricIDsNew = &uint64set.Set{}
atomic.StoreUint64(&s.prefetchedMetricIDsDeadline, fasttime.UnixTimestamp()+73*60)
} else {
prefetchedMetricIDsNew = prefetchedMetricIDs.Clone()
}
prefetchedMetricIDsNew.AddMulti(metricIDs)
if prefetchedMetricIDsNew.SizeBytes() > uint64(memory.Allowed())/32 {
// Reset prefetchedMetricIDsNew if it occupies too much space.
prefetchedMetricIDsNew = &uint64set.Set{}
}
s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew)
s.prefetchedMetricIDsLock.Unlock()
return nil
}