1616
1717package com .google .cloud .storage ;
1818
19+ import com .google .api .client .util .Sleeper ;
20+ import com .google .api .core .ApiClock ;
21+ import com .google .api .core .ApiFuture ;
22+ import com .google .api .core .ApiFutures ;
23+ import com .google .api .core .NanoClock ;
1924import com .google .api .gax .retrying .ResultRetryAlgorithm ;
2025import com .google .cloud .storage .Backoff .BackoffDuration ;
2126import com .google .cloud .storage .Backoff .BackoffResult ;
2227import com .google .cloud .storage .Backoff .BackoffResults ;
2328import com .google .cloud .storage .Backoff .Jitterer ;
2429import com .google .cloud .storage .Retrying .RetryingDependencies ;
2530import com .google .common .annotations .VisibleForTesting ;
31+ import com .google .common .primitives .Longs ;
32+ import com .google .common .util .concurrent .MoreExecutors ;
2633import java .time .Duration ;
34+ import java .util .Collection ;
35+ import java .util .Comparator ;
2736import java .util .LinkedList ;
2837import java .util .List ;
29- import java .util .concurrent .Executors ;
38+ import java .util .concurrent .Callable ;
39+ import java .util .concurrent .Delayed ;
40+ import java .util .concurrent .ExecutionException ;
41+ import java .util .concurrent .Future ;
3042import java .util .concurrent .ScheduledExecutorService ;
3143import java .util .concurrent .ScheduledFuture ;
3244import java .util .concurrent .TimeUnit ;
45+ import java .util .concurrent .TimeoutException ;
3346import java .util .concurrent .locks .ReentrantLock ;
3447import org .checkerframework .checker .nullness .qual .Nullable ;
3548
@@ -198,7 +211,7 @@ static RetryContext of(
198211
199212 static RetryContext neverRetry () {
200213 return new RetryContext (
201- Executors . newSingleThreadScheduledExecutor (),
214+ directScheduledExecutorService (),
202215 RetryingDependencies .attemptOnce (),
203216 Retrying .neverRetry (),
204217 Jitterer .threadLocalRandom ());
@@ -211,6 +224,10 @@ static RetryContextProvider providerFrom(
211224 return () -> RetryContext .of (scheduledExecutorService , deps , alg , Jitterer .threadLocalRandom ());
212225 }
213226
227+ static ScheduledExecutorService directScheduledExecutorService () {
228+ return DirectScheduledExecutorService .INSTANCE ;
229+ }
230+
214231 @ FunctionalInterface
215232 interface RetryContextProvider {
216233 RetryContext create ();
@@ -254,4 +271,169 @@ private static BackoffComment of(String message) {
254271 return new BackoffComment (message );
255272 }
256273 }
274+
275+ private static final class DirectScheduledExecutorService implements ScheduledExecutorService {
276+ private static final DirectScheduledExecutorService INSTANCE =
277+ new DirectScheduledExecutorService (Sleeper .DEFAULT , NanoClock .getDefaultClock ());
278+
279+ private static final Comparator <Delayed > COMP =
280+ Comparator .comparingLong (delay -> delay .getDelay (TimeUnit .NANOSECONDS ));
281+ private final Sleeper sleeper ;
282+ private final ApiClock apiClock ;
283+
284+ private DirectScheduledExecutorService (Sleeper sleeper , ApiClock apiClock ) {
285+ this .sleeper = sleeper ;
286+ this .apiClock = apiClock ;
287+ }
288+
289+ @ Override
290+ public ScheduledFuture <?> schedule (Runnable command , long delay , TimeUnit unit ) {
291+ return new DirectScheduledFuture (unit , delay , command );
292+ }
293+
294+ @ Override
295+ public <V > ScheduledFuture <V > schedule (Callable <V > callable , long delay , TimeUnit unit ) {
296+ throw new UnsupportedOperationException ();
297+ }
298+
299+ @ Override
300+ public ScheduledFuture <?> scheduleAtFixedRate (
301+ Runnable command , long initialDelay , long period , TimeUnit unit ) {
302+ throw new UnsupportedOperationException ();
303+ }
304+
305+ @ Override
306+ public ScheduledFuture <?> scheduleWithFixedDelay (
307+ Runnable command , long initialDelay , long delay , TimeUnit unit ) {
308+ throw new UnsupportedOperationException ();
309+ }
310+
311+ @ Override
312+ public void shutdown () {
313+ throw new UnsupportedOperationException ();
314+ }
315+
316+ @ Override
317+ public List <Runnable > shutdownNow () {
318+ throw new UnsupportedOperationException ();
319+ }
320+
321+ @ Override
322+ public boolean isShutdown () {
323+ throw new UnsupportedOperationException ();
324+ }
325+
326+ @ Override
327+ public boolean isTerminated () {
328+ throw new UnsupportedOperationException ();
329+ }
330+
331+ @ Override
332+ public boolean awaitTermination (long timeout , TimeUnit unit ) throws InterruptedException {
333+ throw new UnsupportedOperationException ();
334+ }
335+
336+ @ Override
337+ public <T > Future <T > submit (Callable <T > task ) {
338+ throw new UnsupportedOperationException ();
339+ }
340+
341+ @ Override
342+ public <T > Future <T > submit (Runnable task , T result ) {
343+ throw new UnsupportedOperationException ();
344+ }
345+
346+ @ Override
347+ public Future <?> submit (Runnable task ) {
348+ throw new UnsupportedOperationException ();
349+ }
350+
351+ @ Override
352+ public <T > List <Future <T >> invokeAll (Collection <? extends Callable <T >> tasks )
353+ throws InterruptedException {
354+ throw new UnsupportedOperationException ();
355+ }
356+
357+ @ Override
358+ public <T > List <Future <T >> invokeAll (
359+ Collection <? extends Callable <T >> tasks , long timeout , TimeUnit unit )
360+ throws InterruptedException {
361+ throw new UnsupportedOperationException ();
362+ }
363+
364+ @ Override
365+ public <T > T invokeAny (Collection <? extends Callable <T >> tasks )
366+ throws InterruptedException , ExecutionException {
367+ throw new UnsupportedOperationException ();
368+ }
369+
370+ @ Override
371+ public <T > T invokeAny (Collection <? extends Callable <T >> tasks , long timeout , TimeUnit unit )
372+ throws InterruptedException , ExecutionException , TimeoutException {
373+ throw new UnsupportedOperationException ();
374+ }
375+
376+ @ Override
377+ public void execute (Runnable command ) {
378+ throw new UnsupportedOperationException ();
379+ }
380+
381+ private final class DirectScheduledFuture implements ScheduledFuture <Object > {
382+
383+ private final long origDelayNs ;
384+ private final long beginNs ;
385+ private final ApiFuture <Object > delegate ;
386+
387+ public DirectScheduledFuture (TimeUnit unit , long delay , Runnable command ) {
388+ origDelayNs = unit .toNanos (delay );
389+ beginNs = apiClock .nanoTime ();
390+ delegate =
391+ ApiFutures .transformAsync (
392+ ApiFutures .immediateFuture (null ),
393+ ignore -> {
394+ sleeper .sleep (unit .toMillis (delay ));
395+ command .run ();
396+ return ApiFutures .immediateFuture (null );
397+ },
398+ MoreExecutors .directExecutor ());
399+ }
400+
401+ @ Override
402+ public long getDelay (TimeUnit unit ) {
403+ long nowNs = apiClock .nanoTime ();
404+ return Longs .max (0L , (nowNs - beginNs ) - origDelayNs );
405+ }
406+
407+ @ Override
408+ public int compareTo (Delayed o ) {
409+ return COMP .compare (this , o );
410+ }
411+
412+ @ Override
413+ public boolean cancel (boolean mayInterruptIfRunning ) {
414+ return delegate .cancel (mayInterruptIfRunning );
415+ }
416+
417+ @ Override
418+ public boolean isCancelled () {
419+ return delegate .isCancelled ();
420+ }
421+
422+ @ Override
423+ public boolean isDone () {
424+ return delegate .isDone ();
425+ }
426+
427+ @ Override
428+ public Object get () throws InterruptedException , ExecutionException {
429+ return delegate .get ();
430+ }
431+
432+ @ Override
433+ public Object get (long timeout , TimeUnit unit )
434+ throws InterruptedException , ExecutionException , TimeoutException {
435+ return delegate .get (timeout , unit );
436+ }
437+ }
438+ }
257439}
0 commit comments