VictoriaMetrics/app/vmbackup/main.go
Nikolay ebebaecd94
lib/netutil: init implimentation of proxy protocol (#3687)
* lib/netutil: init implimentation of proxy protocol
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-01-26 23:25:22 -08:00

205 lines
7.2 KiB
Go

package main
import (
"flag"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/actions"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot"
)
var (
httpListenAddr = flag.String("httpListenAddr", ":8420", "TCP address for exporting metrics at /metrics page")
storageDataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to VictoriaMetrics data. Must match -storageDataPath from VictoriaMetrics or vmstorage")
snapshotName = flag.String("snapshotName", "", "Name for the snapshot to backup. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-work-with-snapshots. There is no need in setting -snapshotName if -snapshot.createURL is set")
snapshotCreateURL = flag.String("snapshot.createURL", "", "VictoriaMetrics create snapshot url. When this is given a snapshot will automatically be created during backup. "+
"Example: http://victoriametrics:8428/snapshot/create . There is no need in setting -snapshotName if -snapshot.createURL is set")
snapshotDeleteURL = flag.String("snapshot.deleteURL", "", "VictoriaMetrics delete snapshot url. Optional. Will be generated from -snapshot.createURL if not provided. "+
"All created snapshots will be automatically deleted. Example: http://victoriametrics:8428/snapshot/delete")
dst = flag.String("dst", "", "Where to put the backup on the remote storage. "+
"Example: gs://bucket/path/to/backup, s3://bucket/path/to/backup, azblob://container/path/to/backup or fs:///path/to/local/backup/dir\n"+
"-dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded")
origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups")
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration")
maxBytesPerSecond = flagutil.NewBytes("maxBytesPerSecond", 0, "The maximum upload speed. There is no limit if it is set to 0")
)
func main() {
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag.CommandLine.SetOutput(os.Stdout)
flag.Usage = usage
flagutil.RegisterSecretFlag("snapshot.createURL")
flagutil.RegisterSecretFlag("snapshot.deleteURL")
envflag.Parse()
buildinfo.Init()
logger.Init()
pushmetrics.Init()
if len(*snapshotCreateURL) > 0 {
// create net/url object
createUrl, err := url.Parse(*snapshotCreateURL)
if err != nil {
logger.Fatalf("cannot parse snapshotCreateURL: %s", err)
}
if len(*snapshotName) > 0 {
logger.Fatalf("-snapshotName shouldn't be set if -snapshot.createURL is set, since snapshots are created automatically in this case")
}
logger.Infof("Snapshot create url %s", createUrl.Redacted())
if len(*snapshotDeleteURL) <= 0 {
err := flag.Set("snapshot.deleteURL", strings.Replace(*snapshotCreateURL, "/create", "/delete", 1))
if err != nil {
logger.Fatalf("Failed to set snapshot.deleteURL flag: %v", err)
}
}
deleteUrl, err := url.Parse(*snapshotCreateURL)
if err != nil {
logger.Fatalf("cannot parse snapshotDeleteURL: %s", err)
}
logger.Infof("Snapshot delete url %s", deleteUrl.Redacted())
name, err := snapshot.Create(createUrl.String())
if err != nil {
logger.Fatalf("cannot create snapshot: %s", err)
}
err = flag.Set("snapshotName", name)
if err != nil {
logger.Fatalf("cannot set snapshotName flag: %v", err)
}
defer func() {
err := snapshot.Delete(deleteUrl.String(), name)
if err != nil {
logger.Fatalf("cannot delete snapshot: %s", err)
}
}()
} else if len(*snapshotName) == 0 {
logger.Fatalf("`-snapshotName` or `-snapshot.createURL` must be provided")
}
if err := snapshot.Validate(*snapshotName); err != nil {
logger.Fatalf("invalid -snapshotName=%q: %s", *snapshotName, err)
}
go httpserver.Serve(*httpListenAddr, false, nil)
srcFS, err := newSrcFS()
if err != nil {
logger.Fatalf("%s", err)
}
dstFS, err := newDstFS()
if err != nil {
logger.Fatalf("%s", err)
}
originFS, err := newOriginFS()
if err != nil {
logger.Fatalf("%s", err)
}
a := &actions.Backup{
Concurrency: *concurrency,
Src: srcFS,
Dst: dstFS,
Origin: originFS,
}
if err := a.Run(); err != nil {
logger.Fatalf("cannot create backup: %s", err)
}
srcFS.MustStop()
dstFS.MustStop()
originFS.MustStop()
startTime := time.Now()
logger.Infof("gracefully shutting down http server for metrics at %q", *httpListenAddr)
if err := httpserver.Stop(*httpListenAddr); err != nil {
logger.Fatalf("cannot stop http server for metrics: %s", err)
}
logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
}
func usage() {
const s = `
vmbackup performs backups for VictoriaMetrics data from instant snapshots to gcs, s3, azblob
or local filesystem. Backed up data can be restored with vmrestore.
See the docs at https://docs.victoriametrics.com/vmbackup.html .
`
flagutil.Usage(s)
}
func newSrcFS() (*fslocal.FS, error) {
snapshotPath := *storageDataPath + "/snapshots/" + *snapshotName
// Verify the snapshot exists.
f, err := os.Open(snapshotPath)
if err != nil {
return nil, fmt.Errorf("cannot open snapshot at %q: %w", snapshotPath, err)
}
fi, err := f.Stat()
_ = f.Close()
if err != nil {
return nil, fmt.Errorf("cannot stat %q: %w", snapshotPath, err)
}
if !fi.IsDir() {
return nil, fmt.Errorf("snapshot %q must be a directory", snapshotPath)
}
fs := &fslocal.FS{
Dir: snapshotPath,
MaxBytesPerSecond: maxBytesPerSecond.IntN(),
}
if err := fs.Init(); err != nil {
return nil, fmt.Errorf("cannot initialize fs: %w", err)
}
return fs, nil
}
func newDstFS() (common.RemoteFS, error) {
fs, err := actions.NewRemoteFS(*dst)
if err != nil {
return nil, fmt.Errorf("cannot parse `-dst`=%q: %w", *dst, err)
}
if hasFilepathPrefix(*dst, *storageDataPath) {
return nil, fmt.Errorf("-dst=%q can not point to the directory with VictoriaMetrics data (aka -storageDataPath=%q)", *dst, *storageDataPath)
}
return fs, nil
}
func hasFilepathPrefix(path, prefix string) bool {
if !strings.HasPrefix(path, "fs://") {
return false
}
path = path[len("fs://"):]
pathAbs, err := filepath.Abs(path)
if err != nil {
return false
}
prefixAbs, err := filepath.Abs(prefix)
if err != nil {
return false
}
return strings.HasPrefix(pathAbs, prefixAbs)
}
func newOriginFS() (common.OriginFS, error) {
if len(*origin) == 0 {
return &fsnil.FS{}, nil
}
fs, err := actions.NewRemoteFS(*origin)
if err != nil {
return nil, fmt.Errorf("cannot parse `-origin`=%q: %w", *origin, err)
}
return fs, nil
}