lib/storage: tune the logic for pre-populating of the per-day inverted index for the next day

- Postpone the pre-poulation to the last hour of the current day. This should reduce the number
  of useless entries in the next per-day index, which shouldn't be created there,
  when the corresponding time series are stopped to be pushed during the current day.

- Make the pre-population more smooth in time by using the hash of MetricID instead of MetricID itself
  when calculating the need for for the given MetricID pre-population.

- Sync the logic for pre-population of the next day inverted index with the logic of pre-populating tsid cache
  after indexdb rotation. This should improve code maintainability.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
This commit is contained in:
Aliaksandr Valialkin 2022-02-12 16:28:46 +02:00
parent 989668beba
commit 53c2135d2a
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 30 additions and 17 deletions

View File

@ -378,13 +378,14 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
// //
// It returns true if new index entry was created, and false if it was skipped. // It returns true if new index entry was created, and false if it was skipped.
func (db *indexDB) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte) (bool, error) { func (db *indexDB) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte) (bool, error) {
h := xxhash.Sum64(metricNameRaw)
p := float64(uint32(h)) / (1 << 32)
pMin := float64(fasttime.UnixTimestamp()-db.rotationTimestamp) / 3600 pMin := float64(fasttime.UnixTimestamp()-db.rotationTimestamp) / 3600
if pMin < 1 {
p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32)
if p > pMin { if p > pMin {
// Fast path: there is no need creating indexes for metricNameRaw yet. // Fast path: there is no need creating indexes for metricNameRaw yet.
return false, nil return false, nil
} }
}
// Slow path: create indexes for (tsid, metricNameRaw) at db. // Slow path: create indexes for (tsid, metricNameRaw) at db.
mn := GetMetricName() mn := GetMetricName()
if err := mn.UnmarshalRaw(metricNameRaw); err != nil { if err := mn.UnmarshalRaw(metricNameRaw); err != nil {

View File

@ -2099,7 +2099,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
hmPrevDate := hmPrev.hour / 24 hmPrevDate := hmPrev.hour / 24
nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v
todayShare16bit := uint64((float64(fasttime.UnixTimestamp()%(3600*24)) / (3600 * 24)) * (1 << 16)) ts := fasttime.UnixTimestamp()
// Start pre-populating the next per-day inverted index during the last hour of the current day.
// pMin linearly increases from 0 to 1 during the last hour of the day.
pMin := (float64(ts%(3600*24)) / 3600) - 23
type pendingDateMetricID struct { type pendingDateMetricID struct {
date uint64 date uint64
metricID uint64 metricID uint64
@ -2130,12 +2133,13 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
// Fast path: the metricID is in the current hour cache. // Fast path: the metricID is in the current hour cache.
// This means the metricID has been already added to per-day inverted index. // This means the metricID has been already added to per-day inverted index.
// Gradually pre-populate per-day inverted index for the next day // Gradually pre-populate per-day inverted index for the next day during the last hour of the current day.
// during the current day.
// This should reduce CPU usage spike and slowdown at the beginning of the next day // This should reduce CPU usage spike and slowdown at the beginning of the next day
// when entries for all the active time series must be added to the index. // when entries for all the active time series must be added to the index.
// This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 . // This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 .
if todayShare16bit > (metricID&(1<<16-1)) && !nextDayMetricIDs.Has(metricID) { if pMin > 0 {
p := float64(uint32(fastHashUint64(metricID))) / (1 << 32)
if p < pMin && !nextDayMetricIDs.Has(metricID) {
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{ pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date + 1, date: date + 1,
metricID: metricID, metricID: metricID,
@ -2145,6 +2149,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}) })
pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID) pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
} }
}
continue continue
} }
e := pendingHourMetricIDEntry{ e := pendingHourMetricIDEntry{
@ -2253,6 +2258,13 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
return firstError return firstError
} }
func fastHashUint64(x uint64) uint64 {
x ^= x >> 12 // a
x ^= x << 25 // b
x ^= x >> 27 // c
return x * 2685821657736338717
}
// dateMetricIDCache is fast cache for holding (date, metricID) entries. // dateMetricIDCache is fast cache for holding (date, metricID) entries.
// //
// It should be faster than map[date]*uint64set.Set on multicore systems. // It should be faster than map[date]*uint64set.Set on multicore systems.