mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
lib/storage: atomically remove snapshot directories
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3038
This commit is contained in:
parent
57ea8dbb36
commit
5f28ca1f42
50
lib/fs/fs.go
50
lib/fs/fs.go
@ -8,8 +8,10 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
@ -193,6 +195,54 @@ func IsEmptyDir(path string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// MustRemoveDirAtomic removes the given dir atomically.
|
||||
//
|
||||
// It uses the following algorithm:
|
||||
//
|
||||
// 1. Atomically rename the "<dir>" to "<dir>.must-remove.<XYZ>",
|
||||
// where <XYZ> is an unique number.
|
||||
// 2. Remove the "<dir>.must-remove.XYZ" in background.
|
||||
//
|
||||
// If the process crashes after the step 1, then the directory must be removed
|
||||
// on the next process start by calling MustRemoveTemporaryDirs.
|
||||
func MustRemoveDirAtomic(dir string) {
|
||||
n := atomic.AddUint64(&atomicDirRemoveCounter, 1)
|
||||
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
|
||||
if err := os.Rename(dir, tmpDir); err != nil {
|
||||
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)
|
||||
}
|
||||
MustRemoveAll(tmpDir)
|
||||
}
|
||||
|
||||
var atomicDirRemoveCounter = uint64(time.Now().UnixNano())
|
||||
|
||||
// MustRemoveTemporaryDirs removes all the subdirectories with ".must-remove.<XYZ>" suffix.
|
||||
//
|
||||
// Such directories may be left on unclean shutdown during MustRemoveDirAtomic.
|
||||
func MustRemoveTemporaryDirs(dir string) {
|
||||
d, err := os.Open(dir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open dir %q: %s", dir, err)
|
||||
}
|
||||
defer MustClose(d)
|
||||
fis, err := d.Readdir(-1)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read dir %q: %s", dir, err)
|
||||
}
|
||||
for _, fi := range fis {
|
||||
if !IsDirOrSymlink(fi) {
|
||||
// Skip non-directories
|
||||
continue
|
||||
}
|
||||
dirName := fi.Name()
|
||||
if strings.Contains(dirName, ".must-remove.") {
|
||||
fullPath := dir + "/" + dirName
|
||||
MustRemoveAll(fullPath)
|
||||
}
|
||||
}
|
||||
MustSyncPath(dir)
|
||||
}
|
||||
|
||||
// MustRemoveAll removes path with all the contents.
|
||||
//
|
||||
// It properly fsyncs the parent directory after path removal.
|
||||
|
@ -195,6 +195,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
||||
if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err)
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(snapshotsPath)
|
||||
|
||||
// Initialize series cardinality limiter.
|
||||
if maxHourlySeries > 0 {
|
||||
@ -239,6 +240,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
||||
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(idbSnapshotsPath)
|
||||
idbCurr, idbPrev, err := s.openIndexDBTables(idbPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err)
|
||||
@ -411,8 +413,8 @@ func (s *Storage) DeleteSnapshot(snapshotName string) error {
|
||||
|
||||
s.tb.MustDeleteSnapshot(snapshotName)
|
||||
idbPath := fmt.Sprintf("%s/indexdb/snapshots/%s", s.path, snapshotName)
|
||||
fs.MustRemoveAll(idbPath)
|
||||
fs.MustRemoveAll(snapshotPath)
|
||||
fs.MustRemoveDirAtomic(idbPath)
|
||||
fs.MustRemoveDirAtomic(snapshotPath)
|
||||
|
||||
logger.Infof("deleted snapshot %q in %.3f seconds", snapshotPath, time.Since(startTime).Seconds())
|
||||
|
||||
@ -2449,6 +2451,7 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
|
||||
d, err := os.Open(path)
|
||||
if err != nil {
|
||||
@ -2494,7 +2497,7 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
|
||||
for _, tn := range tableNames[:len(tableNames)-2] {
|
||||
pathToRemove := path + "/" + tn
|
||||
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
|
||||
fs.MustRemoveAll(pathToRemove)
|
||||
fs.MustRemoveDirAtomic(pathToRemove)
|
||||
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
|
||||
}
|
||||
|
||||
|
@ -107,6 +107,8 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
|
||||
if err := fs.MkdirAllIfNotExist(smallSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", smallSnapshotsPath, err)
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(smallSnapshotsPath)
|
||||
|
||||
bigPartitionsPath := path + "/big"
|
||||
if err := fs.MkdirAllIfNotExist(bigPartitionsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory for big partitions %q: %w", bigPartitionsPath, err)
|
||||
@ -115,6 +117,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
|
||||
if err := fs.MkdirAllIfNotExist(bigSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", bigSnapshotsPath, err)
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(bigSnapshotsPath)
|
||||
|
||||
// Open partitions.
|
||||
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
|
||||
@ -179,9 +182,9 @@ func (tb *table) CreateSnapshot(snapshotName string) (string, string, error) {
|
||||
// MustDeleteSnapshot deletes snapshot with the given snapshotName.
|
||||
func (tb *table) MustDeleteSnapshot(snapshotName string) {
|
||||
smallDir := fmt.Sprintf("%s/small/snapshots/%s", tb.path, snapshotName)
|
||||
fs.MustRemoveAll(smallDir)
|
||||
fs.MustRemoveDirAtomic(smallDir)
|
||||
bigDir := fmt.Sprintf("%s/big/snapshots/%s", tb.path, snapshotName)
|
||||
fs.MustRemoveAll(bigDir)
|
||||
fs.MustRemoveDirAtomic(bigDir)
|
||||
}
|
||||
|
||||
func (tb *table) addPartitionNolock(pt *partition) {
|
||||
|
Loading…
Reference in New Issue
Block a user