mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-29 23:30:04 +01:00
4786f036de
replaces custom fsync function with standard Fsync methods for files. fixes pattern matching for parts and properly generate backup path for local fs. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70
241 lines
6.2 KiB
Go
241 lines
6.2 KiB
Go
package fsremote
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
|
|
libfs "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// FS represents remote filesystem.
|
|
//
|
|
// Backups are uploaded there.
|
|
// Data is downloaded from there during restore.
|
|
type FS struct {
|
|
// Dir is a path to remote directory with backup data.
|
|
Dir string
|
|
}
|
|
|
|
// MustStop stops fs.
|
|
func (fs *FS) MustStop() {
|
|
// Nothing to do
|
|
}
|
|
|
|
// String returns human-readable string representation for fs.
|
|
func (fs *FS) String() string {
|
|
return fmt.Sprintf("fsremote %q", fs.Dir)
|
|
}
|
|
|
|
// ListParts returns all the parts from fs.
|
|
func (fs *FS) ListParts() ([]common.Part, error) {
|
|
dir := fs.Dir
|
|
if _, err := os.Stat(dir); err != nil {
|
|
if os.IsNotExist(err) {
|
|
// Return empty part list for non-existing directory.
|
|
// The directory will be created later.
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
files, err := fscommon.AppendFiles(nil, dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var parts []common.Part
|
|
dir += string(filepath.Separator)
|
|
for _, file := range files {
|
|
if !strings.HasPrefix(file, dir) {
|
|
logger.Panicf("BUG: unexpected prefix for file %q; want %q", file, dir)
|
|
}
|
|
if fscommon.IgnorePath(file) {
|
|
continue
|
|
}
|
|
var p common.Part
|
|
if !p.ParseFromRemotePath(file[len(dir):]) {
|
|
logger.Infof("skipping unknown file %s", file)
|
|
continue
|
|
}
|
|
// Check for correct part size.
|
|
fi, err := os.Stat(file)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot stat file %q for part %q: %w", file, p.Path, err)
|
|
}
|
|
p.ActualSize = uint64(fi.Size())
|
|
parts = append(parts, p)
|
|
}
|
|
return parts, nil
|
|
}
|
|
|
|
// DeletePart deletes the given part p from fs.
|
|
func (fs *FS) DeletePart(p common.Part) error {
|
|
path := fs.path(p)
|
|
if err := os.Remove(path); err != nil {
|
|
return fmt.Errorf("cannot remove %q: %w", path, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RemoveEmptyDirs recursively removes all the empty directories in fs.
|
|
func (fs *FS) RemoveEmptyDirs() error {
|
|
return fscommon.RemoveEmptyDirs(fs.Dir)
|
|
}
|
|
|
|
// CopyPart copies the part p from srcFS to fs.
|
|
//
|
|
// srcFS must have *FS type.
|
|
func (fs *FS) CopyPart(srcFS common.OriginFS, p common.Part) error {
|
|
src, ok := srcFS.(*FS)
|
|
if !ok {
|
|
return fmt.Errorf("cannot perform server-side copying from %s to %s: both of them must be fsremote", srcFS, fs)
|
|
}
|
|
srcPath := src.path(p)
|
|
dstPath := fs.path(p)
|
|
if err := fs.mkdirAll(dstPath); err != nil {
|
|
return err
|
|
}
|
|
// Attempt to create hardlink from srcPath to dstPath.
|
|
if err := os.Link(srcPath, dstPath); err == nil {
|
|
libfs.MustSyncPath(dstPath)
|
|
return nil
|
|
}
|
|
|
|
// Cannot create hardlink. Just copy file contents
|
|
srcFile, err := os.Open(srcPath)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot open source file: %w", err)
|
|
}
|
|
dstFile, err := os.Create(dstPath)
|
|
if err != nil {
|
|
_ = srcFile.Close()
|
|
return fmt.Errorf("cannot create destination file: %w", err)
|
|
}
|
|
n, err := io.Copy(dstFile, srcFile)
|
|
if err := dstFile.Sync(); err != nil {
|
|
return fmt.Errorf("cannot fsync dstFile: %q: %w", dstFile.Name(), err)
|
|
}
|
|
if err1 := dstFile.Close(); err1 != nil {
|
|
err = err1
|
|
}
|
|
if err1 := srcFile.Close(); err1 != nil {
|
|
err = err1
|
|
}
|
|
if err != nil {
|
|
_ = os.RemoveAll(dstPath)
|
|
return err
|
|
}
|
|
if uint64(n) != p.Size {
|
|
_ = os.RemoveAll(dstPath)
|
|
return fmt.Errorf("unexpected number of bytes copied from %q to %q; got %d bytes; want %d bytes", srcPath, dstPath, n, p.Size)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DownloadPart download part p from fs to w.
|
|
func (fs *FS) DownloadPart(p common.Part, w io.Writer) error {
|
|
path := fs.path(p)
|
|
r, err := os.Open(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n, err := io.Copy(w, r)
|
|
if err1 := r.Close(); err1 != nil && err == nil {
|
|
err = err1
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("cannot download data from %q: %w", path, err)
|
|
}
|
|
if uint64(n) != p.Size {
|
|
return fmt.Errorf("wrong data size downloaded from %q; got %d bytes; want %d bytes", path, n, p.Size)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UploadPart uploads p from r to fs.
|
|
func (fs *FS) UploadPart(p common.Part, r io.Reader) error {
|
|
path := fs.path(p)
|
|
if err := fs.mkdirAll(path); err != nil {
|
|
return err
|
|
}
|
|
w, err := os.Create(path)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create file %q: %w", path, err)
|
|
}
|
|
n, err := io.Copy(w, r)
|
|
if err := w.Sync(); err != nil {
|
|
return fmt.Errorf("cannot fsync file: %q: %w", w.Name(), err)
|
|
}
|
|
if err1 := w.Close(); err1 != nil && err == nil {
|
|
err = err1
|
|
}
|
|
if err != nil {
|
|
_ = os.RemoveAll(path)
|
|
return fmt.Errorf("cannot upload data to %q: %w", path, err)
|
|
}
|
|
if uint64(n) != p.Size {
|
|
_ = os.RemoveAll(path)
|
|
return fmt.Errorf("wrong data size uploaded to %q; got %d bytes; want %d bytes", path, n, p.Size)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (fs *FS) mkdirAll(filePath string) error {
|
|
dir := filepath.Dir(filePath)
|
|
if err := os.MkdirAll(dir, 0700); err != nil {
|
|
return fmt.Errorf("cannot create directory %q: %w", dir, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (fs *FS) path(p common.Part) string {
|
|
return filepath.Join(fs.Dir, p.Path, fmt.Sprintf("%016X_%016X_%016X", p.FileSize, p.Offset, p.Size))
|
|
}
|
|
|
|
// DeleteFile deletes filePath at fs.
|
|
//
|
|
// The function does nothing if the filePath doesn't exist.
|
|
func (fs *FS) DeleteFile(filePath string) error {
|
|
path := filepath.Join(fs.Dir, filePath)
|
|
err := os.Remove(path)
|
|
if err != nil && !os.IsNotExist(err) {
|
|
return fmt.Errorf("cannot remove %q: %w", path, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CreateFile creates filePath at fs and puts data into it.
|
|
//
|
|
// The file is overwritten if it exists.
|
|
func (fs *FS) CreateFile(filePath string, data []byte) error {
|
|
path := filepath.Join(fs.Dir, filePath)
|
|
if err := fs.mkdirAll(path); err != nil {
|
|
return err
|
|
}
|
|
if err := os.WriteFile(path, data, 0600); err != nil {
|
|
return fmt.Errorf("cannot write %d bytes to %q: %w", len(data), path, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// HasFile returns true if filePath exists at fs.
|
|
func (fs *FS) HasFile(filePath string) (bool, error) {
|
|
path := filepath.Join(fs.Dir, filePath)
|
|
fi, err := os.Stat(path)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return false, nil
|
|
}
|
|
return false, fmt.Errorf("cannot stat %q: %w", path, err)
|
|
}
|
|
if fi.IsDir() {
|
|
return false, fmt.Errorf("%q is directory, while file is needed", path)
|
|
}
|
|
return true, nil
|
|
}
|