diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index ef676b9282..5494956c2a 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -400,6 +400,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_entries{type="storage/hour_metric_ids"}`, func() float64 { return float64(m().HourMetricIDCacheSize) }) + metrics.NewGauge(`vm_cache_entries{type="storage/next_day_metric_ids"}`, func() float64 { + return float64(m().NextDayMetricIDCacheSize) + }) metrics.NewGauge(`vm_cache_entries{type="storage/bigIndexBlocks"}`, func() float64 { return float64(tm().BigIndexBlocksCacheSize) }) @@ -440,6 +443,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_bytes{type="storage/hour_metric_ids"}`, func() float64 { return float64(m().HourMetricIDCacheSizeBytes) }) + metrics.NewGauge(`vm_cache_size_bytes{type="storage/next_day_metric_ids"}`, func() float64 { + return float64(m().NextDayMetricIDCacheSizeBytes) + }) metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagCacheSizeBytes) }) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index ba38274d5e..9103aa4b04 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -68,16 +68,27 @@ type Storage struct { // Fast cache for MetricID values occurred during the previous hour. prevHourMetricIDs atomic.Value + // Fast cache for pre-populating per-day inverted index for the next day. + // This is needed in order to remove CPU usage spikes at 00:00 UTC + // due to creation of per-day inverted index for active time series. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details. + nextDayMetricIDs atomic.Value + // Pending MetricID values to be added to currHourMetricIDs. pendingHourEntriesLock sync.Mutex pendingHourEntries []pendingHourMetricIDEntry + // Pending MetricIDs to be added to nextDayMetricIDs. + pendingNextDayMetricIDsLock sync.Mutex + pendingNextDayMetricIDs *uint64set.Set + // metricIDs for pre-fetched metricNames in the prefetchMetricNames function. prefetchedMetricIDs atomic.Value stop chan struct{} currHourMetricIDsUpdaterWG sync.WaitGroup + nextDayMetricIDsUpdaterWG sync.WaitGroup retentionWatcherWG sync.WaitGroup prefetchedMetricIDsCleanerWG sync.WaitGroup @@ -148,6 +159,11 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.currHourMetricIDs.Store(hmCurr) s.prevHourMetricIDs.Store(hmPrev) + date := uint64(timestampFromTime(time.Now())) / msecPerDay + nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date) + s.nextDayMetricIDs.Store(nextDayMetricIDs) + s.pendingNextDayMetricIDs = &uint64set.Set{} + s.prefetchedMetricIDs.Store(&uint64set.Set{}) // Load indexdb @@ -173,6 +189,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.tb = tb s.startCurrHourMetricIDsUpdater() + s.startNextDayMetricIDsUpdater() s.startRetentionWatcher() s.startPrefetchedMetricIDsCleaner() @@ -342,6 +359,9 @@ type Metrics struct { HourMetricIDCacheSize uint64 HourMetricIDCacheSizeBytes uint64 + NextDayMetricIDCacheSize uint64 + NextDayMetricIDCacheSizeBytes uint64 + PrefetchedMetricIDsSize uint64 PrefetchedMetricIDsSizeBytes uint64 @@ -406,6 +426,10 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes() m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes() + nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v + m.NextDayMetricIDCacheSize += uint64(nextDayMetricIDs.Len()) + m.NextDayMetricIDCacheSizeBytes += nextDayMetricIDs.SizeBytes() + prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len()) m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes()) @@ -463,6 +487,14 @@ func (s *Storage) startCurrHourMetricIDsUpdater() { }() } +func (s *Storage) startNextDayMetricIDsUpdater() { + s.nextDayMetricIDsUpdaterWG.Add(1) + go func() { + s.nextDayMetricIDsUpdater() + s.nextDayMetricIDsUpdaterWG.Done() + }() +} + var currHourMetricIDsUpdateInterval = time.Second * 10 func (s *Storage) currHourMetricIDsUpdater() { @@ -479,6 +511,22 @@ func (s *Storage) currHourMetricIDsUpdater() { } } +var nextDayMetricIDsUpdateInterval = time.Second * 11 + +func (s *Storage) nextDayMetricIDsUpdater() { + ticker := time.NewTicker(nextDayMetricIDsUpdateInterval) + defer ticker.Stop() + for { + select { + case <-s.stop: + s.updateNextDayMetricIDs() + return + case <-ticker.C: + s.updateNextDayMetricIDs() + } + } +} + func (s *Storage) mustRotateIndexDB() { // Create new indexdb table. newTableName := nextIndexDBTableName() @@ -518,6 +566,7 @@ func (s *Storage) MustClose() { s.retentionWatcherWG.Wait() s.currHourMetricIDsUpdaterWG.Wait() + s.nextDayMetricIDsUpdaterWG.Wait() s.tb.MustClose() s.idb().MustClose() @@ -532,21 +581,70 @@ func (s *Storage) MustClose() { hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) s.mustSaveHourMetricIDs(hmPrev, "prev_hour_metric_ids") + nextDayMetricIDs := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) + s.mustSaveNextDayMetricIDs(nextDayMetricIDs) + // Release lock file. if err := s.flockF.Close(); err != nil { logger.Panicf("FATAL: cannot close lock file %q: %s", s.flockF.Name(), err) } } -func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs { +func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry { + e := &byDateMetricIDEntry{ + date: date, + } + name := "next_day_metric_ids" path := s.cachePath + "/" + name logger.Infof("loading %s from %q...", name, path) startTime := time.Now() if !fs.IsPathExist(path) { logger.Infof("nothing to load from %q", path) - return &hourMetricIDs{ - hour: hour, - } + return e + } + src, err := ioutil.ReadFile(path) + if err != nil { + logger.Panicf("FATAL: cannot read %s: %s", path, err) + } + srcOrigLen := len(src) + if len(src) < 16 { + logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 16) + return e + } + + // Unmarshal header + dateLoaded := encoding.UnmarshalUint64(src) + src = src[8:] + if dateLoaded != date { + logger.Infof("discarding %s, since it contains data for stale date; got %d; want %d", path, dateLoaded, date) + return e + } + + // Unmarshal uint64set + m, tail, err := unmarshalUint64Set(src) + if err != nil { + logger.Infof("discarding %s because cannot load uint64set: %s", path, err) + return e + } + if len(tail) > 0 { + logger.Infof("discarding %s because non-empty tail left; len(tail)=%d", path, len(tail)) + return e + } + e.v = *m + logger.Infof("loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), m.Len(), srcOrigLen) + return e +} + +func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs { + hm := &hourMetricIDs{ + hour: hour, + } + path := s.cachePath + "/" + name + logger.Infof("loading %s from %q...", name, path) + startTime := time.Now() + if !fs.IsPathExist(path) { + logger.Infof("nothing to load from %q", path) + return hm } src, err := ioutil.ReadFile(path) if err != nil { @@ -555,9 +653,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs srcOrigLen := len(src) if len(src) < 24 { logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) - return &hourMetricIDs{ - hour: hour, - } + return hm } // Unmarshal header @@ -566,32 +662,22 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs hourLoaded := encoding.UnmarshalUint64(src) src = src[8:] if hourLoaded != hour { - logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour) - return &hourMetricIDs{ - hour: hour, - } + logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", path, hourLoaded, hour) + return hm } - // Unmarshal hm.m - hmLen := encoding.UnmarshalUint64(src) - src = src[8:] - if uint64(len(src)) < 8*hmLen { - logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes", path, len(src), 8*hmLen) - return &hourMetricIDs{ - hour: hour, - } - } - m := &uint64set.Set{} - for i := uint64(0); i < hmLen; i++ { - metricID := encoding.UnmarshalUint64(src) - src = src[8:] - m.Add(metricID) + // Unmarshal uint64set + m, tail, err := unmarshalUint64Set(src) + if err != nil { + logger.Infof("discarding %s because cannot load uint64set: %s", path, err) + return hm } + src = tail // Unmarshal hm.byTenant if len(src) < 8 { logger.Errorf("discarding %s, since it has broken hm.byTenant header; got %d bytes; want %d bytes", path, len(src), 8) - return &hourMetricIDs{} + return hm } byTenantLen := encoding.UnmarshalUint64(src) src = src[8:] @@ -599,7 +685,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs for i := uint64(0); i < byTenantLen; i++ { if len(src) < 16 { logger.Errorf("discarding %s, since it has broken accountID:projectID prefix; got %d bytes; want %d bytes", path, len(src), 16) - return &hourMetricIDs{} + return hm } accountID := encoding.UnmarshalUint32(src) src = src[4:] @@ -609,7 +695,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs src = src[8:] if uint64(len(src)) < 8*mLen { logger.Errorf("discarding %s, since it has borken accountID:projectID entry; got %d bytes; want %d bytes", path, len(src), 8*mLen) - return &hourMetricIDs{} + return hm } m := &uint64set.Set{} for j := uint64(0); j < mLen; j++ { @@ -624,13 +710,30 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs byTenant[k] = m } - logger.Infof("loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), hmLen, srcOrigLen) - return &hourMetricIDs{ - m: m, - byTenant: byTenant, - hour: hourLoaded, - isFull: isFull != 0, + hm.m = m + hm.byTenant = byTenant + hm.isFull = isFull != 0 + logger.Infof("loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), m.Len(), srcOrigLen) + return hm +} + +func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) { + name := "next_day_metric_ids" + path := s.cachePath + "/" + name + logger.Infof("saving %s to %q...", name, path) + startTime := time.Now() + dst := make([]byte, 0, e.v.Len()*8+16) + + // Marshal header + dst = encoding.MarshalUint64(dst, e.date) + + // Marshal e.v + dst = marshalUint64Set(dst, &e.v) + + if err := ioutil.WriteFile(path, dst, 0644); err != nil { + logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) } + logger.Infof("saved %s to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), e.v.Len(), len(dst)) } func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { @@ -648,13 +751,7 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { dst = encoding.MarshalUint64(dst, hm.hour) // Marshal hm.m - dst = encoding.MarshalUint64(dst, uint64(hm.m.Len())) - hm.m.ForEach(func(part []uint64) bool { - for _, metricID := range part { - dst = encoding.MarshalUint64(dst, metricID) - } - return true - }) + dst = marshalUint64Set(dst, hm.m) // Marshal hm.byTenant var metricIDs []uint64 @@ -675,6 +772,32 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { logger.Infof("saved %s to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), hm.m.Len(), len(dst)) } +func unmarshalUint64Set(src []byte) (*uint64set.Set, []byte, error) { + mLen := encoding.UnmarshalUint64(src) + src = src[8:] + if uint64(len(src)) < 8*mLen { + return nil, nil, fmt.Errorf("cannot unmarshal uint64set; got %d bytes; want at least %d bytes", len(src), 8*mLen) + } + m := &uint64set.Set{} + for i := uint64(0); i < mLen; i++ { + metricID := encoding.UnmarshalUint64(src) + src = src[8:] + m.Add(metricID) + } + return m, src, nil +} + +func marshalUint64Set(dst []byte, m *uint64set.Set) []byte { + dst = encoding.MarshalUint64(dst, uint64(m.Len())) + m.ForEach(func(part []uint64) bool { + for _, metricID := range part { + dst = encoding.MarshalUint64(dst, metricID) + } + return true + }) + return dst +} + func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache { path := s.cachePath + "/" + name logger.Infof("loading %s cache from %q...", info, path) @@ -1058,7 +1181,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra if err := s.tb.AddRows(rows); err != nil { lastError = fmt.Errorf("cannot add rows to table: %s", err) } - if err := s.updatePerDateData(rows, lastError); err != nil && lastError == nil { + if err := s.updatePerDateData(rows); err != nil && lastError == nil { lastError = fmt.Errorf("cannot update per-date data: %s", err) } if lastError != nil { @@ -1067,7 +1190,8 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra return rows, nil } -func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { +func (s *Storage) updatePerDateData(rows []rawRow) error { + var lastError error var date uint64 var hour uint64 var prevTimestamp int64 @@ -1079,6 +1203,8 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { ) idb := s.idb() hm := s.currHourMetricIDs.Load().(*hourMetricIDs) + nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v + todayShare16bit := uint64((float64(uint64(time.Now().UnixNano()/1e9)%(3600*24)) / (3600 * 24)) * (1 << 16)) for i := range rows { r := &rows[i] if r.Timestamp != prevTimestamp { @@ -1092,6 +1218,21 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { if hm.m.Has(metricID) { // Fast path: the metricID is in the current hour cache. // This means the metricID has been already added to per-day inverted index. + + // Gradually pre-populate per-day inverted index for the next day + // during the current day. + // This should reduce CPU usage spike and slowdown at the beginning of the next day + // when entries for all the active time series must be added to the index. + // This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 . + if todayShare16bit > (metricID&(1<<16-1)) && !nextDayMetricIDs.Has(metricID) { + if err := idb.storeDateMetricID(date+1, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil && lastError == nil { + lastError = err + continue + } + s.pendingNextDayMetricIDsLock.Lock() + s.pendingNextDayMetricIDs.Add(metricID) + s.pendingNextDayMetricIDsLock.Unlock() + } continue } s.pendingHourEntriesLock.Lock() @@ -1119,7 +1260,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { // Slow path: store the (date, metricID) entry in the indexDB. // It is OK if the (date, metricID) entry is added multiple times to db // by concurrent goroutines. - if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil { + if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil && lastError == nil { lastError = err continue } @@ -1280,6 +1421,30 @@ type byDateMetricIDEntry struct { v uint64set.Set } +func (s *Storage) updateNextDayMetricIDs() { + date := uint64(timestampFromTime(time.Now())) / msecPerDay + + e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) + s.pendingNextDayMetricIDsLock.Lock() + pendingMetricIDs := s.pendingNextDayMetricIDs + s.pendingNextDayMetricIDs = &uint64set.Set{} + s.pendingNextDayMetricIDsLock.Unlock() + if pendingMetricIDs.Len() == 0 && e.date == date { + // Fast path: nothing to update. + return + } + + // Slow path: union pendingMetricIDs with e.v + if e.date == date { + pendingMetricIDs.Union(&e.v) + } + eNew := &byDateMetricIDEntry{ + date: date, + v: *pendingMetricIDs, + } + s.nextDayMetricIDs.Store(eNew) +} + func (s *Storage) updateCurrHourMetricIDs() { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) s.pendingHourEntriesLock.Lock()