1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.base;
33
34 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
35 import java.time.Instant;
36 import java.util.HashMap;
37 import java.util.Map;
38 import java.util.concurrent.Callable;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.function.Function;
42 import java.util.function.LongSupplier;
43 import java.util.function.Predicate;
44 import java.util.logging.Level;
45 import java.util.logging.Logger;
46 import javax.annotation.Nonnull;
47 import javax.annotation.Nullable;
48 import javax.annotation.ParametersAreNonnullByDefault;
49
50
51
52
53
54
55 @ParametersAreNonnullByDefault
56
57 public final class Callables {
58
59
60
61
62 @Deprecated
63 public static final SimpleRetryPredicate<?> RETRY_FOR_NULL_RESULT = new SimpleRetryPredicate<Object>() {
64 @Override
65 public SimpleAction apply(final Object input) {
66 return (input != null) ? SimpleAction.ABORT : SimpleAction.RETRY;
67 }
68 };
69
70
71
72
73
74
75
76
77 @Deprecated
78 public static final AdvancedRetryPredicate<Exception> DEFAULT_EXCEPTION_RETRY
79 = new DefaultAdvancedRetryPredicateImpl();
80
81
82
83
84 @Deprecated
85 public static final Predicate<Exception> DEFAULT_EXCEPTION_RETRY_PREDICATE
86 = new Predicate<Exception>() {
87
88 @Override
89 @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE")
90 public boolean test(final Exception t) {
91 return DEFAULT_EXCEPTION_RETRY.apply(t) != AdvancedAction.ABORT;
92 }
93
94 };
95
96 private Callables() { }
97
98
99
100
101
102
103
104 @Deprecated
105 @Nullable
106 public static <T, EX extends Exception> T executeWithRetry(final TimeoutCallable<T, EX> what,
107 final int nrImmediateRetries,
108 final int maxRetryWaitMillis, final Class<EX> exceptionClass)
109 throws InterruptedException, EX, TimeoutException {
110 return executeWithRetry(what, nrImmediateRetries, maxRetryWaitMillis,
111 (TimeoutRetryPredicate<? super T, T>) TimeoutRetryPredicate.NORETRY_FOR_RESULT,
112 DEFAULT_EXCEPTION_RETRY, exceptionClass);
113 }
114
115
116
117
118 @Deprecated
119 @Nullable
120 public static <T> T executeWithRetry(final TimeoutCallable<T, RuntimeException> what,
121 final int nrImmediateRetries, final int maxRetryWaitMillis)
122 throws InterruptedException, TimeoutException {
123 return executeWithRetry(what, nrImmediateRetries, maxRetryWaitMillis,
124 (TimeoutRetryPredicate<? super T, T>) TimeoutRetryPredicate.NORETRY_FOR_RESULT,
125 DEFAULT_EXCEPTION_RETRY, RuntimeException.class);
126 }
127
128
129
130
131
132 @Deprecated
133 @Nullable
134 public static <T, EX extends Exception> T executeWithRetry(final TimeoutCallable<T, EX> what,
135 final int nrImmediateRetries,
136 final int maxRetryWaitMillis,
137 final AdvancedRetryPredicate<Exception> retryOnException,
138 final Class<EX> exceptionClass)
139 throws InterruptedException, EX, TimeoutException {
140 return executeWithRetry(what, nrImmediateRetries, maxRetryWaitMillis,
141 (TimeoutRetryPredicate<? super T, T>) TimeoutRetryPredicate.NORETRY_FOR_RESULT,
142 retryOnException, exceptionClass);
143 }
144
145
146
147
148 @Deprecated
149 @Nullable
150 public static <T> T executeWithRetry(final TimeoutCallable<T, RuntimeException> what,
151 final int nrImmediateRetries, final int maxRetryWaitMillis,
152 final AdvancedRetryPredicate<Exception> retryOnException)
153 throws InterruptedException, TimeoutException {
154 return executeWithRetry(what, nrImmediateRetries, maxRetryWaitMillis,
155 (TimeoutRetryPredicate<? super T, T>) TimeoutRetryPredicate.NORETRY_FOR_RESULT,
156 retryOnException, RuntimeException.class);
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175 @Deprecated
176 @Nullable
177 public static <T, EX extends Exception> T executeWithRetry(final TimeoutCallable<T, EX> what,
178 final int nrImmediateRetries, final int maxWaitMillis,
179 final TimeoutRetryPredicate<? super T, T> retryOnReturnVal,
180 final AdvancedRetryPredicate<Exception> retryOnException,
181 final Class<EX> exceptionClass)
182 throws InterruptedException, EX, TimeoutException {
183 long deadline = what.getDeadline();
184 return executeWithRetry(what, new TimeoutRetryPredicate2RetryPredicate<>(deadline, retryOnReturnVal),
185 new FibonacciBackoffRetryPredicate<>(retryOnException, nrImmediateRetries,
186 maxWaitMillis / 100, maxWaitMillis, Callables::rootClass, deadline,
187 () -> System.currentTimeMillis(), TimeUnit.MILLISECONDS),
188 exceptionClass);
189 }
190
191
192
193
194 @Deprecated
195 private static final class RetryData {
196
197 private int immediateLeft;
198
199 private long p1;
200
201 private long p2;
202
203 private final long maxDelay;
204
205 RetryData(final int immediateLeft, final long p1, final long maxDelay) {
206 this.immediateLeft = immediateLeft;
207 if (p1 < 1) {
208 this.p1 = 0;
209 this.p2 = 1;
210 } else {
211 this.p1 = p1;
212 this.p2 = p1;
213 }
214 this.maxDelay = maxDelay;
215 }
216
217 private long nextDelay() {
218 if (immediateLeft > 0) {
219 immediateLeft--;
220 return 0;
221 } else if (p2 > maxDelay) {
222 return maxDelay;
223 } else {
224 long result = p2;
225 p2 = p1 + p2;
226 p1 = result;
227 return result;
228 }
229 }
230
231 }
232
233
234
235
236 @Deprecated
237 public static Class rootClass(final Exception f) {
238 return com.google.common.base.Throwables.getRootCause(f).getClass();
239 }
240
241
242
243
244 @Deprecated
245 public static final class FibonacciBackoffRetryPredicate<T, R> implements RetryPredicate<T, R> {
246
247 private final IntMath.XorShift32 random;
248
249 private final AdvancedRetryPredicate<T> arp;
250
251 private final int nrImmediateRetries;
252
253 private final long maxWaitUnits;
254
255 private final long minWaitUnits;
256
257 private Map<Object, RetryData> retryRegistry;
258
259 private final Function<T, ?> mapper;
260
261 private final long deadline;
262
263 private final LongSupplier currTimeSuplier;
264
265 private final TimeUnit tu;
266
267 public FibonacciBackoffRetryPredicate(final AdvancedRetryPredicate<T> arp,
268 final int nrImmediateRetries, final long minWaitUnits, final long maxWaitUnits,
269 final Function<T, ?> mapper, final long deadline, final LongSupplier currTimeSuplier,
270 final TimeUnit tu) {
271 this.arp = arp;
272 this.nrImmediateRetries = nrImmediateRetries;
273 this.maxWaitUnits = maxWaitUnits;
274 this.minWaitUnits = minWaitUnits;
275 retryRegistry = null;
276 this.mapper = mapper;
277 this.random = new IntMath.XorShift32();
278 this.deadline = deadline;
279 this.currTimeSuplier = currTimeSuplier;
280 this.tu = tu;
281 }
282
283 private RetryData getRetryData(final T value, final AdvancedAction action) {
284 Object rootCauseClass = mapper.apply(value);
285 RetryData data = retryRegistry.get(rootCauseClass);
286 if (data == null) {
287 data = createRetryData(action);
288 retryRegistry.put(rootCauseClass, data);
289 }
290 return data;
291 }
292
293 private RetryData createRetryData(final AdvancedAction action) {
294 if (action == AdvancedAction.RETRY_DELAYED) {
295 return new RetryData(0, minWaitUnits, maxWaitUnits);
296 } else {
297 return new RetryData(nrImmediateRetries, minWaitUnits, maxWaitUnits);
298 }
299 }
300
301 @Override
302 public RetryDecision<R> getDecision(final T value, final Callable<R> callable) {
303 long currentTime = currTimeSuplier.getAsLong();
304 if (currentTime > deadline) {
305 return RetryDecision.abort(new TimeoutException("Deadline " + Instant.ofEpochMilli(deadline)
306 + " passed, current time is " + Instant.ofEpochMilli(currentTime)));
307 }
308 if (retryRegistry == null) {
309 retryRegistry = new HashMap<>();
310 }
311 AdvancedAction action = arp.apply(value, deadline);
312 switch (action) {
313 case ABORT:
314 return RetryDecision.abort();
315 case RETRY_IMMEDIATE:
316 return RetryDecision.retry(0, callable);
317 case RETRY_DELAYED:
318 case RETRY:
319 RetryData retryData = getRetryData(value, action);
320 final long nextDelay = retryData.nextDelay();
321 long delay = Math.min(nextDelay, deadline - currentTime);
322 if (delay > 0) {
323 delay = Math.abs(random.nextInt()) % delay;
324 }
325 if (currentTime + delay > deadline) {
326 return RetryDecision.abort(new TimeoutException("No time left for retry "
327 + Instant.ofEpochMilli(deadline) + ' ' + tu
328 + " passed, current time is " + Instant.ofEpochMilli(currentTime) + ' ' + tu));
329 }
330 return RetryDecision.retry(tu.toMillis(delay), callable);
331 default:
332 throw new UnsupportedOperationException("Unsupperted Retry Action " + action);
333
334 }
335 }
336
337 }
338
339
340
341
342 @Deprecated
343 @Nullable
344 public static <T, EX extends Exception> T executeWithRetry(final TimeoutCallable<T, EX> what,
345 final TimeoutRetryPredicate<? super T, T> retryOnReturnVal,
346 final TimeoutRetryPredicate<Exception, T> retryOnException, final Class<EX> exceptionClass)
347 throws InterruptedException, EX, TimeoutException {
348 final long deadline = what.getDeadline();
349 return executeWithRetry(what,
350 new TimeoutRetryPredicate2RetryPredicate<>(deadline, retryOnReturnVal),
351 new TimeoutRetryPredicate2RetryPredicate<>(deadline, retryOnException), exceptionClass);
352 }
353
354
355
356
357 @Deprecated
358 public abstract static class TimeoutCallable<T, EX extends Exception> implements CheckedCallable<T, EX> {
359
360 private final long mdeadline;
361
362
363 public TimeoutCallable(final int timeoutMillis) {
364 mdeadline = overflowSafeAdd(System.currentTimeMillis(), timeoutMillis);
365 }
366
367 public TimeoutCallable(final long deadline) {
368 mdeadline = deadline;
369
370 }
371
372 @Override
373 public final T call() throws EX, InterruptedException, TimeoutException {
374 return call(mdeadline);
375 }
376
377
378
379
380 public abstract T call(long deadline) throws EX, InterruptedException, TimeoutException;
381
382 public final long getDeadline() {
383 return mdeadline;
384 }
385
386 }
387
388
389
390
391 @Deprecated
392 public enum AdvancedAction {
393 RETRY,
394 RETRY_IMMEDIATE,
395 RETRY_DELAYED,
396 ABORT
397 }
398
399
400
401
402 @Deprecated
403 public interface AdvancedRetryPredicate<T> {
404
405 default AdvancedAction apply(final T value, final long deadline) {
406 return apply(value);
407 }
408
409 AdvancedAction apply(T value);
410
411 AdvancedRetryPredicate<?> NO_RETRY = new AdvancedRetryPredicate<Object>() {
412 @Override
413 public AdvancedAction apply(final Object value) {
414 return AdvancedAction.ABORT;
415 }
416 };
417
418 }
419
420
421
422
423 @Deprecated
424 public static final class RetryDecision<R> {
425
426 private static final RetryDecision ABORT = new RetryDecision(Type.Abort, -1, null, null);
427
428 public enum Type {
429 Abort, Retry
430 }
431
432 private final Type decisionType;
433
434 private final long delayMillis;
435
436 private final Exception exception;
437
438 private final Callable<R> newCallable;
439
440 private RetryDecision(final Type decisionType, final long delayMillis,
441 final Exception exception, @Nullable final Callable<R> newCallable) {
442 this.decisionType = decisionType;
443 this.delayMillis = delayMillis;
444 this.exception = exception;
445 this.newCallable = newCallable;
446 }
447
448 public static RetryDecision abort(final Exception exception) {
449 return new RetryDecision(Type.Abort, -1, exception, null);
450 }
451
452 public static <R> RetryDecision<R> retry(final long retryMillis, @Nonnull final Callable<R> callable) {
453 return new RetryDecision(Type.Retry, retryMillis, null, callable);
454 }
455
456 public static RetryDecision abort() {
457 return ABORT;
458 }
459
460 public Type getDecisionType() {
461 return decisionType;
462 }
463
464 public long getDelayMillis() {
465 return delayMillis;
466 }
467
468 @SuppressFBWarnings("EI_EXPOSE_REP")
469 public Exception getException() {
470 return exception;
471 }
472
473 @Nullable
474 public Callable<R> getNewCallable() {
475 return newCallable;
476 }
477
478 }
479
480
481
482
483 @Deprecated
484 public interface RetryPredicate<T, R> {
485
486
487
488
489
490
491
492 @Nonnull
493 RetryDecision<R> getDecision(T value, @Nonnull Callable<R> callable);
494
495 RetryPredicate<Object, Object> NORETRY_DELAY_PREDICATE = new RetryPredicate<Object, Object>() {
496 @Override
497 public RetryDecision getDecision(final Object value, final Callable callable) {
498 return RetryDecision.abort();
499 }
500 };
501 }
502
503
504
505
506
507 @Deprecated
508 @Nullable
509 public static <T, EX extends Exception> T executeWithRetry(final TimeoutCallable<T, EX> what,
510 final TimeoutRetryPredicate<Exception, T> retryOnException, final Class<EX> exceptionClass)
511 throws InterruptedException, EX, TimeoutException {
512 return executeWithRetry(what, (TimeoutRetryPredicate<T, T>) TimeoutRetryPredicate.NORETRY_FOR_RESULT,
513 retryOnException, exceptionClass);
514 }
515
516
517
518
519 @Deprecated
520 public interface TimeoutRetryPredicate<T, R> {
521
522 RetryDecision<R> getDecision(T value, long deadlineMillis, Callable<R> what);
523
524 TimeoutRetryPredicate NORETRY_FOR_RESULT = new TimeoutRetryPredicate<Object, Object>() {
525
526 @Override
527 public RetryDecision<Object> getDecision(final Object value, final long deadline, final Callable<Object> what) {
528 return RetryDecision.abort();
529 }
530
531 };
532
533 }
534
535
536
537
538 @Deprecated
539 static final class TimeoutRetryPredicate2RetryPredicate<T, R>
540 implements RetryPredicate<T, R> {
541
542 private final long deadline;
543
544 private final TimeoutRetryPredicate<T, R> predicate;
545
546 TimeoutRetryPredicate2RetryPredicate(final long deadline, final TimeoutRetryPredicate<T, R> predicate) {
547 this.deadline = deadline;
548 this.predicate = predicate;
549 }
550
551
552 @Override
553 public RetryDecision<R> getDecision(final T value, final Callable<R> callable) {
554 return predicate.getDecision(value, deadline, callable);
555 }
556
557 }
558
559
560
561
562
563
564
565 public interface CheckedCallable<T, EX extends Exception> extends Callable<T> {
566
567
568
569
570
571
572
573
574
575 @Override
576 T call() throws EX, InterruptedException, TimeoutException;
577
578 }
579
580
581
582
583 @Deprecated
584 public enum SimpleAction {
585 RETRY, ABORT
586 }
587
588
589
590
591 @Deprecated
592 public interface SimpleRetryPredicate<T> {
593
594 SimpleAction apply(T value)
595 throws TimeoutException, InterruptedException;
596 }
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613 @Deprecated
614 @SuppressFBWarnings({ "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE", "MDM_THREAD_YIELD" })
615 @Nullable
616 public static <T, EX extends Exception> T executeWithRetry(
617 final CheckedCallable<T, EX> pwhat,
618 final RetryPredicate<? super T, T> retryOnReturnVal,
619 final RetryPredicate<Exception, T> retryOnException,
620 final Class<EX> exceptionClass)
621 throws InterruptedException, TimeoutException, EX {
622 Callable<T> what = pwhat;
623 T result = null;
624 Exception lastEx = null;
625 try {
626 result = what.call();
627 } catch (InterruptedException ex1) {
628 throw ex1;
629 } catch (Exception e) {
630 lastEx = e;
631 }
632 Exception lastExChain = lastEx;
633 RetryDecision decision = null;
634
635 while ((lastEx != null
636 && (decision = retryOnException.getDecision(lastEx, what)).getDecisionType()
637 == RetryDecision.Type.Retry)
638 || (lastEx == null && (decision = retryOnReturnVal.getDecision(result, what)).getDecisionType()
639 == RetryDecision.Type.Retry)) {
640 if (Thread.interrupted()) {
641 throw new InterruptedException();
642 }
643 long delayMillis = decision.getDelayMillis();
644 if (delayMillis > 0) {
645 Thread.sleep(delayMillis);
646 }
647 what = decision.getNewCallable();
648 result = null;
649 lastEx = null;
650 try {
651 result = what.call();
652 } catch (InterruptedException ex1) {
653 throw ex1;
654 } catch (Exception e) {
655 lastEx = e;
656 if (lastExChain != null) {
657 Throwables.suppressLimited(lastEx, lastExChain);
658 }
659 lastExChain = lastEx;
660 }
661 }
662 if (decision == null) {
663 throw new IllegalStateException("Decission should have ben initialized " + lastEx + ", " + result);
664 }
665 if (decision.getDecisionType() == RetryDecision.Type.Abort) {
666 Exception ex = decision.getException();
667 if (ex != null) {
668 lastEx = ex;
669 if (lastExChain != null) {
670 Throwables.suppressLimited(lastEx, lastExChain);
671 }
672 lastExChain = lastEx;
673 }
674 }
675 if (lastEx != null) {
676 if (lastExChain instanceof RuntimeException) {
677 throw (RuntimeException) lastExChain;
678 } else if (lastExChain instanceof TimeoutException) {
679 throw (TimeoutException) lastExChain;
680 } else if (lastExChain == null) {
681 return null;
682 } else if (exceptionClass.isAssignableFrom(lastExChain.getClass())) {
683 throw (EX) lastExChain;
684 } else {
685 throw new UncheckedExecutionException(lastExChain);
686 }
687 }
688 return result;
689 }
690
691 public static <T> Callable<T> synchronize(final Callable<T> callable) {
692 return new Callable<T>() {
693
694 @Override
695 public synchronized T call() throws Exception {
696 return callable.call();
697 }
698 };
699 }
700
701
702
703
704 public static <T> Callable<T> withName(final Callable<T> callable, final String name) {
705 return new Callable<T>() {
706
707 @Override
708 public T call() throws Exception {
709 Thread currentThread = Thread.currentThread();
710 String origName = currentThread.getName();
711 try {
712 currentThread.setName(origName + '[' + name + ']');
713 return callable.call();
714 } finally {
715 currentThread.setName(origName);
716 }
717 }
718
719 @Override
720 public String toString() {
721 return name;
722 }
723
724 };
725 }
726
727 static long overflowSafeAdd(final long currentTime, final long timeout) {
728 if (currentTime < 0) {
729 throw new IllegalArgumentException("Time must be positive, not " + currentTime);
730 }
731 if (timeout < 0) {
732 return currentTime;
733 }
734 long result = currentTime + timeout;
735 if ((currentTime ^ timeout) < 0 || (currentTime ^ result) >= 0) {
736 return result;
737 } else {
738 return Long.MAX_VALUE;
739 }
740 }
741
742 public static <V> MemorizedCallable<V> memorized(final Callable<V> source) {
743 return new MemorizedCallable<>(source);
744 }
745
746 public static <V> Callable<V> constant(final V value) {
747 return new ConstCallable(value);
748 }
749
750 public static Callable<Void> from(final Runnable value) {
751 return () -> {
752 value.run();
753 return null;
754 };
755 }
756
757 private static final class ConstCallable<V> implements Callable<V> {
758
759 private final V value;
760
761 ConstCallable(final V value) {
762 this.value = value;
763 }
764
765 @Override
766 public V call() {
767 return value;
768 }
769
770 @Override
771 public String toString() {
772 return "ConstCallable{" + value + '}';
773 }
774 }
775
776 @Deprecated
777 private static final class DefaultAdvancedRetryPredicateImpl implements AdvancedRetryPredicate<Exception> {
778
779 @Override
780 public AdvancedAction apply(@Nonnull final Exception input) {
781 if (Throwables.isRetryable(input)) {
782 Logger.getLogger(DefaultAdvancedRetryPredicateImpl.class.getName())
783 .log(Level.FINE, "Exception encountered, retrying...", input);
784 return AdvancedAction.RETRY;
785 }
786 return AdvancedAction.ABORT;
787 }
788 }
789
790
791
792 }