Skip to content

Commit 28f2759

Browse files
JesseLovelaceBenWhitehead
authored andcommitted
chore: add appendable takeover
1 parent 775ad24 commit 28f2759

12 files changed

+360
-72
lines changed

google-cloud-storage/clirr-ignored-differences.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@
130130
<difference>
131131
<differenceType>7012</differenceType>
132132
<className>com/google/cloud/storage/Storage</className>
133-
<method>com.google.cloud.storage.AppendableBlobUpload createAppendableBlobUpload(com.google.cloud.storage.BlobInfo, int, com.google.cloud.storage.Storage$BlobWriteOption[])</method>
133+
<method>com.google.cloud.storage.AppendableBlobUpload appendableBlobUpload(com.google.cloud.storage.BlobInfo, int, com.google.cloud.storage.Storage$BlobWriteOption[])</method>
134134
</difference>
135135
<difference>
136136
<differenceType>7005</differenceType>

google-cloud-storage/src/main/java/com/google/cloud/storage/AppendableBlobUpload.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,35 @@
2020
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
23+
import java.nio.channels.WritableByteChannel;
2324
import java.util.concurrent.ExecutionException;
2425
import java.util.concurrent.locks.ReentrantLock;
2526

26-
public final class AppendableBlobUpload implements AutoCloseable {
27+
public final class AppendableBlobUpload implements AutoCloseable, WritableByteChannel {
2728
private final AppendableObjectBufferedWritableByteChannel channel;
2829
private final ApiFuture<BlobInfo> result;
2930

30-
private AppendableBlobUpload(BlobInfo blob, BlobWriteSession session) throws IOException {
31+
private AppendableBlobUpload(BlobInfo blob, BlobWriteSession session, boolean takeover)
32+
throws IOException {
3133
channel = (AppendableObjectBufferedWritableByteChannel) (session.open());
3234
result = session.getResult();
35+
if (takeover) {
36+
channel.startTakeoverStream();
37+
}
3338
}
3439

3540
static AppendableBlobUpload createNewAppendableBlob(BlobInfo blob, BlobWriteSession session)
3641
throws IOException {
37-
return new AppendableBlobUpload(blob, session);
42+
return new AppendableBlobUpload(blob, session, false);
43+
}
44+
45+
static AppendableBlobUpload resumeAppendableUpload(BlobInfo blob, BlobWriteSession session)
46+
throws IOException {
47+
return new AppendableBlobUpload(blob, session, true);
48+
}
49+
50+
void startTakeoverStream() {
51+
channel.startTakeoverStream();
3852
}
3953

4054
public BlobInfo finalizeUpload() throws IOException, ExecutionException, InterruptedException {
@@ -43,8 +57,14 @@ public BlobInfo finalizeUpload() throws IOException, ExecutionException, Interru
4357
return result.get();
4458
}
4559

46-
public void write(ByteBuffer buffer) throws IOException {
47-
channel.write(buffer);
60+
@Override
61+
public int write(ByteBuffer buffer) throws IOException {
62+
return channel.write(buffer);
63+
}
64+
65+
@Override
66+
public boolean isOpen() {
67+
return channel.isOpen();
4868
}
4969

5070
@Override
@@ -126,5 +146,9 @@ public void finalizeWrite() throws IOException {
126146
lock.unlock();
127147
}
128148
}
149+
150+
void startTakeoverStream() {
151+
unbuffered.startAppendableTakeoverStream();
152+
}
129153
}
130154
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,19 @@ final class BidiAppendableWrite implements BidiWriteObjectRequestBuilderFactory
100100
private final BidiWriteObjectRequest req;
101101

102102
public BidiAppendableWrite(BidiWriteObjectRequest req) {
103-
req =
104-
req.toBuilder()
105-
.setWriteObjectSpec(req.getWriteObjectSpec().toBuilder().setAppendable(true).build())
106-
.build();
107-
this.req = req;
103+
this(req, false);
104+
}
105+
106+
public BidiAppendableWrite(BidiWriteObjectRequest req, boolean takeOver) {
107+
if (takeOver) {
108+
this.req = req;
109+
} else {
110+
req =
111+
req.toBuilder()
112+
.setWriteObjectSpec(req.getWriteObjectSpec().toBuilder().setAppendable(true).build())
113+
.build();
114+
this.req = req;
115+
}
108116
}
109117

110118
public BidiWriteObjectRequest getReq() {
@@ -120,6 +128,8 @@ public BidiWriteObjectRequest.Builder newBuilder() {
120128
public @Nullable String bucketName() {
121129
if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) {
122130
return req.getWriteObjectSpec().getResource().getBucket();
131+
} else if (req.hasAppendObjectSpec()) {
132+
return req.getAppendObjectSpec().getBucket();
123133
}
124134
return null;
125135
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedAppendableWriteableByteChannel.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,14 @@ void restart() {
170170
}
171171
}
172172

173+
public void startAppendableTakeoverStream() {
174+
BidiWriteObjectRequest req =
175+
writeCtx.newRequestBuilder().setFlush(true).setStateLookup(true).build();
176+
this.messages = Collections.singletonList(req);
177+
flush();
178+
first = false;
179+
}
180+
173181
@VisibleForTesting
174182
BidiWriteCtx<BidiAppendableWrite> getWriteCtx() {
175183
return writeCtx;
@@ -209,6 +217,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength)
209217
builder.clearUploadId();
210218
builder.clearObjectChecksums();
211219
builder.clearWriteObjectSpec();
220+
builder.clearAppendObjectSpec();
212221
} else {
213222
first = false;
214223
}
@@ -251,7 +260,7 @@ private BidiWriteObjectRequest finishMessage() {
251260

252261
BidiWriteObjectRequest.Builder b = writeCtx.newRequestBuilder();
253262
if (!first) {
254-
b.clearUploadId().clearObjectChecksums().clearWriteObjectSpec();
263+
b.clearUploadId().clearObjectChecksums().clearWriteObjectSpec().clearAppendObjectSpec();
255264
}
256265
b.setFinishWrite(true).setWriteOffset(offset);
257266
if (crc32cValue != null) {
@@ -282,6 +291,10 @@ private void flush() {
282291
retry = false;
283292
restart();
284293
processRetryingMessages();
294+
if (this.messages.isEmpty()) {
295+
// This can happen if proccessRetryingMessages ends up dropping every message
296+
return null;
297+
}
285298
}
286299
try {
287300
ApiStreamObserver<BidiWriteObjectRequest> opened = openedStream(context);
@@ -314,6 +327,13 @@ private void processRetryingMessages() {
314327
long bytesSeen = begin;
315328
boolean caughtUp = false;
316329
for (BidiWriteObjectRequest message : this.messages) {
330+
if (message.hasAppendObjectSpec() && first) {
331+
// If this is the first message of a takeover, then running the restart() method will
332+
// actually get us to the state we want to be in (i.e. the persisted_size has been
333+
// captured), so we don't actually need to try to write the original message again--we just
334+
// drop it entirely
335+
continue;
336+
}
317337
if (message.hasWriteObjectSpec()
318338
&& redirecting) { // This is a first message and we got a Redirect
319339
message = message.toBuilder().clearWriteObjectSpec().clearObjectChecksums().build();
@@ -366,6 +386,14 @@ public void onNext(BidiWriteObjectResponse value) {
366386
if (value.hasWriteHandle()) {
367387
bidiWriteHandle.set(value.getWriteHandle());
368388
}
389+
if (lastWrittenRequest.hasAppendObjectSpec() && first) {
390+
long persistedSize =
391+
value.hasPersistedSize() ? value.getPersistedSize() : value.getResource().getSize();
392+
writeCtx.getConfirmedBytes().set(persistedSize);
393+
writeCtx.getTotalSentBytes().set(persistedSize);
394+
ok(value);
395+
return;
396+
}
369397
boolean finalizing = lastWrittenRequest.getFinishWrite();
370398
boolean firstResponse = !finalizing && value.hasResource();
371399
if (firstResponse) {

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ final class BufferedAppendableUploadBuilder {
202202
}
203203

204204
/**
205-
* Set the Future which will contain the ResumableWrite information necessary to open the
205+
* Set the Future which will contain the AppendableWrite information necessary to open the
206206
* Write stream.
207207
*/
208208
BufferedAppendableUploadBuilder setStartAsync(ApiFuture<BidiAppendableWrite> start) {

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,4 @@ ApiFuture<BidiResumableWrite> bidiResumableWrite(
104104
(resp) -> new BidiResumableWrite(req, resp, f),
105105
MoreExecutors.directExecutor());
106106
}
107-
108-
ApiFuture<BidiAppendableWrite> bidiAppendableWrite(BidiWriteObjectRequest req) {
109-
BidiAppendableWrite baw = new BidiAppendableWrite(req);
110-
return ApiFutures.immediateFuture(baw);
111-
}
112107
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import com.google.iam.v1.GetIamPolicyRequest;
8484
import com.google.iam.v1.SetIamPolicyRequest;
8585
import com.google.iam.v1.TestIamPermissionsRequest;
86+
import com.google.storage.v2.AppendObjectSpec;
8687
import com.google.storage.v2.BidiReadObjectRequest;
8788
import com.google.storage.v2.BidiReadObjectSpec;
8889
import com.google.storage.v2.BidiWriteObjectRequest;
@@ -1422,11 +1423,21 @@ public BlobWriteSession blobWriteSession(BlobInfo info, BlobWriteOption... optio
14221423

14231424
@BetaApi
14241425
@Override
1425-
public AppendableBlobUpload createAppendableBlobUpload(
1426+
public AppendableBlobUpload appendableBlobUpload(
14261427
BlobInfo blob, int bufferSize, BlobWriteOption... options) throws IOException {
1428+
boolean takeOver = blob.getGeneration() != null;
1429+
return getAppendableBlobUpload(blob, bufferSize, takeOver, options);
1430+
}
1431+
1432+
private AppendableBlobUpload getAppendableBlobUpload(
1433+
BlobInfo blob, int bufferSize, boolean takeOver, BlobWriteOption... options)
1434+
throws IOException {
14271435
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blob);
1428-
BidiWriteObjectRequest req = getBidiWriteObjectRequest(blob, opts);
1429-
BidiAppendableWrite baw = new BidiAppendableWrite(req);
1436+
BidiWriteObjectRequest req =
1437+
takeOver
1438+
? getBidiWriteObjectRequestForTakeover(blob, opts)
1439+
: getBidiWriteObjectRequest(blob, opts);
1440+
BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver);
14301441
ApiFuture<BidiAppendableWrite> startAppendableWrite = ApiFutures.immediateFuture(baw);
14311442
WritableByteChannelSession<BufferedWritableByteChannel, BidiWriteObjectResponse> build =
14321443
ResumableMedia.gapic()
@@ -1468,7 +1479,9 @@ public boolean shouldRetry(
14681479
new DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession<>(
14691480
build, BidiBlobWriteSessionConfig.Factory.WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER);
14701481
BlobWriteSession session = BlobWriteSessions.of(dec);
1471-
return AppendableBlobUpload.createNewAppendableBlob(blob, session);
1482+
return takeOver
1483+
? AppendableBlobUpload.resumeAppendableUpload(blob, session)
1484+
: AppendableBlobUpload.createNewAppendableBlob(blob, session);
14721485
}
14731486

14741487
@Override
@@ -1807,6 +1820,21 @@ BidiWriteObjectRequest getBidiWriteObjectRequest(BlobInfo info, Opts<ObjectTarge
18071820
return opts.bidiWriteObjectRequest().apply(requestBuilder).build();
18081821
}
18091822

1823+
BidiWriteObjectRequest getBidiWriteObjectRequestForTakeover(
1824+
BlobInfo info, Opts<ObjectTargetOpt> opts) {
1825+
Object object = codecs.blobInfo().encode(info);
1826+
AppendObjectSpec.Builder specBuilder =
1827+
AppendObjectSpec.newBuilder()
1828+
.setObject(object.getName())
1829+
.setBucket(object.getBucket())
1830+
.setGeneration(object.getGeneration());
1831+
1832+
BidiWriteObjectRequest.Builder requestBuilder =
1833+
BidiWriteObjectRequest.newBuilder().setAppendObjectSpec(specBuilder.build());
1834+
1835+
return opts.bidiWriteObjectRequest().apply(requestBuilder).build();
1836+
}
1837+
18101838
private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
18111839
BlobId blob, BlobSourceOption[] options) {
18121840

google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5844,7 +5844,7 @@ default ApiFuture<BlobReadSession> blobReadSession(BlobId id, BlobSourceOption..
58445844

58455845
@BetaApi
58465846
@TransportCompatibility({Transport.GRPC})
5847-
default AppendableBlobUpload createAppendableBlobUpload(
5847+
default AppendableBlobUpload appendableBlobUpload(
58485848
BlobInfo blob, int bufferSize, BlobWriteOption... options) throws IOException {
58495849
return throwGrpcOnly(
58505850
fmtMethodName("createAppendableBlobUpload", BlobId.class, BlobWriteOption.class));

0 commit comments

Comments
 (0)