FutureTask.java

//CHECKSTYLE:OFF
//preserve original formatting.
package org.spf4j.failsafe.concurrent;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import org.spf4j.base.AlmostSafe;

/**
 * A cancellable asynchronous computation. This class provides a base implementation of {@link Future}, with methods to
 * start and cancel a computation, query to see if the computation is complete, and retrieve the result of the
 * computation. The result can only be retrieved when the computation has completed; the {@code get} methods will block
 * if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or
 * cancelled (unless the computation is invoked using {@link #runAndReset}).
 *
 * <p>
 * A {@code FutureTask} can be used to wrap a {@link Callable} or {@link Runnable} object. Because {@code FutureTask}
 * implements {@code Runnable}, a {@code FutureTask} can be submitted to an {@link Executor} for execution.
 *
 * <p>
 * In addition to serving as a standalone class, this class provides {@code protected} functionality that may be useful
 * when creating customized task classes.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's {@code get} methods
 */
@SuppressFBWarnings
public class FutureTask<V> implements RunnableFuture<V> {

  /*
     * Revision notes: This differs from previous versions of this
     * class that relied on AbstractQueuedSynchronizer, mainly to
     * avoid surprising users about retaining interrupt status during
     * cancellation races. Sync control in the current design relies
     * on a "state" field updated via CAS to track completion, along
     * with a simple Treiber stack to hold waiting threads.
     *
     * Style note: As usual, we bypass overhead of using
     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
   */

  /**
   * The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set,
   * setException, and cancel. During completion, state may take on transient values of COMPLETING (while outcome is
   * being set) or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). Transitions from these
   * intermediate to final states use cheaper ordered/lazy writes because values are unique and cannot be further
   * modified.
   *
   * Possible state transitions: NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW ->
   * INTERRUPTING -> INTERRUPTED
   */
  private volatile int state;
  private static final int NEW = 0;
  private static final int COMPLETING = 1;
  private static final int NORMAL = 2;
  private static final int EXCEPTIONAL = 3;
  private static final int CANCELLED = 4;
  private static final int INTERRUPTING = 5;
  private static final int INTERRUPTED = 6;

  /**
   * The underlying callable; nulled out after running
   */
  private volatile Callable<V> callable;
  /**
   * The result to return or exception to throw from get()
   */
  private Object outcome; // non-volatile, protected by state reads/writes
  /**
   * The thread running the callable; CASed during run()
   */
  private volatile Thread runner;
  /**
   * Treiber stack of waiting threads
   */
  private volatile WaitNode waiters;

