diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 4e3e350b5d..6b851bebed 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -50,8 +50,15 @@ type Storage struct { // dateMetricIDCache is (Date, MetricID) cache. dateMetricIDCache *fastcache.Cache - stop chan struct{} - retentionWatcherWG sync.WaitGroup + // Fast cache for today MetricID values. + todayMetricIDs atomic.Value + pendingTodayMetricIDsLock sync.Mutex + pendingTodayMetricIDs map[uint64]struct{} + + stop chan struct{} + + todayMetricIDsUpdaterWG sync.WaitGroup + retentionWatcherWG sync.WaitGroup } // OpenStorage opens storage on the given path with the given number of retention months. @@ -122,6 +129,10 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { } s.tb = tb + s.todayMetricIDs.Store(&todayMetricIDs{}) + s.pendingTodayMetricIDs = make(map[uint64]struct{}) + s.startTodayMetricIDsUpdater() + s.startRetentionWatcher() return s, nil @@ -341,6 +352,27 @@ func (s *Storage) retentionWatcher() { } } +func (s *Storage) startTodayMetricIDsUpdater() { + s.todayMetricIDsUpdaterWG.Add(1) + go func() { + s.todayMetricIDsUpdater() + s.todayMetricIDsUpdaterWG.Done() + }() +} + +func (s *Storage) todayMetricIDsUpdater() { + t := time.NewTimer(time.Second) + for { + select { + case <-s.stop: + return + case <-t.C: + s.updateTodayMetricIDs() + t.Reset(time.Second) + } + } +} + func (s *Storage) mustRotateIndexDB() { // Create new indexdb table. newTableName := nextIndexDBTableName() @@ -379,6 +411,7 @@ func (s *Storage) MustClose() { close(s.stop) s.retentionWatcherWG.Wait() + s.todayMetricIDsUpdaterWG.Wait() s.tb.MustClose() s.idb().MustClose() @@ -683,14 +716,25 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error prevTimestamp = r.Timestamp } metricID := r.TSID.MetricID + tm := s.todayMetricIDs.Load().(*todayMetricIDs) + if date == tm.date { + if _, ok := tm.m[metricID]; ok { + // Fast path: the (date, metricID) entry is in the fast cache with today's metric ids. + continue + } + s.pendingTodayMetricIDsLock.Lock() + s.pendingTodayMetricIDs[metricID] = struct{}{} + s.pendingTodayMetricIDsLock.Unlock() + } + + // Slower path: check global cache for (date, metricID) entry. a[0] = date a[1] = metricID if s.dateMetricIDCache.Has(keyBuf) { - // Fast path: the (date, metricID) entry is in the cache. continue } - // Slow path: store the entry in the cache and in the indexDB. + // Slow path: store the entry in the (date, metricID) cache and in the indexDB. // It is OK if the (date, metricID) entry is added multiple times to db // by concurrent goroutines. s.dateMetricIDCache.Set(keyBuf, nil) @@ -702,6 +746,53 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error return errors } +func (s *Storage) updateTodayMetricIDs() { + tm := s.todayMetricIDs.Load().(*todayMetricIDs) + s.pendingTodayMetricIDsLock.Lock() + newMetricIDsLen := len(s.pendingTodayMetricIDs) + s.pendingTodayMetricIDsLock.Unlock() + today := uint64(timestampFromTime(time.Now())) / msecPerDay + if newMetricIDsLen == 0 { + if tm.date == today { + // Fast path: nothing to update. + return + } + // Reset tm because of new day. + tmNew := &todayMetricIDs{ + m: make(map[uint64]struct{}, len(tm.m)), + date: today, + } + s.todayMetricIDs.Store(tmNew) + return + } + + // Slow path tm.m must be updated with non-empty s.pendingTodayMetricIDs. + m := make(map[uint64]struct{}, len(tm.m)+newMetricIDsLen) + if tm.date == today { + for metricID := range tm.m { + m[metricID] = struct{}{} + } + } + + s.pendingTodayMetricIDsLock.Lock() + for metricID := range s.pendingTodayMetricIDs { + m[metricID] = struct{}{} + } + s.pendingTodayMetricIDs = make(map[uint64]struct{}, len(s.pendingTodayMetricIDs)) + s.pendingTodayMetricIDsLock.Unlock() + + tmNew := &todayMetricIDs{ + m: m, + date: today, + } + s.todayMetricIDs.Store(tmNew) +} + +type todayMetricIDs struct { + m map[uint64]struct{} + date uint64 +} + func (s *Storage) getTSIDFromCache(dst *TSID, metricName []byte) bool { buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))[:] buf = s.tsidCache.Get(buf[:0], metricName)