Skip to content

Commit b1499d6

Browse files
committed
chore: overhaul Retrying to provide more detail about failures
Overhaul `Retrying` to use the new `RetryContext` for more useful retrying error messages.
1 parent 1085216 commit b1499d6

File tree

5 files changed

+374
-40
lines changed

5 files changed

+374
-40
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/RetryContext.java

Lines changed: 184 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,33 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import com.google.api.client.util.Sleeper;
20+
import com.google.api.core.ApiClock;
21+
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutures;
23+
import com.google.api.core.NanoClock;
1924
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2025
import com.google.cloud.storage.Backoff.BackoffDuration;
2126
import com.google.cloud.storage.Backoff.BackoffResult;
2227
import com.google.cloud.storage.Backoff.BackoffResults;
2328
import com.google.cloud.storage.Backoff.Jitterer;
2429
import com.google.cloud.storage.Retrying.RetryingDependencies;
2530
import com.google.common.annotations.VisibleForTesting;
31+
import com.google.common.primitives.Longs;
32+
import com.google.common.util.concurrent.MoreExecutors;
2633
import java.time.Duration;
34+
import java.util.Collection;
35+
import java.util.Comparator;
2736
import java.util.LinkedList;
2837
import java.util.List;
29-
import java.util.concurrent.Executors;
38+
import java.util.concurrent.Callable;
39+
import java.util.concurrent.Delayed;
40+
import java.util.concurrent.ExecutionException;
41+
import java.util.concurrent.Future;
3042
import java.util.concurrent.ScheduledExecutorService;
3143
import java.util.concurrent.ScheduledFuture;
3244
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.TimeoutException;
3346
import java.util.concurrent.locks.ReentrantLock;
3447
import org.checkerframework.checker.nullness.qual.Nullable;
3548

