2424import com .google .api .gax .rpc .StateCheckingResponseObserver ;
2525import com .google .api .gax .rpc .StreamController ;
2626import com .google .cloud .storage .BlobDescriptorImpl .OutstandingReadToArray ;
27+ import com .google .common .base .Preconditions ;
2728import com .google .storage .v2 .BidiReadObjectRequest ;
2829import com .google .storage .v2 .BidiReadObjectResponse ;
2930import com .google .storage .v2 .ObjectRangeData ;
3536import java .util .concurrent .TimeoutException ;
3637
3738final class BlobDescriptorStream extends StateCheckingResponseObserver <BidiReadObjectResponse >
38- implements ClientStream <BidiReadObjectRequest >, ApiFuture <Void > {
39+ implements ClientStream <BidiReadObjectRequest >, ApiFuture <Void >, AutoCloseable {
3940
4041 private final SettableApiFuture <Void > openSignal ;
42+ private final SettableApiFuture <Void > closeSignal ;
4143
4244 private final BlobDescriptorState state ;
4345 private final ResponseContentLifecycleManager <BidiReadObjectResponse >
@@ -47,8 +49,8 @@ final class BlobDescriptorStream extends StateCheckingResponseObserver<BidiReadO
4749 private final GrpcCallContext context ;
4850 private final OpenMonitorResponseObserver openMonitorResponseObserver ;
4951
50- private StreamController controller ;
51-
52+ private volatile boolean open ;
53+ private volatile StreamController controller ;
5254 private volatile ClientStream <BidiReadObjectRequest > requestStream ;
5355
5456 private BlobDescriptorStream (
@@ -64,6 +66,8 @@ private BlobDescriptorStream(
6466 this .context = context ;
6567 this .openMonitorResponseObserver = new OpenMonitorResponseObserver ();
6668 this .openSignal = SettableApiFuture .create ();
69+ this .closeSignal = SettableApiFuture .create ();
70+ this .open = true ;
6771 }
6872
6973 public ClientStream <BidiReadObjectRequest > getRequestStream () {
@@ -79,23 +83,49 @@ public ClientStream<BidiReadObjectRequest> getRequestStream() {
7983 }
8084 }
8185
86+ @ Override
87+ public void close () throws IOException {
88+ if (!open ) {
89+ return ;
90+ }
91+
92+ try {
93+ cancel (true );
94+ if (requestStream != null ) {
95+ requestStream .closeSend ();
96+ try {
97+ closeSignal .get ();
98+ } catch (InterruptedException | ExecutionException e ) {
99+ throw new RuntimeException (e );
100+ }
101+ requestStream = null ;
102+ }
103+ } finally {
104+ open = false ;
105+ }
106+ }
107+
82108 @ Override
83109 public void send (BidiReadObjectRequest request ) {
110+ checkOpen ();
84111 getRequestStream ().send (request );
85112 }
86113
87114 @ Override
88115 public void closeSendWithError (Throwable t ) {
116+ checkOpen ();
89117 getRequestStream ().closeSendWithError (t );
90118 }
91119
92120 @ Override
93121 public void closeSend () {
122+ checkOpen ();
94123 getRequestStream ().closeSend ();
95124 }
96125
97126 @ Override
98127 public boolean isSendReady () {
128+ checkOpen ();
99129 return getRequestStream ().isSendReady ();
100130 }
101131
@@ -192,6 +222,10 @@ protected void onErrorImpl(Throwable t) {}
192222 @ Override
193223 protected void onCompleteImpl () {}
194224
225+ private void checkOpen () {
226+ Preconditions .checkState (open , "not open" );
227+ }
228+
195229 private class OpenMonitorResponseObserver
196230 extends StateCheckingResponseObserver <BidiReadObjectResponse > {
197231
@@ -212,12 +246,14 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
212246 protected void onErrorImpl (Throwable t ) {
213247 BlobDescriptorStream .this .onErrorImpl (t );
214248 openSignal .setException (t );
249+ closeSignal .setException (t );
215250 }
216251
217252 @ Override
218253 protected void onCompleteImpl () {
219254 BlobDescriptorStream .this .onCompleteImpl ();
220255 openSignal .set (null );
256+ closeSignal .set (null );
221257 }
222258 }
223259
0 commit comments