mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-13 13:11:37 +01:00
lib/fs: consolidate *RemoveAll* funcs into a single MustRemoveAll func
The func syncs parent dir in order to persist directory removal in the event of power loss
This commit is contained in:
parent
28d9904efc
commit
18d6f293f7
@ -22,9 +22,7 @@ func InitTmpBlocksDir(tmpDirPath string) {
|
||||
tmpDirPath = os.TempDir()
|
||||
}
|
||||
tmpBlocksDir = tmpDirPath + "/searchResults"
|
||||
if err := fs.RemoveAllHard(tmpBlocksDir); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", tmpBlocksDir, err)
|
||||
}
|
||||
fs.MustRemoveAll(tmpBlocksDir)
|
||||
if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil {
|
||||
logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err)
|
||||
}
|
||||
|
36
lib/fs/fs.go
36
lib/fs/fs.go
@ -172,9 +172,7 @@ func RemoveDirContents(dir string) {
|
||||
continue
|
||||
}
|
||||
fullPath := dir + "/" + name
|
||||
if err := RemoveAllHard(fullPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", fullPath, err)
|
||||
}
|
||||
MustRemoveAll(fullPath)
|
||||
}
|
||||
MustSyncPath(dir)
|
||||
}
|
||||
@ -198,31 +196,20 @@ func IsPathExist(path string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// MustRemoveAllSynced removes path with all the contents
|
||||
// and syncs the parent directory, so it no longer contains the path.
|
||||
func MustRemoveAllSynced(path string) {
|
||||
MustRemoveAll(path)
|
||||
parentDirPath := filepath.Dir(path)
|
||||
MustSyncPath(parentDirPath)
|
||||
}
|
||||
|
||||
// MustRemoveAll removes path with all the contents.
|
||||
func MustRemoveAll(path string) {
|
||||
if err := RemoveAllHard(path); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveAllHard removes path with all the contents.
|
||||
//
|
||||
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
func RemoveAllHard(path string) error {
|
||||
func MustRemoveAll(path string) {
|
||||
err := os.RemoveAll(path)
|
||||
if err == nil {
|
||||
return nil
|
||||
// Make sure the parent directory doesn't contain references
|
||||
// to the current directory.
|
||||
parentDirPath := filepath.Dir(path)
|
||||
MustSyncPath(parentDirPath)
|
||||
return
|
||||
}
|
||||
if !isTemporaryNFSError(err) {
|
||||
return err
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||
}
|
||||
// NFS prevents from removing directories with open files.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
@ -230,9 +217,8 @@ func RemoveAllHard(path string) error {
|
||||
select {
|
||||
case removeDirCh <- path:
|
||||
default:
|
||||
return fmt.Errorf("cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var removeDirCh = make(chan string, 1024)
|
||||
@ -257,6 +243,10 @@ func dirRemover() {
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
// Make sure the parent directory doesn't contain references
|
||||
// to the current directory.
|
||||
parentDirPath := filepath.Dir(path)
|
||||
MustSyncPath(parentDirPath)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -846,17 +846,13 @@ func openParts(path string) ([]*partWrapper, error) {
|
||||
}
|
||||
|
||||
txnDir := path + "/txn"
|
||||
if err := fs.RemoveAllHard(txnDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot remove %q: %s", txnDir, err)
|
||||
}
|
||||
fs.MustRemoveAll(txnDir)
|
||||
if err := fs.MkdirAllFailIfExist(txnDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %s", txnDir, err)
|
||||
}
|
||||
|
||||
tmpDir := path + "/tmp"
|
||||
if err := fs.RemoveAllHard(tmpDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot remove %q: %s", tmpDir, err)
|
||||
}
|
||||
fs.MustRemoveAll(tmpDir)
|
||||
if err := fs.MkdirAllFailIfExist(tmpDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %s", tmpDir, err)
|
||||
}
|
||||
@ -1033,9 +1029,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid path to remove: %s", err)
|
||||
}
|
||||
if err := fs.RemoveAllHard(path); err != nil {
|
||||
return fmt.Errorf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
fs.MustRemoveAll(path)
|
||||
}
|
||||
|
||||
// Move the new part to new directory.
|
||||
|
@ -277,9 +277,7 @@ func (db *indexDB) decRef() {
|
||||
}
|
||||
|
||||
logger.Infof("dropping indexDB %q", tbPath)
|
||||
if err := fs.RemoveAllHard(tbPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", tbPath, err)
|
||||
}
|
||||
fs.MustRemoveAll(tbPath)
|
||||
logger.Infof("indexDB %q has been dropped", tbPath)
|
||||
}
|
||||
|
||||
|
@ -212,14 +212,8 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
|
||||
// The pt must be detached from table before calling pt.Drop.
|
||||
func (pt *partition) Drop() {
|
||||
logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath)
|
||||
|
||||
if err := fs.RemoveAllHard(pt.smallPartsPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove small parts directory %q: %s", pt.smallPartsPath, err)
|
||||
}
|
||||
if err := fs.RemoveAllHard(pt.bigPartsPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove big parts directory %q: %s", pt.bigPartsPath, err)
|
||||
}
|
||||
|
||||
fs.MustRemoveAll(pt.smallPartsPath)
|
||||
fs.MustRemoveAll(pt.bigPartsPath)
|
||||
logger.Infof("partition %q has been dropped", pt.name)
|
||||
}
|
||||
|
||||
@ -1223,13 +1217,9 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
|
||||
}
|
||||
|
||||
txnDir := path + "/txn"
|
||||
if err := fs.RemoveAllHard(txnDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot delete transaction directory %q: %s", txnDir, err)
|
||||
}
|
||||
fs.MustRemoveAll(txnDir)
|
||||
tmpDir := path + "/tmp"
|
||||
if err := fs.RemoveAllHard(tmpDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot remove temporary directory %q: %s", tmpDir, err)
|
||||
}
|
||||
fs.MustRemoveAll(tmpDir)
|
||||
if err := createPartitionDirs(path); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directories for partition %q: %s", path, err)
|
||||
}
|
||||
@ -1408,9 +1398,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid path to remove: %s", err)
|
||||
}
|
||||
if err := fs.RemoveAllHard(path); err != nil {
|
||||
return fmt.Errorf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
fs.MustRemoveAll(path)
|
||||
}
|
||||
|
||||
// Move the new part to new directory.
|
||||
@ -1438,9 +1426,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
|
||||
}
|
||||
} else {
|
||||
// Just remove srcPath.
|
||||
if err := fs.RemoveAllHard(srcPath); err != nil {
|
||||
return fmt.Errorf("cannot remove %q: %s", srcPath, err)
|
||||
}
|
||||
fs.MustRemoveAll(srcPath)
|
||||
}
|
||||
|
||||
// Flush pathPrefix* directory metadata to the underying storage.
|
||||
|
@ -246,8 +246,8 @@ func (s *Storage) DeleteSnapshot(snapshotName string) error {
|
||||
|
||||
s.tb.MustDeleteSnapshot(snapshotName)
|
||||
idbPath := fmt.Sprintf("%s/indexdb/snapshots/%s", s.path, snapshotName)
|
||||
fs.MustRemoveAllSynced(idbPath)
|
||||
fs.MustRemoveAllSynced(snapshotPath)
|
||||
fs.MustRemoveAll(idbPath)
|
||||
fs.MustRemoveAll(snapshotPath)
|
||||
|
||||
logger.Infof("deleted snapshot %q in %s", snapshotPath, time.Since(startTime))
|
||||
|
||||
@ -897,9 +897,7 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache *fastcache.Ca
|
||||
for _, tn := range tableNames[:len(tableNames)-2] {
|
||||
pathToRemove := path + "/" + tn
|
||||
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
|
||||
if err := fs.RemoveAllHard(pathToRemove); err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot remove obsolete indexdb dir %q: %s", pathToRemove, err)
|
||||
}
|
||||
fs.MustRemoveAll(pathToRemove)
|
||||
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
|
||||
}
|
||||
|
||||
|
@ -176,9 +176,9 @@ func (tb *table) CreateSnapshot(snapshotName string) (string, string, error) {
|
||||
// MustDeleteSnapshot deletes snapshot with the given snapshotName.
|
||||
func (tb *table) MustDeleteSnapshot(snapshotName string) {
|
||||
smallDir := fmt.Sprintf("%s/small/snapshots/%s", tb.path, snapshotName)
|
||||
fs.MustRemoveAllSynced(smallDir)
|
||||
fs.MustRemoveAll(smallDir)
|
||||
bigDir := fmt.Sprintf("%s/big/snapshots/%s", tb.path, snapshotName)
|
||||
fs.MustRemoveAllSynced(bigDir)
|
||||
fs.MustRemoveAll(bigDir)
|
||||
}
|
||||
|
||||
func (tb *table) addPartitionNolock(pt *partition) {
|
||||
|
Loading…
Reference in New Issue
Block a user