// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package storage import ( "context" "encoding/base64" "errors" "fmt" "io" "io/ioutil" "net/http" "net/url" "os" "reflect" "strconv" "strings" "time" "cloud.google.com/go/internal/optional" "cloud.google.com/go/internal/trace" "golang.org/x/oauth2/google" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" raw "google.golang.org/api/storage/v1" "google.golang.org/api/transport" htransport "google.golang.org/api/transport/http" iampb "google.golang.org/genproto/googleapis/iam/v1" ) // httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic // storageClient interface. // // This is an experimental API and not intended for public use. type httpStorageClient struct { creds *google.Credentials hc *http.Client readHost string raw *raw.Service scheme string settings *settings } // newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON // Storage API. // // This is an experimental API and not intended for public use. func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) { s := initSettings(opts...) o := s.clientOption var creds *google.Credentials // In general, it is recommended to use raw.NewService instead of htransport.NewClient // since raw.NewService configures the correct default endpoints when initializing the // internal http client. However, in our case, "NewRangeReader" in reader.go needs to // access the http client directly to make requests, so we create the client manually // here so it can be re-used by both reader.go and raw.NewService. This means we need to // manually configure the default endpoint options on the http client. Furthermore, we // need to account for STORAGE_EMULATOR_HOST override when setting the default endpoints. if host := os.Getenv("STORAGE_EMULATOR_HOST"); host == "" { // Prepend default options to avoid overriding options passed by the user. o = append([]option.ClientOption{option.WithScopes(ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"), option.WithUserAgent(userAgent)}, o...) o = append(o, internaloption.WithDefaultEndpoint("https://storage.googleapis.com/storage/v1/")) o = append(o, internaloption.WithDefaultMTLSEndpoint("https://storage.mtls.googleapis.com/storage/v1/")) // Don't error out here. The user may have passed in their own HTTP // client which does not auth with ADC or other common conventions. c, err := transport.Creds(ctx, o...) if err == nil { creds = c o = append(o, internaloption.WithCredentials(creds)) } } else { var hostURL *url.URL if strings.Contains(host, "://") { h, err := url.Parse(host) if err != nil { return nil, err } hostURL = h } else { // Add scheme for user if not supplied in STORAGE_EMULATOR_HOST // URL is only parsed correctly if it has a scheme, so we build it ourselves hostURL = &url.URL{Scheme: "http", Host: host} } hostURL.Path = "storage/v1/" endpoint := hostURL.String() // Append the emulator host as default endpoint for the user o = append([]option.ClientOption{option.WithoutAuthentication()}, o...) o = append(o, internaloption.WithDefaultEndpoint(endpoint)) o = append(o, internaloption.WithDefaultMTLSEndpoint(endpoint)) } s.clientOption = o // htransport selects the correct endpoint among WithEndpoint (user override), WithDefaultEndpoint, and WithDefaultMTLSEndpoint. hc, ep, err := htransport.NewClient(ctx, s.clientOption...) if err != nil { return nil, fmt.Errorf("dialing: %v", err) } // RawService should be created with the chosen endpoint to take account of user override. rawService, err := raw.NewService(ctx, option.WithEndpoint(ep), option.WithHTTPClient(hc)) if err != nil { return nil, fmt.Errorf("storage client: %v", err) } // Update readHost and scheme with the chosen endpoint. u, err := url.Parse(ep) if err != nil { return nil, fmt.Errorf("supplied endpoint %q is not valid: %v", ep, err) } return &httpStorageClient{ creds: creds, hc: hc, readHost: u.Host, raw: rawService, scheme: u.Scheme, settings: s, }, nil } func (c *httpStorageClient) Close() error { c.hc.CloseIdleConnections() return nil } // Top-level methods. func (c *httpStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) { s := callSettings(c.settings, opts...) call := c.raw.Projects.ServiceAccount.Get(project) var res *raw.ServiceAccount err := run(ctx, func() error { var err error res, err = call.Context(ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) if err != nil { return "", err } return res.EmailAddress, nil } func (c *httpStorageClient) CreateBucket(ctx context.Context, project string, attrs *BucketAttrs, opts ...storageOption) (*BucketAttrs, error) { s := callSettings(c.settings, opts...) var bkt *raw.Bucket if attrs != nil { bkt = attrs.toRawBucket() } else { bkt = &raw.Bucket{} } // If there is lifecycle information but no location, explicitly set // the location. This is a GCS quirk/bug. if bkt.Location == "" && bkt.Lifecycle != nil { bkt.Location = "US" } req := c.raw.Buckets.Insert(project, bkt) setClientHeader(req.Header()) if attrs != nil && attrs.PredefinedACL != "" { req.PredefinedAcl(attrs.PredefinedACL) } if attrs != nil && attrs.PredefinedDefaultObjectACL != "" { req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL) } var battrs *BucketAttrs err := run(ctx, func() error { b, err := req.Context(ctx).Do() if err != nil { return err } battrs, err = newBucket(b) return err }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) return battrs, err } func (c *httpStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator { s := callSettings(c.settings, opts...) it := &BucketIterator{ ctx: ctx, projectID: project, } fetch := func(pageSize int, pageToken string) (token string, err error) { req := c.raw.Buckets.List(it.projectID) setClientHeader(req.Header()) req.Projection("full") req.Prefix(it.Prefix) req.PageToken(pageToken) if pageSize > 0 { req.MaxResults(int64(pageSize)) } var resp *raw.Buckets err = run(it.ctx, func() error { resp, err = req.Context(it.ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) if err != nil { return "", err } for _, item := range resp.Items { b, err := newBucket(item) if err != nil { return "", err } it.buckets = append(it.buckets, b) } return resp.NextPageToken, nil } it.pageInfo, it.nextFunc = iterator.NewPageInfo( fetch, func() int { return len(it.buckets) }, func() interface{} { b := it.buckets; it.buckets = nil; return b }) return it } // Bucket methods. func (c *httpStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error { s := callSettings(c.settings, opts...) req := c.raw.Buckets.Delete(bucket) setClientHeader(req.Header()) if err := applyBucketConds("httpStorageClient.DeleteBucket", conds, req); err != nil { return err } if s.userProject != "" { req.UserProject(s.userProject) } return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) } func (c *httpStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) { s := callSettings(c.settings, opts...) req := c.raw.Buckets.Get(bucket).Projection("full") setClientHeader(req.Header()) err := applyBucketConds("httpStorageClient.GetBucket", conds, req) if err != nil { return nil, err } if s.userProject != "" { req.UserProject(s.userProject) } var resp *raw.Bucket err = run(ctx, func() error { resp, err = req.Context(ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { return nil, ErrBucketNotExist } if err != nil { return nil, err } return newBucket(resp) } func (c *httpStorageClient) UpdateBucket(ctx context.Context, bucket string, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) { s := callSettings(c.settings, opts...) rb := uattrs.toRawBucket() req := c.raw.Buckets.Patch(bucket, rb).Projection("full") setClientHeader(req.Header()) err := applyBucketConds("httpStorageClient.UpdateBucket", conds, req) if err != nil { return nil, err } if s.userProject != "" { req.UserProject(s.userProject) } if uattrs != nil && uattrs.PredefinedACL != "" { req.PredefinedAcl(uattrs.PredefinedACL) } if uattrs != nil && uattrs.PredefinedDefaultObjectACL != "" { req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL) } var rawBucket *raw.Bucket err = run(ctx, func() error { rawBucket, err = req.Context(ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) if err != nil { return nil, err } return newBucket(rawBucket) } func (c *httpStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error { s := callSettings(c.settings, opts...) var metageneration int64 if conds != nil { metageneration = conds.MetagenerationMatch } req := c.raw.Buckets.LockRetentionPolicy(bucket, metageneration) return run(ctx, func() error { _, err := req.Context(ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) } func (c *httpStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator { s := callSettings(c.settings, opts...) it := &ObjectIterator{ ctx: ctx, } if q != nil { it.query = *q } fetch := func(pageSize int, pageToken string) (string, error) { req := c.raw.Objects.List(bucket) setClientHeader(req.Header()) projection := it.query.Projection if projection == ProjectionDefault { projection = ProjectionFull } req.Projection(projection.String()) req.Delimiter(it.query.Delimiter) req.Prefix(it.query.Prefix) req.StartOffset(it.query.StartOffset) req.EndOffset(it.query.EndOffset) req.Versions(it.query.Versions) req.IncludeTrailingDelimiter(it.query.IncludeTrailingDelimiter) if len(it.query.fieldSelection) > 0 { req.Fields("nextPageToken", googleapi.Field(it.query.fieldSelection)) } req.PageToken(pageToken) if s.userProject != "" { req.UserProject(s.userProject) } if pageSize > 0 { req.MaxResults(int64(pageSize)) } var resp *raw.Objects var err error err = run(it.ctx, func() error { resp, err = req.Context(it.ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) if err != nil { var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { err = ErrBucketNotExist } return "", err } for _, item := range resp.Items { it.items = append(it.items, newObject(item)) } for _, prefix := range resp.Prefixes { it.items = append(it.items, &ObjectAttrs{Prefix: prefix}) } return resp.NextPageToken, nil } it.pageInfo, it.nextFunc = iterator.NewPageInfo( fetch, func() int { return len(it.items) }, func() interface{} { b := it.items; it.items = nil; return b }) return it } // Object metadata methods. func (c *httpStorageClient) DeleteObject(ctx context.Context, bucket, object string, gen int64, conds *Conditions, opts ...storageOption) error { s := callSettings(c.settings, opts...) req := c.raw.Objects.Delete(bucket, object).Context(ctx) if err := applyConds("Delete", gen, conds, req); err != nil { return err } if s.userProject != "" { req.UserProject(s.userProject) } err := run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { return ErrObjectNotExist } return err } func (c *httpStorageClient) GetObject(ctx context.Context, bucket, object string, gen int64, encryptionKey []byte, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) { s := callSettings(c.settings, opts...) req := c.raw.Objects.Get(bucket, object).Projection("full").Context(ctx) if err := applyConds("Attrs", gen, conds, req); err != nil { return nil, err } if s.userProject != "" { req.UserProject(s.userProject) } if err := setEncryptionHeaders(req.Header(), encryptionKey, false); err != nil { return nil, err } var obj *raw.Object var err error err = run(ctx, func() error { obj, err = req.Context(ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { return nil, ErrObjectNotExist } if err != nil { return nil, err } return newObject(obj), nil } func (c *httpStorageClient) UpdateObject(ctx context.Context, bucket, object string, uattrs *ObjectAttrsToUpdate, gen int64, encryptionKey []byte, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) { s := callSettings(c.settings, opts...) var attrs ObjectAttrs // Lists of fields to send, and set to null, in the JSON. var forceSendFields, nullFields []string if uattrs.ContentType != nil { attrs.ContentType = optional.ToString(uattrs.ContentType) // For ContentType, sending the empty string is a no-op. // Instead we send a null. if attrs.ContentType == "" { nullFields = append(nullFields, "ContentType") } else { forceSendFields = append(forceSendFields, "ContentType") } } if uattrs.ContentLanguage != nil { attrs.ContentLanguage = optional.ToString(uattrs.ContentLanguage) // For ContentLanguage it's an error to send the empty string. // Instead we send a null. if attrs.ContentLanguage == "" { nullFields = append(nullFields, "ContentLanguage") } else { forceSendFields = append(forceSendFields, "ContentLanguage") } } if uattrs.ContentEncoding != nil { attrs.ContentEncoding = optional.ToString(uattrs.ContentEncoding) forceSendFields = append(forceSendFields, "ContentEncoding") } if uattrs.ContentDisposition != nil { attrs.ContentDisposition = optional.ToString(uattrs.ContentDisposition) forceSendFields = append(forceSendFields, "ContentDisposition") } if uattrs.CacheControl != nil { attrs.CacheControl = optional.ToString(uattrs.CacheControl) forceSendFields = append(forceSendFields, "CacheControl") } if uattrs.EventBasedHold != nil { attrs.EventBasedHold = optional.ToBool(uattrs.EventBasedHold) forceSendFields = append(forceSendFields, "EventBasedHold") } if uattrs.TemporaryHold != nil { attrs.TemporaryHold = optional.ToBool(uattrs.TemporaryHold) forceSendFields = append(forceSendFields, "TemporaryHold") } if !uattrs.CustomTime.IsZero() { attrs.CustomTime = uattrs.CustomTime forceSendFields = append(forceSendFields, "CustomTime") } if uattrs.Metadata != nil { attrs.Metadata = uattrs.Metadata if len(attrs.Metadata) == 0 { // Sending the empty map is a no-op. We send null instead. nullFields = append(nullFields, "Metadata") } else { forceSendFields = append(forceSendFields, "Metadata") } } if uattrs.ACL != nil { attrs.ACL = uattrs.ACL // It's an error to attempt to delete the ACL, so // we don't append to nullFields here. forceSendFields = append(forceSendFields, "Acl") } rawObj := attrs.toRawObject(bucket) rawObj.ForceSendFields = forceSendFields rawObj.NullFields = nullFields call := c.raw.Objects.Patch(bucket, object, rawObj).Projection("full").Context(ctx) if err := applyConds("Update", gen, conds, call); err != nil { return nil, err } if s.userProject != "" { call.UserProject(s.userProject) } if uattrs.PredefinedACL != "" { call.PredefinedAcl(uattrs.PredefinedACL) } if err := setEncryptionHeaders(call.Header(), encryptionKey, false); err != nil { return nil, err } var obj *raw.Object var err error err = run(ctx, func() error { obj, err = call.Do(); return err }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) var e *googleapi.Error if errors.As(err, &e) && e.Code == http.StatusNotFound { return nil, ErrObjectNotExist } if err != nil { return nil, err } return newObject(obj), nil } // Default Object ACL methods. func (c *httpStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error { s := callSettings(c.settings, opts...) req := c.raw.DefaultObjectAccessControls.Delete(bucket, string(entity)) configureACLCall(ctx, s.userProject, req) return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) } func (c *httpStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) { s := callSettings(c.settings, opts...) var acls *raw.ObjectAccessControls var err error req := c.raw.DefaultObjectAccessControls.List(bucket) configureACLCall(ctx, s.userProject, req) err = run(ctx, func() error { acls, err = req.Do() return err }, s.retry, true, setRetryHeaderHTTP(req)) if err != nil { return nil, err } return toObjectACLRules(acls.Items), nil } func (c *httpStorageClient) UpdateDefaultObjectACL(ctx context.Context, opts ...storageOption) (*ACLRule, error) { return nil, errMethodNotSupported } // Bucket ACL methods. func (c *httpStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error { s := callSettings(c.settings, opts...) req := c.raw.BucketAccessControls.Delete(bucket, string(entity)) configureACLCall(ctx, s.userProject, req) return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) } func (c *httpStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) { s := callSettings(c.settings, opts...) var acls *raw.BucketAccessControls var err error req := c.raw.BucketAccessControls.List(bucket) configureACLCall(ctx, s.userProject, req) err = run(ctx, func() error { acls, err = req.Do() return err }, s.retry, true, setRetryHeaderHTTP(req)) if err != nil { return nil, err } return toBucketACLRules(acls.Items), nil } func (c *httpStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) (*ACLRule, error) { s := callSettings(c.settings, opts...) acl := &raw.BucketAccessControl{ Bucket: bucket, Entity: string(entity), Role: string(role), } req := c.raw.BucketAccessControls.Update(bucket, string(entity), acl) configureACLCall(ctx, s.userProject, req) var aclRule ACLRule var err error err = run(ctx, func() error { acl, err = req.Do() aclRule = toBucketACLRule(acl) return err }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) if err != nil { return nil, err } return &aclRule, nil } // configureACLCall sets the context, user project and headers on the apiary library call. // This will panic if the call does not have the correct methods. func configureACLCall(ctx context.Context, userProject string, call interface{ Header() http.Header }) { vc := reflect.ValueOf(call) vc.MethodByName("Context").Call([]reflect.Value{reflect.ValueOf(ctx)}) if userProject != "" { vc.MethodByName("UserProject").Call([]reflect.Value{reflect.ValueOf(userProject)}) } setClientHeader(call.Header()) } // Object ACL methods. func (c *httpStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error { return errMethodNotSupported } // ListObjectACLs retrieves object ACL entries. By default, it operates on the latest generation of this object. // Selecting a specific generation of this object is not currently supported by the client. func (c *httpStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) { s := callSettings(c.settings, opts...) var acls *raw.ObjectAccessControls var err error req := c.raw.ObjectAccessControls.List(bucket, object) configureACLCall(ctx, s.userProject, req) err = run(ctx, func() error { acls, err = req.Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) if err != nil { return nil, err } return toObjectACLRules(acls.Items), nil } func (c *httpStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) (*ACLRule, error) { return nil, errMethodNotSupported } // Media operations. func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) { return nil, errMethodNotSupported } func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) { return nil, errMethodNotSupported } func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.NewRangeReader") defer func() { trace.EndSpan(ctx, err) }() s := callSettings(c.settings, opts...) if params.offset < 0 && params.length >= 0 { return nil, fmt.Errorf("storage: invalid offset %d < 0 requires negative length", params.offset) } if params.conds != nil { if err := params.conds.validate("NewRangeReader"); err != nil { return nil, err } } u := &url.URL{ Scheme: c.scheme, Host: c.readHost, Path: fmt.Sprintf("/%s/%s", params.bucket, params.object), } verb := "GET" if params.length == 0 { verb = "HEAD" } req, err := http.NewRequest(verb, u.String(), nil) if err != nil { return nil, err } req = req.WithContext(ctx) if s.userProject != "" { req.Header.Set("X-Goog-User-Project", s.userProject) } // TODO(noahdietz): add option for readCompressed. // if o.readCompressed { // req.Header.Set("Accept-Encoding", "gzip") // } if err := setEncryptionHeaders(req.Header, params.encryptionKey, false); err != nil { return nil, err } // Define a function that initiates a Read with offset and length, assuming we // have already read seen bytes. reopen := func(seen int64) (*http.Response, error) { // If the context has already expired, return immediately without making a // call. if err := ctx.Err(); err != nil { return nil, err } start := params.offset + seen if params.length < 0 && start < 0 { req.Header.Set("Range", fmt.Sprintf("bytes=%d", start)) } else if params.length < 0 && start > 0 { req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start)) } else if params.length > 0 { // The end character isn't affected by how many bytes we've seen. req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, params.offset+params.length-1)) } // We wait to assign conditions here because the generation number can change in between reopen() runs. if err := setConditionsHeaders(req.Header, params.conds); err != nil { return nil, err } // If an object generation is specified, include generation as query string parameters. if params.gen >= 0 { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) } var res *http.Response err = run(ctx, func() error { res, err = c.hc.Do(req) if err != nil { return err } if res.StatusCode == http.StatusNotFound { res.Body.Close() return ErrObjectNotExist } if res.StatusCode < 200 || res.StatusCode > 299 { body, _ := ioutil.ReadAll(res.Body) res.Body.Close() return &googleapi.Error{ Code: res.StatusCode, Header: res.Header, Body: string(body), } } partialContentNotSatisfied := !decompressiveTranscoding(res) && start > 0 && params.length != 0 && res.StatusCode != http.StatusPartialContent if partialContentNotSatisfied { res.Body.Close() return errors.New("storage: partial request not satisfied") } // With "Content-Encoding": "gzip" aka decompressive transcoding, GCS serves // back the whole file regardless of the range count passed in as per: // https://cloud.google.com/storage/docs/transcoding#range, // thus we have to manually move the body forward by seen bytes. if decompressiveTranscoding(res) && seen > 0 { _, _ = io.CopyN(ioutil.Discard, res.Body, seen) } // If a generation hasn't been specified, and this is the first response we get, let's record the // generation. In future requests we'll use this generation as a precondition to avoid data races. if params.gen < 0 && res.Header.Get("X-Goog-Generation") != "" { gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64) if err != nil { return err } params.gen = gen64 } return nil }, s.retry, s.idempotent, setRetryHeaderHTTP(nil)) if err != nil { return nil, err } return res, nil } res, err := reopen(0) if err != nil { return nil, err } var ( size int64 // total size of object, even if a range was requested. checkCRC bool crc uint32 startOffset int64 // non-zero if range request. ) if res.StatusCode == http.StatusPartialContent { cr := strings.TrimSpace(res.Header.Get("Content-Range")) if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") { return nil, fmt.Errorf("storage: invalid Content-Range %q", cr) } // Content range is formatted -/. We take // the total size. size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64) if err != nil { return nil, fmt.Errorf("storage: invalid Content-Range %q", cr) } dashIndex := strings.Index(cr, "-") if dashIndex >= 0 { startOffset, err = strconv.ParseInt(cr[len("bytes="):dashIndex], 10, 64) if err != nil { return nil, fmt.Errorf("storage: invalid Content-Range %q: %v", cr, err) } } } else { size = res.ContentLength // Check the CRC iff all of the following hold: // - We asked for content (length != 0). // - We got all the content (status != PartialContent). // - The server sent a CRC header. // - The Go http stack did not uncompress the file. // - We were not served compressed data that was uncompressed on download. // The problem with the last two cases is that the CRC will not match -- GCS // computes it on the compressed contents, but we compute it on the // uncompressed contents. if params.length != 0 && !res.Uncompressed && !uncompressedByServer(res) { crc, checkCRC = parseCRC32c(res) } } remain := res.ContentLength body := res.Body if params.length == 0 { remain = 0 body.Close() body = emptyBody } var metaGen int64 if res.Header.Get("X-Goog-Metageneration") != "" { metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64) if err != nil { return nil, err } } var lm time.Time if res.Header.Get("Last-Modified") != "" { lm, err = http.ParseTime(res.Header.Get("Last-Modified")) if err != nil { return nil, err } } attrs := ReaderObjectAttrs{ Size: size, ContentType: res.Header.Get("Content-Type"), ContentEncoding: res.Header.Get("Content-Encoding"), CacheControl: res.Header.Get("Cache-Control"), LastModified: lm, StartOffset: startOffset, Generation: params.gen, Metageneration: metaGen, } return &Reader{ Attrs: attrs, size: size, remain: remain, wantCRC: crc, checkCRC: checkCRC, reader: &httpReader{ reopen: reopen, body: body, }, }, nil } func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) { s := callSettings(c.settings, opts...) errorf := params.setError setObj := params.setObj progress := params.progress attrs := params.attrs mediaOpts := []googleapi.MediaOption{ googleapi.ChunkSize(params.chunkSize), } if c := attrs.ContentType; c != "" { mediaOpts = append(mediaOpts, googleapi.ContentType(c)) } if params.chunkRetryDeadline != 0 { mediaOpts = append(mediaOpts, googleapi.ChunkRetryDeadline(params.chunkRetryDeadline)) } pr, pw := io.Pipe() go func() { defer close(params.donec) rawObj := attrs.toRawObject(params.bucket) if params.sendCRC32C { rawObj.Crc32c = encodeUint32(attrs.CRC32C) } if attrs.MD5 != nil { rawObj.Md5Hash = base64.StdEncoding.EncodeToString(attrs.MD5) } call := c.raw.Objects.Insert(params.bucket, rawObj). Media(pr, mediaOpts...). Projection("full"). Context(params.ctx). Name(params.attrs.Name) call.ProgressUpdater(func(n, _ int64) { progress(n) }) if attrs.KMSKeyName != "" { call.KmsKeyName(attrs.KMSKeyName) } if attrs.PredefinedACL != "" { call.PredefinedAcl(attrs.PredefinedACL) } if err := setEncryptionHeaders(call.Header(), params.encryptionKey, false); err != nil { errorf(err) pr.CloseWithError(err) return } var resp *raw.Object err := applyConds("NewWriter", params.attrs.Generation, params.conds, call) if err == nil { if s.userProject != "" { call.UserProject(s.userProject) } // TODO(tritone): Remove this code when Uploads begin to support // retry attempt header injection with "client header" injection. setClientHeader(call.Header()) // The internals that perform call.Do automatically retry both the initial // call to set up the upload as well as calls to upload individual chunks // for a resumable upload (as long as the chunk size is non-zero). Hence // there is no need to add retries here. // Retry only when the operation is idempotent or the retry policy is RetryAlways. isIdempotent := params.conds != nil && (params.conds.GenerationMatch >= 0 || params.conds.DoesNotExist == true) var useRetry bool if (s.retry == nil || s.retry.policy == RetryIdempotent) && isIdempotent { useRetry = true } else if s.retry != nil && s.retry.policy == RetryAlways { useRetry = true } if useRetry { if s.retry != nil { call.WithRetry(s.retry.backoff, s.retry.shouldRetry) } else { call.WithRetry(nil, nil) } } resp, err = call.Do() } if err != nil { errorf(err) pr.CloseWithError(err) return } setObj(newObject(resp)) }() return pw, nil } // IAM methods. func (c *httpStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) { s := callSettings(c.settings, opts...) call := c.raw.Buckets.GetIamPolicy(resource).OptionsRequestedPolicyVersion(int64(version)) setClientHeader(call.Header()) if s.userProject != "" { call.UserProject(s.userProject) } var rp *raw.Policy err := run(ctx, func() error { var err error rp, err = call.Context(ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) if err != nil { return nil, err } return iamFromStoragePolicy(rp), nil } func (c *httpStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error { s := callSettings(c.settings, opts...) rp := iamToStoragePolicy(policy) call := c.raw.Buckets.SetIamPolicy(resource, rp) setClientHeader(call.Header()) if s.userProject != "" { call.UserProject(s.userProject) } return run(ctx, func() error { _, err := call.Context(ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) } func (c *httpStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) { s := callSettings(c.settings, opts...) call := c.raw.Buckets.TestIamPermissions(resource, permissions) setClientHeader(call.Header()) if s.userProject != "" { call.UserProject(s.userProject) } var res *raw.TestIamPermissionsResponse err := run(ctx, func() error { var err error res, err = call.Context(ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) if err != nil { return nil, err } return res.Permissions, nil } // HMAC Key methods. func (c *httpStorageClient) GetHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) (*HMACKey, error) { return nil, errMethodNotSupported } func (c *httpStorageClient) ListHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) *HMACKeysIterator { return &HMACKeysIterator{} } func (c *httpStorageClient) UpdateHMACKey(ctx context.Context, desc *hmacKeyDesc, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) { return nil, errMethodNotSupported } func (c *httpStorageClient) CreateHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) (*HMACKey, error) { return nil, errMethodNotSupported } func (c *httpStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) error { return errMethodNotSupported } // Notification methods. // ListNotifications returns all the Notifications configured for this bucket, as a map indexed by notification ID. // // Note: This API does not support pagination. However, entity limits cap the number of notifications on a single bucket, // so all results will be returned in the first response. See https://cloud.google.com/storage/quotas#buckets. func (c *httpStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.ListNotifications") defer func() { trace.EndSpan(ctx, err) }() s := callSettings(c.settings, opts...) call := c.raw.Notifications.List(bucket) if s.userProject != "" { call.UserProject(s.userProject) } var res *raw.Notifications err = run(ctx, func() error { res, err = call.Context(ctx).Do() return err }, s.retry, true, setRetryHeaderHTTP(call)) if err != nil { return nil, err } return notificationsToMap(res.Items), nil } func (c *httpStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.CreateNotification") defer func() { trace.EndSpan(ctx, err) }() s := callSettings(c.settings, opts...) call := c.raw.Notifications.Insert(bucket, toRawNotification(n)) if s.userProject != "" { call.UserProject(s.userProject) } var rn *raw.Notification err = run(ctx, func() error { rn, err = call.Context(ctx).Do() return err }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) if err != nil { return nil, err } return toNotification(rn), nil } func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.DeleteNotification") defer func() { trace.EndSpan(ctx, err) }() s := callSettings(c.settings, opts...) call := c.raw.Notifications.Delete(bucket, id) if s.userProject != "" { call.UserProject(s.userProject) } return run(ctx, func() error { return call.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) } type httpReader struct { body io.ReadCloser seen int64 reopen func(seen int64) (*http.Response, error) } func (r *httpReader) Read(p []byte) (int, error) { n := 0 for len(p[n:]) > 0 { m, err := r.body.Read(p[n:]) n += m r.seen += int64(m) if err == nil || err == io.EOF { return n, err } // Read failed (likely due to connection issues), but we will try to reopen // the pipe and continue. Send a ranged read request that takes into account // the number of bytes we've already seen. res, err := r.reopen(r.seen) if err != nil { // reopen already retries return n, err } r.body.Close() r.body = res.Body } return n, nil } func (r *httpReader) Close() error { return r.body.Close() }