mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
squash commits (#4166)
This commit is contained in:
parent
4786f036de
commit
49b77ec01a
@ -24,6 +24,9 @@ var (
|
|||||||
"or if both not set, DefaultSharedConfigProfile is used")
|
"or if both not set, DefaultSharedConfigProfile is used")
|
||||||
customS3Endpoint = flag.String("customS3Endpoint", "", "Custom S3 endpoint for use with S3-compatible storages (e.g. MinIO). S3 is used if not set")
|
customS3Endpoint = flag.String("customS3Endpoint", "", "Custom S3 endpoint for use with S3-compatible storages (e.g. MinIO). S3 is used if not set")
|
||||||
s3ForcePathStyle = flag.Bool("s3ForcePathStyle", true, "Prefixing endpoint with bucket name when set false, true by default.")
|
s3ForcePathStyle = flag.Bool("s3ForcePathStyle", true, "Prefixing endpoint with bucket name when set false, true by default.")
|
||||||
|
s3StorageClass = flag.String("s3StorageClass", "", "The Storage Class applied to objects uploaded to AWS S3. Supported values are: GLACIER, "+
|
||||||
|
"DEEP_ARCHIVE, GLACIER_IR, INTELLIGENT_TIERING, ONEZONE_IA, OUTPOSTS, REDUCED_REDUNDANCY, STANDARD, STANDARD_IA.\n"+
|
||||||
|
"See https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-class-intro.html/")
|
||||||
)
|
)
|
||||||
|
|
||||||
func runParallel(concurrency int, parts []common.Part, f func(p common.Part) error, progress func(elapsed time.Duration)) error {
|
func runParallel(concurrency int, parts []common.Part, f func(p common.Part) error, progress func(elapsed time.Duration)) error {
|
||||||
@ -240,6 +243,7 @@ func NewRemoteFS(path string) (common.RemoteFS, error) {
|
|||||||
CredsFilePath: *credsFilePath,
|
CredsFilePath: *credsFilePath,
|
||||||
ConfigFilePath: *configFilePath,
|
ConfigFilePath: *configFilePath,
|
||||||
CustomEndpoint: *customS3Endpoint,
|
CustomEndpoint: *customS3Endpoint,
|
||||||
|
StorageClass: s3remote.StringToS3StorageClass(*s3StorageClass),
|
||||||
S3ForcePathStyle: *s3ForcePathStyle,
|
S3ForcePathStyle: *s3ForcePathStyle,
|
||||||
ProfileName: *configProfile,
|
ProfileName: *configProfile,
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
|
@ -11,12 +11,38 @@ import (
|
|||||||
"github.com/aws/aws-sdk-go-v2/config"
|
"github.com/aws/aws-sdk-go-v2/config"
|
||||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||||
|
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
supportedStorageClasses = []s3types.StorageClass{s3types.StorageClassGlacier, s3types.StorageClassDeepArchive, s3types.StorageClassGlacierIr, s3types.StorageClassIntelligentTiering, s3types.StorageClassOnezoneIa, s3types.StorageClassOutposts, s3types.StorageClassReducedRedundancy, s3types.StorageClassStandard, s3types.StorageClassStandardIa}
|
||||||
|
)
|
||||||
|
|
||||||
|
func validateStorageClass(storageClass s3types.StorageClass) error {
|
||||||
|
// if no storageClass set, no need to validate against supported values
|
||||||
|
// backwards compatibility
|
||||||
|
if len(storageClass) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, supported := range supportedStorageClasses {
|
||||||
|
if supported == storageClass {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("unsupported S3 storage class: %s. Supported values: %v", storageClass, supportedStorageClasses)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StringToS3StorageClass converts string types to AWS S3 StorageClass type for value comparison
|
||||||
|
func StringToS3StorageClass(sc string) s3types.StorageClass {
|
||||||
|
return s3types.StorageClass(sc)
|
||||||
|
}
|
||||||
|
|
||||||
// FS represents filesystem for backups in S3.
|
// FS represents filesystem for backups in S3.
|
||||||
//
|
//
|
||||||
// Init must be called before calling other FS methods.
|
// Init must be called before calling other FS methods.
|
||||||
@ -39,6 +65,9 @@ type FS struct {
|
|||||||
// Force to use path style for s3, true by default.
|
// Force to use path style for s3, true by default.
|
||||||
S3ForcePathStyle bool
|
S3ForcePathStyle bool
|
||||||
|
|
||||||
|
// Object Storage Class: https://aws.amazon.com/s3/storage-classes/
|
||||||
|
StorageClass s3types.StorageClass
|
||||||
|
|
||||||
// The name of S3 config profile to use.
|
// The name of S3 config profile to use.
|
||||||
ProfileName string
|
ProfileName string
|
||||||
|
|
||||||
@ -77,6 +106,11 @@ func (fs *FS) Init() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot load S3 config: %w", err)
|
return fmt.Errorf("cannot load S3 config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = validateStorageClass(fs.StorageClass); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
var outerErr error
|
var outerErr error
|
||||||
fs.s3 = s3.NewFromConfig(cfg, func(o *s3.Options) {
|
fs.s3 = s3.NewFromConfig(cfg, func(o *s3.Options) {
|
||||||
if len(fs.CustomEndpoint) > 0 {
|
if len(fs.CustomEndpoint) > 0 {
|
||||||
@ -188,10 +222,12 @@ func (fs *FS) CopyPart(srcFS common.OriginFS, p common.Part) error {
|
|||||||
copySource := fmt.Sprintf("/%s/%s", src.Bucket, srcPath)
|
copySource := fmt.Sprintf("/%s/%s", src.Bucket, srcPath)
|
||||||
|
|
||||||
input := &s3.CopyObjectInput{
|
input := &s3.CopyObjectInput{
|
||||||
Bucket: aws.String(fs.Bucket),
|
Bucket: aws.String(fs.Bucket),
|
||||||
CopySource: aws.String(copySource),
|
CopySource: aws.String(copySource),
|
||||||
Key: aws.String(dstPath),
|
Key: aws.String(dstPath),
|
||||||
|
StorageClass: fs.StorageClass,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := fs.s3.CopyObject(context.Background(), input)
|
_, err := fs.s3.CopyObject(context.Background(), input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot copy %q from %s to %s (copySource %q): %w", p.Path, src, fs, copySource, err)
|
return fmt.Errorf("cannot copy %q from %s to %s (copySource %q): %w", p.Path, src, fs, copySource, err)
|
||||||
@ -231,10 +267,12 @@ func (fs *FS) UploadPart(p common.Part, r io.Reader) error {
|
|||||||
r: r,
|
r: r,
|
||||||
}
|
}
|
||||||
input := &s3.PutObjectInput{
|
input := &s3.PutObjectInput{
|
||||||
Bucket: aws.String(fs.Bucket),
|
Bucket: aws.String(fs.Bucket),
|
||||||
Key: aws.String(path),
|
Key: aws.String(path),
|
||||||
Body: sr,
|
Body: sr,
|
||||||
|
StorageClass: fs.StorageClass,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := fs.uploader.Upload(context.Background(), input)
|
_, err := fs.uploader.Upload(context.Background(), input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot upoad data to %q at %s (remote path %q): %w", p.Path, fs, path, err)
|
return fmt.Errorf("cannot upoad data to %q at %s (remote path %q): %w", p.Path, fs, path, err)
|
||||||
@ -282,9 +320,10 @@ func (fs *FS) CreateFile(filePath string, data []byte) error {
|
|||||||
r: bytes.NewReader(data),
|
r: bytes.NewReader(data),
|
||||||
}
|
}
|
||||||
input := &s3.PutObjectInput{
|
input := &s3.PutObjectInput{
|
||||||
Bucket: aws.String(fs.Bucket),
|
Bucket: aws.String(fs.Bucket),
|
||||||
Key: aws.String(path),
|
Key: aws.String(path),
|
||||||
Body: sr,
|
Body: sr,
|
||||||
|
StorageClass: fs.StorageClass,
|
||||||
}
|
}
|
||||||
_, err := fs.uploader.Upload(context.Background(), input)
|
_, err := fs.uploader.Upload(context.Background(), input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user