mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-16 00:41:24 +01:00
347 lines
12 KiB
Go
347 lines
12 KiB
Go
// Copyright 2022 Google LLC
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
|
|
gapic "cloud.google.com/go/storage/internal/apiv2"
|
|
"google.golang.org/api/option"
|
|
iampb "google.golang.org/genproto/googleapis/iam/v1"
|
|
storagepb "google.golang.org/genproto/googleapis/storage/v2"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
// defaultConnPoolSize is the default number of connections
|
|
// to initialize in the GAPIC gRPC connection pool. A larger
|
|
// connection pool may be necessary for jobs that require
|
|
// high throughput and/or leverage many concurrent streams.
|
|
//
|
|
// This is an experimental API and not intended for public use.
|
|
defaultConnPoolSize = 4
|
|
|
|
// globalProjectAlias is the project ID alias used for global buckets.
|
|
//
|
|
// This is only used for the gRPC API.
|
|
globalProjectAlias = "_"
|
|
)
|
|
|
|
// defaultGRPCOptions returns a set of the default client options
|
|
// for gRPC client initialization.
|
|
//
|
|
// This is an experimental API and not intended for public use.
|
|
func defaultGRPCOptions() []option.ClientOption {
|
|
defaults := []option.ClientOption{
|
|
option.WithGRPCConnectionPool(defaultConnPoolSize),
|
|
}
|
|
|
|
// Set emulator options for gRPC if an emulator was specified. Note that in a
|
|
// hybrid client, STORAGE_EMULATOR_HOST will set the host to use for HTTP and
|
|
// STORAGE_EMULATOR_HOST_GRPC will set the host to use for gRPC (when using a
|
|
// local emulator, HTTP and gRPC must use different ports, so this is
|
|
// necessary).
|
|
//
|
|
// TODO: When the newHybridClient is not longer used, remove
|
|
// STORAGE_EMULATOR_HOST_GRPC and use STORAGE_EMULATOR_HOST for both the
|
|
// HTTP and gRPC based clients.
|
|
if host := os.Getenv("STORAGE_EMULATOR_HOST_GRPC"); host != "" {
|
|
// Strip the scheme from the emulator host. WithEndpoint does not take a
|
|
// scheme for gRPC.
|
|
host = stripScheme(host)
|
|
|
|
defaults = append(defaults,
|
|
option.WithEndpoint(host),
|
|
option.WithGRPCDialOption(grpc.WithInsecure()),
|
|
option.WithoutAuthentication(),
|
|
)
|
|
}
|
|
|
|
return defaults
|
|
}
|
|
|
|
// grpcStorageClient is the gRPC API implementation of the transport-agnostic
|
|
// storageClient interface.
|
|
//
|
|
// This is an experimental API and not intended for public use.
|
|
type grpcStorageClient struct {
|
|
raw *gapic.Client
|
|
settings *settings
|
|
}
|
|
|
|
// newGRPCStorageClient initializes a new storageClient that uses the gRPC
|
|
// Storage API.
|
|
//
|
|
// This is an experimental API and not intended for public use.
|
|
func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) {
|
|
s := initSettings(opts...)
|
|
s.clientOption = append(defaultGRPCOptions(), s.clientOption...)
|
|
|
|
g, err := gapic.NewClient(ctx, s.clientOption...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &grpcStorageClient{
|
|
raw: g,
|
|
settings: s,
|
|
}, nil
|
|
}
|
|
|
|
func (c *grpcStorageClient) Close() error {
|
|
return c.raw.Close()
|
|
}
|
|
|
|
// Top-level methods.
|
|
|
|
func (c *grpcStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) {
|
|
return "", errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) CreateBucket(ctx context.Context, project string, attrs *BucketAttrs, opts ...storageOption) (*BucketAttrs, error) {
|
|
s := callSettings(c.settings, opts...)
|
|
b := attrs.toProtoBucket()
|
|
|
|
// If there is lifecycle information but no location, explicitly set
|
|
// the location. This is a GCS quirk/bug.
|
|
if b.GetLocation() == "" && b.GetLifecycle() != nil {
|
|
b.Location = "US"
|
|
}
|
|
|
|
req := &storagepb.CreateBucketRequest{
|
|
Parent: toProjectResource(project),
|
|
Bucket: b,
|
|
BucketId: b.GetName(),
|
|
// TODO(noahdietz): This will be switched to a string.
|
|
//
|
|
// PredefinedAcl: attrs.PredefinedACL,
|
|
// PredefinedDefaultObjectAcl: attrs.PredefinedDefaultObjectACL,
|
|
}
|
|
|
|
var battrs *BucketAttrs
|
|
err := run(ctx, func() error {
|
|
res, err := c.raw.CreateBucket(ctx, req, s.gax...)
|
|
|
|
battrs = newBucketFromProto(res)
|
|
|
|
return err
|
|
}, s.retry, s.idempotent)
|
|
|
|
return battrs, err
|
|
}
|
|
|
|
func (c *grpcStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) (*BucketIterator, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
|
|
// Bucket methods.
|
|
|
|
func (c *grpcStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
|
|
s := callSettings(c.settings, opts...)
|
|
req := &storagepb.DeleteBucketRequest{
|
|
Name: bucketResourceName(globalProjectAlias, bucket),
|
|
}
|
|
if err := applyBucketCondsProto("grpcStorageClient.DeleteBucket", conds, req); err != nil {
|
|
return err
|
|
}
|
|
if s.userProject != "" {
|
|
req.CommonRequestParams = &storagepb.CommonRequestParams{
|
|
UserProject: toProjectResource(s.userProject),
|
|
}
|
|
}
|
|
|
|
return run(ctx, func() error {
|
|
return c.raw.DeleteBucket(ctx, req, s.gax...)
|
|
}, s.retry, s.idempotent)
|
|
}
|
|
|
|
func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
|
|
s := callSettings(c.settings, opts...)
|
|
req := &storagepb.GetBucketRequest{
|
|
Name: bucketResourceName(globalProjectAlias, bucket),
|
|
}
|
|
if err := applyBucketCondsProto("grpcStorageClient.GetBucket", conds, req); err != nil {
|
|
return nil, err
|
|
}
|
|
if s.userProject != "" {
|
|
req.CommonRequestParams = &storagepb.CommonRequestParams{
|
|
UserProject: toProjectResource(s.userProject),
|
|
}
|
|
}
|
|
|
|
var battrs *BucketAttrs
|
|
err := run(ctx, func() error {
|
|
res, err := c.raw.GetBucket(ctx, req, s.gax...)
|
|
|
|
battrs = newBucketFromProto(res)
|
|
|
|
return err
|
|
}, s.retry, s.idempotent)
|
|
|
|
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
|
|
return nil, ErrBucketNotExist
|
|
}
|
|
|
|
return battrs, err
|
|
}
|
|
func (c *grpcStorageClient) UpdateBucket(ctx context.Context, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
|
|
return errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) (*ObjectIterator, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
|
|
// Object metadata methods.
|
|
|
|
func (c *grpcStorageClient) DeleteObject(ctx context.Context, bucket, object string, conds *Conditions, opts ...storageOption) error {
|
|
return errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) GetObject(ctx context.Context, bucket, object string, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) UpdateObject(ctx context.Context, bucket, object string, uattrs *ObjectAttrsToUpdate, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
|
|
// Default Object ACL methods.
|
|
|
|
func (c *grpcStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
|
|
return errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) UpdateDefaultObjectACL(ctx context.Context, opts ...storageOption) (*ACLRule, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
|
|
// Bucket ACL methods.
|
|
|
|
func (c *grpcStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
|
|
return errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) (*ACLRule, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
|
|
// Object ACL methods.
|
|
|
|
func (c *grpcStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error {
|
|
return errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) (*ACLRule, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
|
|
// Media operations.
|
|
|
|
func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
|
|
func (c *grpcStorageClient) OpenReader(ctx context.Context, r *Reader, opts ...storageOption) error {
|
|
return errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) OpenWriter(ctx context.Context, w *Writer, opts ...storageOption) error {
|
|
return errMethodNotSupported
|
|
}
|
|
|
|
// IAM methods.
|
|
|
|
func (c *grpcStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) {
|
|
// TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
|
|
s := callSettings(c.settings, opts...)
|
|
req := &iampb.GetIamPolicyRequest{
|
|
Resource: bucketResourceName(globalProjectAlias, resource),
|
|
Options: &iampb.GetPolicyOptions{
|
|
RequestedPolicyVersion: version,
|
|
},
|
|
}
|
|
var rp *iampb.Policy
|
|
err := run(ctx, func() error {
|
|
var err error
|
|
rp, err = c.raw.GetIamPolicy(ctx, req, s.gax...)
|
|
return err
|
|
}, s.retry, s.idempotent)
|
|
|
|
return rp, err
|
|
}
|
|
|
|
func (c *grpcStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error {
|
|
// TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
|
|
s := callSettings(c.settings, opts...)
|
|
|
|
req := &iampb.SetIamPolicyRequest{
|
|
Resource: bucketResourceName(globalProjectAlias, resource),
|
|
Policy: policy,
|
|
}
|
|
|
|
return run(ctx, func() error {
|
|
_, err := c.raw.SetIamPolicy(ctx, req, s.gax...)
|
|
return err
|
|
}, s.retry, s.idempotent)
|
|
}
|
|
|
|
func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) {
|
|
// TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
|
|
s := callSettings(c.settings, opts...)
|
|
req := &iampb.TestIamPermissionsRequest{
|
|
Resource: bucketResourceName(globalProjectAlias, resource),
|
|
Permissions: permissions,
|
|
}
|
|
var res *iampb.TestIamPermissionsResponse
|
|
err := run(ctx, func() error {
|
|
var err error
|
|
res, err = c.raw.TestIamPermissions(ctx, req, s.gax...)
|
|
return err
|
|
}, s.retry, s.idempotent)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return res.Permissions, nil
|
|
}
|
|
|
|
// HMAC Key methods.
|
|
|
|
func (c *grpcStorageClient) GetHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) (*HMACKey, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) ListHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) *HMACKeysIterator {
|
|
return &HMACKeysIterator{}
|
|
}
|
|
func (c *grpcStorageClient) UpdateHMACKey(ctx context.Context, desc *hmacKeyDesc, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) CreateHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) (*HMACKey, error) {
|
|
return nil, errMethodNotSupported
|
|
}
|
|
func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) error {
|
|
return errMethodNotSupported
|
|
}
|