@@ -198,7 +211,7 @@ static RetryContext of(
198211

199212
static RetryContext neverRetry() {
200213
return new RetryContext(
201-
Executors.newSingleThreadScheduledExecutor(),
214+
directScheduledExecutorService(),
202215
RetryingDependencies.attemptOnce(),
203216
Retrying.neverRetry(),
204217
Jitterer.threadLocalRandom());
@@ -211,6 +224,10 @@ static RetryContextProvider providerFrom(
211224
return () -> RetryContext.of(scheduledExecutorService, deps, alg, Jitterer.threadLocalRandom());
212225
}
213226

227+
static ScheduledExecutorService directScheduledExecutorService() {
228+
return DirectScheduledExecutorService.INSTANCE;
229+
}
230+
214231
@FunctionalInterface
215232
interface RetryContextProvider {
216233
RetryContext create();
@@ -254,4 +271,169 @@ private static BackoffComment of(String message) {
254271
return new BackoffComment(message);
255272
}
256273
}
274+
275+
private static final class DirectScheduledExecutorService implements ScheduledExecutorService {
276+
private static final DirectScheduledExecutorService INSTANCE =
277+
new DirectScheduledExecutorService(Sleeper.DEFAULT, NanoClock.getDefaultClock());
278+
279+
private static final Comparator<Delayed> COMP =
280+
Comparator.comparingLong(delay -> delay.getDelay(TimeUnit.NANOSECONDS));
281+
private final Sleeper sleeper;
282+
private final ApiClock apiClock;
283+
284+
private DirectScheduledExecutorService(Sleeper sleeper, ApiClock apiClock) {
285+
this.sleeper = sleeper;
286+
this.apiClock = apiClock;
287+
}
288+
289+
@Override
290+
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
291+
return new DirectScheduledFuture(unit, delay, command);
292+
}
293+
294+
@Override
295+
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
296+
throw new UnsupportedOperationException();
297+
}
298+
299+
@Override
300+
public ScheduledFuture<?> scheduleAtFixedRate(
301+
Runnable command, long initialDelay, long period, TimeUnit unit) {
302+
throw new UnsupportedOperationException();
303+
}
304+
305+
@Override
306+
public ScheduledFuture<?> scheduleWithFixedDelay(
307+
Runnable command, long initialDelay, long delay, TimeUnit unit) {
308+
throw new UnsupportedOperationException();
309+
}
310+
311+
@Override
312+
public void shutdown() {
313+
throw new UnsupportedOperationException();
314+
}
315+
316+
@Override
317+
public List<Runnable> shutdownNow() {
318+
throw new UnsupportedOperationException();
319+
}
320+
321+
@Override
322+
public boolean isShutdown() {
323+
throw new UnsupportedOperationException();
324+
}
325+
326+
@Override
327+
public boolean isTerminated() {
328+
throw new UnsupportedOperationException();
329+
}
330+
331+
@Override
332+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
333+
throw new UnsupportedOperationException();
334+
}
335+
336+
@Override
337+
public <T> Future<T> submit(Callable<T> task) {
338+
throw new UnsupportedOperationException();
339+
}
340+
341+
@Override
342+
public <T> Future<T> submit(Runnable task, T result) {
343+
throw new UnsupportedOperationException();
344+
}
345+
346+
@Override
347+
public Future<?> submit(Runnable task) {
348+
throw new UnsupportedOperationException();
349+
}
350+
351+
@Override
352+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
353+
throws InterruptedException {
354+
throw new UnsupportedOperationException();
355+
}
356+
357+
@Override
358+
public <T> List<Future<T>> invokeAll(
359+
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
360+
throws InterruptedException {
361+
throw new UnsupportedOperationException();
362+
}
363+
364+
@Override
365+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
366+
throws InterruptedException, ExecutionException {
367+
throw new UnsupportedOperationException();
368+
}
369+
370+
@Override
371+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
372+
throws InterruptedException, ExecutionException, TimeoutException {
373+
throw new UnsupportedOperationException();
374+
}
375+
376+
@Override
377+
public void execute(Runnable command) {
378+
throw new UnsupportedOperationException();
379+
}
380+
381+
private final class DirectScheduledFuture implements ScheduledFuture<Object> {
382+
383+
private final long origDelayNs;
384+
private final long beginNs;
385+
private final ApiFuture<Object> delegate;
386+
387+
public DirectScheduledFuture(TimeUnit unit, long delay, Runnable command) {
388+
origDelayNs = unit.toNanos(delay);
389+
beginNs = apiClock.nanoTime();
390+
delegate =
391+
ApiFutures.transformAsync(
392+
ApiFutures.immediateFuture(null),
393+
ignore -> {
394+
sleeper.sleep(unit.toMillis(delay));
395+
command.run();
396+
return ApiFutures.immediateFuture(null);
397+
},
398+
MoreExecutors.directExecutor());
399+
}
400+
401+
@Override
402+
public long getDelay(TimeUnit unit) {
403+
long nowNs = apiClock.nanoTime();
404+
return Longs.max(0L, (nowNs - beginNs) - origDelayNs);
405+
}
406+
407+
@Override
408+
public int compareTo(Delayed o) {
409+
return COMP.compare(this, o);
410+
}
411+
412+
@Override
413+
public boolean cancel(boolean mayInterruptIfRunning) {
414+
return delegate.cancel(mayInterruptIfRunning);
415+
}
416+
417+
@Override
418+
public boolean isCancelled() {
419+
return delegate.isCancelled();
420+
}
421+
422+
@Override
423+
public boolean isDone() {
424+
return delegate.isDone();
425+
}
426+
427+
@Override
428+
public Object get() throws InterruptedException, ExecutionException {
429+
return delegate.get();
430+
}
431+
432+
@Override
433+
public Object get(long timeout, TimeUnit unit)
434+
throws InterruptedException, ExecutionException, TimeoutException {
435+
return delegate.get(timeout, unit);
436+
}
437+
}
438+
}
257439
}

google-cloud-storage/src/main/java/com/google/cloud/storage/Retrying.java

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@
1616

1717
package com.google.cloud.storage;
1818

19-
import static com.google.cloud.RetryHelper.runWithRetries;
20-
2119
import com.google.api.core.ApiClock;
2220
import com.google.api.core.NanoClock;
2321
import com.google.api.gax.grpc.GrpcCallContext;
2422
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
2523
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2624
import com.google.api.gax.retrying.RetrySettings;
27-
import com.google.cloud.RetryHelper.RetryHelperException;
25+
import com.google.cloud.storage.Backoff.Jitterer;
2826
import com.google.cloud.storage.Conversions.Decoder;
2927
import com.google.cloud.storage.spi.v1.HttpRpcContext;
3028
import com.google.common.base.MoreObjects;
3129
import com.google.common.collect.ImmutableList;
3230
import com.google.common.collect.ImmutableMap;
3331
import java.util.UUID;
3432
import java.util.concurrent.Callable;
33+
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicReference;
3535
import java.util.function.Function;
3636
import org.checkerframework.checker.nullness.qual.NonNull;
3737

