// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package storage import ( "context" "os" gapic "cloud.google.com/go/storage/internal/apiv2" "google.golang.org/api/iterator" "google.golang.org/api/option" iampb "google.golang.org/genproto/googleapis/iam/v1" storagepb "google.golang.org/genproto/googleapis/storage/v2" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb" ) const ( // defaultConnPoolSize is the default number of connections // to initialize in the GAPIC gRPC connection pool. A larger // connection pool may be necessary for jobs that require // high throughput and/or leverage many concurrent streams. // // This is an experimental API and not intended for public use. defaultConnPoolSize = 4 // globalProjectAlias is the project ID alias used for global buckets. // // This is only used for the gRPC API. globalProjectAlias = "_" ) // defaultGRPCOptions returns a set of the default client options // for gRPC client initialization. // // This is an experimental API and not intended for public use. func defaultGRPCOptions() []option.ClientOption { defaults := []option.ClientOption{ option.WithGRPCConnectionPool(defaultConnPoolSize), } // Set emulator options for gRPC if an emulator was specified. Note that in a // hybrid client, STORAGE_EMULATOR_HOST will set the host to use for HTTP and // STORAGE_EMULATOR_HOST_GRPC will set the host to use for gRPC (when using a // local emulator, HTTP and gRPC must use different ports, so this is // necessary). // // TODO: When the newHybridClient is not longer used, remove // STORAGE_EMULATOR_HOST_GRPC and use STORAGE_EMULATOR_HOST for both the // HTTP and gRPC based clients. if host := os.Getenv("STORAGE_EMULATOR_HOST_GRPC"); host != "" { // Strip the scheme from the emulator host. WithEndpoint does not take a // scheme for gRPC. host = stripScheme(host) defaults = append(defaults, option.WithEndpoint(host), option.WithGRPCDialOption(grpc.WithInsecure()), option.WithoutAuthentication(), ) } return defaults } // grpcStorageClient is the gRPC API implementation of the transport-agnostic // storageClient interface. // // This is an experimental API and not intended for public use. type grpcStorageClient struct { raw *gapic.Client settings *settings } // newGRPCStorageClient initializes a new storageClient that uses the gRPC // Storage API. // // This is an experimental API and not intended for public use. func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) { s := initSettings(opts...) s.clientOption = append(defaultGRPCOptions(), s.clientOption...) g, err := gapic.NewClient(ctx, s.clientOption...) if err != nil { return nil, err } return &grpcStorageClient{ raw: g, settings: s, }, nil } func (c *grpcStorageClient) Close() error { return c.raw.Close() } // Top-level methods. func (c *grpcStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) { s := callSettings(c.settings, opts...) req := &storagepb.GetServiceAccountRequest{ Project: toProjectResource(project), } var resp *storagepb.ServiceAccount err := run(ctx, func() error { var err error resp, err = c.raw.GetServiceAccount(ctx, req, s.gax...) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if err != nil { return "", err } return resp.EmailAddress, err } func (c *grpcStorageClient) CreateBucket(ctx context.Context, project string, attrs *BucketAttrs, opts ...storageOption) (*BucketAttrs, error) { s := callSettings(c.settings, opts...) b := attrs.toProtoBucket() // If there is lifecycle information but no location, explicitly set // the location. This is a GCS quirk/bug. if b.GetLocation() == "" && b.GetLifecycle() != nil { b.Location = "US" } req := &storagepb.CreateBucketRequest{ Parent: toProjectResource(project), Bucket: b, BucketId: b.GetName(), PredefinedAcl: attrs.PredefinedACL, PredefinedDefaultObjectAcl: attrs.PredefinedDefaultObjectACL, } var battrs *BucketAttrs err := run(ctx, func() error { res, err := c.raw.CreateBucket(ctx, req, s.gax...) battrs = newBucketFromProto(res) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) return battrs, err } func (c *grpcStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator { s := callSettings(c.settings, opts...) it := &BucketIterator{ ctx: ctx, projectID: project, } var gitr *gapic.BucketIterator fetch := func(pageSize int, pageToken string) (token string, err error) { // Initialize GAPIC-based iterator when pageToken is empty, which // indicates that this fetch call is attempting to get the first page. // // Note: Initializing the GAPIC-based iterator lazily is necessary to // capture the BucketIterator.Prefix set by the user *after* the // BucketIterator is returned to them from the veneer. if pageToken == "" { req := &storagepb.ListBucketsRequest{ Parent: toProjectResource(it.projectID), Prefix: it.Prefix, } gitr = c.raw.ListBuckets(it.ctx, req, s.gax...) } var buckets []*storagepb.Bucket var next string err = run(it.ctx, func() error { buckets, next, err = gitr.InternalFetch(pageSize, pageToken) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if err != nil { return "", err } for _, bkt := range buckets { b := newBucketFromProto(bkt) it.buckets = append(it.buckets, b) } return next, nil } it.pageInfo, it.nextFunc = iterator.NewPageInfo( fetch, func() int { return len(it.buckets) }, func() interface{} { b := it.buckets; it.buckets = nil; return b }) return it } // Bucket methods. func (c *grpcStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error { s := callSettings(c.settings, opts...) req := &storagepb.DeleteBucketRequest{ Name: bucketResourceName(globalProjectAlias, bucket), } if err := applyBucketCondsProto("grpcStorageClient.DeleteBucket", conds, req); err != nil { return err } if s.userProject != "" { ctx = setUserProjectMetadata(ctx, s.userProject) } return run(ctx, func() error { return c.raw.DeleteBucket(ctx, req, s.gax...) }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) } func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) { s := callSettings(c.settings, opts...) req := &storagepb.GetBucketRequest{ Name: bucketResourceName(globalProjectAlias, bucket), } if err := applyBucketCondsProto("grpcStorageClient.GetBucket", conds, req); err != nil { return nil, err } if s.userProject != "" { ctx = setUserProjectMetadata(ctx, s.userProject) } var battrs *BucketAttrs err := run(ctx, func() error { res, err := c.raw.GetBucket(ctx, req, s.gax...) battrs = newBucketFromProto(res) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { return nil, ErrBucketNotExist } return battrs, err } func (c *grpcStorageClient) UpdateBucket(ctx context.Context, bucket string, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) { s := callSettings(c.settings, opts...) b := uattrs.toProtoBucket() b.Name = bucketResourceName(globalProjectAlias, bucket) req := &storagepb.UpdateBucketRequest{ Bucket: b, PredefinedAcl: uattrs.PredefinedACL, PredefinedDefaultObjectAcl: uattrs.PredefinedDefaultObjectACL, } if err := applyBucketCondsProto("grpcStorageClient.UpdateBucket", conds, req); err != nil { return nil, err } if s.userProject != "" { ctx = setUserProjectMetadata(ctx, s.userProject) } var paths []string fieldMask := &fieldmaskpb.FieldMask{ Paths: paths, } if uattrs.CORS != nil { fieldMask.Paths = append(fieldMask.Paths, "cors") } if uattrs.DefaultEventBasedHold != nil { fieldMask.Paths = append(fieldMask.Paths, "default_event_based_hold") } if uattrs.RetentionPolicy != nil { fieldMask.Paths = append(fieldMask.Paths, "retention_policy") } if uattrs.VersioningEnabled != nil { fieldMask.Paths = append(fieldMask.Paths, "versioning") } if uattrs.RequesterPays != nil { fieldMask.Paths = append(fieldMask.Paths, "billing") } if uattrs.BucketPolicyOnly != nil || uattrs.UniformBucketLevelAccess != nil || uattrs.PublicAccessPrevention != PublicAccessPreventionUnknown { fieldMask.Paths = append(fieldMask.Paths, "iam_config") } if uattrs.Encryption != nil { fieldMask.Paths = append(fieldMask.Paths, "encryption") } if uattrs.Lifecycle != nil { fieldMask.Paths = append(fieldMask.Paths, "lifecycle") } if uattrs.Logging != nil { fieldMask.Paths = append(fieldMask.Paths, "logging") } if uattrs.Website != nil { fieldMask.Paths = append(fieldMask.Paths, "website") } if uattrs.PredefinedACL != "" { // In cases where PredefinedACL is set, Acl is cleared. fieldMask.Paths = append(fieldMask.Paths, "acl") } if uattrs.PredefinedDefaultObjectACL != "" { // In cases where PredefinedDefaultObjectACL is set, DefaultObjectAcl is cleared. fieldMask.Paths = append(fieldMask.Paths, "default_object_acl") } if uattrs.acl != nil { // In cases where acl is set by UpdateBucketACL method. fieldMask.Paths = append(fieldMask.Paths, "acl") } if uattrs.defaultObjectACL != nil { // In cases where defaultObjectACL is set by UpdateBucketACL method. fieldMask.Paths = append(fieldMask.Paths, "default_object_acl") } if uattrs.StorageClass != "" { fieldMask.Paths = append(fieldMask.Paths, "storage_class") } if uattrs.RPO != RPOUnknown { fieldMask.Paths = append(fieldMask.Paths, "rpo") } // TODO(cathyo): Handle labels. Pending b/230510191. req.UpdateMask = fieldMask var battrs *BucketAttrs err := run(ctx, func() error { res, err := c.raw.UpdateBucket(ctx, req, s.gax...) battrs = newBucketFromProto(res) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) return battrs, err } func (c *grpcStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error { s := callSettings(c.settings, opts...) req := &storagepb.LockBucketRetentionPolicyRequest{ Bucket: bucketResourceName(globalProjectAlias, bucket), } if err := applyBucketCondsProto("grpcStorageClient.LockBucketRetentionPolicy", conds, req); err != nil { return err } return run(ctx, func() error { _, err := c.raw.LockBucketRetentionPolicy(ctx, req, s.gax...) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) } func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator { s := callSettings(c.settings, opts...) it := &ObjectIterator{ ctx: ctx, } if q != nil { it.query = *q } req := &storagepb.ListObjectsRequest{ Parent: bucketResourceName(globalProjectAlias, bucket), Prefix: it.query.Prefix, Delimiter: it.query.Delimiter, Versions: it.query.Versions, LexicographicStart: it.query.StartOffset, LexicographicEnd: it.query.EndOffset, // TODO(noahietz): Convert a projection to a FieldMask. // ReadMask: q.Projection, } if s.userProject != "" { ctx = setUserProjectMetadata(ctx, s.userProject) } gitr := c.raw.ListObjects(it.ctx, req, s.gax...) fetch := func(pageSize int, pageToken string) (token string, err error) { var objects []*storagepb.Object err = run(it.ctx, func() error { objects, token, err = gitr.InternalFetch(pageSize, pageToken) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if err != nil { if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { err = ErrBucketNotExist } return "", err } for _, obj := range objects { b := newObjectFromProto(obj) it.items = append(it.items, b) } return token, nil } it.pageInfo, it.nextFunc = iterator.NewPageInfo( fetch, func() int { return len(it.items) }, func() interface{} { b := it.items; it.items = nil; return b }) return it } // Object metadata methods. func (c *grpcStorageClient) DeleteObject(ctx context.Context, bucket, object string, conds *Conditions, opts ...storageOption) error { return errMethodNotSupported } func (c *grpcStorageClient) GetObject(ctx context.Context, bucket, object string, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) { return nil, errMethodNotSupported } func (c *grpcStorageClient) UpdateObject(ctx context.Context, bucket, object string, uattrs *ObjectAttrsToUpdate, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) { return nil, errMethodNotSupported } // Default Object ACL methods. func (c *grpcStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error { // There is no separate API for PATCH in gRPC. // Make a GET call first to retrieve BucketAttrs. attrs, err := c.GetBucket(ctx, bucket, nil, opts...) if err != nil { return err } // Delete the entity and copy other remaining ACL entities. var acl []ACLRule for _, a := range attrs.DefaultObjectACL { if a.Entity != entity { acl = append(acl, a) } } uattrs := &BucketAttrsToUpdate{defaultObjectACL: acl} // Call UpdateBucket with a MetagenerationMatch precondition set. if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil { return err } return nil } func (c *grpcStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) { attrs, err := c.GetBucket(ctx, bucket, nil, opts...) if err != nil { return nil, err } return attrs.DefaultObjectACL, nil } func (c *grpcStorageClient) UpdateDefaultObjectACL(ctx context.Context, opts ...storageOption) (*ACLRule, error) { return nil, errMethodNotSupported } // Bucket ACL methods. func (c *grpcStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error { // There is no separate API for PATCH in gRPC. // Make a GET call first to retrieve BucketAttrs. attrs, err := c.GetBucket(ctx, bucket, nil, opts...) if err != nil { return err } // Delete the entity and copy other remaining ACL entities. var acl []ACLRule for _, a := range attrs.ACL { if a.Entity != entity { acl = append(acl, a) } } uattrs := &BucketAttrsToUpdate{acl: acl} // Call UpdateBucket with a MetagenerationMatch precondition set. if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil { return err } return nil } func (c *grpcStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) { attrs, err := c.GetBucket(ctx, bucket, nil, opts...) if err != nil { return nil, err } return attrs.ACL, nil } func (c *grpcStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) (*ACLRule, error) { // There is no separate API for PATCH in gRPC. // Make a GET call first to retrieve BucketAttrs. attrs, err := c.GetBucket(ctx, bucket, nil, opts...) if err != nil { return nil, err } var acl []ACLRule aclRule := ACLRule{Entity: entity, Role: role} acl = append(attrs.ACL, aclRule) uattrs := &BucketAttrsToUpdate{acl: acl} // Call UpdateBucket with a MetagenerationMatch precondition set. _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...) if err != nil { return nil, err } return &aclRule, err } // Object ACL methods. func (c *grpcStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error { return errMethodNotSupported } func (c *grpcStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) { return nil, errMethodNotSupported } func (c *grpcStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) (*ACLRule, error) { return nil, errMethodNotSupported } // Media operations. func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) { return nil, errMethodNotSupported } func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) { return nil, errMethodNotSupported } func (c *grpcStorageClient) OpenReader(ctx context.Context, r *Reader, opts ...storageOption) error { return errMethodNotSupported } func (c *grpcStorageClient) OpenWriter(ctx context.Context, w *Writer, opts ...storageOption) error { return errMethodNotSupported } // IAM methods. func (c *grpcStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) { // TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter. s := callSettings(c.settings, opts...) req := &iampb.GetIamPolicyRequest{ Resource: bucketResourceName(globalProjectAlias, resource), Options: &iampb.GetPolicyOptions{ RequestedPolicyVersion: version, }, } var rp *iampb.Policy err := run(ctx, func() error { var err error rp, err = c.raw.GetIamPolicy(ctx, req, s.gax...) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) return rp, err } func (c *grpcStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error { // TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter. s := callSettings(c.settings, opts...) req := &iampb.SetIamPolicyRequest{ Resource: bucketResourceName(globalProjectAlias, resource), Policy: policy, } return run(ctx, func() error { _, err := c.raw.SetIamPolicy(ctx, req, s.gax...) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) } func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) { // TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter. s := callSettings(c.settings, opts...) req := &iampb.TestIamPermissionsRequest{ Resource: bucketResourceName(globalProjectAlias, resource), Permissions: permissions, } var res *iampb.TestIamPermissionsResponse err := run(ctx, func() error { var err error res, err = c.raw.TestIamPermissions(ctx, req, s.gax...) return err }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if err != nil { return nil, err } return res.Permissions, nil } // HMAC Key methods. func (c *grpcStorageClient) GetHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) (*HMACKey, error) { return nil, errMethodNotSupported } func (c *grpcStorageClient) ListHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) *HMACKeysIterator { return &HMACKeysIterator{} } func (c *grpcStorageClient) UpdateHMACKey(ctx context.Context, desc *hmacKeyDesc, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) { return nil, errMethodNotSupported } func (c *grpcStorageClient) CreateHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) (*HMACKey, error) { return nil, errMethodNotSupported } func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) error { return errMethodNotSupported } // setUserProjectMetadata appends a project ID to the outgoing Context metadata // via the x-goog-user-project system parameter defined at // https://cloud.google.com/apis/docs/system-parameters. This is only for // billing purposes, and is generally optional, except for requester-pays // buckets. func setUserProjectMetadata(ctx context.Context, project string) context.Context { return metadata.AppendToOutgoingContext(ctx, "x-goog-user-project", project) }