mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
app/{vmbackup,vmrestore}: add -maxBytesPerSecond
command-line flag for limiting the used network bandwidth during backup / restore
This commit is contained in:
parent
2db06f0ef8
commit
443189fb0a
@ -115,7 +115,7 @@ See [this article](https://medium.com/@valyala/speeding-up-backups-for-big-time-
|
||||
### Troubleshooting
|
||||
|
||||
* If the backup is slow, then try setting higher value for `-concurrency` flag. This will increase the number of concurrent workers that upload data to backup storage.
|
||||
* If `vmbackup` eats all the network bandwidth, then set `-concurrency` to 1. This should reduce network bandwidth usage.
|
||||
* If `vmbackup` eats all the network bandwidth, then set `-maxBytesPerSecond` to the desired value.
|
||||
* If `vmbackup` has been interrupted due to temporary error, then just restart it with the same args. It will resume the backup process.
|
||||
|
||||
|
||||
@ -137,6 +137,8 @@ Run `vmbackup -help` in order to see all the available options:
|
||||
-dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded
|
||||
-loggerLevel string
|
||||
Minimum level of errors to log. Possible values: INFO, ERROR, FATAL, PANIC (default "INFO")
|
||||
-maxBytesPerSecond int
|
||||
The maximum upload speed. There is no limit if it is set to 0
|
||||
-memory.allowedPercent float
|
||||
Allowed percent of system memory VictoriaMetrics caches may occupy (default 60)
|
||||
-origin string
|
||||
|
@ -18,8 +18,9 @@ var (
|
||||
dst = flag.String("dst", "", "Where to put the backup on the remote storage. "+
|
||||
"Example: gcs://bucket/path/to/backup/dir, s3://bucket/path/to/backup/dir or fs:///path/to/local/backup/dir\n"+
|
||||
"-dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded")
|
||||
origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups")
|
||||
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration")
|
||||
origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups")
|
||||
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration")
|
||||
maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum upload speed. There is no limit if it is set to 0")
|
||||
)
|
||||
|
||||
func main() {
|
||||
@ -84,7 +85,11 @@ func newSrcFS() (*fslocal.FS, error) {
|
||||
}
|
||||
|
||||
fs := &fslocal.FS{
|
||||
Dir: snapshotPath,
|
||||
Dir: snapshotPath,
|
||||
MaxBytesPerSecond: *maxBytesPerSecond,
|
||||
}
|
||||
if err := fs.Init(); err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize fs: %s", err)
|
||||
}
|
||||
return fs, nil
|
||||
}
|
||||
|
@ -24,15 +24,17 @@ vmrestore -src=gcs://<bucket>/<path/to/backup> -storageDataPath=<local/path/to/r
|
||||
The original `-storageDataPath` directory may contain old files. They will be susbstituted by the files from backup.
|
||||
|
||||
|
||||
### Troubleshooting
|
||||
|
||||
* If `vmrestore` eats all the network bandwidth, then set `-maxBytesPerSecond` to the desired value.
|
||||
* If `vmrestore` has been interrupted due to temporary error, then just restart it with the same args. It will resume the restore process.
|
||||
|
||||
|
||||
### Advanced usage
|
||||
|
||||
Run `vmrestore -help` in order to see all the available options:
|
||||
|
||||
```
|
||||
vmrestore restores VictoriaMetrics data from backups made by vmbackup.
|
||||
|
||||
See the docs at https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmrestore/README.md .
|
||||
|
||||
-concurrency int
|
||||
The number of concurrent workers. Higher concurrency may reduce restore duration (default 10)
|
||||
-configFilePath string
|
||||
@ -43,6 +45,8 @@ See the docs at https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/a
|
||||
See https://cloud.google.com/iam/docs/creating-managing-service-account-keys and https://docs.aws.amazon.com/general/latest/gr/aws-security-credentials.html
|
||||
-loggerLevel string
|
||||
Minimum level of errors to log. Possible values: INFO, ERROR, FATAL, PANIC (default "INFO")
|
||||
-maxBytesPerSecond int
|
||||
The maximum download speed. There is no limit if it is set to 0
|
||||
-memory.allowedPercent float
|
||||
Allowed percent of system memory VictoriaMetrics caches may occupy (default 60)
|
||||
-src string
|
||||
|
@ -16,7 +16,8 @@ var (
|
||||
"Example: gcs://bucket/path/to/backup/dir, s3://bucket/path/to/backup/dir or fs:///path/to/local/backup/dir")
|
||||
storageDataPath = flag.String("storageDataPath", "victoria-metrics-data", "Destination path where backup must be restored. "+
|
||||
"VictoriaMetrics must be stopped when restoring from backup. -storageDataPath dir can be non-empty. In this case only missing data is downloaded from backup")
|
||||
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration")
|
||||
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration")
|
||||
maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0")
|
||||
)
|
||||
|
||||
func main() {
|
||||
@ -59,7 +60,11 @@ func newDstFS() (*fslocal.FS, error) {
|
||||
return nil, fmt.Errorf("`-storageDataPath` cannot be empty")
|
||||
}
|
||||
fs := &fslocal.FS{
|
||||
Dir: *storageDataPath,
|
||||
Dir: *storageDataPath,
|
||||
MaxBytesPerSecond: *maxBytesPerSecond,
|
||||
}
|
||||
if err := fs.Init(); err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize local fs: %s", err)
|
||||
}
|
||||
return fs, nil
|
||||
}
|
||||
|
116
lib/backup/fslocal/bandwidth_limiter.go
Normal file
116
lib/backup/fslocal/bandwidth_limiter.go
Normal file
@ -0,0 +1,116 @@
|
||||
package fslocal
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
type bandwidthLimiter struct {
|
||||
perSecondLimit int
|
||||
|
||||
c *sync.Cond
|
||||
|
||||
// quota for the current second
|
||||
quota int
|
||||
}
|
||||
|
||||
func newBandwidthLimiter(perSecondLimit int) *bandwidthLimiter {
|
||||
if perSecondLimit <= 0 {
|
||||
logger.Panicf("BUG: perSecondLimit must be positive; got %d", perSecondLimit)
|
||||
}
|
||||
var bl bandwidthLimiter
|
||||
bl.perSecondLimit = perSecondLimit
|
||||
var mu sync.Mutex
|
||||
bl.c = sync.NewCond(&mu)
|
||||
go bl.perSecondUpdater()
|
||||
return &bl
|
||||
}
|
||||
|
||||
func (bl *bandwidthLimiter) NewReadCloser(rc io.ReadCloser) *bandwidthLimitedReader {
|
||||
return &bandwidthLimitedReader{
|
||||
rc: rc,
|
||||
bl: bl,
|
||||
}
|
||||
}
|
||||
|
||||
func (bl *bandwidthLimiter) NewWriteCloser(wc io.WriteCloser) *bandwidthLimitedWriter {
|
||||
return &bandwidthLimitedWriter{
|
||||
wc: wc,
|
||||
bl: bl,
|
||||
}
|
||||
}
|
||||
|
||||
type bandwidthLimitedReader struct {
|
||||
rc io.ReadCloser
|
||||
bl *bandwidthLimiter
|
||||
}
|
||||
|
||||
func (blr *bandwidthLimitedReader) Read(p []byte) (int, error) {
|
||||
quota := blr.bl.GetQuota(len(p))
|
||||
return blr.rc.Read(p[:quota])
|
||||
}
|
||||
|
||||
func (blr *bandwidthLimitedReader) Close() error {
|
||||
return blr.rc.Close()
|
||||
}
|
||||
|
||||
type bandwidthLimitedWriter struct {
|
||||
wc io.WriteCloser
|
||||
bl *bandwidthLimiter
|
||||
}
|
||||
|
||||
func (blw *bandwidthLimitedWriter) Write(p []byte) (int, error) {
|
||||
nn := 0
|
||||
for len(p) > 0 {
|
||||
quota := blw.bl.GetQuota(len(p))
|
||||
n, err := blw.wc.Write(p[:quota])
|
||||
nn += n
|
||||
if err != nil {
|
||||
return nn, err
|
||||
}
|
||||
p = p[quota:]
|
||||
}
|
||||
return nn, nil
|
||||
}
|
||||
|
||||
func (blw *bandwidthLimitedWriter) Close() error {
|
||||
return blw.wc.Close()
|
||||
}
|
||||
|
||||
func (bl *bandwidthLimiter) perSecondUpdater() {
|
||||
tc := time.NewTicker(time.Second)
|
||||
c := bl.c
|
||||
for range tc.C {
|
||||
c.L.Lock()
|
||||
bl.quota = bl.perSecondLimit
|
||||
c.Signal()
|
||||
c.L.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// GetQuota returns the number in the range [1..n] - the allowed quota for now.
|
||||
//
|
||||
// The function blocks until at least 1 can be returned from it.
|
||||
func (bl *bandwidthLimiter) GetQuota(n int) int {
|
||||
if n <= 0 {
|
||||
logger.Panicf("BUG: n must be positive; got %d", n)
|
||||
}
|
||||
c := bl.c
|
||||
c.L.Lock()
|
||||
for bl.quota <= 0 {
|
||||
c.Wait()
|
||||
}
|
||||
quota := bl.quota
|
||||
if quota > n {
|
||||
quota = n
|
||||
}
|
||||
bl.quota -= quota
|
||||
if bl.quota > 0 {
|
||||
c.Signal()
|
||||
}
|
||||
c.L.Unlock()
|
||||
return quota
|
||||
}
|
@ -20,6 +20,19 @@ import (
|
||||
type FS struct {
|
||||
// Dir is a path to local directory to work with.
|
||||
Dir string
|
||||
|
||||
// MaxBytesPerSecond is the maximum bandwidth usage during backups or restores.
|
||||
MaxBytesPerSecond int
|
||||
|
||||
bl *bandwidthLimiter
|
||||
}
|
||||
|
||||
// Init initializes fs
|
||||
func (fs *FS) Init() error {
|
||||
if fs.MaxBytesPerSecond > 0 {
|
||||
fs.bl = newBandwidthLimiter(fs.MaxBytesPerSecond)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns user-readable representation for the fs.
|
||||
@ -93,7 +106,11 @@ func (fs *FS) NewReadCloser(p common.Part) (io.ReadCloser, error) {
|
||||
r: r,
|
||||
n: p.Size,
|
||||
}
|
||||
return lrc, nil
|
||||
if fs.bl == nil {
|
||||
return lrc, nil
|
||||
}
|
||||
blrc := fs.bl.NewReadCloser(lrc)
|
||||
return blrc, nil
|
||||
}
|
||||
|
||||
// NewWriteCloser returns io.WriteCloser for the given part p located in fs.
|
||||
@ -108,9 +125,14 @@ func (fs *FS) NewWriteCloser(p common.Part) (io.WriteCloser, error) {
|
||||
}
|
||||
wc := &writeCloser{
|
||||
w: w,
|
||||
n: p.Size,
|
||||
path: path,
|
||||
}
|
||||
return wc, nil
|
||||
if fs.bl == nil {
|
||||
return wc, nil
|
||||
}
|
||||
blwc := fs.bl.NewWriteCloser(wc)
|
||||
return blwc, nil
|
||||
}
|
||||
|
||||
// DeletePath deletes the given path from fs and returns the size
|
||||
@ -188,14 +210,23 @@ func (lrc *limitedReadCloser) Close() error {
|
||||
|
||||
type writeCloser struct {
|
||||
w *filestream.Writer
|
||||
n uint64
|
||||
path string
|
||||
}
|
||||
|
||||
func (wc *writeCloser) Write(p []byte) (int, error) {
|
||||
return wc.w.Write(p)
|
||||
n, err := wc.w.Write(p)
|
||||
if uint64(n) > wc.n {
|
||||
return n, fmt.Errorf("too much data written; got %d bytes; want %d bytes", n, wc.n)
|
||||
}
|
||||
wc.n -= uint64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (wc *writeCloser) Close() error {
|
||||
wc.w.MustClose()
|
||||
if wc.n != 0 {
|
||||
return fmt.Errorf("missing data writes for %d bytes", wc.n)
|
||||
}
|
||||
return fscommon.FsyncFile(wc.path)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user