From 53c2135d2a690e542b4f281f4705cc716956815c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 12 Feb 2022 16:28:46 +0200 Subject: [PATCH] lib/storage: tune the logic for pre-populating of the per-day inverted index for the next day - Postpone the pre-poulation to the last hour of the current day. This should reduce the number of useless entries in the next per-day index, which shouldn't be created there, when the corresponding time series are stopped to be pushed during the current day. - Make the pre-population more smooth in time by using the hash of MetricID instead of MetricID itself when calculating the need for for the given MetricID pre-population. - Sync the logic for pre-population of the next day inverted index with the logic of pre-populating tsid cache after indexdb rotation. This should improve code maintainability. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 --- lib/storage/index_db.go | 11 ++++++----- lib/storage/storage.go | 36 ++++++++++++++++++++++++------------ 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 71394a28cd..2bff658946 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -378,12 +378,13 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) { // // It returns true if new index entry was created, and false if it was skipped. func (db *indexDB) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte) (bool, error) { - h := xxhash.Sum64(metricNameRaw) - p := float64(uint32(h)) / (1 << 32) pMin := float64(fasttime.UnixTimestamp()-db.rotationTimestamp) / 3600 - if p > pMin { - // Fast path: there is no need creating indexes for metricNameRaw yet. - return false, nil + if pMin < 1 { + p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32) + if p > pMin { + // Fast path: there is no need creating indexes for metricNameRaw yet. + return false, nil + } } // Slow path: create indexes for (tsid, metricNameRaw) at db. mn := GetMetricName() diff --git a/lib/storage/storage.go b/lib/storage/storage.go index e05283848d..f8afceb6c1 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -2099,7 +2099,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) hmPrevDate := hmPrev.hour / 24 nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v - todayShare16bit := uint64((float64(fasttime.UnixTimestamp()%(3600*24)) / (3600 * 24)) * (1 << 16)) + ts := fasttime.UnixTimestamp() + // Start pre-populating the next per-day inverted index during the last hour of the current day. + // pMin linearly increases from 0 to 1 during the last hour of the day. + pMin := (float64(ts%(3600*24)) / 3600) - 23 type pendingDateMetricID struct { date uint64 metricID uint64 @@ -2130,20 +2133,22 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { // Fast path: the metricID is in the current hour cache. // This means the metricID has been already added to per-day inverted index. - // Gradually pre-populate per-day inverted index for the next day - // during the current day. + // Gradually pre-populate per-day inverted index for the next day during the last hour of the current day. // This should reduce CPU usage spike and slowdown at the beginning of the next day // when entries for all the active time series must be added to the index. // This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 . - if todayShare16bit > (metricID&(1<<16-1)) && !nextDayMetricIDs.Has(metricID) { - pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{ - date: date + 1, - metricID: metricID, - accountID: r.TSID.AccountID, - projectID: r.TSID.ProjectID, - mr: mrs[i], - }) - pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID) + if pMin > 0 { + p := float64(uint32(fastHashUint64(metricID))) / (1 << 32) + if p < pMin && !nextDayMetricIDs.Has(metricID) { + pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{ + date: date + 1, + metricID: metricID, + accountID: r.TSID.AccountID, + projectID: r.TSID.ProjectID, + mr: mrs[i], + }) + pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID) + } } continue } @@ -2253,6 +2258,13 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { return firstError } +func fastHashUint64(x uint64) uint64 { + x ^= x >> 12 // a + x ^= x << 25 // b + x ^= x >> 27 // c + return x * 2685821657736338717 +} + // dateMetricIDCache is fast cache for holding (date, metricID) entries. // // It should be faster than map[date]*uint64set.Set on multicore systems.