Skip to content

Commit 50e4589

Browse files
authored
chore: refactor RangeProjectionConfig to support more than one type of ObjectReadSession integration
This is pre-work to allow creating a SeekableByteChannel that will use the ObjectReadSession rather than reading from the stream directly.
1 parent a751971 commit 50e4589

File tree

5 files changed

+75
-9
lines changed

5 files changed

+75
-9
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
2323
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
24+
import com.google.cloud.storage.RangeProjectionConfig.ProjectionType;
2425
import com.google.cloud.storage.RetryContext.RetryContextProvider;
2526
import com.google.common.annotations.VisibleForTesting;
2627
import com.google.storage.v2.BidiReadObjectRequest;
@@ -31,6 +32,7 @@
3132
import java.util.ArrayList;
3233
import java.util.IdentityHashMap;
3334
import java.util.Iterator;
35+
import java.util.Locale;
3436
import java.util.Map.Entry;
3537
import java.util.concurrent.ExecutionException;
3638
import java.util.concurrent.ScheduledExecutorService;
@@ -83,11 +85,23 @@ public <Projection> Projection readRange(
8385
lock.lock();
8486
try {
8587
checkState(open, "stream already closed");
86-
long readId = state.newReadId();
87-
ObjectReadSessionStreamRead<Projection> read =
88-
config.cast().newRead(readId, range, retryContextProvider.create());
89-
registerReadInState(readId, read);
90-
return read.project();
88+
switch (config.getType()) {
89+
case STREAM_READ:
90+
long readId = state.newReadId();
91+
ObjectReadSessionStreamRead<Projection> read =
92+
config.cast().newRead(readId, range, retryContextProvider.create());
93+
registerReadInState(readId, read);
94+
return read.project();
95+
case SESSION_USER:
96+
return config.project(range, this, IOAutoCloseable.noOp());
97+
default:
98+
throw new IllegalStateException(
99+
String.format(
100+
Locale.US,
101+
"Broken java enum %s value=%s",
102+
ProjectionType.class.getName(),
103+
config.getType().name()));
104+
}
91105
} finally {
92106
lock.unlock();
93107
}

google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1816,21 +1816,43 @@ public void copyChunk() {
18161816

18171817
private static final class OtelRangeProjectionConfig<Projection>
18181818
extends RangeProjectionConfig<Projection> {
1819-
private final BaseConfig<Projection, ?> delegate;
1819+
private final RangeProjectionConfig<Projection> delegate;
18201820
private final Span parentSpan;
18211821

18221822
private OtelRangeProjectionConfig(RangeProjectionConfig<Projection> delegate, Span parentSpan) {
1823-
this.delegate = delegate.cast();
1823+
this.delegate = delegate;
18241824
this.parentSpan = parentSpan;
18251825
}
18261826

18271827
@Override
18281828
BaseConfig<Projection, ?> cast() {
1829-
return new OtelBaseConfigDecorator();
1829+
return new OtelBaseConfigDecorator(delegate.cast());
1830+
}
1831+
1832+
@Override
1833+
public ProjectionType getType() {
1834+
return delegate.getType();
1835+
}
1836+
1837+
@Override
1838+
Projection project(RangeSpec range, ObjectReadSession session, IOAutoCloseable closeAlongWith) {
1839+
try {
1840+
return delegate.project(range, session, closeAlongWith.andThen(parentSpan::end));
1841+
} catch (Throwable t) {
1842+
parentSpan.recordException(t);
1843+
parentSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
1844+
parentSpan.end();
1845+
throw t;
1846+
}
18301847
}
18311848

18321849
private class OtelBaseConfigDecorator
18331850
extends BaseConfig<Projection, ObjectReadSessionStreamRead<Projection>> {
1851+
private final BaseConfig<Projection, ?> delegate;
1852+
1853+
private OtelBaseConfigDecorator(BaseConfig<Projection, ?> delegate) {
1854+
this.delegate = delegate;
1855+
}
18341856

18351857
@Override
18361858
ObjectReadSessionStreamRead<Projection> newRead(

google-cloud-storage/src/main/java/com/google/cloud/storage/RangeProjectionConfig.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,30 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.cloud.storage.RangeProjectionConfigs.BaseConfig;
20+
import java.util.Locale;
2021

2122
public abstract class RangeProjectionConfig<Projection> {
2223

2324
RangeProjectionConfig() {}
2425

25-
abstract BaseConfig<Projection, ?> cast();
26+
BaseConfig<Projection, ?> cast() {
27+
throw new UnsupportedOperationException(String.format("%s#cast()", this.getClass().getName()));
28+
}
29+
30+
abstract ProjectionType getType();
31+
32+
Projection project(RangeSpec range, ObjectReadSession session, IOAutoCloseable closeAlongWith) {
33+
throw new UnsupportedOperationException(
34+
String.format(Locale.US, "%s#project()", this.getClass().getName()));
35+
}
36+
37+
enum ProjectionType {
38+
/** Those projections which translate to a direct read registered in the state of the stream */
39+
STREAM_READ,
40+
/**
41+
* Those projections which use an ObjectReadSession rather than directly registering a read in
42+
* the stream state.
43+
*/
44+
SESSION_USER
45+
}
2646
}

google-cloud-storage/src/main/java/com/google/cloud/storage/RangeProjectionConfigs.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ public final class RangeProjectionConfigs {
2727
abstract static class BaseConfig<Projection, Read extends ObjectReadSessionStreamRead<Projection>>
2828
extends RangeProjectionConfig<Projection> {
2929
abstract Read newRead(long readId, RangeSpec range, RetryContext retryContext);
30+
31+
@Override
32+
ProjectionType getType() {
33+
return ProjectionType.STREAM_READ;
34+
}
3035
}
3136

3237
/**

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageDataClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.api.gax.grpc.GrpcCallContext;
2424
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
2525
import com.google.cloud.storage.ObjectReadSessionState.OpenArguments;
26+
import com.google.cloud.storage.RangeProjectionConfig.ProjectionType;
2627
import com.google.cloud.storage.RetryContext.RetryContextProvider;
2728
import com.google.storage.v2.BidiReadObjectRequest;
2829
import com.google.storage.v2.BidiReadObjectResponse;
@@ -77,6 +78,10 @@ <Projection> ApiFuture<FastOpenObjectReadSession<Projection>> fastOpenReadSessio
7778
checkArgument(
7879
openRequest.getReadRangesList().isEmpty(),
7980
"ranges included in the initial request are not supported");
81+
checkArgument(
82+
config.getType() == ProjectionType.STREAM_READ,
83+
"unsupported RangeProjectionConfig: %s",
84+
config.getClass().getName());
8085
ObjectReadSessionState state = new ObjectReadSessionState(ctx, openRequest);
8186

8287
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable =

0 commit comments

Comments
 (0)