diff --git a/lib/fs/dir_remover.go b/lib/fs/dir_remover.go index ba3e8d52ef..79b3e429d9 100644 --- a/lib/fs/dir_remover.go +++ b/lib/fs/dir_remover.go @@ -10,9 +10,13 @@ import ( "github.com/VictoriaMetrics/metrics" ) -func mustRemoveAll(path string, done func()) { +// MustRemoveAll removes path with all the contents. +// +// It properly fsyncs the parent directory after path removal. +// +// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . +func MustRemoveAll(path string) { if tryRemoveAll(path) { - done() return } select { @@ -29,7 +33,6 @@ func mustRemoveAll(path string, done func()) { for { time.Sleep(time.Second) if tryRemoveAll(path) { - done() return } } diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 702cb0c9ec..10b9e0cc97 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -248,27 +248,6 @@ func MustRemoveTemporaryDirs(dir string) { MustSyncPath(dir) } -// MustRemoveAll removes path with all the contents. -// -// It properly fsyncs the parent directory after path removal. -// -// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . -func MustRemoveAll(path string) { - mustRemoveAll(path, func() {}) -} - -// MustRemoveAllWithDoneCallback removes path with all the contents. -// -// It properly fsyncs the parent directory after path removal. -// -// done is called after the path is successfully removed. -// -// done may be called after the function returns for NFS path. -// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61. -func MustRemoveAllWithDoneCallback(path string, done func()) { - mustRemoveAll(path, done) -} - // HardLinkFiles makes hard links for all the files from srcDir in dstDir. func HardLinkFiles(srcDir, dstDir string) error { if err := mkdirSync(dstDir); err != nil { diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index 46bac85a65..b25e473257 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -94,7 +94,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre metaindexPath := path + "/metaindex.bin" metaindexFile, err := filestream.Create(metaindexPath, false) if err != nil { - fs.MustRemoveAll(path) + fs.MustRemoveDirAtomic(path) return fmt.Errorf("cannot create metaindex file: %w", err) } @@ -102,7 +102,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre indexFile, err := filestream.Create(indexPath, nocache) if err != nil { metaindexFile.MustClose() - fs.MustRemoveAll(path) + fs.MustRemoveDirAtomic(path) return fmt.Errorf("cannot create index file: %w", err) } @@ -111,7 +111,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre if err != nil { metaindexFile.MustClose() indexFile.MustClose() - fs.MustRemoveAll(path) + fs.MustRemoveDirAtomic(path) return fmt.Errorf("cannot create items file: %w", err) } @@ -121,7 +121,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre metaindexFile.MustClose() indexFile.MustClose() itemsFile.MustClose() - fs.MustRemoveAll(path) + fs.MustRemoveDirAtomic(path) return fmt.Errorf("cannot create lens file: %w", err) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index e614b7ad26..3b3ceb8f92 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1059,6 +1059,7 @@ func openParts(path string) ([]*partWrapper, error) { if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, err } + fs.MustRemoveTemporaryDirs(path) d, err := os.Open(path) if err != nil { return nil, fmt.Errorf("cannot open difrectory: %w", err) @@ -1073,13 +1074,13 @@ func openParts(path string) ([]*partWrapper, error) { } txnDir := path + "/txn" - fs.MustRemoveAll(txnDir) + fs.MustRemoveDirAtomic(txnDir) if err := fs.MkdirAllFailIfExist(txnDir); err != nil { return nil, fmt.Errorf("cannot create %q: %w", txnDir, err) } tmpDir := path + "/tmp" - fs.MustRemoveAll(tmpDir) + fs.MustRemoveDirAtomic(tmpDir) if err := fs.MkdirAllFailIfExist(tmpDir); err != nil { return nil, fmt.Errorf("cannot create %q: %w", tmpDir, err) } @@ -1106,7 +1107,7 @@ func openParts(path string) ([]*partWrapper, error) { if fs.IsEmptyDir(partPath) { // Remove empty directory, which can be left after unclean shutdown on NFS. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142 - fs.MustRemoveAll(partPath) + fs.MustRemoveDirAtomic(partPath) continue } p, err := openFilePart(partPath) @@ -1277,14 +1278,12 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error { } // Remove old paths. It is OK if certain paths don't exist. - var removeWG sync.WaitGroup for _, path := range rmPaths { path, err := validatePath(pathPrefix, path) if err != nil { return fmt.Errorf("invalid path to remove: %w", err) } - removeWG.Add(1) - fs.MustRemoveAllWithDoneCallback(path, removeWG.Done) + fs.MustRemoveDirAtomic(path) } // Move the new part to new directory. @@ -1316,9 +1315,6 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error { pendingTxnDeletionsWG.Add(1) go func() { defer pendingTxnDeletionsWG.Done() - // Remove the transaction file only after all the source paths are deleted. - // This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . - removeWG.Wait() if err := os.Remove(txnPath); err != nil { logger.Errorf("cannot remove transaction file %q: %s", txnPath, err) }