@@ -1725,12 +1725,12 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
17251725
17261726 var o * storagepb.Object
17271727 uploadBuff := func (ctx context.Context ) error {
1728- obj , err := gw .uploadBuffer (recvd , offset , doneReading )
1728+ obj , err := gw .uploadBuffer (ctx , recvd , offset , doneReading )
17291729 o = obj
17301730 return err
17311731 }
17321732
1733- err = run (gw .ctx , uploadBuff , gw .settings .retry , s .idempotent )
1733+ err = run (bucketContext ( gw .ctx , gw . bucket ) , uploadBuff , gw .settings .retry , s .idempotent )
17341734 if err != nil {
17351735 return err
17361736 }
@@ -2666,11 +2666,10 @@ type gRPCBidiWriteBufferSender interface {
26662666 // If flush is true, implementations must not return until the data in buf is
26672667 // stable. If finishWrite is true, implementations must return the object on
26682668 // success.
2669- sendBuffer (buf []byte , offset int64 , flush , finishWrite bool ) (* storagepb.Object , error )
2669+ sendBuffer (ctx context. Context , buf []byte , offset int64 , flush , finishWrite bool ) (* storagepb.Object , error )
26702670}
26712671
26722672type gRPCOneshotBidiWriteBufferSender struct {
2673- ctx context.Context
26742673 firstMessage * storagepb.BidiWriteObjectRequest
26752674 raw * gapic.Client
26762675 stream storagepb.Storage_BidiWriteObjectClient
@@ -2691,17 +2690,16 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() (*gRPCOneshotBidiWrit
26912690 }
26922691
26932692 return & gRPCOneshotBidiWriteBufferSender {
2694- ctx : bucketContext (w .ctx , w .bucket ),
26952693 firstMessage : firstMessage ,
26962694 raw : w .c .raw ,
26972695 settings : w .settings ,
26982696 }, nil
26992697}
27002698
2701- func (s * gRPCOneshotBidiWriteBufferSender ) sendBuffer (buf []byte , offset int64 , flush , finishWrite bool ) (obj * storagepb.Object , err error ) {
2699+ func (s * gRPCOneshotBidiWriteBufferSender ) sendBuffer (ctx context. Context , buf []byte , offset int64 , flush , finishWrite bool ) (obj * storagepb.Object , err error ) {
27022700 var firstMessage * storagepb.BidiWriteObjectRequest
27032701 if s .stream == nil {
2704- s .stream , err = s .raw .BidiWriteObject (s . ctx , s .settings .gax ... )
2702+ s .stream , err = s .raw .BidiWriteObject (ctx , s .settings .gax ... )
27052703 if err != nil {
27062704 return
27072705 }
@@ -2737,7 +2735,6 @@ func (s *gRPCOneshotBidiWriteBufferSender) sendBuffer(buf []byte, offset int64,
27372735}
27382736
27392737type gRPCResumableBidiWriteBufferSender struct {
2740- ctx context.Context
27412738 queryRetry * retryConfig
27422739 upid string
27432740 progress func (int64 )
@@ -2748,7 +2745,7 @@ type gRPCResumableBidiWriteBufferSender struct {
27482745 settings * settings
27492746}
27502747
2751- func (w * gRPCWriter ) newGRPCResumableBidiWriteBufferSender () (* gRPCResumableBidiWriteBufferSender , error ) {
2748+ func (w * gRPCWriter ) newGRPCResumableBidiWriteBufferSender (ctx context. Context ) (* gRPCResumableBidiWriteBufferSender , error ) {
27522749 req := & storagepb.StartResumableWriteRequest {
27532750 WriteObjectSpec : w .spec ,
27542751 CommonObjectRequestParams : toProtoCommonObjectRequestParams (w .encryptionKey ),
@@ -2758,7 +2755,6 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() (*gRPCResumableBidi
27582755 ObjectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
27592756 }
27602757
2761- ctx := bucketContext (w .ctx , w .bucket )
27622758 var upid string
27632759 err := run (ctx , func (ctx context.Context ) error {
27642760 upres , err := w .c .raw .StartResumableWrite (ctx , req , w .settings .gax ... )
@@ -2778,7 +2774,6 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() (*gRPCResumableBidi
27782774 }
27792775
27802776 return & gRPCResumableBidiWriteBufferSender {
2781- ctx : ctx ,
27822777 queryRetry : w .settings .retry ,
27832778 upid : upid ,
27842779 progress : w .progress ,
@@ -2791,9 +2786,9 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() (*gRPCResumableBidi
27912786
27922787// queryProgress is a helper that queries the status of the resumable upload
27932788// associated with the given upload ID.
2794- func (s * gRPCResumableBidiWriteBufferSender ) queryProgress () (int64 , error ) {
2789+ func (s * gRPCResumableBidiWriteBufferSender ) queryProgress (ctx context. Context ) (int64 , error ) {
27952790 var persistedSize int64
2796- err := run (s . ctx , func (ctx context.Context ) error {
2791+ err := run (ctx , func (ctx context.Context ) error {
27972792 q , err := s .raw .QueryWriteStatus (ctx , & storagepb.QueryWriteStatusRequest {
27982793 UploadId : s .upid ,
27992794 }, s .settings .gax ... )
@@ -2805,15 +2800,15 @@ func (s *gRPCResumableBidiWriteBufferSender) queryProgress() (int64, error) {
28052800 return persistedSize , err
28062801}
28072802
2808- func (s * gRPCResumableBidiWriteBufferSender ) sendBuffer (buf []byte , offset int64 , flush , finishWrite bool ) (obj * storagepb.Object , err error ) {
2803+ func (s * gRPCResumableBidiWriteBufferSender ) sendBuffer (ctx context. Context , buf []byte , offset int64 , flush , finishWrite bool ) (obj * storagepb.Object , err error ) {
28092804 reconnected := false
28102805 if s .stream == nil {
28112806 // Determine offset and reconnect
2812- s .flushOffset , err = s .queryProgress ()
2807+ s .flushOffset , err = s .queryProgress (ctx )
28132808 if err != nil {
28142809 return
28152810 }
2816- s .stream , err = s .raw .BidiWriteObject (s . ctx , s .settings .gax ... )
2811+ s .stream , err = s .raw .BidiWriteObject (ctx , s .settings .gax ... )
28172812 if err != nil {
28182813 return
28192814 }
@@ -2885,7 +2880,7 @@ func (s *gRPCResumableBidiWriteBufferSender) sendBuffer(buf []byte, offset int64
28852880// The final Object is returned on success if doneReading is true.
28862881//
28872882// Returns object and any error that is not retriable.
2888- func (w * gRPCWriter ) uploadBuffer (recvd int , start int64 , doneReading bool ) (obj * storagepb.Object , err error ) {
2883+ func (w * gRPCWriter ) uploadBuffer (ctx context. Context , recvd int , start int64 , doneReading bool ) (obj * storagepb.Object , err error ) {
28892884 if w .streamSender == nil {
28902885 if w .append {
28912886 // Appendable object semantics
@@ -2895,7 +2890,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (obj
28952890 w .streamSender , err = w .newGRPCOneshotBidiWriteBufferSender ()
28962891 } else {
28972892 // Resumable write semantics
2898- w .streamSender , err = w .newGRPCResumableBidiWriteBufferSender ()
2893+ w .streamSender , err = w .newGRPCResumableBidiWriteBufferSender (ctx )
28992894 }
29002895 if err != nil {
29012896 return
@@ -2915,7 +2910,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (obj
29152910 l = len (data )
29162911 flush = true
29172912 }
2918- obj , err = w .streamSender .sendBuffer (data [:l ], offset , flush , flush && doneReading )
2913+ obj , err = w .streamSender .sendBuffer (ctx , data [:l ], offset , flush , flush && doneReading )
29192914 if err != nil {
29202915 return nil , err
29212916 }
0 commit comments