  /**
   * Returns result or throws exception for completed task.
   *
   * @param s completed state value
   */
  @SuppressWarnings("unchecked")
  protected V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL) {
      return (V) x;
    }
    if (s >= CANCELLED) {
      throw new CancellationException();
    }
    throw new ExecutionException((Throwable) x);
  }

  /**
   * Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}.
   *
   * @param callable the callable task
   * @throws NullPointerException if the callable is null
   */
  public FutureTask(Callable<V> callable) {
    if (callable == null) {
      throw new NullPointerException();
    }
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
  }

  /**
   * Creates a {@code FutureTask} that will, upon running, execute the given {@code Runnable}, and arrange that
   * {@code get} will return the given result on successful completion.
   *
   * @param runnable the runnable task
   * @param result the result to return on successful completion. If you don't need a particular result, consider using
   * constructions of the form: {@code Future<?> f = new FutureTask<Void>(runnable, null)}
   * @throws NullPointerException if the runnable is null
   */
  public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
  }

  public void setCallable(final Callable<V> c) {
    if (callable == null) {
      throw new NullPointerException();
    }
    this.callable = c;
  }

  public void reset() {
    this.state = NEW;
  }

  public Callable<V> getCallable() {
    return callable;
  }

  public boolean isCancelled() {
    return state >= CANCELLED;
  }

  public boolean isDone() {
    return state != NEW;
  }

  public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW
            && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) {
      return false;
    }
    try {    // in case call to interrupt throws exception
      if (mayInterruptIfRunning) {
        try {
          Thread t = runner;
          if (t != null) {
            t.interrupt();
          }
        } finally { // final state
          UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
        }
      }
    } finally {
      finishCompletion();
    }
    return true;
  }

  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING) {
      s = awaitDone(false, 0L);
    }
    return report(s);
  }

  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get(long timeout, TimeUnit unit)
          throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null) {
      throw new NullPointerException();
    }
    int s = state;
    if (s <= COMPLETING
            && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) {
      throw new TimeoutException();
    }
    return report(s);
  }

  /**
   * Protected method invoked when this task transitions to state {@code isDone} (whether normally or via cancellation).
   * The default implementation does nothing. Subclasses may override this method to invoke completion callbacks or
   * perform bookkeeping. Note that you can query status inside the implementation of this method to determine whether
   * this task has been cancelled.
   */
  protected void done() {
  }

  /**
   * Sets the result of this future to the given value unless this future has already been set or has been cancelled.
   *
   * <p>
   * This method is invoked internally by the {@link #run} method upon successful completion of the computation.
   *
   * @param v the value
   */
  protected boolean set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = v;
      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
      finishCompletion();
    }
    return true;
  }

  /**
   * Causes this future to report an {@link ExecutionException} with the given throwable as its cause, unless this
   * future has already been set or has been cancelled.
   *
   * <p>
   * This method is invoked internally by the {@link #run} method upon failure of the computation.
   *
   * @param t the cause of failure
   */
  protected boolean setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = t;
      UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
      finishCompletion();
    }
    return true;
  }

  public void run() {
    if (state != NEW
            || !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread())) {
      return;
    }
    boolean finished = false;
    try {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          result = null;
          ran = false;
          finished = setException(ex);
        }
        if (ran) {
          finished = set(result);
        }
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING) {
        handlePossibleCancellationInterrupt(s);
      }
      if (!finished && s < CANCELLED) {
        state = NEW;
      } else {
        callable = null;
      }
    }
  }

  /**
   * Executes the computation without setting its result, and then resets this future to initial state, failing to do so
   * if the computation encounters an exception or is cancelled. This is designed for use with tasks that intrinsically
   * execute more than once.
   *
   * @return {@code true} if successfully run and reset
   */
  protected boolean runAndReset() {
    if (state != NEW
            || !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread())) {
      return false;
    }
    boolean ran = false;
    int s = state;
    try {
      Callable<V> c = callable;
      if (c != null && s == NEW) {
        try {
          c.call(); // don't set result
          ran = true;
        } catch (Throwable ex) {
          setException(ex);
        }
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      s = state;
      if (s >= INTERRUPTING) {
        handlePossibleCancellationInterrupt(s);
      }
    }
    return ran && s == NEW;
  }

  /**
   * Ensures that any interrupt from a possible cancel(true) is only delivered to a task while in run or runAndReset.
   */
  private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING) {
      while (state == INTERRUPTING) {
        Thread.yield(); // wait out pending interrupt
      }
    }
    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
  }

  /**
   * Simple linked list nodes to record waiting threads in a Treiber stack. See other classes such as Phaser and
   * SynchronousQueue for more detailed explanation.
   */
  static final class WaitNode {

    volatile Thread thread;
    volatile WaitNode next;

    WaitNode() {
      thread = Thread.currentThread();
    }
  }

  /**
   * Removes and signals all waiting threads, invokes done(), and nulls out callable.
   */
  private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
      if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
        for (;;) {
          Thread t = q.thread;
          if (t != null) {
            q.thread = null;
            LockSupport.unpark(t);
          }
          WaitNode next = q.next;
          if (next == null) {
            break;
          }
          q.next = null; // unlink to help gc
          q = next;
        }
        break;
      }
    }

    done();
  }

  /**
   * Awaits completion or aborts on interrupt or timeout.
   *
   * @param timed true if use timed waits
   * @param nanos time to wait, if timed
   * @return state upon completion
   */
  private int awaitDone(boolean timed, long nanos)
          throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
      if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
      }

      int s = state;
      if (s > COMPLETING) {
        if (q != null) {
          q.thread = null;
        }
        return s;
      } else if (s == COMPLETING) // cannot time out yet
      {
        Thread.yield();
      } else if (q == null) {
        q = new WaitNode();
      } else if (!queued) {
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                q.next = waiters, q);
      } else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
          removeWaiter(q);
          return state;
        }
        LockSupport.parkNanos(this, nanos);
      } else {
        LockSupport.park(this);
      }
    }
  }

  /**
   * Tries to unlink a timed-out or interrupted wait node to avoid accumulating garbage. Internal nodes are simply
   * unspliced without CAS since it is harmless if they are traversed anyway by releasers. To avoid effects of
   * unsplicing from already removed nodes, the list is retraversed in case of an apparent race. This is slow when there
   * are a lot of nodes, but we don't expect lists to be long enough to outweigh higher-overhead schemes.
   */
  private void removeWaiter(WaitNode node) {
    if (node != null) {
      node.thread = null;
      retry:
      for (;;) {          // restart on removeWaiter race
        for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
          s = q.next;
          if (q.thread != null) {
            pred = q;
          } else if (pred != null) {
            pred.next = s;
            if (pred.thread == null) // check for race
            {
              continue retry;
            }
          } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                  q, s)) {
            continue retry;
          }
        }
        break;
      }
    }
  }

  // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;
  private static final long stateOffset;
  private static final long runnerOffset;
  private static final long waitersOffset;

  static {
    try {
      UNSAFE =  AlmostSafe.USF; //sun.misc.Unsafe.getUnsafe();
      Class<?> k = FutureTask.class;
      stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
      runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
      waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
    } catch (Exception e) {
      throw new Error(e);
    }
  }

  @Override
  public String toString() {
    return "FutureTask{" + "state=" + state + ", callable=" + callable + ", outcome=" + outcome + '}';
  }

}