lib/storage: optimize data ingestion in the beginning of every hour

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1046
This commit is contained in:
Aliaksandr Valialkin 2021-02-08 12:00:44 +02:00
parent 8f28a578d3
commit 2242647a04

View File

@ -1582,6 +1582,8 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
metricID uint64
}
var pendingDateMetricIDs []pendingDateMetricID
var pendingNextDayMetricIDs []uint64
var pendingHourEntries []uint64
for i := range rows {
r := &rows[i]
if r.Timestamp != prevTimestamp {
@ -1606,15 +1608,11 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
date: date + 1,
metricID: metricID,
})
s.pendingNextDayMetricIDsLock.Lock()
s.pendingNextDayMetricIDs.Add(metricID)
s.pendingNextDayMetricIDsLock.Unlock()
pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
}
continue
}
s.pendingHourEntriesLock.Lock()
s.pendingHourEntries.Add(metricID)
s.pendingHourEntriesLock.Unlock()
pendingHourEntries = append(pendingHourEntries, metricID)
if date == hmPrevDate && hmPrev.m.Has(metricID) {
// The metricID is already registered for the current day on the previous hour.
continue
@ -1639,6 +1637,16 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
})
}
}
if len(pendingNextDayMetricIDs) > 0 {
s.pendingNextDayMetricIDsLock.Lock()
s.pendingNextDayMetricIDs.AddMulti(pendingNextDayMetricIDs)
s.pendingNextDayMetricIDsLock.Unlock()
}
if len(pendingHourEntries) > 0 {
s.pendingHourEntriesLock.Lock()
s.pendingHourEntries.AddMulti(pendingHourEntries)
s.pendingHourEntriesLock.Unlock()
}
if len(pendingDateMetricIDs) == 0 {
// Fast path - there are no new (date, metricID) entires in rows.
return nil