Skip to content

Commit 67482f7

Browse files
committed
chore: create ZeroCopyBidiStreamingCallable and ZeroCopyServerStreamingCallable
Introduce ZeroCopyBidiStreamingCallable and ZeroCopyServerStreamingCallable to encapsulate a callable and its corresponding ResponseContentLifecycleManager. This helps reduce the number of fields needed for zero copy integration, and reduces some repetition.
1 parent d540a83 commit 67482f7

File tree

9 files changed

+266
-114
lines changed

9 files changed

+266
-114
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import com.google.api.core.ApiFutures;
2121
import com.google.api.core.SettableApiFuture;
2222
import com.google.api.gax.grpc.GrpcCallContext;
23-
import com.google.api.gax.rpc.BidiStreamingCallable;
2423
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
2524
import com.google.cloud.storage.BlobDescriptorStreamRead.AccumulatingRead;
25+
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
2626
import com.google.storage.v2.BidiReadObjectRequest;
2727
import com.google.storage.v2.BidiReadObjectResponse;
2828
import java.io.IOException;
@@ -78,14 +78,11 @@ public void close() throws IOException {
7878
static ApiFuture<BlobDescriptor> create(
7979
BidiReadObjectRequest openRequest,
8080
GrpcCallContext context,
81-
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
82-
ResponseContentLifecycleManager<BidiReadObjectResponse> bidiResponseContentLifecycleManager,
81+
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
8382
Executor executor) {
8483
BlobDescriptorState state = new BlobDescriptorState(openRequest);
8584

86-
BlobDescriptorStream stream =
87-
BlobDescriptorStream.create(
88-
executor, bidiResponseContentLifecycleManager, callable, context, state);
85+
BlobDescriptorStream stream = BlobDescriptorStream.create(executor, callable, context, state);
8986

9087
ApiFuture<BlobDescriptor> blobDescriptorFuture =
9188
ApiFutures.transform(stream, nowOpen -> new BlobDescriptorImpl(stream, state), executor);

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.SettableApiFuture;
2121
import com.google.api.gax.grpc.GrpcCallContext;
22-
import com.google.api.gax.rpc.BidiStreamingCallable;
2322
import com.google.api.gax.rpc.ClientStream;
2423
import com.google.api.gax.rpc.ResponseObserver;
2524
import com.google.api.gax.rpc.StreamController;
25+
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
2626
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
2727
import com.google.common.base.Preconditions;
2828
import com.google.protobuf.ByteString;
@@ -52,10 +52,9 @@ final class BlobDescriptorStream
5252
private final SettableApiFuture<Void> blobDescriptorResolveFuture;
5353

5454
private final BlobDescriptorState state;
55-
private final ResponseContentLifecycleManager<BidiReadObjectResponse>
56-
bidiResponseContentLifecycleManager;
5755
private final Executor executor;
58-
private final BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable;
56+
private final ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse>
57+
callable;
5958
private final GrpcCallContext context;
6059
private final int maxRedirectsAllowed;
6160

@@ -69,13 +68,11 @@ final class BlobDescriptorStream
6968
private BlobDescriptorStream(
7069
BlobDescriptorState state,
7170
Executor executor,
72-
ResponseContentLifecycleManager<BidiReadObjectResponse> bidiResponseContentLifecycleManager,
73-
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
71+
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
7472
GrpcCallContext context,
7573
int maxRedirectsAllowed) {
7674
this.state = state;
7775
this.executor = executor;
78-
this.bidiResponseContentLifecycleManager = bidiResponseContentLifecycleManager;
7976
this.callable = callable;
8077
this.context = context;
8178
this.blobDescriptorResolveFuture = SettableApiFuture.create();
@@ -233,7 +230,7 @@ public void onStart(StreamController controller) {
233230
public void onResponse(BidiReadObjectResponse response) {
234231
controller.request(1);
235232
try (ResponseContentLifecycleHandle<BidiReadObjectResponse> handle =
236-
bidiResponseContentLifecycleManager.get(response)) {
233+
callable.getResponseContentLifecycleManager().get(response)) {
237234
if (response.hasMetadata()) {
238235
state.setMetadata(response.getMetadata());
239236
}
@@ -456,19 +453,12 @@ public void onComplete() {
456453

457454
static BlobDescriptorStream create(
458455
Executor executor,
459-
ResponseContentLifecycleManager<BidiReadObjectResponse> bidiResponseContentLifecycleManager,
460-
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
456+
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
461457
GrpcCallContext context,
462458
BlobDescriptorState state) {
463459

464460
int maxRedirectsAllowed = 3; // TODO: make this configurable in the ultimate public surface
465-
return new BlobDescriptorStream(
466-
state,
467-
executor,
468-
bidiResponseContentLifecycleManager,
469-
callable,
470-
context,
471-
maxRedirectsAllowed);
461+
return new BlobDescriptorStream(state, executor, callable, context, maxRedirectsAllowed);
472462
}
473463

474464
static final class MaxRedirectsExceededException extends RuntimeException {

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import com.google.api.core.ApiFutures;
2222
import com.google.api.core.SettableApiFuture;
2323
import com.google.api.gax.retrying.ResultRetryAlgorithm;
24-
import com.google.api.gax.rpc.ServerStreamingCallable;
2524
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
25+
import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable;
2626
import com.google.cloud.storage.Retrying.RetryingDependencies;
2727
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
2828
import com.google.common.util.concurrent.MoreExecutors;
@@ -46,32 +46,27 @@ public static GapicDownloadSessionBuilder create() {
4646
}
4747

4848
public ReadableByteChannelSessionBuilder byteChannel(
49-
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
49+
ZeroCopyServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
5050
RetryingDependencies retryingDependencies,
51-
ResultRetryAlgorithm<?> resultRetryAlgorithm,
52-
ResponseContentLifecycleManager responseContentLifecycleManager) {
53-
return new ReadableByteChannelSessionBuilder(
54-
read, retryingDependencies, resultRetryAlgorithm, responseContentLifecycleManager);
51+
ResultRetryAlgorithm<?> resultRetryAlgorithm) {
52+
return new ReadableByteChannelSessionBuilder(read, retryingDependencies, resultRetryAlgorithm);
5553
}
5654

5755
public static final class ReadableByteChannelSessionBuilder {
5856

59-
private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
57+
private final ZeroCopyServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
6058
private final RetryingDependencies retryingDependencies;
6159
private final ResultRetryAlgorithm<?> resultRetryAlgorithm;
62-
private final ResponseContentLifecycleManager responseContentLifecycleManager;
6360
private boolean autoGzipDecompression;
6461
private Hasher hasher;
6562

6663
private ReadableByteChannelSessionBuilder(
67-
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
64+
ZeroCopyServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
6865
RetryingDependencies retryingDependencies,
69-
ResultRetryAlgorithm<?> resultRetryAlgorithm,
70-
ResponseContentLifecycleManager responseContentLifecycleManager) {
66+
ResultRetryAlgorithm<?> resultRetryAlgorithm) {
7167
this.read = read;
7268
this.retryingDependencies = retryingDependencies;
7369
this.resultRetryAlgorithm = resultRetryAlgorithm;
74-
this.responseContentLifecycleManager = responseContentLifecycleManager;
7570
this.hasher = Hasher.noop();
7671
this.autoGzipDecompression = false;
7772
}
@@ -112,24 +107,12 @@ public UnbufferedReadableByteChannelSessionBuilder unbuffered() {
112107
if (autoGzipDecompression) {
113108
return new GzipReadableByteChannel(
114109
new GapicUnbufferedReadableByteChannel(
115-
resultFuture,
116-
read,
117-
object,
118-
hasher,
119-
retryingDependencies,
120-
resultRetryAlgorithm,
121-
responseContentLifecycleManager),
110+
resultFuture, read, object, hasher, retryingDependencies, resultRetryAlgorithm),
122111
ApiFutures.transform(
123112
resultFuture, Object::getContentEncoding, MoreExecutors.directExecutor()));
124113
} else {
125114
return new GapicUnbufferedReadableByteChannel(
126-
resultFuture,
127-
read,
128-
object,
129-
hasher,
130-
retryingDependencies,
131-
resultRetryAlgorithm,
132-
responseContentLifecycleManager);
115+
resultFuture, read, object, hasher, retryingDependencies, resultRetryAlgorithm);
133116
}
134117
};
135118
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
2323
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2424
import com.google.api.gax.rpc.ApiExceptions;
25-
import com.google.api.gax.rpc.ServerStreamingCallable;
2625
import com.google.api.gax.rpc.StateCheckingResponseObserver;
2726
import com.google.api.gax.rpc.StreamController;
2827
import com.google.api.gax.rpc.WatchdogTimeoutException;
2928
import com.google.cloud.BaseServiceException;
3029
import com.google.cloud.storage.Conversions.Decoder;
3130
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
31+
import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable;
3232
import com.google.cloud.storage.Retrying.RetryingDependencies;
3333
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
3434
import com.google.protobuf.ByteString;
@@ -57,10 +57,9 @@ final class GapicUnbufferedReadableByteChannel
5757
private static final java.lang.Object EOF_MARKER = new java.lang.Object();
5858

5959
private final SettableApiFuture<Object> result;
60-
private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
60+
private final ZeroCopyServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
6161
private final ReadObjectRequest req;
6262
private final Hasher hasher;
63-
private final ResponseContentLifecycleManager rclm;
6463
private final RetryingDependencies retryingDeps;
6564
private final ResultRetryAlgorithm<?> alg;
6665
private final SimpleBlockingQueue<java.lang.Object> queue;
@@ -76,19 +75,17 @@ final class GapicUnbufferedReadableByteChannel
7675

7776
GapicUnbufferedReadableByteChannel(
7877
SettableApiFuture<Object> result,
79-
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
78+
ZeroCopyServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
8079
ReadObjectRequest req,
8180
Hasher hasher,
8281
RetryingDependencies retryingDependencies,
83-
ResultRetryAlgorithm<?> alg,
84-
ResponseContentLifecycleManager rclm) {
82+
ResultRetryAlgorithm<?> alg) {
8583
this.result = result;
8684
this.read = read;
8785
this.req = req;
8886
this.hasher = hasher;
8987
this.fetchOffset = new AtomicLong(req.getReadOffset());
9088
this.blobOffset = req.getReadOffset();
91-
this.rclm = rclm;
9289
this.retryingDeps = retryingDependencies;
9390
this.alg =
9491
new BasicResultRetryAlgorithm<java.lang.Object>() {
@@ -159,7 +156,8 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
159156
readObjectObserver.request();
160157

161158
ReadObjectResponse resp = (ReadObjectResponse) take;
162-
ResponseContentLifecycleHandle handle = rclm.get(resp);
159+
ResponseContentLifecycleHandle<ReadObjectResponse> handle =
160+
read.getResponseContentLifecycleManager().get(resp);
163161
if (resp.hasMetadata()) {
164162
Object respMetadata = resp.getMetadata();
165163
if (metadata == null) {
@@ -251,7 +249,8 @@ private void drainQueue() throws IOException {
251249
java.lang.Object queueValue = queue.poll();
252250
if (queueValue instanceof ReadObjectResponse) {
253251
ReadObjectResponse resp = (ReadObjectResponse) queueValue;
254-
ResponseContentLifecycleHandle handle = rclm.get(resp);
252+
ResponseContentLifecycleHandle<ReadObjectResponse> handle =
253+
read.getResponseContentLifecycleManager().get(resp);
255254
handle.close();
256255
} else if (queueValue == EOF_MARKER || queueValue instanceof Throwable) {
257256
break;

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.gax.retrying.ResultRetryAlgorithm;
20-
import com.google.api.gax.rpc.ServerStreamingCallable;
2120
import com.google.cloud.ReadChannel;
2221
import com.google.cloud.RestorableState;
2322
import com.google.cloud.storage.GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder;
23+
import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable;
2424
import com.google.cloud.storage.Retrying.RetryingDependencies;
2525
import com.google.storage.v2.Object;
2626
import com.google.storage.v2.ReadObjectRequest;
@@ -29,25 +29,22 @@
2929

3030
final class GrpcBlobReadChannel extends BaseStorageReadChannel<Object> {
3131

32-
private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
32+
private final ZeroCopyServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
3333
private final RetryingDependencies retryingDependencies;
3434
private final ResultRetryAlgorithm<?> resultRetryAlgorithm;
35-
private final ResponseContentLifecycleManager responseContentLifecycleManager;
3635
private final ReadObjectRequest request;
3736
private final boolean autoGzipDecompression;
3837

3938
GrpcBlobReadChannel(
40-
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
39+
ZeroCopyServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
4140
RetryingDependencies retryingDependencies,
4241
ResultRetryAlgorithm<?> resultRetryAlgorithm,
43-
ResponseContentLifecycleManager responseContentLifecycleManager,
4442
ReadObjectRequest request,
4543
boolean autoGzipDecompression) {
4644
super(Conversions.grpc().blobInfo());
4745
this.read = read;
4846
this.retryingDependencies = retryingDependencies;
4947
this.resultRetryAlgorithm = resultRetryAlgorithm;
50-
this.responseContentLifecycleManager = responseContentLifecycleManager;
5148
this.request = request;
5249
this.autoGzipDecompression = autoGzipDecompression;
5350
}
@@ -64,11 +61,7 @@ protected LazyReadChannel<?, Object> newLazyReadChannel() {
6461
ReadableByteChannelSessionBuilder b =
6562
ResumableMedia.gapic()
6663
.read()
67-
.byteChannel(
68-
read,
69-
retryingDependencies,
70-
resultRetryAlgorithm,
71-
responseContentLifecycleManager)
64+
.byteChannel(read, retryingDependencies, resultRetryAlgorithm)
7265
.setHasher(Hasher.noop())
7366
.setAutoGzipDecompression(autoGzipDecompression);
7467
BufferHandle bufferHandle = getBufferHandle();

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import com.google.api.gax.retrying.ResultRetryAlgorithm;
3939
import com.google.api.gax.rpc.ApiException;
4040
import com.google.api.gax.rpc.ApiExceptions;
41-
import com.google.api.gax.rpc.BidiStreamingCallable;
4241
import com.google.api.gax.rpc.ClientStreamingCallable;
4342
import com.google.api.gax.rpc.NotFoundException;
4443
import com.google.api.gax.rpc.StatusCode;
@@ -50,6 +49,8 @@
5049
import com.google.cloud.storage.BlobWriteSessionConfig.WriterFactory;
5150
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
5251
import com.google.cloud.storage.Conversions.Decoder;
52+
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
53+
import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable;
5354
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
5455
import com.google.cloud.storage.HmacKey.HmacKeyState;
5556
import com.google.cloud.storage.PostPolicyV4.PostConditionsV4;
@@ -749,10 +750,9 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
749750
GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(Retrying.newCallContext());
750751

751752
return new GrpcBlobReadChannel(
752-
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
753+
readObjectCallable(grpcCallContext),
753754
getOptions(),
754755
retryAlgorithmManager.getFor(request),
755-
responseContentLifecycleManager,
756756
request,
757757
!opts.autoGzipDecompression());
758758
}
@@ -1475,14 +1475,14 @@ public ApiFuture<BlobDescriptor> getBlobDescriptor(BlobId id, BlobSourceOption..
14751475
b.setReadObjectSpec(spec);
14761476
BidiReadObjectRequest req = b.build();
14771477

1478-
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable =
1479-
storageClient.bidiReadObjectCallable();
1478+
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable =
1479+
new ZeroCopyBidiStreamingCallable<>(
1480+
storageClient.bidiReadObjectCallable(), bidiResponseContentLifecycleManager);
14801481

14811482
GrpcCallContext context =
14821483
GrpcUtils.contextWithBucketName(object.getBucket(), GrpcCallContext.createDefault());
14831484

1484-
return BlobDescriptorImpl.create(
1485-
req, context, callable, bidiResponseContentLifecycleManager, executor);
1485+
return BlobDescriptorImpl.create(req, context, callable, executor);
14861486
}
14871487

14881488
@Override
@@ -1784,10 +1784,9 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
17841784
return ResumableMedia.gapic()
17851785
.read()
17861786
.byteChannel(
1787-
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
1787+
readObjectCallable(grpcCallContext),
17881788
getOptions(),
1789-
retryAlgorithmManager.getFor(readObjectRequest),
1790-
responseContentLifecycleManager)
1789+
retryAlgorithmManager.getFor(readObjectRequest))
17911790
.setAutoGzipDecompression(!opts.autoGzipDecompression())
17921791
.unbuffered()
17931792
.setReadObjectRequest(readObjectRequest)
@@ -2020,4 +2019,11 @@ private Bucket internalBucketGet(String bucket, Opts<BucketSourceOpt> unwrap) {
20202019
() -> storageClient.getBucketCallable().call(req, merge),
20212020
syntaxDecoders.bucket.andThen(opts.clearBucketFields()));
20222021
}
2022+
2023+
private ZeroCopyServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> readObjectCallable(
2024+
GrpcCallContext grpcCallContext) {
2025+
return new ZeroCopyServerStreamingCallable<>(
2026+
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
2027+
responseContentLifecycleManager);
2028+
}
20232029
}

0 commit comments

Comments
 (0)