mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
adds restore.lock (#1988)
* adds restore.lock it must prevent from running storage after incomplete restore process https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1958 * return back flock file deletion * Apply suggestions from code review * wip * docs/CHANGELOG.md: document https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1958 Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
fcab2fc716
commit
6cdc934c3d
@ -7,6 +7,7 @@ sort: 15
|
||||
## tip
|
||||
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): log error message when remote storage returns 400 or 409 http errors. This should simplify detection and debugging of this case. See [this issue](vmagent_remotewrite_packets_dropped_total).
|
||||
* FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): store `restore-in-progress` file in `-dst` directory while `vmrestore` is running. This file is automatically deleted when `vmrestore` is successfully finished. This helps detecting incompletely restored data on VictoriaMetrics start. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1958).
|
||||
|
||||
|
||||
## [v1.71.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.71.0)
|
||||
|
@ -3,6 +3,8 @@ package actions
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -51,6 +53,9 @@ func (r *Restore) Run() error {
|
||||
}
|
||||
defer fs.MustClose(flockF)
|
||||
|
||||
if err := createRestoreLock(r.Dst.Dir); err != nil {
|
||||
return err
|
||||
}
|
||||
concurrency := r.Concurrency
|
||||
src := r.Src
|
||||
dst := r.Dst
|
||||
@ -189,7 +194,7 @@ func (r *Restore) Run() error {
|
||||
logger.Infof("restored %d bytes from backup in %.3f seconds; deleted %d bytes; downloaded %d bytes",
|
||||
backupSize, time.Since(startTime).Seconds(), deleteSize, downloadSize)
|
||||
|
||||
return nil
|
||||
return removeLockFile(r.Dst.Dir)
|
||||
}
|
||||
|
||||
type statWriter struct {
|
||||
@ -202,3 +207,20 @@ func (sw *statWriter) Write(p []byte) (int, error) {
|
||||
atomic.AddUint64(sw.bytesWritten, uint64(n))
|
||||
return n, err
|
||||
}
|
||||
|
||||
func createRestoreLock(dstDir string) error {
|
||||
lockF := path.Join(dstDir, "restore-in-progress")
|
||||
f, err := os.Create(lockF)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create restore lock file %q: %w", lockF, err)
|
||||
}
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
func removeLockFile(dstDir string) error {
|
||||
lockF := path.Join(dstDir, "restore-in-progress")
|
||||
if err := os.Remove(lockF); err != nil {
|
||||
return fmt.Errorf("cannote remove restore lock file %q: %w", lockF, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -72,9 +72,8 @@ func appendFilesInternal(dst []string, d *os.File) ([]string, error) {
|
||||
if name == "." || name == ".." {
|
||||
continue
|
||||
}
|
||||
if name == "flock.lock" {
|
||||
// Do not take into account flock.lock files, since they are used
|
||||
// for preventing from concurrent access.
|
||||
if isSpecialFile(name) {
|
||||
// Do not take into account special files.
|
||||
continue
|
||||
}
|
||||
path := dir + "/" + name
|
||||
@ -135,6 +134,10 @@ func appendFilesInternal(dst []string, d *os.File) ([]string, error) {
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
func isSpecialFile(name string) bool {
|
||||
return name == "flock.lock" || name == "restore-in-progress"
|
||||
}
|
||||
|
||||
// RemoveEmptyDirs recursively removes empty directories under the given dir.
|
||||
func RemoveEmptyDirs(dir string) error {
|
||||
_, err := removeEmptyDirs(dir)
|
||||
@ -173,7 +176,6 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
|
||||
return false, fmt.Errorf("cannot read directory contents in %q: %w", dir, err)
|
||||
}
|
||||
dirEntries := 0
|
||||
hasFlock := false
|
||||
for _, fi := range fis {
|
||||
name := fi.Name()
|
||||
if name == "." || name == ".." {
|
||||
@ -192,11 +194,10 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
|
||||
continue
|
||||
}
|
||||
if fi.Mode()&os.ModeSymlink != os.ModeSymlink {
|
||||
if name == "flock.lock" {
|
||||
hasFlock = true
|
||||
if isSpecialFile(name) {
|
||||
// Do not take into account special files
|
||||
continue
|
||||
}
|
||||
// Skip plain files.
|
||||
dirEntries++
|
||||
continue
|
||||
}
|
||||
@ -248,14 +249,9 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
|
||||
if dirEntries > 0 {
|
||||
return false, nil
|
||||
}
|
||||
logger.Infof("removing empty dir %q", dir)
|
||||
if hasFlock {
|
||||
flockFilepath := dir + "/flock.lock"
|
||||
if err := os.Remove(flockFilepath); err != nil {
|
||||
return false, fmt.Errorf("cannot remove %q: %w", flockFilepath, err)
|
||||
}
|
||||
}
|
||||
if err := os.Remove(dir); err != nil {
|
||||
// Use os.RemoveAll() instead of os.Remove(), since the dir may contain special files such as flock.lock and restore-in-progress,
|
||||
// which must be ingored.
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
return false, fmt.Errorf("cannot remove %q: %w", dir, err)
|
||||
}
|
||||
return true, nil
|
||||
|
@ -192,6 +192,12 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
||||
}
|
||||
s.flockF = flockF
|
||||
|
||||
// Check whether restore process finished successfully
|
||||
restoreLockF := path + "/restore-in-progress"
|
||||
if fs.IsPathExist(restoreLockF) {
|
||||
return nil, fmt.Errorf("restore lock file exists, incomplete vmrestore run. Run vmrestore again or remove lock file %q", restoreLockF)
|
||||
}
|
||||
|
||||
// Pre-create snapshots directory if it is missing.
|
||||
snapshotsPath := path + "/snapshots"
|
||||
if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user