lib/storage: gradually pre-populate per-day inverted index for the next day

This should prevent from CPU usage spikes at 00:00 UTC every day when
inverted index for new day must be quickly created for all the active time series.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430
This commit is contained in:
Aliaksandr Valialkin 2020-05-12 01:06:17 +03:00
parent 8c77cb436a
commit f7753b1469
2 changed files with 215 additions and 44 deletions

View File

@ -400,6 +400,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_entries{type="storage/hour_metric_ids"}`, func() float64 {
return float64(m().HourMetricIDCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="storage/next_day_metric_ids"}`, func() float64 {
return float64(m().NextDayMetricIDCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="storage/bigIndexBlocks"}`, func() float64 {
return float64(tm().BigIndexBlocksCacheSize)
})
@ -440,6 +443,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_bytes{type="storage/hour_metric_ids"}`, func() float64 {
return float64(m().HourMetricIDCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/next_day_metric_ids"}`, func() float64 {
return float64(m().NextDayMetricIDCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagCacheSizeBytes)
})

View File

@ -68,16 +68,27 @@ type Storage struct {
// Fast cache for MetricID values occurred during the previous hour.
prevHourMetricIDs atomic.Value
// Fast cache for pre-populating per-day inverted index for the next day.
// This is needed in order to remove CPU usage spikes at 00:00 UTC
// due to creation of per-day inverted index for active time series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details.
nextDayMetricIDs atomic.Value
// Pending MetricID values to be added to currHourMetricIDs.
pendingHourEntriesLock sync.Mutex
pendingHourEntries []pendingHourMetricIDEntry
// Pending MetricIDs to be added to nextDayMetricIDs.
pendingNextDayMetricIDsLock sync.Mutex
pendingNextDayMetricIDs *uint64set.Set
// metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
prefetchedMetricIDs atomic.Value
stop chan struct{}
currHourMetricIDsUpdaterWG sync.WaitGroup
nextDayMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
prefetchedMetricIDsCleanerWG sync.WaitGroup
@ -148,6 +159,11 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
s.currHourMetricIDs.Store(hmCurr)
s.prevHourMetricIDs.Store(hmPrev)
date := uint64(timestampFromTime(time.Now())) / msecPerDay
nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date)
s.nextDayMetricIDs.Store(nextDayMetricIDs)
s.pendingNextDayMetricIDs = &uint64set.Set{}
s.prefetchedMetricIDs.Store(&uint64set.Set{})
// Load indexdb
@ -173,6 +189,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
s.tb = tb
s.startCurrHourMetricIDsUpdater()
s.startNextDayMetricIDsUpdater()
s.startRetentionWatcher()
s.startPrefetchedMetricIDsCleaner()
@ -342,6 +359,9 @@ type Metrics struct {
HourMetricIDCacheSize uint64
HourMetricIDCacheSizeBytes uint64
NextDayMetricIDCacheSize uint64
NextDayMetricIDCacheSizeBytes uint64
PrefetchedMetricIDsSize uint64
PrefetchedMetricIDsSizeBytes uint64
@ -406,6 +426,10 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes()
m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes()
nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v
m.NextDayMetricIDCacheSize += uint64(nextDayMetricIDs.Len())
m.NextDayMetricIDCacheSizeBytes += nextDayMetricIDs.SizeBytes()
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len())
m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes())
@ -463,6 +487,14 @@ func (s *Storage) startCurrHourMetricIDsUpdater() {
}()
}
func (s *Storage) startNextDayMetricIDsUpdater() {
s.nextDayMetricIDsUpdaterWG.Add(1)
go func() {
s.nextDayMetricIDsUpdater()
s.nextDayMetricIDsUpdaterWG.Done()
}()
}
var currHourMetricIDsUpdateInterval = time.Second * 10
func (s *Storage) currHourMetricIDsUpdater() {
@ -479,6 +511,22 @@ func (s *Storage) currHourMetricIDsUpdater() {
}
}
var nextDayMetricIDsUpdateInterval = time.Second * 11
func (s *Storage) nextDayMetricIDsUpdater() {
ticker := time.NewTicker(nextDayMetricIDsUpdateInterval)
defer ticker.Stop()
for {
select {
case <-s.stop:
s.updateNextDayMetricIDs()
return
case <-ticker.C:
s.updateNextDayMetricIDs()
}
}
}
func (s *Storage) mustRotateIndexDB() {
// Create new indexdb table.
newTableName := nextIndexDBTableName()
@ -518,6 +566,7 @@ func (s *Storage) MustClose() {
s.retentionWatcherWG.Wait()
s.currHourMetricIDsUpdaterWG.Wait()
s.nextDayMetricIDsUpdaterWG.Wait()
s.tb.MustClose()
s.idb().MustClose()
@ -532,21 +581,70 @@ func (s *Storage) MustClose() {
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
s.mustSaveHourMetricIDs(hmPrev, "prev_hour_metric_ids")
nextDayMetricIDs := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry)
s.mustSaveNextDayMetricIDs(nextDayMetricIDs)
// Release lock file.
if err := s.flockF.Close(); err != nil {
logger.Panicf("FATAL: cannot close lock file %q: %s", s.flockF.Name(), err)
}
}
func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs {
func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry {
e := &byDateMetricIDEntry{
date: date,
}
name := "next_day_metric_ids"
path := s.cachePath + "/" + name
logger.Infof("loading %s from %q...", name, path)
startTime := time.Now()
if !fs.IsPathExist(path) {
logger.Infof("nothing to load from %q", path)
return &hourMetricIDs{
hour: hour,
}
return e
}
src, err := ioutil.ReadFile(path)
if err != nil {
logger.Panicf("FATAL: cannot read %s: %s", path, err)
}
srcOrigLen := len(src)
if len(src) < 16 {
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 16)
return e
}
// Unmarshal header
dateLoaded := encoding.UnmarshalUint64(src)
src = src[8:]
if dateLoaded != date {
logger.Infof("discarding %s, since it contains data for stale date; got %d; want %d", path, dateLoaded, date)
return e
}
// Unmarshal uint64set
m, tail, err := unmarshalUint64Set(src)
if err != nil {
logger.Infof("discarding %s because cannot load uint64set: %s", path, err)
return e
}
if len(tail) > 0 {
logger.Infof("discarding %s because non-empty tail left; len(tail)=%d", path, len(tail))
return e
}
e.v = *m
logger.Infof("loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), m.Len(), srcOrigLen)
return e
}
func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs {
hm := &hourMetricIDs{
hour: hour,
}
path := s.cachePath + "/" + name
logger.Infof("loading %s from %q...", name, path)
startTime := time.Now()
if !fs.IsPathExist(path) {
logger.Infof("nothing to load from %q", path)
return hm
}
src, err := ioutil.ReadFile(path)
if err != nil {
@ -555,9 +653,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
srcOrigLen := len(src)
if len(src) < 24 {
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
return &hourMetricIDs{
hour: hour,
}
return hm
}
// Unmarshal header
@ -566,32 +662,22 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
hourLoaded := encoding.UnmarshalUint64(src)
src = src[8:]
if hourLoaded != hour {
logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour)
return &hourMetricIDs{
hour: hour,
}
logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", path, hourLoaded, hour)
return hm
}
// Unmarshal hm.m
hmLen := encoding.UnmarshalUint64(src)
src = src[8:]
if uint64(len(src)) < 8*hmLen {
logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes", path, len(src), 8*hmLen)
return &hourMetricIDs{
hour: hour,
}
}
m := &uint64set.Set{}
for i := uint64(0); i < hmLen; i++ {
metricID := encoding.UnmarshalUint64(src)
src = src[8:]
m.Add(metricID)
// Unmarshal uint64set
m, tail, err := unmarshalUint64Set(src)
if err != nil {
logger.Infof("discarding %s because cannot load uint64set: %s", path, err)
return hm
}
src = tail
// Unmarshal hm.byTenant
if len(src) < 8 {
logger.Errorf("discarding %s, since it has broken hm.byTenant header; got %d bytes; want %d bytes", path, len(src), 8)
return &hourMetricIDs{}
return hm
}
byTenantLen := encoding.UnmarshalUint64(src)
src = src[8:]
@ -599,7 +685,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
for i := uint64(0); i < byTenantLen; i++ {
if len(src) < 16 {
logger.Errorf("discarding %s, since it has broken accountID:projectID prefix; got %d bytes; want %d bytes", path, len(src), 16)
return &hourMetricIDs{}
return hm
}
accountID := encoding.UnmarshalUint32(src)
src = src[4:]
@ -609,7 +695,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
src = src[8:]
if uint64(len(src)) < 8*mLen {
logger.Errorf("discarding %s, since it has borken accountID:projectID entry; got %d bytes; want %d bytes", path, len(src), 8*mLen)
return &hourMetricIDs{}
return hm
}
m := &uint64set.Set{}
for j := uint64(0); j < mLen; j++ {
@ -624,13 +710,30 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
byTenant[k] = m
}
logger.Infof("loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), hmLen, srcOrigLen)
return &hourMetricIDs{
m: m,
byTenant: byTenant,
hour: hourLoaded,
isFull: isFull != 0,
hm.m = m
hm.byTenant = byTenant
hm.isFull = isFull != 0
logger.Infof("loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), m.Len(), srcOrigLen)
return hm
}
func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
name := "next_day_metric_ids"
path := s.cachePath + "/" + name
logger.Infof("saving %s to %q...", name, path)
startTime := time.Now()
dst := make([]byte, 0, e.v.Len()*8+16)
// Marshal header
dst = encoding.MarshalUint64(dst, e.date)
// Marshal e.v
dst = marshalUint64Set(dst, &e.v)
if err := ioutil.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
}
logger.Infof("saved %s to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), e.v.Len(), len(dst))
}
func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
@ -648,13 +751,7 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
dst = encoding.MarshalUint64(dst, hm.hour)
// Marshal hm.m
dst = encoding.MarshalUint64(dst, uint64(hm.m.Len()))
hm.m.ForEach(func(part []uint64) bool {
for _, metricID := range part {
dst = encoding.MarshalUint64(dst, metricID)
}
return true
})
dst = marshalUint64Set(dst, hm.m)
// Marshal hm.byTenant
var metricIDs []uint64
@ -675,6 +772,32 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
logger.Infof("saved %s to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), hm.m.Len(), len(dst))
}
func unmarshalUint64Set(src []byte) (*uint64set.Set, []byte, error) {
mLen := encoding.UnmarshalUint64(src)
src = src[8:]
if uint64(len(src)) < 8*mLen {
return nil, nil, fmt.Errorf("cannot unmarshal uint64set; got %d bytes; want at least %d bytes", len(src), 8*mLen)
}
m := &uint64set.Set{}
for i := uint64(0); i < mLen; i++ {
metricID := encoding.UnmarshalUint64(src)
src = src[8:]
m.Add(metricID)
}
return m, src, nil
}
func marshalUint64Set(dst []byte, m *uint64set.Set) []byte {
dst = encoding.MarshalUint64(dst, uint64(m.Len()))
m.ForEach(func(part []uint64) bool {
for _, metricID := range part {
dst = encoding.MarshalUint64(dst, metricID)
}
return true
})
return dst
}
func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache {
path := s.cachePath + "/" + name
logger.Infof("loading %s cache from %q...", info, path)
@ -1058,7 +1181,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
if err := s.tb.AddRows(rows); err != nil {
lastError = fmt.Errorf("cannot add rows to table: %s", err)
}
if err := s.updatePerDateData(rows, lastError); err != nil && lastError == nil {
if err := s.updatePerDateData(rows); err != nil && lastError == nil {
lastError = fmt.Errorf("cannot update per-date data: %s", err)
}
if lastError != nil {
@ -1067,7 +1190,8 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
return rows, nil
}
func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
func (s *Storage) updatePerDateData(rows []rawRow) error {
var lastError error
var date uint64
var hour uint64
var prevTimestamp int64
@ -1079,6 +1203,8 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
)
idb := s.idb()
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v
todayShare16bit := uint64((float64(uint64(time.Now().UnixNano()/1e9)%(3600*24)) / (3600 * 24)) * (1 << 16))
for i := range rows {
r := &rows[i]
if r.Timestamp != prevTimestamp {
@ -1092,6 +1218,21 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
if hm.m.Has(metricID) {
// Fast path: the metricID is in the current hour cache.
// This means the metricID has been already added to per-day inverted index.
// Gradually pre-populate per-day inverted index for the next day
// during the current day.
// This should reduce CPU usage spike and slowdown at the beginning of the next day
// when entries for all the active time series must be added to the index.
// This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 .
if todayShare16bit > (metricID&(1<<16-1)) && !nextDayMetricIDs.Has(metricID) {
if err := idb.storeDateMetricID(date+1, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil && lastError == nil {
lastError = err
continue
}
s.pendingNextDayMetricIDsLock.Lock()
s.pendingNextDayMetricIDs.Add(metricID)
s.pendingNextDayMetricIDsLock.Unlock()
}
continue
}
s.pendingHourEntriesLock.Lock()
@ -1119,7 +1260,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
// Slow path: store the (date, metricID) entry in the indexDB.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil {
if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil && lastError == nil {
lastError = err
continue
}
@ -1280,6 +1421,30 @@ type byDateMetricIDEntry struct {
v uint64set.Set
}
func (s *Storage) updateNextDayMetricIDs() {
date := uint64(timestampFromTime(time.Now())) / msecPerDay
e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry)
s.pendingNextDayMetricIDsLock.Lock()
pendingMetricIDs := s.pendingNextDayMetricIDs
s.pendingNextDayMetricIDs = &uint64set.Set{}
s.pendingNextDayMetricIDsLock.Unlock()
if pendingMetricIDs.Len() == 0 && e.date == date {
// Fast path: nothing to update.
return
}
// Slow path: union pendingMetricIDs with e.v
if e.date == date {
pendingMetricIDs.Union(&e.v)
}
eNew := &byDateMetricIDEntry{
date: date,
v: *pendingMetricIDs,
}
s.nextDayMetricIDs.Store(eNew)
}
func (s *Storage) updateCurrHourMetricIDs() {
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.pendingHourEntriesLock.Lock()