lib/storage: optimize ingestion pefrormance for new time series

This commit is contained in:
Aliaksandr Valialkin 2020-05-14 23:45:04 +03:00
parent 6838fa876c
commit 67e331ac62
3 changed files with 208 additions and 70 deletions

View File

@ -2568,19 +2568,7 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
return nil, false
}
func (db *indexDB) storeDateMetricID(date, metricID uint64, accountID, projectID uint32) error {
is := db.getIndexSearch()
ok, err := is.hasDateMetricID(date, metricID, accountID, projectID)
db.putIndexSearch(is)
if err != nil {
return err
}
if ok {
// Fast path: the (date, metricID) entry already exists in the db.
return nil
}
// Slow path: create (date, metricID) entries.
func (is *indexSearch) storeDateMetricID(date, metricID uint64, accountID, projectID uint32) error {
items := getIndexItems()
defer putIndexItems(items)
@ -2594,12 +2582,16 @@ func (db *indexDB) storeDateMetricID(date, metricID uint64, accountID, projectID
defer kbPool.Put(kb)
mn := GetMetricName()
defer PutMetricName(mn)
kb.B, err = db.searchMetricName(kb.B[:0], metricID, accountID, projectID)
var err error
// There is no need in searching for metric name in is.db.extDB,
// Since the storeDateMetricID function is called only after the metricID->metricName
// is added into the current is.db.
kb.B, err = is.searchMetricName(kb.B[:0], metricID, accountID, projectID)
if err != nil {
if err == io.EOF {
logger.Errorf("missing metricName by metricID %d; this could be the case after unclean shutdown; "+
"deleting the metricID, so it could be re-created next time", metricID)
if err := db.deleteMetricIDs([]uint64{metricID}); err != nil {
if err := is.db.deleteMetricIDs([]uint64{metricID}); err != nil {
return fmt.Errorf("cannot delete metricID %d after unclean shutdown: %s", metricID, err)
}
return nil
@ -2624,8 +2616,7 @@ func (db *indexDB) storeDateMetricID(date, metricID uint64, accountID, projectID
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
}
if err = db.tb.AddItems(items.Items); err != nil {
if err = is.db.tb.AddItems(items.Items); err != nil {
return fmt.Errorf("cannot add per-day entires for metricID %d: %s", metricID, err)
}
return nil

View File

@ -711,7 +711,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
date := uint64(timestampFromTime(time.Now())) / msecPerDay
for i := range tsids {
tsid := &tsids[i]
if err := db.storeDateMetricID(date, tsid.MetricID, tsid.AccountID, tsid.ProjectID); err != nil {
if err := is.storeDateMetricID(date, tsid.MetricID, tsid.AccountID, tsid.ProjectID); err != nil {
return nil, nil, fmt.Errorf("error in storeDateMetricID(%d, %d, %d, %d): %s", date, tsid.MetricID, tsid.AccountID, tsid.ProjectID, err)
}
}
@ -1555,7 +1555,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
date := baseDate - uint64(day*msecPerDay)
for i := range tsids {
tsid := &tsids[i]
if err := db.storeDateMetricID(date, tsid.MetricID, tsid.AccountID, tsid.ProjectID); err != nil {
if err := is.storeDateMetricID(date, tsid.MetricID, tsid.AccountID, tsid.ProjectID); err != nil {
t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
}
}

View File

@ -15,7 +15,6 @@ import (
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -559,6 +558,8 @@ func (s *Storage) mustRotateIndexDB() {
// Do not flush metricIDCache and metricNameCache, since all the metricIDs
// from prev idb remain valid after the rotation.
// There is no need in resetting nextDayMetricIDs, since it should be automatically reset every day.
}
// MustClose closes the storage.
@ -1083,10 +1084,6 @@ var (
)
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
var is *indexSearch
var mn *MetricName
var kb *bytesutil.ByteBuffer
idb := s.idb()
dmis := idb.getDeletedMetricIDs()
rowsLen := len(rows)
@ -1100,6 +1097,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
prevTSID TSID
prevMetricNameRaw []byte
)
var pmrs *pendingMetricRows
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
// Return only the first error, since it has no sense in returning all errors.
var firstWarn error
@ -1146,44 +1144,68 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
continue
}
// Slow path - the TSID is missing in the cache. Search for it in the index.
if is == nil {
is = idb.getIndexSearch()
mn = GetMetricName()
kb = kbPool.Get()
// Slow path - the TSID is missing in the cache.
// Postpone its search in the loop below.
j--
if pmrs == nil {
pmrs = getPendingMetricRows()
}
if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
if err := pmrs.addRow(mr); err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err)
firstWarn = err
}
j--
continue
}
mn.sortTags()
kb.B = mn.Marshal(kb.B[:0])
if err := is.GetOrCreateTSIDByName(&r.TSID, kb.B); err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err)
}
if pmrs != nil {
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs.pmrs
sort.Slice(pendingMetricRows, func(i, j int) bool {
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
})
is := idb.getIndexSearch()
prevMetricNameRaw = nil
for i := range pendingMetricRows {
pmr := &pendingMetricRows[i]
mr := &pmr.mr
r := &rows[rowsLen+j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r.TSID = prevTSID
continue
}
j--
continue
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) && !dmis.Has(r.TSID.MetricID) {
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
continue
}
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot obtain or create TSID for MetricName %q: %s", pmr.MetricName, err)
}
j--
continue
}
s.putTSIDToCache(&r.TSID, mr.MetricNameRaw)
}
s.putTSIDToCache(&r.TSID, mr.MetricNameRaw)
idb.putIndexSearch(is)
putPendingMetricRows(pmrs)
}
if firstWarn != nil {
logger.Errorf("warn occurred during rows addition: %s", firstWarn)
}
if is != nil {
kbPool.Put(kb)
PutMetricName(mn)
idb.putIndexSearch(is)
}
rows = rows[:rowsLen+j]
var firstError error
@ -1199,21 +1221,87 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
return rows, nil
}
type pendingMetricRow struct {
MetricName []byte
mr MetricRow
}
type pendingMetricRows struct {
pmrs []pendingMetricRow
metricNamesBuf []byte
lastMetricNameRaw []byte
lastMetricName []byte
mn MetricName
}
func (pmrs *pendingMetricRows) reset() {
for _, pmr := range pmrs.pmrs {
pmr.MetricName = nil
pmr.mr.MetricNameRaw = nil
}
pmrs.pmrs = pmrs.pmrs[:0]
pmrs.metricNamesBuf = pmrs.metricNamesBuf[:0]
pmrs.lastMetricNameRaw = nil
pmrs.lastMetricName = nil
pmrs.mn.Reset()
}
func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error {
// Do not spend CPU time on re-calculating canonical metricName during bulk import
// of many rows for the same metric.
if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) {
if err := pmrs.mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err)
}
pmrs.mn.sortTags()
metricNamesBufLen := len(pmrs.metricNamesBuf)
pmrs.metricNamesBuf = pmrs.mn.Marshal(pmrs.metricNamesBuf)
pmrs.lastMetricName = pmrs.metricNamesBuf[metricNamesBufLen:]
pmrs.lastMetricNameRaw = mr.MetricNameRaw
}
pmrs.pmrs = append(pmrs.pmrs, pendingMetricRow{
MetricName: pmrs.lastMetricName,
mr: *mr,
})
return nil
}
func getPendingMetricRows() *pendingMetricRows {
v := pendingMetricRowsPool.Get()
if v == nil {
v = &pendingMetricRows{}
}
return v.(*pendingMetricRows)
}
func putPendingMetricRows(pmrs *pendingMetricRows) {
pmrs.reset()
pendingMetricRowsPool.Put(pmrs)
}
var pendingMetricRowsPool sync.Pool
func (s *Storage) updatePerDateData(rows []rawRow) error {
var firstError error
var date uint64
var hour uint64
var prevTimestamp int64
var (
// These vars are used for speeding up bulk imports when multiple adjancent rows
// contain the same (metricID, date) pairs.
prevMatchedDate uint64
prevMatchedMetricID uint64
prevDate uint64
prevMetricID uint64
)
idb := s.idb()
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v
todayShare16bit := uint64((float64(fasttime.UnixTimestamp()%(3600*24)) / (3600 * 24)) * (1 << 16))
type pendingDateMetricID struct {
date uint64
metricID uint64
accountID uint32
projectID uint32
}
var pendingDateMetricIDs []pendingDateMetricID
for i := range rows {
r := &rows[i]
if r.Timestamp != prevTimestamp {
@ -1234,12 +1322,12 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
// when entries for all the active time series must be added to the index.
// This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 .
if todayShare16bit > (metricID&(1<<16-1)) && !nextDayMetricIDs.Has(metricID) {
if err := idb.storeDateMetricID(date+1, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil {
if lastError == nil {
lastError = err
}
continue
}
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date + 1,
metricID: metricID,
accountID: r.TSID.AccountID,
projectID: r.TSID.ProjectID,
})
s.pendingNextDayMetricIDsLock.Lock()
s.pendingNextDayMetricIDs.Add(metricID)
s.pendingNextDayMetricIDsLock.Unlock()
@ -1257,27 +1345,86 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
}
// Slower path: check global cache for (date, metricID) entry.
if metricID == prevMatchedMetricID && date == prevMatchedDate {
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
if s.dateMetricIDCache.Has(date, metricID) {
// The metricID has been already added to per-day inverted index.
prevMatchedDate = date
prevMatchedMetricID = metricID
prevDate = date
prevMetricID = metricID
if !s.dateMetricIDCache.Has(date, metricID) {
// Slow path: store the (date, metricID) entry in the indexDB.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date,
metricID: metricID,
accountID: r.TSID.AccountID,
projectID: r.TSID.ProjectID,
})
}
}
if len(pendingDateMetricIDs) == 0 {
// Fast path - there are no new (date, metricID) entires in rows.
return nil
}
// Slow path - add new (date, metricID) entries to indexDB.
// Sort pendingDateMetricIDs by (accountID, projectID, date, metricID) in order to speed up `is` search in the loop below.
sort.Slice(pendingDateMetricIDs, func(i, j int) bool {
a := pendingDateMetricIDs[i]
b := pendingDateMetricIDs[j]
if a.accountID != b.projectID {
return a.accountID < b.accountID
}
if a.projectID != b.projectID {
return a.projectID < b.projectID
}
if a.date != b.date {
return a.date < b.date
}
return a.metricID < b.metricID
})
idb := s.idb()
is := idb.getIndexSearch()
defer idb.putIndexSearch(is)
var firstError error
prevMetricID = 0
prevDate = 0
for _, dateMetricID := range pendingDateMetricIDs {
date := dateMetricID.date
metricID := dateMetricID.metricID
accountID := dateMetricID.accountID
projectID := dateMetricID.projectID
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
prevDate = date
prevMetricID = metricID
// Slow path: store the (date, metricID) entry in the indexDB.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil {
if lastError == nil {
lastError = err
if s.dateMetricIDCache.Has(date, metricID) {
// The metricID has been already added to per-day inverted index.
continue
}
ok, err := is.hasDateMetricID(date, metricID, accountID, projectID)
if err != nil {
if firstError == nil {
firstError = fmt.Errorf("error when locating (date=%d, metricID=%d, accountID=%d, projectID=%d) in database: %s",
date, metricID, accountID, projectID, err)
}
continue
}
if !ok {
// The (date, metricID) entry is missing in the indexDB. Add it there.
if err := is.storeDateMetricID(date, metricID, accountID, projectID); err != nil {
if firstError == nil {
firstError = fmt.Errorf("error when storing (date=%d, metricID=%d) in database: %s", date, metricID, err)
}
continue
}
}
// The metric must be added to cache only after it has been successfully added to indexDB.
s.dateMetricIDCache.Set(date, metricID)
}