RetryExecutor.java
/*
* Copyright (c) 2001, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package org.spf4j.concurrent;
import com.google.common.base.Predicate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.spf4j.base.Throwables;
import org.spf4j.base.Pair;
/**
* Executor that will execute Callables with retry.
* This executor cannot be used inside a Completion service.
* as such it allow
*
* @author zoly
*/
public class RetryExecutor<T> implements ExecutorService {
private final ExecutorService executionService;
/**
* can contain: DelayedCallables for execution. (delayed retries) or
* FailedExecutionResults for results.
*
*/
private final DelayQueue<FailedExecutionResult> executionEvents = new DelayQueue<FailedExecutionResult>();
private final ConcurrentMap<Callable<? extends Object>, Pair<Integer, ExecutionException>> executionAttempts;
private final ExecutorService exec;
private final int nrImmediateRetries;
private final int nrTotalRetries;
private final long delayMillis;
private volatile RetryManager retryManager;
private final Predicate<Exception> retryException;
private final BlockingQueue<Future<T>> completionQueue;
private void startRetryManager() {
if (this.retryManager == null) {
synchronized (this) {
if (this.retryManager == null) {
this.retryManager = new RetryManager();
this.retryManager.start();
}
}
}
}
private void shutdownRetryManager() {
synchronized (this) {
if (this.retryManager != null) {
this.retryManager.interrupt();
try {
this.retryManager.join();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
/**
* this class represents either a execution failure notification or a retry
* command.
*/
private static class FailedExecutionResult implements Delayed {
private final ExecutionException exception;
private final FutureBean<Object> future;
private final Callable<Object> callable;
private final long delay;
private final boolean isExecution;
public FailedExecutionResult(final ExecutionException exception, final FutureBean future,
final Callable callable, final long delay, final boolean isExecution) {
this.exception = exception;
this.future = future;
this.callable = callable;
this.delay = delay + System.currentTimeMillis();
this.isExecution = isExecution;
}
@Override
public long getDelay(final TimeUnit unit) {
return unit.convert(delay - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(final Delayed o) {
long tDelay = getDelay(TimeUnit.MILLISECONDS);
long oDelay = o.getDelay(TimeUnit.MILLISECONDS);
if (tDelay > oDelay) {
return 1;
} else if (tDelay < oDelay) {
return -1;
} else {
return 0;
}
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
} else {
if (obj instanceof Delayed) {
return this.compareTo((Delayed) obj) == 0;
} else {
return false;
}
}
}
@Override
public int hashCode() {
int hash = 7;
hash = 53 * hash + (this.callable != null ? this.callable.hashCode() : 0);
return hash;
}
public ExecutionException getException() {
return exception;
}
public FutureBean<Object> getFuture() {
return future;
}
public Callable<Object> getCallable() {
return callable;
}
public boolean isIsExecution() {
return isExecution;
}
}
private class RetryableCallable<T> implements Callable<T>, Runnable {
private final Callable callable;
private final FutureBean<T> future;
public RetryableCallable(final Callable<T> callable, final FutureBean<T> future) {
this.callable = callable;
this.future = future;
}
public RetryableCallable(final Runnable task, final Object result, final FutureBean<T> future) {
this.callable = new Callable() {
@Override
public Object call() throws Exception {
task.run();
return result;
}
};
this.future = future;
}
@Override
public T call() {
try {
Object result = callable.call();
if (future != null) {
future.setResult(result);
}
return null;
} catch (Exception e) {
if (retryException.apply(e)) {
startRetryManager();
executionEvents.add(
new FailedExecutionResult(new ExecutionException(e), future, callable, 0, false));
} else {
future.setExceptionResult(new ExecutionException(e));
}
return null;
}
}
@Override
public void run() {
call();
}
}
private class RetryManager extends Thread {
public RetryManager() {
super("RetryManager");
}
public void shutdown() {
this.interrupt();
try {
this.join();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
@Override
public void run() {
Thread managerThread = Thread.currentThread();
while (!managerThread.isInterrupted()) {
try {
FailedExecutionResult event = executionEvents.take();
if (event.isIsExecution()) {
executionService.execute(new RetryableCallable<Object>(event.getCallable(), event.getFuture()));
} else {
Pair<Integer, ExecutionException> attemptsInfo = executionAttempts.get(event.getCallable());
if (attemptsInfo == null) {
attemptsInfo = Pair.of(1, event.getException());
} else {
attemptsInfo = Pair.of(attemptsInfo.getFirst() + 1,
Throwables.suppress(event.getException(), attemptsInfo.getSecond()));
}
int nrAttempts = attemptsInfo.getFirst();
if (nrAttempts > nrTotalRetries) {
executionAttempts.remove(event.getCallable());
event.getFuture().setExceptionResult(attemptsInfo.getSecond());
} else if (nrAttempts > nrImmediateRetries) {
executionAttempts.put(event.getCallable(), attemptsInfo);
executionEvents.put(new FailedExecutionResult(attemptsInfo.getSecond(), event.getFuture(),
event.getCallable(), delayMillis, true));
} else {
executionAttempts.put(event.getCallable(), attemptsInfo);
executionService.execute(
new RetryableCallable<Object>(event.getCallable(), event.getFuture()));
}
}
} catch (InterruptedException ex) {
managerThread.interrupt();
break;
}
}
}
}
public RetryExecutor(final ExecutorService exec, final int nrImmediateRetries,
final int nrTotalRetries, final long delayMillis, final Predicate<Exception> retryException,
@Nullable final BlockingQueue<Future<T>> completionQueue) {
executionService = exec;
executionAttempts = new ConcurrentHashMap<Callable<? extends Object>, Pair<Integer, ExecutionException>>();
this.nrImmediateRetries = nrImmediateRetries;
this.nrTotalRetries = nrTotalRetries;
this.delayMillis = delayMillis;
this.exec = exec;
this.retryException = retryException;
this.completionQueue = completionQueue;
}
@Override
public final void shutdown() {
shutdownRetryManager();
exec.shutdown();
}
@Override
public final List<Runnable> shutdownNow() {
shutdownRetryManager();
return exec.shutdownNow();
}
@Override
public final boolean isShutdown() {
return exec.isShutdown();
}
@Override
public final boolean isTerminated() {
return exec.isTerminated();
}
@Override
public final boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
return exec.awaitTermination(timeout, unit);
}
private FutureBean<T> createFutureBean() {
if (completionQueue == null) {
return new FutureBean<T>();
} else {
return new FutureBean<T>() {
@Override
public void done() {
completionQueue.add(this);
}
};
}
}
@Override
public final <A> Future<A> submit(final Callable<A> task) {
FutureBean<T> result = createFutureBean();
executionService.execute(new RetryableCallable(task, result));
return (Future<A>) result;
}
@Override
public final <A> Future<A> submit(final Runnable task, final A result) {
FutureBean<T> resultFuture = createFutureBean();
executionService.execute(new RetryableCallable<T>(task, result, resultFuture));
return (Future<A>) resultFuture;
}
@Override
public final Future<?> submit(final Runnable task) {
FutureBean<?> resultFuture = createFutureBean();
executionService.execute(new RetryableCallable(task, null, resultFuture));
return resultFuture;
}
@Override
public final <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
throws InterruptedException {
List<Future<T>> result = new ArrayList<Future<T>>();
for (Callable task : tasks) {
result.add(this.submit(task));
}
for (Future fut : result) {
try {
fut.get();
// CHECKSTYLE:OFF
} catch (ExecutionException ex) {
//CHECKSTYLE:ON
// Swallow exception for now, this sexception will be thoriwn when the client will call get again..
}
}
return result;
}
@Override
public final <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks,
final long timeout, final TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public final <T> T invokeAny(final Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public final <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public final void execute(final Runnable command) {
executionService.execute(new RetryableCallable(command, null, null));
}
}