2019-11-07 20:05:39 +01:00
package actions
import (
2023-04-12 18:51:27 +02:00
"encoding/json"
2019-11-07 20:05:39 +01:00
"fmt"
"io"
2023-05-03 10:48:53 +02:00
"path/filepath"
2019-11-07 20:05:39 +01:00
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
2020-01-09 14:24:26 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
2019-11-07 20:05:39 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2023-04-12 18:51:27 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot"
2022-12-20 23:12:04 +01:00
"github.com/VictoriaMetrics/metrics"
)
var (
bytesUploadedTotal = uint64 ( 0 )
bytesUploadedTotalMetric = metrics . NewCounter ( ` vm_backups_uploaded_bytes_total ` )
2019-11-07 20:05:39 +01:00
)
// Backup performs backup according to the provided settings.
//
// Note that the backup works only for VictoriaMetrics snapshots
// made via `/snapshot/create`. It works improperly on mutable files.
type Backup struct {
// Concurrency is the number of concurrent workers during the backup.
// Concurrency=1 by default.
Concurrency int
// Src is backup source
Src * fslocal . FS
// Dst is backup destination.
//
// If dst contains the previous backup data, then incremental backup
// is made, i.e. only the changed data is uploaded.
//
// If dst points to empty dir, then full backup is made.
// Origin can be set to the previous backup in order to reduce backup duration
// and reduce network bandwidth usage.
Dst common . RemoteFS
// Origin is optional origin for speeding up full backup if Dst points
// to empty dir.
Origin common . OriginFS
}
2023-04-12 18:51:27 +02:00
// BackupMetadata contains metadata about the backup.
// Note that CreatedAt and CompletedAt are in RFC3339 format.
type BackupMetadata struct {
CreatedAt string ` json:"created_at" `
CompletedAt string ` json:"completed_at" `
}
2019-11-07 20:05:39 +01:00
// Run runs b with the provided settings.
func ( b * Backup ) Run ( ) error {
concurrency := b . Concurrency
src := b . Src
dst := b . Dst
origin := b . Origin
2020-01-09 14:24:26 +01:00
if origin != nil && origin . String ( ) == dst . String ( ) {
origin = nil
}
2019-11-07 20:05:39 +01:00
if origin == nil {
origin = & fsnil . FS { }
}
2020-01-09 14:24:26 +01:00
if err := dst . DeleteFile ( fscommon . BackupCompleteFilename ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot delete `backup complete` file at %s: %w" , dst , err )
2020-01-09 14:24:26 +01:00
}
if err := runBackup ( src , dst , origin , concurrency ) ; err != nil {
return err
}
2023-04-12 18:51:27 +02:00
if err := storeMetadata ( src , dst ) ; err != nil {
return fmt . Errorf ( "cannot store backup metadata: %w" , err )
}
2023-05-25 15:56:18 +02:00
if err := dst . CreateFile ( fscommon . BackupCompleteFilename , [ ] byte ( "ok" ) ) ; err != nil {
2023-05-16 14:21:56 +02:00
return fmt . Errorf ( "cannot create `backup complete` file at %s: %w" , dst , err )
}
2023-04-12 18:51:27 +02:00
return nil
}
func storeMetadata ( src * fslocal . FS , dst common . RemoteFS ) error {
2023-05-03 10:48:53 +02:00
snapshotName := filepath . Base ( src . Dir )
2023-04-12 18:51:27 +02:00
snapshotTime , err := snapshot . Time ( snapshotName )
if err != nil {
return fmt . Errorf ( "cannot decode snapshot name %q: %w" , snapshotName , err )
}
d := BackupMetadata {
CreatedAt : snapshotTime . Format ( time . RFC3339 ) ,
CompletedAt : time . Now ( ) . Format ( time . RFC3339 ) ,
}
metadata , err := json . Marshal ( d )
if err != nil {
return fmt . Errorf ( "cannot marshal metadata: %w" , err )
}
2023-05-16 14:21:56 +02:00
if err := dst . CreateFile ( fscommon . BackupMetadataFilename , metadata ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot create `backup complete` file at %s: %w" , dst , err )
2020-01-09 14:24:26 +01:00
}
2023-04-12 18:51:27 +02:00
2020-01-09 14:24:26 +01:00
return nil
}
func runBackup ( src * fslocal . FS , dst common . RemoteFS , origin common . OriginFS , concurrency int ) error {
startTime := time . Now ( )
2019-11-07 20:05:39 +01:00
logger . Infof ( "starting backup from %s to %s using origin %s" , src , dst , origin )
srcParts , err := src . ListParts ( )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot list src parts: %w" , err )
2019-11-07 20:05:39 +01:00
}
2020-10-08 13:22:50 +02:00
logger . Infof ( "obtained %d parts from src %s" , len ( srcParts ) , src )
2019-11-07 20:05:39 +01:00
dstParts , err := dst . ListParts ( )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot list dst parts: %w" , err )
2019-11-07 20:05:39 +01:00
}
2020-10-08 13:22:50 +02:00
logger . Infof ( "obtained %d parts from dst %s" , len ( dstParts ) , dst )
2019-11-07 20:05:39 +01:00
originParts , err := origin . ListParts ( )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot list origin parts: %w" , err )
2019-11-07 20:05:39 +01:00
}
2020-10-08 13:22:50 +02:00
logger . Infof ( "obtained %d parts from origin %s" , len ( originParts ) , origin )
2019-11-07 20:05:39 +01:00
backupSize := getPartsSize ( srcParts )
partsToDelete := common . PartsDifference ( dstParts , srcParts )
deleteSize := getPartsSize ( partsToDelete )
if len ( partsToDelete ) > 0 {
2020-10-08 13:22:50 +02:00
logger . Infof ( "deleting %d parts from dst %s" , len ( partsToDelete ) , dst )
2019-11-07 20:05:39 +01:00
deletedParts := uint64 ( 0 )
err = runParallel ( concurrency , partsToDelete , func ( p common . Part ) error {
2020-10-08 13:22:50 +02:00
logger . Infof ( "deleting %s from dst %s" , & p , dst )
2019-11-07 20:05:39 +01:00
if err := dst . DeletePart ( p ) ; err != nil {
2020-10-08 13:22:50 +02:00
return fmt . Errorf ( "cannot delete %s from dst %s: %w" , & p , dst , err )
2019-11-07 20:05:39 +01:00
}
atomic . AddUint64 ( & deletedParts , 1 )
return nil
} , func ( elapsed time . Duration ) {
n := atomic . LoadUint64 ( & deletedParts )
2020-10-08 13:22:50 +02:00
logger . Infof ( "deleted %d out of %d parts from dst %s in %s" , n , len ( partsToDelete ) , dst , elapsed )
2019-11-07 20:05:39 +01:00
} )
if err != nil {
return err
}
if err := dst . RemoveEmptyDirs ( ) ; err != nil {
2020-10-08 13:22:50 +02:00
return fmt . Errorf ( "cannot remove empty directories at dst %s: %w" , dst , err )
2019-11-07 20:05:39 +01:00
}
}
partsToCopy := common . PartsDifference ( srcParts , dstParts )
originCopyParts := common . PartsIntersect ( originParts , partsToCopy )
copySize := getPartsSize ( originCopyParts )
if len ( originCopyParts ) > 0 {
2020-10-08 13:22:50 +02:00
logger . Infof ( "server-side copying %d parts from origin %s to dst %s" , len ( originCopyParts ) , origin , dst )
2019-11-07 20:05:39 +01:00
copiedParts := uint64 ( 0 )
err = runParallel ( concurrency , originCopyParts , func ( p common . Part ) error {
2020-10-08 13:22:50 +02:00
logger . Infof ( "server-side copying %s from origin %s to dst %s" , & p , origin , dst )
2019-11-07 20:05:39 +01:00
if err := dst . CopyPart ( origin , p ) ; err != nil {
2020-10-08 13:22:50 +02:00
return fmt . Errorf ( "cannot copy %s from origin %s to dst %s: %w" , & p , origin , dst , err )
2019-11-07 20:05:39 +01:00
}
atomic . AddUint64 ( & copiedParts , 1 )
return nil
} , func ( elapsed time . Duration ) {
n := atomic . LoadUint64 ( & copiedParts )
2020-10-08 13:22:50 +02:00
logger . Infof ( "server-side copied %d out of %d parts from origin %s to dst %s in %s" , n , len ( originCopyParts ) , origin , dst , elapsed )
2019-11-07 20:05:39 +01:00
} )
if err != nil {
return err
}
}
srcCopyParts := common . PartsDifference ( partsToCopy , originParts )
uploadSize := getPartsSize ( srcCopyParts )
if len ( srcCopyParts ) > 0 {
2020-10-08 13:22:50 +02:00
logger . Infof ( "uploading %d parts from src %s to dst %s" , len ( srcCopyParts ) , src , dst )
2019-11-07 20:05:39 +01:00
bytesUploaded := uint64 ( 0 )
err = runParallel ( concurrency , srcCopyParts , func ( p common . Part ) error {
2020-10-08 13:22:50 +02:00
logger . Infof ( "uploading %s from src %s to dst %s" , & p , src , dst )
2019-11-07 20:05:39 +01:00
rc , err := src . NewReadCloser ( p )
if err != nil {
2020-10-08 13:22:50 +02:00
return fmt . Errorf ( "cannot create reader for %s from src %s: %w" , & p , src , err )
2019-11-07 20:05:39 +01:00
}
sr := & statReader {
r : rc ,
bytesRead : & bytesUploaded ,
}
if err := dst . UploadPart ( p , sr ) ; err != nil {
2020-10-08 13:22:50 +02:00
return fmt . Errorf ( "cannot upload %s to dst %s: %w" , & p , dst , err )
2019-11-07 20:05:39 +01:00
}
if err = rc . Close ( ) ; err != nil {
2020-10-08 13:22:50 +02:00
return fmt . Errorf ( "cannot close reader for %s from src %s: %w" , & p , src , err )
2019-11-07 20:05:39 +01:00
}
return nil
} , func ( elapsed time . Duration ) {
n := atomic . LoadUint64 ( & bytesUploaded )
2020-10-08 13:22:50 +02:00
logger . Infof ( "uploaded %d out of %d bytes from src %s to dst %s in %s" , n , uploadSize , src , dst , elapsed )
2019-11-07 20:05:39 +01:00
} )
2022-12-20 23:12:04 +01:00
atomic . AddUint64 ( & bytesUploadedTotal , bytesUploaded )
bytesUploadedTotalMetric . Set ( bytesUploadedTotal )
2019-11-07 20:05:39 +01:00
if err != nil {
return err
}
}
2020-10-08 13:22:50 +02:00
logger . Infof ( "backup from src %s to dst %s with origin %s is complete; backed up %d bytes in %.3f seconds; deleted %d bytes; server-side copied %d bytes; uploaded %d bytes" ,
src , dst , origin , backupSize , time . Since ( startTime ) . Seconds ( ) , deleteSize , copySize , uploadSize )
2019-11-07 20:05:39 +01:00
return nil
}
type statReader struct {
r io . Reader
bytesRead * uint64
}
func ( sr * statReader ) Read ( p [ ] byte ) ( int , error ) {
n , err := sr . r . Read ( p )
atomic . AddUint64 ( sr . bytesRead , uint64 ( n ) )
return n , err
}