FailSafeExecutorImpl.java
/*
* Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Additionally licensed with:
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.spf4j.failsafe.concurrent;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spf4j.base.AbstractRunnable;
import org.spf4j.failsafe.RetryPredicate;
import org.spf4j.base.UncheckedExecutionException;
import org.spf4j.concurrent.DefaultExecutor;
import org.spf4j.concurrent.InterruptibleCompletableFuture;
/**
* Executor that will call Callables with retry. This executor cannot be used inside a Completion service.
*
*
* @author zoly
*/
public final class FailSafeExecutorImpl implements FailSafeExecutor {
private static final Logger LOG = LoggerFactory.getLogger(FailSafeExecutorImpl.class);
private static final Future<?> SHUTDOWN = new Future() {
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
@Override
public boolean isCancelled() {
throw new UnsupportedOperationException();
}
@Override
public boolean isDone() {
throw new UnsupportedOperationException();
}
@Override
public Object get() {
throw new UnsupportedOperationException();
}
@Override
public Object get(final long timeout, final TimeUnit unit) {
throw new UnsupportedOperationException();
}
};
private final ExecutorService executionService;
private final DelayQueue<DelayedTask<RetryFutureTask<?>>> executionEvents = new DelayQueue<>();
private volatile Future<?> retryManagerFuture;
private final Object sync = new Object();
private void startRetryManager() {
Future<?> rm = retryManagerFuture;
if (rm == null) {
synchronized (sync) {
rm = retryManagerFuture;
if (rm == null) {
rm = DefaultExecutor.INSTANCE.submit(new RetryManager());
this.retryManagerFuture = rm;
LOG.debug("Retry manager started {}", rm);
}
}
}
}
private void shutdownRetryManager() {
synchronized (sync) {
Future<?> rmf = this.retryManagerFuture;
if (rmf != null && rmf != SHUTDOWN) {
rmf.cancel(true);
retryManagerFuture = SHUTDOWN;
}
}
}
private class RetryManager extends AbstractRunnable {
RetryManager() {
super("RetryManager");
}
@Override
public void doRun() {
while (retryManagerFuture != SHUTDOWN) {
try {
DelayedTask<RetryFutureTask<?>> event = executionEvents.poll(1, TimeUnit.MINUTES);
if (event != null) {
RetryFutureTask<?> runnable = event.getRunnable();
executionService.execute(runnable);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
LOG.debug("Interrupted Retry manager, shuting down, events scheduled: {}", executionEvents, ex);
break;
}
}
}
}
public FailSafeExecutorImpl(final ExecutorService exec) {
executionService = exec;
}
@Override
public void close() throws InterruptedException {
synchronized (sync) {
Future<?> rmf = this.retryManagerFuture;
if (rmf != null && rmf != SHUTDOWN) {
this.retryManagerFuture = SHUTDOWN;
rmf.cancel(true);
try {
rmf.get();
} catch (ExecutionException ex) {
throw new UncheckedExecutionException(ex);
} catch (CancellationException ex) {
// ignore, since we are the source
}
}
}
}
public void initiateClose() {
shutdownRetryManager();
}
@Override
public <A> Future<A> submit(final Callable<? extends A> task,
final RetryPredicate<A, ? extends Callable<? extends A>> predicate) {
RetryFutureTask<A> result =
new RetryFutureTask(task, (RetryPredicate<A, Callable<? extends A>>) predicate, executionEvents,
this::startRetryManager);
executionService.execute(result);
return (Future<A>) result;
}
@Override
public <A> CompletableFuture<A> submitRx(final Callable<? extends A> task,
final RetryPredicate<A, ? extends Callable<? extends A>> predicate,
final Supplier<InterruptibleCompletableFuture<A>> cfSupplier) {
InterruptibleCompletableFuture<A> result = cfSupplier.get();
ConsumableRetryFutureTask<A> rft =
new ConsumableRetryFutureTask<>(f -> {
A r;
try {
r = f.get();
} catch (ExecutionException ex) {
return result.completeExceptionally(ex.getCause());
} catch (Throwable ex) {
return result.completeExceptionally(ex);
}
return result.complete(r);
}, (Callable<A>) task,
(RetryPredicate<A, Callable<? extends A>>) predicate, executionEvents,
this::startRetryManager);
result.setToCancel(rft);
executionService.execute(rft);
return result;
}
@Override
public <A> Future<A> submit(final Callable<? extends A> task,
final RetryPredicate<A, ? extends Callable<? extends A>> predicate,
final int nrHedges, final long hedgeDelay, final TimeUnit unit) {
if (nrHedges <= 0) {
return submit(task, predicate);
}
int nrFut = nrHedges + 1;
final Future[] futures = new Future[nrFut];
ArrayBlockingQueue<Future<A>> queue = new ArrayBlockingQueue<>(1);
FirstFuture<A> result = new FirstFuture<A>(futures, queue);
ConsumableRetryFutureTask<A> future = new ConsumableRetryFutureTask(result, task,
(RetryPredicate<A, Callable<? extends A>>) predicate, executionEvents, this::startRetryManager);
startRetryManager();
futures[0] = future;
Runnable[] submits = new Runnable[nrFut];
submits[0] = () -> executionService.execute(future);
for (int i = 1; i < nrFut; i++) {
ConsumableRetryFutureTask<A> f = new ConsumableRetryFutureTask(
result, task, (RetryPredicate) predicate, executionEvents,
this::startRetryManager);
futures[i] = f;
if (hedgeDelay > 0) {
DelayedTask<RetryFutureTask<?>> delayedExecution = new DelayedTask<RetryFutureTask<?>>(
f, unit.toNanos(hedgeDelay));
f.setExec(delayedExecution);
submits[i] = () -> executionEvents.add(delayedExecution);
} else {
submits[i] = () -> executionService.execute(f);
}
}
for (Runnable submit : submits) {
submit.run();
}
return result;
}
@Override
public <A> CompletableFuture<A> submitRx(final Callable<? extends A> task,
final RetryPredicate<A, ? extends Callable<? extends A>> predicate,
final int nrHedges, final long hedgeDelay, final TimeUnit unit,
final Supplier<InterruptibleCompletableFuture<A>> cfSupplier) {
if (nrHedges <= 0) {
return submitRx(task, predicate);
}
InterruptibleCompletableFuture<A> result = cfSupplier.get();
int nrFut = nrHedges + 1;
final Future<A>[] futures = new Future[nrFut];
ArrayBlockingQueue<Future<A>> queue = new ArrayBlockingQueue<>(1);
FirstFuture<A> resultX = new FirstFuture<A>(futures, queue) {
@Override
@SuppressFBWarnings({ "NOS_NON_OWNED_SYNCHRONIZATION", "EXS_EXCEPTION_SOFTENING_NO_CHECKED" })
public boolean accept(final Future<A> finished) {
boolean accepted = super.accept(finished);
if (accepted) {
A r;
try {
r = finished.get();
} catch (ExecutionException ex) {
if (!result.completeExceptionally(ex.getCause())) {
throw new IllegalStateException(ex);
}
return true;
} catch (Throwable ex) {
if (!result.completeExceptionally(ex)) {
throw new IllegalStateException(ex);
}
return true;
}
if (!result.complete(r)) {
throw new IllegalStateException();
}
return true;
}
return false;
}
};
result.setToCancel(resultX);
ConsumableRetryFutureTask<A> future = new ConsumableRetryFutureTask(resultX, task,
(RetryPredicate<A, Callable<? extends A>>) predicate, executionEvents, this::startRetryManager);
startRetryManager();
futures[0] = future;
Runnable[] submits = new Runnable[nrFut];
submits[0] = () -> executionService.execute(future);
for (int i = 1; i < nrFut; i++) {
ConsumableRetryFutureTask<A> f = new ConsumableRetryFutureTask(
resultX, task, (RetryPredicate) predicate, executionEvents,
this::startRetryManager);
futures[i] = f;
if (hedgeDelay > 0) {
DelayedTask<RetryFutureTask<?>> delayedExecution = new DelayedTask<RetryFutureTask<?>>(
f, unit.toNanos(hedgeDelay));
f.setExec(delayedExecution);
submits[i] = () -> executionEvents.add(delayedExecution);
} else {
submits[i] = () -> executionService.execute(f);
}
}
for (Runnable submit : submits) {
submit.run();
}
return result;
}
@Override
public <A> void execute(final Callable<? extends A> task,
final RetryPredicate<A, ? extends Callable<? extends A>> predicate) {
RetryFutureTask<A> result = new RetryFutureTask(task, predicate, executionEvents, this::startRetryManager);
executionService.execute(result);
}
@Override
public String toString() {
return "RetryExecutor{" + "executionService=" + executionService + ", executionEvents=" + executionEvents
+ ", retryManagerFuture=" + retryManagerFuture
+ ", sync=" + sync + '}';
}
@SuppressFBWarnings("NOS_NON_OWNED_SYNCHRONIZATION") // Actually I own it...
private static class FirstFuture<T> implements Future<T>, ConditionalConsumer<Future<T>> {
private final Future<T>[] futures;
private final BlockingQueue<Future<T>> queue;
private boolean first = true;
FirstFuture(final Future<T>[] futures,
final BlockingQueue<Future<T>> queue) {
this.futures = futures;
this.queue = queue;
}
@Override
public boolean accept(final Future<T> finished) {
synchronized (this) {
if (first) {
first = false;
queue.add(finished);
for (int i = 0; i < futures.length; i++) {
Future f = futures[i];
if (f != null && f != finished) {
f.cancel(true);
}
futures[i] = null;
}
return true;
}
return false;
}
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
synchronized (this) {
boolean result = true;
for (int i = 0, l = futures.length; i < l; i++) {
Future f = futures[i];
if (f != null && !f.cancel(mayInterruptIfRunning)) {
result = false;
}
}
return result;
}
}
@Override
public boolean isCancelled() {
synchronized (this) {
boolean result = true;
for (int i = 0, l = futures.length; i < l; i++) {
Future f = futures[i];
if (f != null && !f.isCancelled()) {
result = false;
}
}
return result;
}
}
@Override
public boolean isDone() {
boolean result = true;
for (int i = 0, l = futures.length; i < l; i++) {
Future f = futures[i];
if (f != null && !f.isDone()) {
result = false;
}
}
return result;
}
@Override
public T get() throws InterruptedException, ExecutionException {
return queue.take().get();
}
@Override
public T get(final long timeout, final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Future<T> poll = queue.poll(timeout, unit);
if (poll == null) {
throw new TimeoutException("Timed out after " + timeout + " " + unit);
} else {
return poll.get();
}
}
}
private static class ConsumableRetryFutureTask<T> extends RetryFutureTask<T> {
private final ConditionalConsumer<Future<T>> consumer;
ConsumableRetryFutureTask(final ConditionalConsumer<Future<T>> consumer, final Callable<T> callable,
final RetryPredicate<T, Callable<? extends T>> retryPredicate,
final DelayQueue<DelayedTask<RetryFutureTask<?>>> delayedTasks,
final Runnable onRetry) {
super(callable, retryPredicate, delayedTasks, onRetry);
this.consumer = consumer;
}
@Override
public void done() {
consumer.accept(this);
}
}
}