@@ -63,10 +63,11 @@ static <T, U> U run(
6363
HttpRpcContext httpRpcContext = HttpRpcContext.getInstance();
6464
try {
6565
httpRpcContext.newInvocationId();
66-
T result = runWithRetries(c, options.getRetrySettings(), algorithm, options.getClock());
67-
return result == null ? null : f.apply(result);
68-
} catch (RetryHelperException e) {
69-
throw StorageException.coalesce(e);
66+
return run(
67+
RetryingDependencies.simple(options.getClock(), options.getRetrySettings()),
68+
algorithm,
69+
c,
70+
f::apply);
7071
} finally {
7172
httpRpcContext.clearInvocationId();
7273
}
@@ -95,34 +96,41 @@ static <T, U> U run(
9596
ResultRetryAlgorithm<?> algorithm,
9697
Callable<T> c,
9798
Decoder<T, U> f) {
98-
try {
99-
T result =
100-
runWithRetries(
101-
() -> {
102-
try {
103-
return c.call();
104-
} catch (StorageException se) {
105-
// we hope for this case
106-
throw se;
107-
} catch (IllegalArgumentException iae) {
108-
// IllegalArgumentException can happen if there is no json in the body and we try
109-
// to parse it Our retry algorithms have special case for this, so in an effort to
110-
// keep compatibility with those existing behaviors, explicitly rethrow an
111-
// IllegalArgumentException that may have happened
112-
throw iae;
113-
} catch (Exception e) {
114-
// Wire in this fall through just in case.
115-
// all of our retry algorithms are centered around StorageException so this helps
116-
// those be more effective
117-
throw StorageException.coalesce(e);
118-
}
119-
},
120-
deps.getRetrySettings(),
121-
algorithm,
122-
deps.getClock());
123-
return result == null ? null : f.decode(result);
124-
} catch (RetryHelperException e) {
125-
throw StorageException.coalesce(e.getCause());
99+
RetryContext ctx =
100+
RetryContext.of(
101+
RetryContext.directScheduledExecutorService(),
102+
deps,
103+
algorithm,
104+
Jitterer.threadLocalRandom());
105+
AtomicReference<Exception> failure = new AtomicReference<>();
106+
AtomicBoolean attemptAgain = new AtomicBoolean(false);
107+
do {
108+
attemptAgain.set(false);
109+
try {
110+
T result = c.call();
111+
return result == null ? null : f.decode(result);
112+
} catch (StorageException se) {
113+
// we hope for this case
114+
ctx.recordError(se, () -> attemptAgain.set(true), failure::set);
115+
} catch (IllegalArgumentException iae) {
116+
// IllegalArgumentException can happen if there is no json in the body and we try
117+
// to parse it Our retry algorithms have special case for this, so in an effort to
118+
// keep compatibility with those existing behaviors, explicitly rethrow an
119+
// IllegalArgumentException that may have happened
120+
ctx.recordError(iae, () -> attemptAgain.set(true), failure::set);
121+
} catch (Exception e) {
122+
// Wire in this fall through just in case.
123+
// all of our retry algorithms are centered around StorageException so this helps
124+
// those be more effective
125+
ctx.recordError(StorageException.coalesce(e), () -> attemptAgain.set(true), failure::set);
126+
}
127+
} while (attemptAgain.get());
128+
129+
Exception throwable = failure.get();
130+
if (throwable instanceof StorageException) {
131+
throw (StorageException) throwable;
132+
} else {
133+
throw StorageException.coalesce(throwable);
126134
}
127135
}
128136

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageException.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public static StorageException translateAndThrow(RetryHelperException ex) {
104104
private static StorageException getStorageException(Throwable t) {
105105
// unwrap a RetryHelperException if that is what is being translated
106106
if (t instanceof RetryHelperException) {
107-
return new StorageException(UNKNOWN_CODE, t.getMessage(), t.getCause());
107+
Throwable cause = t.getCause();
108+
return new StorageException(UNKNOWN_CODE, cause != null ? cause.getMessage() : "", cause);
108109
}
109110
return new StorageException(UNKNOWN_CODE, t.getMessage(), t);
110111
}

0 commit comments

Comments
 (0)