Skip to content

Commit 411615d

Browse files
committed
chore: make BlobDescriptor Closeable
1 parent 51af43e commit 411615d

File tree

3 files changed

+48
-4
lines changed

3 files changed

+48
-4
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import java.io.IOException;
2323

2424
/** Blob Descriptor is to blob, what File Descriptor is to a file */
25-
public interface BlobDescriptor {
25+
public interface BlobDescriptor extends AutoCloseable, Closeable {
2626

2727
BlobInfo getBlobInfo();
2828

2929
ApiFuture<byte[]> readRangeAsBytes(ByteRangeSpec range);
3030

31+
@Override
32+
void close() throws IOException;
33+
3134
interface ZeroCopySupport {
3235
interface DisposableByteString extends AutoCloseable, Closeable {
3336
ByteString byteString();

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ public BlobInfo getBlobInfo() {
6363
return info;
6464
}
6565

66+
@Override
67+
public void close() throws IOException {
68+
stream.close();
69+
}
70+
6671
static ApiFuture<BlobDescriptor> create(
6772
BidiReadObjectRequest openRequest,
6873
GrpcCallContext context,

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.api.gax.rpc.StateCheckingResponseObserver;
2525
import com.google.api.gax.rpc.StreamController;
2626
import com.google.cloud.storage.BlobDescriptorImpl.OutstandingReadToArray;
27+
import com.google.common.base.Preconditions;
2728
import com.google.storage.v2.BidiReadObjectRequest;
2829
import com.google.storage.v2.BidiReadObjectResponse;
2930
import com.google.storage.v2.ObjectRangeData;
@@ -35,9 +36,10 @@
3536
import java.util.concurrent.TimeoutException;
3637

3738
final class BlobDescriptorStream extends StateCheckingResponseObserver<BidiReadObjectResponse>
38-
implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void> {
39+
implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void>, AutoCloseable {
3940

4041
private final SettableApiFuture<Void> openSignal;
42+
private final SettableApiFuture<Void> closeSignal;
4143

4244
private final BlobDescriptorState state;
4345
private final ResponseContentLifecycleManager<BidiReadObjectResponse>
@@ -47,8 +49,8 @@ final class BlobDescriptorStream extends StateCheckingResponseObserver<BidiReadO
4749
private final GrpcCallContext context;
4850
private final OpenMonitorResponseObserver openMonitorResponseObserver;
4951

50-
private StreamController controller;
51-
52+
private volatile boolean open;
53+
private volatile StreamController controller;
5254
private volatile ClientStream<BidiReadObjectRequest> requestStream;
5355

5456
private BlobDescriptorStream(
@@ -64,6 +66,8 @@ private BlobDescriptorStream(
6466
this.context = context;
6567
this.openMonitorResponseObserver = new OpenMonitorResponseObserver();
6668
this.openSignal = SettableApiFuture.create();
69+
this.closeSignal = SettableApiFuture.create();
70+
this.open = true;
6771
}
6872

6973
public ClientStream<BidiReadObjectRequest> getRequestStream() {
@@ -79,23 +83,49 @@ public ClientStream<BidiReadObjectRequest> getRequestStream() {
7983
}
8084
}
8185

86+
@Override
87+
public void close() throws IOException {
88+
if (!open) {
89+
return;
90+
}
91+
92+
try {
93+
cancel(true);
94+
if (requestStream != null) {
95+
requestStream.closeSend();
96+
try {
97+
closeSignal.get();
98+
} catch (InterruptedException | ExecutionException e) {
99+
throw new RuntimeException(e);
100+
}
101+
requestStream = null;
102+
}
103+
} finally {
104+
open = false;
105+
}
106+
}
107+
82108
@Override
83109
public void send(BidiReadObjectRequest request) {
110+
checkOpen();
84111
getRequestStream().send(request);
85112
}
86113

87114
@Override
88115
public void closeSendWithError(Throwable t) {
116+
checkOpen();
89117
getRequestStream().closeSendWithError(t);
90118
}
91119

92120
@Override
93121
public void closeSend() {
122+
checkOpen();
94123
getRequestStream().closeSend();
95124
}
96125

97126
@Override
98127
public boolean isSendReady() {
128+
checkOpen();
99129
return getRequestStream().isSendReady();
100130
}
101131

@@ -192,6 +222,10 @@ protected void onErrorImpl(Throwable t) {}
192222
@Override
193223
protected void onCompleteImpl() {}
194224

225+
private void checkOpen() {
226+
Preconditions.checkState(open, "not open");
227+
}
228+
195229
private class OpenMonitorResponseObserver
196230
extends StateCheckingResponseObserver<BidiReadObjectResponse> {
197231

@@ -212,12 +246,14 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
212246
protected void onErrorImpl(Throwable t) {
213247
BlobDescriptorStream.this.onErrorImpl(t);
214248
openSignal.setException(t);
249+
closeSignal.setException(t);
215250
}
216251

217252
@Override
218253
protected void onCompleteImpl() {
219254
BlobDescriptorStream.this.onCompleteImpl();
220255
openSignal.set(null);
256+
closeSignal.set(null);
221257
}
222258
}
223259

0 commit comments

Comments
 (0)