mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/mergeset: atomically remove part dirs
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3038
This commit is contained in:
parent
042a532f70
commit
ce2c07c5a7
@ -10,9 +10,13 @@ import (
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
func mustRemoveAll(path string, done func()) {
|
||||
// MustRemoveAll removes path with all the contents.
|
||||
//
|
||||
// It properly fsyncs the parent directory after path removal.
|
||||
//
|
||||
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
func MustRemoveAll(path string) {
|
||||
if tryRemoveAll(path) {
|
||||
done()
|
||||
return
|
||||
}
|
||||
select {
|
||||
@ -29,7 +33,6 @@ func mustRemoveAll(path string, done func()) {
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
if tryRemoveAll(path) {
|
||||
done()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
21
lib/fs/fs.go
21
lib/fs/fs.go
@ -248,27 +248,6 @@ func MustRemoveTemporaryDirs(dir string) {
|
||||
MustSyncPath(dir)
|
||||
}
|
||||
|
||||
// MustRemoveAll removes path with all the contents.
|
||||
//
|
||||
// It properly fsyncs the parent directory after path removal.
|
||||
//
|
||||
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
func MustRemoveAll(path string) {
|
||||
mustRemoveAll(path, func() {})
|
||||
}
|
||||
|
||||
// MustRemoveAllWithDoneCallback removes path with all the contents.
|
||||
//
|
||||
// It properly fsyncs the parent directory after path removal.
|
||||
//
|
||||
// done is called after the path is successfully removed.
|
||||
//
|
||||
// done may be called after the function returns for NFS path.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61.
|
||||
func MustRemoveAllWithDoneCallback(path string, done func()) {
|
||||
mustRemoveAll(path, done)
|
||||
}
|
||||
|
||||
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
||||
func HardLinkFiles(srcDir, dstDir string) error {
|
||||
if err := mkdirSync(dstDir); err != nil {
|
||||
|
@ -94,7 +94,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexFile, err := filestream.Create(metaindexPath, false)
|
||||
if err != nil {
|
||||
fs.MustRemoveAll(path)
|
||||
fs.MustRemoveDirAtomic(path)
|
||||
return fmt.Errorf("cannot create metaindex file: %w", err)
|
||||
}
|
||||
|
||||
@ -102,7 +102,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
indexFile, err := filestream.Create(indexPath, nocache)
|
||||
if err != nil {
|
||||
metaindexFile.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
fs.MustRemoveDirAtomic(path)
|
||||
return fmt.Errorf("cannot create index file: %w", err)
|
||||
}
|
||||
|
||||
@ -111,7 +111,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
if err != nil {
|
||||
metaindexFile.MustClose()
|
||||
indexFile.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
fs.MustRemoveDirAtomic(path)
|
||||
return fmt.Errorf("cannot create items file: %w", err)
|
||||
}
|
||||
|
||||
@ -121,7 +121,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
metaindexFile.MustClose()
|
||||
indexFile.MustClose()
|
||||
itemsFile.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
fs.MustRemoveDirAtomic(path)
|
||||
return fmt.Errorf("cannot create lens file: %w", err)
|
||||
}
|
||||
|
||||
|
@ -1059,6 +1059,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
d, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open difrectory: %w", err)
|
||||
@ -1073,13 +1074,13 @@ func openParts(path string) ([]*partWrapper, error) {
|
||||
}
|
||||
|
||||
txnDir := path + "/txn"
|
||||
fs.MustRemoveAll(txnDir)
|
||||
fs.MustRemoveDirAtomic(txnDir)
|
||||
if err := fs.MkdirAllFailIfExist(txnDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", txnDir, err)
|
||||
}
|
||||
|
||||
tmpDir := path + "/tmp"
|
||||
fs.MustRemoveAll(tmpDir)
|
||||
fs.MustRemoveDirAtomic(tmpDir)
|
||||
if err := fs.MkdirAllFailIfExist(tmpDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", tmpDir, err)
|
||||
}
|
||||
@ -1106,7 +1107,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
||||
if fs.IsEmptyDir(partPath) {
|
||||
// Remove empty directory, which can be left after unclean shutdown on NFS.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
|
||||
fs.MustRemoveAll(partPath)
|
||||
fs.MustRemoveDirAtomic(partPath)
|
||||
continue
|
||||
}
|
||||
p, err := openFilePart(partPath)
|
||||
@ -1277,14 +1278,12 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
|
||||
}
|
||||
|
||||
// Remove old paths. It is OK if certain paths don't exist.
|
||||
var removeWG sync.WaitGroup
|
||||
for _, path := range rmPaths {
|
||||
path, err := validatePath(pathPrefix, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid path to remove: %w", err)
|
||||
}
|
||||
removeWG.Add(1)
|
||||
fs.MustRemoveAllWithDoneCallback(path, removeWG.Done)
|
||||
fs.MustRemoveDirAtomic(path)
|
||||
}
|
||||
|
||||
// Move the new part to new directory.
|
||||
@ -1316,9 +1315,6 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
|
||||
pendingTxnDeletionsWG.Add(1)
|
||||
go func() {
|
||||
defer pendingTxnDeletionsWG.Done()
|
||||
// Remove the transaction file only after all the source paths are deleted.
|
||||
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
removeWG.Wait()
|
||||
if err := os.Remove(txnPath); err != nil {
|
||||
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user