mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
lib/fs: atomically create file with the given contents on WriteFileAtomically
This should prevent from `transaction` and `metadata.json` files corruption on unclean shutdown such as OOM, `kill -9`, power loss, etc. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/148
This commit is contained in:
parent
0b488f1e37
commit
5d8d110010
39
lib/fs/fs.go
39
lib/fs/fs.go
@ -5,7 +5,9 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
@ -87,26 +89,42 @@ func MustSyncPath(path string) {
|
||||
}
|
||||
}
|
||||
|
||||
// WriteFile writes data to the given file path.
|
||||
var tmpFileNum uint64
|
||||
|
||||
// WriteFileAtomically atomically writes data to the given file path.
|
||||
//
|
||||
// WriteFile returns only after the file is fully written
|
||||
// WriteFile returns only after the file is fully written and synced
|
||||
// to the underlying storage.
|
||||
func WriteFile(path string, data []byte) error {
|
||||
func WriteFileAtomically(path string, data []byte) error {
|
||||
// Check for the existing file. It is expected that
|
||||
// the WriteFileAtomically function cannot be called concurrently
|
||||
// with the same `path`.
|
||||
if IsPathExist(path) {
|
||||
return fmt.Errorf("cannot create file %q, since it already exists", path)
|
||||
}
|
||||
f, err := filestream.Create(path, false)
|
||||
|
||||
n := atomic.AddUint64(&tmpFileNum, 1)
|
||||
tmpPath := fmt.Sprintf("%s.tmp.%d", path, n)
|
||||
f, err := filestream.Create(tmpPath, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create file %q: %s", path, err)
|
||||
return fmt.Errorf("cannot create file %q: %s", tmpPath, err)
|
||||
}
|
||||
if _, err := f.Write(data); err != nil {
|
||||
f.MustClose()
|
||||
return fmt.Errorf("cannot write %d bytes to file %q: %s", len(data), path, err)
|
||||
MustRemoveAll(tmpPath)
|
||||
return fmt.Errorf("cannot write %d bytes to file %q: %s", len(data), tmpPath, err)
|
||||
}
|
||||
|
||||
// Sync and close the file.
|
||||
f.MustClose()
|
||||
|
||||
// Atomically move the file from tmpPath to path.
|
||||
if err := os.Rename(tmpPath, path); err != nil {
|
||||
// do not call MustRemoveAll(tmpPath) here, so the user could inspect
|
||||
// the file contents during investigating the issue.
|
||||
return fmt.Errorf("cannot move %q to %q: %s", tmpPath, path, err)
|
||||
}
|
||||
|
||||
// Sync the containing directory, so the file is guaranteed to appear in the directory.
|
||||
// See https://www.quora.com/When-should-you-fsync-the-containing-directory-in-addition-to-the-file-itself
|
||||
absPath, err := filepath.Abs(path)
|
||||
@ -119,6 +137,15 @@ func WriteFile(path string, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsTemporaryFileName returns true if fn matches temporary file name pattern
|
||||
// from WriteFileAtomically.
|
||||
func IsTemporaryFileName(fn string) bool {
|
||||
return tmpFileNameRe.MatchString(fn)
|
||||
}
|
||||
|
||||
// tmpFileNameRe is regexp for temporary file name - see WriteFileAtomically for details.
|
||||
var tmpFileNameRe = regexp.MustCompile(`\.tmp\.\d+$`)
|
||||
|
||||
// MkdirAllIfNotExist creates the given path dir if it isn't exist.
|
||||
func MkdirAllIfNotExist(path string) error {
|
||||
if IsPathExist(path) {
|
||||
|
@ -164,7 +164,7 @@ func (ph *partHeader) WriteMetadata(partPath string) error {
|
||||
return fmt.Errorf("cannot marshal metadata: %s", err)
|
||||
}
|
||||
metadataPath := partPath + "/metadata.json"
|
||||
if err := fs.WriteFile(metadataPath, metadata); err != nil {
|
||||
if err := fs.WriteFileAtomically(metadataPath, metadata); err != nil {
|
||||
return fmt.Errorf("cannot create %q: %s", metadataPath, err)
|
||||
}
|
||||
return nil
|
||||
|
@ -715,7 +715,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
||||
dstPartPath := ph.Path(tb.path, mergeIdx)
|
||||
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
|
||||
txnPath := fmt.Sprintf("%s/txn/%016X", tb.path, mergeIdx)
|
||||
if err := fs.WriteFile(txnPath, bb.B); err != nil {
|
||||
if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil {
|
||||
return fmt.Errorf("cannot create transaction file %q: %s", txnPath, err)
|
||||
}
|
||||
|
||||
@ -994,7 +994,12 @@ func runTransactions(txnLock *sync.RWMutex, path string) error {
|
||||
})
|
||||
|
||||
for _, fi := range fis {
|
||||
txnPath := txnDir + "/" + fi.Name()
|
||||
fn := fi.Name()
|
||||
if fs.IsTemporaryFileName(fn) {
|
||||
// Skip temporary files, which could be left after unclean shutdown.
|
||||
continue
|
||||
}
|
||||
txnPath := txnDir + "/" + fn
|
||||
if err := runTransaction(txnLock, path, txnPath); err != nil {
|
||||
return fmt.Errorf("cannot run transaction from %q: %s", txnPath, err)
|
||||
}
|
||||
|
@ -1008,7 +1008,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
||||
}
|
||||
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
|
||||
txnPath := fmt.Sprintf("%s/txn/%016X", ptPath, mergeIdx)
|
||||
if err := fs.WriteFile(txnPath, bb.B); err != nil {
|
||||
if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil {
|
||||
return fmt.Errorf("cannot create transaction file %q: %s", txnPath, err)
|
||||
}
|
||||
|
||||
@ -1367,7 +1367,12 @@ func runTransactions(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, path strin
|
||||
})
|
||||
|
||||
for _, fi := range fis {
|
||||
txnPath := txnDir + "/" + fi.Name()
|
||||
fn := fi.Name()
|
||||
if fs.IsTemporaryFileName(fn) {
|
||||
// Skip temporary files, which could be left after unclean shutdown.
|
||||
continue
|
||||
}
|
||||
txnPath := txnDir + "/" + fn
|
||||
if err := runTransaction(txnLock, pathPrefix1, pathPrefix2, txnPath); err != nil {
|
||||
return fmt.Errorf("cannot run transaction from %q: %s", txnPath, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user