mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/storage: optimize bulk import performance when multiple data points are inserted for the same time series
This should speed up `/api/v1/import` and make it more scalable on multi-core systems.
This commit is contained in:
parent
2fba7b6f35
commit
97f70ccda7
@ -803,6 +803,11 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||
}
|
||||
rows = rows[:rowsLen+len(mrs)]
|
||||
j := 0
|
||||
var (
|
||||
// These vars are used for speeding up bulk imports of multiple adjancent rows for the same metricName.
|
||||
prevTSID TSID
|
||||
prevMetricNameRaw []byte
|
||||
)
|
||||
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
|
||||
// Return only the last error, since it has no sense in returning all errors.
|
||||
var lastWarn error
|
||||
@ -830,9 +835,17 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||
r.Timestamp = mr.Timestamp
|
||||
r.Value = mr.Value
|
||||
r.PrecisionBits = precisionBits
|
||||
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
|
||||
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
|
||||
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
|
||||
r.TSID = prevTSID
|
||||
continue
|
||||
}
|
||||
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
|
||||
if !dmis.Has(r.TSID.MetricID) {
|
||||
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
|
||||
prevTSID = r.TSID
|
||||
prevMetricNameRaw = mr.MetricNameRaw
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -890,6 +903,12 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
|
||||
var date uint64
|
||||
var hour uint64
|
||||
var prevTimestamp int64
|
||||
var (
|
||||
// These vars are used for speeding up bulk imports when multiple adjancent rows
|
||||
// contain the same (metricID, date) pairs.
|
||||
prevMatchedDate uint64
|
||||
prevMatchedMetricID uint64
|
||||
)
|
||||
idb := s.idb()
|
||||
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
for i := range rows {
|
||||
@ -913,8 +932,14 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
|
||||
}
|
||||
|
||||
// Slower path: check global cache for (date, metricID) entry.
|
||||
if metricID == prevMatchedMetricID && date == prevMatchedDate {
|
||||
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
|
||||
continue
|
||||
}
|
||||
if s.dateMetricIDCache.Has(date, metricID) {
|
||||
// The metricID has been already added to per-day inverted index.
|
||||
prevMatchedDate = date
|
||||
prevMatchedMetricID = metricID
|
||||
continue
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user