mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
lib/storage: reset MetricName->TSID
cache after marking metricIDs as deleted
This is a follow-up commit after 12b16077c4
,
which didn't reset the `tsidCache` in all the required places.
This could result in indefinite errors like:
missing metricName by metricID ...; this could be the case after unclean shutdown; deleting the metricID, so it could be re-created next time
Fix this by resetting the cache inside deleteMetricIDs function.
This commit is contained in:
parent
0e7b2008b2
commit
be0ab4fbfe
@ -128,6 +128,9 @@ type indexDB struct {
|
||||
// Cache for fast MetricID -> MetricName lookup.
|
||||
metricNameCache *workingsetcache.Cache
|
||||
|
||||
// Cache for fast MetricName -> TSID lookups.
|
||||
tsidCache *workingsetcache.Cache
|
||||
|
||||
// Cache for useless TagFilters entries, which have no tag filters
|
||||
// matching low number of metrics.
|
||||
uselessTagFiltersCache *workingsetcache.Cache
|
||||
@ -155,13 +158,16 @@ type indexDB struct {
|
||||
}
|
||||
|
||||
// openIndexDB opens index db from the given path with the given caches.
|
||||
func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) {
|
||||
func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) {
|
||||
if metricIDCache == nil {
|
||||
logger.Panicf("BUG: metricIDCache must be non-nil")
|
||||
}
|
||||
if metricNameCache == nil {
|
||||
logger.Panicf("BUG: metricNameCache must be non-nil")
|
||||
}
|
||||
if tsidCache == nil {
|
||||
logger.Panicf("BUG: tsidCache must be nin-nil")
|
||||
}
|
||||
if currHourMetricIDs == nil {
|
||||
logger.Panicf("BUG: currHourMetricIDs must be non-nil")
|
||||
}
|
||||
@ -187,6 +193,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
|
||||
tagCache: workingsetcache.New(mem/32, time.Hour),
|
||||
metricIDCache: metricIDCache,
|
||||
metricNameCache: metricNameCache,
|
||||
tsidCache: tsidCache,
|
||||
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
|
||||
metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
|
||||
|
||||
@ -360,6 +367,7 @@ func (db *indexDB) decRef() {
|
||||
db.tagCache = nil
|
||||
db.metricIDCache = nil
|
||||
db.metricNameCache = nil
|
||||
db.tsidCache = nil
|
||||
db.uselessTagFiltersCache = nil
|
||||
db.metricIDsPerDateTagFilterCache = nil
|
||||
|
||||
@ -1233,6 +1241,9 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
|
||||
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
|
||||
invalidateTagCache()
|
||||
|
||||
// Reset MetricName -> TSID cache, since it may contain deleted TSIDs.
|
||||
db.tsidCache.Reset()
|
||||
|
||||
// Do not reset uselessTagFiltersCache, since the found metricIDs
|
||||
// on cache miss are filtered out later with deletedMetricIDs.
|
||||
return nil
|
||||
@ -1515,7 +1526,7 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
|
||||
tsids := make([]TSID, len(metricIDs))
|
||||
i := 0
|
||||
for _, metricID := range metricIDs {
|
||||
// Try obtaining TSIDs from db.tsidCache. This is much faster
|
||||
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
|
||||
// than scanning the mergeset if it contains a lot of metricIDs.
|
||||
tsid := &tsids[i]
|
||||
err := is.db.getFromMetricIDCache(tsid, metricID)
|
||||
|
@ -466,8 +466,10 @@ func TestMarshalUnmarshalTSIDs(t *testing.T) {
|
||||
func TestIndexDBOpenClose(t *testing.T) {
|
||||
metricIDCache := workingsetcache.New(1234, time.Hour)
|
||||
metricNameCache := workingsetcache.New(1234, time.Hour)
|
||||
tsidCache := workingsetcache.New(1234, time.Hour)
|
||||
defer metricIDCache.Stop()
|
||||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
@ -475,7 +477,7 @@ func TestIndexDBOpenClose(t *testing.T) {
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
@ -494,8 +496,10 @@ func TestIndexDB(t *testing.T) {
|
||||
t.Run("serial", func(t *testing.T) {
|
||||
metricIDCache := workingsetcache.New(1234, time.Hour)
|
||||
metricNameCache := workingsetcache.New(1234, time.Hour)
|
||||
tsidCache := workingsetcache.New(1234, time.Hour)
|
||||
defer metricIDCache.Stop()
|
||||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
@ -503,7 +507,7 @@ func TestIndexDB(t *testing.T) {
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
dbName := "test-index-db-serial"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
@ -533,7 +537,7 @@ func TestIndexDB(t *testing.T) {
|
||||
|
||||
// Re-open the db and verify it works as expected.
|
||||
db.MustClose()
|
||||
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
|
||||
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
@ -551,8 +555,10 @@ func TestIndexDB(t *testing.T) {
|
||||
t.Run("concurrent", func(t *testing.T) {
|
||||
metricIDCache := workingsetcache.New(1234, time.Hour)
|
||||
metricNameCache := workingsetcache.New(1234, time.Hour)
|
||||
tsidCache := workingsetcache.New(1234, time.Hour)
|
||||
defer metricIDCache.Stop()
|
||||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
@ -560,7 +566,7 @@ func TestIndexDB(t *testing.T) {
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
dbName := "test-index-db-concurrent"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
@ -1532,8 +1538,10 @@ func TestMatchTagFilters(t *testing.T) {
|
||||
func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||
metricIDCache := workingsetcache.New(1234, time.Hour)
|
||||
metricNameCache := workingsetcache.New(1234, time.Hour)
|
||||
tsidCache := workingsetcache.New(1234, time.Hour)
|
||||
defer metricIDCache.Stop()
|
||||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
currMetricIDs := &hourMetricIDs{
|
||||
isFull: true,
|
||||
@ -1551,7 +1559,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||
hmPrev.Store(prevMetricIDs)
|
||||
|
||||
dbName := "test-index-db-ts-range"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -45,8 +45,10 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
||||
|
||||
metricIDCache := workingsetcache.New(1234, time.Hour)
|
||||
metricNameCache := workingsetcache.New(1234, time.Hour)
|
||||
tsidCache := workingsetcache.New(1234, time.Hour)
|
||||
defer metricIDCache.Stop()
|
||||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
@ -54,7 +56,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
const dbName = "bench-index-db-add-tsids"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
@ -116,8 +118,10 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
||||
// See https://www.robustperception.io/evaluating-performance-and-correctness for more details.
|
||||
metricIDCache := workingsetcache.New(1234, time.Hour)
|
||||
metricNameCache := workingsetcache.New(1234, time.Hour)
|
||||
tsidCache := workingsetcache.New(1234, time.Hour)
|
||||
defer metricIDCache.Stop()
|
||||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
@ -125,7 +129,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
const dbName = "bench-head-posting-for-matchers"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
@ -302,8 +306,10 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
||||
func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||
metricIDCache := workingsetcache.New(1234, time.Hour)
|
||||
metricNameCache := workingsetcache.New(1234, time.Hour)
|
||||
tsidCache := workingsetcache.New(1234, time.Hour)
|
||||
defer metricIDCache.Stop()
|
||||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
@ -311,7 +317,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
const dbName = "bench-index-db-get-tsids"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -176,7 +176,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
||||
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
|
||||
}
|
||||
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, &s.currHourMetricIDs, &s.prevHourMetricIDs)
|
||||
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err)
|
||||
}
|
||||
@ -552,7 +552,7 @@ func (s *Storage) mustRotateIndexDB() {
|
||||
// Create new indexdb table.
|
||||
newTableName := nextIndexDBTableName()
|
||||
idbNewPath := s.path + "/indexdb/" + newTableName
|
||||
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, &s.currHourMetricIDs, &s.prevHourMetricIDs)
|
||||
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
|
||||
}
|
||||
@ -942,11 +942,10 @@ func (s *Storage) DeleteMetrics(tfss []*TagFilters) (int, error) {
|
||||
if err != nil {
|
||||
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
||||
}
|
||||
// Reset MetricName->TSID cache in order to prevent from adding new data points
|
||||
// to deleted time series in Storage.add.
|
||||
s.tsidCache.Reset()
|
||||
// Do not reset MetricName->TSID cache in order to prevent from adding new data points
|
||||
// to deleted time series in Storage.add, since it is already reset inside DeleteTSIDs.
|
||||
|
||||
// Do not reset MetricID -> MetricName cache, since it must be used only
|
||||
// Do not reset MetricID->MetricName cache, since it must be used only
|
||||
// after filtering out deleted metricIDs.
|
||||
|
||||
return deletedCount, nil
|
||||
@ -1730,7 +1729,8 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) {
|
||||
s.tsidCache.Set(metricName, buf)
|
||||
}
|
||||
|
||||
func openIndexDBTables(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) {
|
||||
func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache,
|
||||
currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) {
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
@ -1789,12 +1789,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache *workingsetca
|
||||
// Open the last two tables.
|
||||
currPath := path + "/" + tableNames[len(tableNames)-1]
|
||||
|
||||
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, currHourMetricIDs, prevHourMetricIDs)
|
||||
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
|
||||
}
|
||||
prevPath := path + "/" + tableNames[len(tableNames)-2]
|
||||
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, currHourMetricIDs, prevHourMetricIDs)
|
||||
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs)
|
||||
if err != nil {
|
||||
curr.MustClose()
|
||||
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
|
||||
|
Loading…
Reference in New Issue
Block a user