mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-13 13:11:37 +01:00
lib/storage: speed up checking metricID existence in the list for the current date
This commit is contained in:
parent
d7bafde77e
commit
e27fd5148a
@ -50,8 +50,15 @@ type Storage struct {
|
||||
// dateMetricIDCache is (Date, MetricID) cache.
|
||||
dateMetricIDCache *fastcache.Cache
|
||||
|
||||
stop chan struct{}
|
||||
retentionWatcherWG sync.WaitGroup
|
||||
// Fast cache for today MetricID values.
|
||||
todayMetricIDs atomic.Value
|
||||
pendingTodayMetricIDsLock sync.Mutex
|
||||
pendingTodayMetricIDs map[uint64]struct{}
|
||||
|
||||
stop chan struct{}
|
||||
|
||||
todayMetricIDsUpdaterWG sync.WaitGroup
|
||||
retentionWatcherWG sync.WaitGroup
|
||||
}
|
||||
|
||||
// OpenStorage opens storage on the given path with the given number of retention months.
|
||||
@ -122,6 +129,10 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
||||
}
|
||||
s.tb = tb
|
||||
|
||||
s.todayMetricIDs.Store(&todayMetricIDs{})
|
||||
s.pendingTodayMetricIDs = make(map[uint64]struct{})
|
||||
s.startTodayMetricIDsUpdater()
|
||||
|
||||
s.startRetentionWatcher()
|
||||
|
||||
return s, nil
|
||||
@ -341,6 +352,27 @@ func (s *Storage) retentionWatcher() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) startTodayMetricIDsUpdater() {
|
||||
s.todayMetricIDsUpdaterWG.Add(1)
|
||||
go func() {
|
||||
s.todayMetricIDsUpdater()
|
||||
s.todayMetricIDsUpdaterWG.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Storage) todayMetricIDsUpdater() {
|
||||
t := time.NewTimer(time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-s.stop:
|
||||
return
|
||||
case <-t.C:
|
||||
s.updateTodayMetricIDs()
|
||||
t.Reset(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) mustRotateIndexDB() {
|
||||
// Create new indexdb table.
|
||||
newTableName := nextIndexDBTableName()
|
||||
@ -379,6 +411,7 @@ func (s *Storage) MustClose() {
|
||||
close(s.stop)
|
||||
|
||||
s.retentionWatcherWG.Wait()
|
||||
s.todayMetricIDsUpdaterWG.Wait()
|
||||
|
||||
s.tb.MustClose()
|
||||
s.idb().MustClose()
|
||||
@ -683,14 +716,25 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error
|
||||
prevTimestamp = r.Timestamp
|
||||
}
|
||||
metricID := r.TSID.MetricID
|
||||
tm := s.todayMetricIDs.Load().(*todayMetricIDs)
|
||||
if date == tm.date {
|
||||
if _, ok := tm.m[metricID]; ok {
|
||||
// Fast path: the (date, metricID) entry is in the fast cache with today's metric ids.
|
||||
continue
|
||||
}
|
||||
s.pendingTodayMetricIDsLock.Lock()
|
||||
s.pendingTodayMetricIDs[metricID] = struct{}{}
|
||||
s.pendingTodayMetricIDsLock.Unlock()
|
||||
}
|
||||
|
||||
// Slower path: check global cache for (date, metricID) entry.
|
||||
a[0] = date
|
||||
a[1] = metricID
|
||||
if s.dateMetricIDCache.Has(keyBuf) {
|
||||
// Fast path: the (date, metricID) entry is in the cache.
|
||||
continue
|
||||
}
|
||||
|
||||
// Slow path: store the entry in the cache and in the indexDB.
|
||||
// Slow path: store the entry in the (date, metricID) cache and in the indexDB.
|
||||
// It is OK if the (date, metricID) entry is added multiple times to db
|
||||
// by concurrent goroutines.
|
||||
s.dateMetricIDCache.Set(keyBuf, nil)
|
||||
@ -702,6 +746,53 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error
|
||||
return errors
|
||||
}
|
||||
|
||||
func (s *Storage) updateTodayMetricIDs() {
|
||||
tm := s.todayMetricIDs.Load().(*todayMetricIDs)
|
||||
s.pendingTodayMetricIDsLock.Lock()
|
||||
newMetricIDsLen := len(s.pendingTodayMetricIDs)
|
||||
s.pendingTodayMetricIDsLock.Unlock()
|
||||
today := uint64(timestampFromTime(time.Now())) / msecPerDay
|
||||
if newMetricIDsLen == 0 {
|
||||
if tm.date == today {
|
||||
// Fast path: nothing to update.
|
||||
return
|
||||
}
|
||||
// Reset tm because of new day.
|
||||
tmNew := &todayMetricIDs{
|
||||
m: make(map[uint64]struct{}, len(tm.m)),
|
||||
date: today,
|
||||
}
|
||||
s.todayMetricIDs.Store(tmNew)
|
||||
return
|
||||
}
|
||||
|
||||
// Slow path tm.m must be updated with non-empty s.pendingTodayMetricIDs.
|
||||
m := make(map[uint64]struct{}, len(tm.m)+newMetricIDsLen)
|
||||
if tm.date == today {
|
||||
for metricID := range tm.m {
|
||||
m[metricID] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
s.pendingTodayMetricIDsLock.Lock()
|
||||
for metricID := range s.pendingTodayMetricIDs {
|
||||
m[metricID] = struct{}{}
|
||||
}
|
||||
s.pendingTodayMetricIDs = make(map[uint64]struct{}, len(s.pendingTodayMetricIDs))
|
||||
s.pendingTodayMetricIDsLock.Unlock()
|
||||
|
||||
tmNew := &todayMetricIDs{
|
||||
m: m,
|
||||
date: today,
|
||||
}
|
||||
s.todayMetricIDs.Store(tmNew)
|
||||
}
|
||||
|
||||
type todayMetricIDs struct {
|
||||
m map[uint64]struct{}
|
||||
date uint64
|
||||
}
|
||||
|
||||
func (s *Storage) getTSIDFromCache(dst *TSID, metricName []byte) bool {
|
||||
buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))[:]
|
||||
buf = s.tsidCache.Get(buf[:0], metricName)
|
||||
|
Loading…
Reference in New Issue
Block a user