mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
lib/mergeset: atomically remove part dirs
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3038
This commit is contained in:
parent
fe52378f45
commit
5b488a339d
@ -10,9 +10,13 @@ import (
|
|||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
func mustRemoveAll(path string, done func()) {
|
// MustRemoveAll removes path with all the contents.
|
||||||
|
//
|
||||||
|
// It properly fsyncs the parent directory after path removal.
|
||||||
|
//
|
||||||
|
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||||
|
func MustRemoveAll(path string) {
|
||||||
if tryRemoveAll(path) {
|
if tryRemoveAll(path) {
|
||||||
done()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
@ -29,7 +33,6 @@ func mustRemoveAll(path string, done func()) {
|
|||||||
for {
|
for {
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
if tryRemoveAll(path) {
|
if tryRemoveAll(path) {
|
||||||
done()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
21
lib/fs/fs.go
21
lib/fs/fs.go
@ -248,27 +248,6 @@ func MustRemoveTemporaryDirs(dir string) {
|
|||||||
MustSyncPath(dir)
|
MustSyncPath(dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustRemoveAll removes path with all the contents.
|
|
||||||
//
|
|
||||||
// It properly fsyncs the parent directory after path removal.
|
|
||||||
//
|
|
||||||
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
|
||||||
func MustRemoveAll(path string) {
|
|
||||||
mustRemoveAll(path, func() {})
|
|
||||||
}
|
|
||||||
|
|
||||||
// MustRemoveAllWithDoneCallback removes path with all the contents.
|
|
||||||
//
|
|
||||||
// It properly fsyncs the parent directory after path removal.
|
|
||||||
//
|
|
||||||
// done is called after the path is successfully removed.
|
|
||||||
//
|
|
||||||
// done may be called after the function returns for NFS path.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61.
|
|
||||||
func MustRemoveAllWithDoneCallback(path string, done func()) {
|
|
||||||
mustRemoveAll(path, done)
|
|
||||||
}
|
|
||||||
|
|
||||||
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
||||||
func HardLinkFiles(srcDir, dstDir string) error {
|
func HardLinkFiles(srcDir, dstDir string) error {
|
||||||
if err := mkdirSync(dstDir); err != nil {
|
if err := mkdirSync(dstDir); err != nil {
|
||||||
|
@ -94,7 +94,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||||||
metaindexPath := path + "/metaindex.bin"
|
metaindexPath := path + "/metaindex.bin"
|
||||||
metaindexFile, err := filestream.Create(metaindexPath, false)
|
metaindexFile, err := filestream.Create(metaindexPath, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.MustRemoveAll(path)
|
fs.MustRemoveDirAtomic(path)
|
||||||
return fmt.Errorf("cannot create metaindex file: %w", err)
|
return fmt.Errorf("cannot create metaindex file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||||||
indexFile, err := filestream.Create(indexPath, nocache)
|
indexFile, err := filestream.Create(indexPath, nocache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metaindexFile.MustClose()
|
metaindexFile.MustClose()
|
||||||
fs.MustRemoveAll(path)
|
fs.MustRemoveDirAtomic(path)
|
||||||
return fmt.Errorf("cannot create index file: %w", err)
|
return fmt.Errorf("cannot create index file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,7 +111,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
metaindexFile.MustClose()
|
metaindexFile.MustClose()
|
||||||
indexFile.MustClose()
|
indexFile.MustClose()
|
||||||
fs.MustRemoveAll(path)
|
fs.MustRemoveDirAtomic(path)
|
||||||
return fmt.Errorf("cannot create items file: %w", err)
|
return fmt.Errorf("cannot create items file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,7 +121,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||||||
metaindexFile.MustClose()
|
metaindexFile.MustClose()
|
||||||
indexFile.MustClose()
|
indexFile.MustClose()
|
||||||
itemsFile.MustClose()
|
itemsFile.MustClose()
|
||||||
fs.MustRemoveAll(path)
|
fs.MustRemoveDirAtomic(path)
|
||||||
return fmt.Errorf("cannot create lens file: %w", err)
|
return fmt.Errorf("cannot create lens file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1059,6 +1059,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
fs.MustRemoveTemporaryDirs(path)
|
||||||
d, err := os.Open(path)
|
d, err := os.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot open difrectory: %w", err)
|
return nil, fmt.Errorf("cannot open difrectory: %w", err)
|
||||||
@ -1073,13 +1074,13 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
txnDir := path + "/txn"
|
txnDir := path + "/txn"
|
||||||
fs.MustRemoveAll(txnDir)
|
fs.MustRemoveDirAtomic(txnDir)
|
||||||
if err := fs.MkdirAllFailIfExist(txnDir); err != nil {
|
if err := fs.MkdirAllFailIfExist(txnDir); err != nil {
|
||||||
return nil, fmt.Errorf("cannot create %q: %w", txnDir, err)
|
return nil, fmt.Errorf("cannot create %q: %w", txnDir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpDir := path + "/tmp"
|
tmpDir := path + "/tmp"
|
||||||
fs.MustRemoveAll(tmpDir)
|
fs.MustRemoveDirAtomic(tmpDir)
|
||||||
if err := fs.MkdirAllFailIfExist(tmpDir); err != nil {
|
if err := fs.MkdirAllFailIfExist(tmpDir); err != nil {
|
||||||
return nil, fmt.Errorf("cannot create %q: %w", tmpDir, err)
|
return nil, fmt.Errorf("cannot create %q: %w", tmpDir, err)
|
||||||
}
|
}
|
||||||
@ -1106,7 +1107,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||||||
if fs.IsEmptyDir(partPath) {
|
if fs.IsEmptyDir(partPath) {
|
||||||
// Remove empty directory, which can be left after unclean shutdown on NFS.
|
// Remove empty directory, which can be left after unclean shutdown on NFS.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
|
||||||
fs.MustRemoveAll(partPath)
|
fs.MustRemoveDirAtomic(partPath)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
p, err := openFilePart(partPath)
|
p, err := openFilePart(partPath)
|
||||||
@ -1277,14 +1278,12 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove old paths. It is OK if certain paths don't exist.
|
// Remove old paths. It is OK if certain paths don't exist.
|
||||||
var removeWG sync.WaitGroup
|
|
||||||
for _, path := range rmPaths {
|
for _, path := range rmPaths {
|
||||||
path, err := validatePath(pathPrefix, path)
|
path, err := validatePath(pathPrefix, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("invalid path to remove: %w", err)
|
return fmt.Errorf("invalid path to remove: %w", err)
|
||||||
}
|
}
|
||||||
removeWG.Add(1)
|
fs.MustRemoveDirAtomic(path)
|
||||||
fs.MustRemoveAllWithDoneCallback(path, removeWG.Done)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move the new part to new directory.
|
// Move the new part to new directory.
|
||||||
@ -1316,9 +1315,6 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
|
|||||||
pendingTxnDeletionsWG.Add(1)
|
pendingTxnDeletionsWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer pendingTxnDeletionsWG.Done()
|
defer pendingTxnDeletionsWG.Done()
|
||||||
// Remove the transaction file only after all the source paths are deleted.
|
|
||||||
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
|
||||||
removeWG.Wait()
|
|
||||||
if err := os.Remove(txnPath); err != nil {
|
if err := os.Remove(txnPath); err != nil {
|
||||||
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
|
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user