mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
all: try hard removing directory with contents
Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61
This commit is contained in:
parent
cd1bc32158
commit
ac7b186f13
@ -22,7 +22,7 @@ func InitTmpBlocksDir(tmpDirPath string) {
|
||||
tmpDirPath = os.TempDir()
|
||||
}
|
||||
tmpBlocksDir = tmpDirPath + "/searchResults"
|
||||
if err := os.RemoveAll(tmpBlocksDir); err != nil {
|
||||
if err := fs.RemoveAllHard(tmpBlocksDir); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", tmpBlocksDir, err)
|
||||
}
|
||||
if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil {
|
||||
|
64
lib/fs/fs.go
64
lib/fs/fs.go
@ -5,6 +5,8 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
@ -159,7 +161,7 @@ func RemoveDirContents(dir string) {
|
||||
continue
|
||||
}
|
||||
fullPath := dir + "/" + name
|
||||
if err := os.RemoveAll(fullPath); err != nil {
|
||||
if err := RemoveAllHard(fullPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", fullPath, err)
|
||||
}
|
||||
}
|
||||
@ -188,10 +190,66 @@ func IsPathExist(path string) bool {
|
||||
// MustRemoveAllSynced removes path with all the contents
|
||||
// and syncs the parent directory, so it no longer contains the path.
|
||||
func MustRemoveAllSynced(path string) {
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
MustRemoveAll(path)
|
||||
SyncPath(filepath.Dir(path))
|
||||
}
|
||||
|
||||
// MustRemoveAll removes path with all the contents.
|
||||
func MustRemoveAll(path string) {
|
||||
if err := RemoveAllHard(path); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||
}
|
||||
SyncPath(filepath.Dir(path))
|
||||
}
|
||||
|
||||
// RemoveAllHard removes path with all the contents.
|
||||
//
|
||||
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
func RemoveAllHard(path string) error {
|
||||
err := os.RemoveAll(path)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if !strings.Contains(err.Error(), "directory not empty") {
|
||||
return err
|
||||
}
|
||||
// This may be NFS-related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
// Schedule for later directory removal.
|
||||
select {
|
||||
case removeDirCh <- path:
|
||||
default:
|
||||
return fmt.Errorf("cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var removeDirCh = make(chan string, 1024)
|
||||
|
||||
func dirRemover() {
|
||||
for path := range removeDirCh {
|
||||
attempts := 0
|
||||
for {
|
||||
err := os.RemoveAll(path)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if !strings.Contains(err.Error(), "directory not empty") {
|
||||
logger.Errorf("cannot remove %q: %s", path, err)
|
||||
break
|
||||
}
|
||||
// NFS-related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
// Sleep for a while and try again.
|
||||
attempts++
|
||||
if attempts > 50 {
|
||||
logger.Errorf("cannot remove %q in %d attempts: %s", path, attempts, err)
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
go dirRemover()
|
||||
}
|
||||
|
||||
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
||||
|
@ -2,7 +2,6 @@ package mergeset
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
@ -92,7 +91,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexFile, err := filestream.Create(metaindexPath, false)
|
||||
if err != nil {
|
||||
_ = os.RemoveAll(path)
|
||||
fs.MustRemoveAll(path)
|
||||
return fmt.Errorf("cannot create metaindex file: %s", err)
|
||||
}
|
||||
|
||||
@ -100,7 +99,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
indexFile, err := filestream.Create(indexPath, nocache)
|
||||
if err != nil {
|
||||
metaindexFile.MustClose()
|
||||
_ = os.RemoveAll(path)
|
||||
fs.MustRemoveAll(path)
|
||||
return fmt.Errorf("cannot create index file: %s", err)
|
||||
}
|
||||
|
||||
@ -109,7 +108,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
if err != nil {
|
||||
metaindexFile.MustClose()
|
||||
indexFile.MustClose()
|
||||
_ = os.RemoveAll(path)
|
||||
fs.MustRemoveAll(path)
|
||||
return fmt.Errorf("cannot create items file: %s", err)
|
||||
}
|
||||
|
||||
@ -119,7 +118,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
metaindexFile.MustClose()
|
||||
indexFile.MustClose()
|
||||
itemsFile.MustClose()
|
||||
_ = os.RemoveAll(path)
|
||||
fs.MustRemoveAll(path)
|
||||
return fmt.Errorf("cannot create lens file: %s", err)
|
||||
}
|
||||
|
||||
|
@ -846,7 +846,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
||||
}
|
||||
|
||||
txnDir := path + "/txn"
|
||||
if err := os.RemoveAll(txnDir); err != nil {
|
||||
if err := fs.RemoveAllHard(txnDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot remove %q: %s", txnDir, err)
|
||||
}
|
||||
if err := fs.MkdirAllFailIfExist(txnDir); err != nil {
|
||||
@ -854,7 +854,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
||||
}
|
||||
|
||||
tmpDir := path + "/tmp"
|
||||
if err := os.RemoveAll(tmpDir); err != nil {
|
||||
if err := fs.RemoveAllHard(tmpDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot remove %q: %s", tmpDir, err)
|
||||
}
|
||||
if err := fs.MkdirAllFailIfExist(tmpDir); err != nil {
|
||||
@ -1033,7 +1033,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid path to remove: %s", err)
|
||||
}
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
if err := fs.RemoveAllHard(path); err != nil {
|
||||
return fmt.Errorf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -85,7 +84,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
timestampsPath := path + "/timestamps.bin"
|
||||
timestampsFile, err := filestream.Create(timestampsPath, nocache)
|
||||
if err != nil {
|
||||
_ = os.RemoveAll(path)
|
||||
fs.MustRemoveAll(path)
|
||||
return fmt.Errorf("cannot create timestamps file: %s", err)
|
||||
}
|
||||
|
||||
@ -93,7 +92,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
valuesFile, err := filestream.Create(valuesPath, nocache)
|
||||
if err != nil {
|
||||
timestampsFile.MustClose()
|
||||
_ = os.RemoveAll(path)
|
||||
fs.MustRemoveAll(path)
|
||||
return fmt.Errorf("cannot create values file: %s", err)
|
||||
}
|
||||
|
||||
@ -102,7 +101,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
if err != nil {
|
||||
timestampsFile.MustClose()
|
||||
valuesFile.MustClose()
|
||||
_ = os.RemoveAll(path)
|
||||
fs.MustRemoveAll(path)
|
||||
return fmt.Errorf("cannot create index file: %s", err)
|
||||
}
|
||||
|
||||
@ -114,7 +113,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
||||
timestampsFile.MustClose()
|
||||
valuesFile.MustClose()
|
||||
indexFile.MustClose()
|
||||
_ = os.RemoveAll(path)
|
||||
fs.MustRemoveAll(path)
|
||||
return fmt.Errorf("cannot create metaindex file: %s", err)
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
@ -15,6 +14,7 @@ import (
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
||||
@ -266,7 +266,7 @@ func (db *indexDB) decRef() {
|
||||
}
|
||||
|
||||
logger.Infof("dropping indexDB %q", tbPath)
|
||||
if err := os.RemoveAll(tbPath); err != nil {
|
||||
if err := fs.RemoveAllHard(tbPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", tbPath, err)
|
||||
}
|
||||
logger.Infof("indexDB %q has been dropped", tbPath)
|
||||
|
@ -213,10 +213,10 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
|
||||
func (pt *partition) Drop() {
|
||||
logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath)
|
||||
|
||||
if err := os.RemoveAll(pt.smallPartsPath); err != nil {
|
||||
if err := fs.RemoveAllHard(pt.smallPartsPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove small parts directory %q: %s", pt.smallPartsPath, err)
|
||||
}
|
||||
if err := os.RemoveAll(pt.bigPartsPath); err != nil {
|
||||
if err := fs.RemoveAllHard(pt.bigPartsPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove big parts directory %q: %s", pt.bigPartsPath, err)
|
||||
}
|
||||
|
||||
@ -1223,11 +1223,11 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
|
||||
}
|
||||
|
||||
txnDir := path + "/txn"
|
||||
if err := os.RemoveAll(txnDir); err != nil {
|
||||
if err := fs.RemoveAllHard(txnDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot delete transaction directory %q: %s", txnDir, err)
|
||||
}
|
||||
tmpDir := path + "/tmp"
|
||||
if err := os.RemoveAll(tmpDir); err != nil {
|
||||
if err := fs.RemoveAllHard(tmpDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot remove temporary directory %q: %s", tmpDir, err)
|
||||
}
|
||||
if err := createPartitionDirs(path); err != nil {
|
||||
@ -1408,7 +1408,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid path to remove: %s", err)
|
||||
}
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
if err := fs.RemoveAllHard(path); err != nil {
|
||||
return fmt.Errorf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
@ -1438,7 +1438,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
|
||||
}
|
||||
} else {
|
||||
// Just remove srcPath.
|
||||
if err := os.RemoveAll(srcPath); err != nil {
|
||||
if err := fs.RemoveAllHard(srcPath); err != nil {
|
||||
return fmt.Errorf("cannot remove %q: %s", srcPath, err)
|
||||
}
|
||||
}
|
||||
|
@ -893,7 +893,7 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache *fastcache.Ca
|
||||
for _, tn := range tableNames[:len(tableNames)-2] {
|
||||
pathToRemove := path + "/" + tn
|
||||
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
|
||||
if err := os.RemoveAll(pathToRemove); err != nil {
|
||||
if err := fs.RemoveAllHard(pathToRemove); err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot remove obsolete indexdb dir %q: %s", pathToRemove, err)
|
||||
}
|
||||
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
|
||||
|
Loading…
Reference in New Issue
Block a user