LifoThreadPoolExecutorSQP.java

/*
 * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 * Additionally licensed with:
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.spf4j.concurrent;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import gnu.trove.set.hash.THashSet;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Queue;
import java.util.List;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.spf4j.base.AbstractRunnable;
import org.spf4j.base.TimeSource;
import org.spf4j.base.Timing;
import org.spf4j.base.UncheckedExecutionException;
import static org.spf4j.concurrent.RejectedExecutionHandler.REJECT_EXCEPTION_EXEC_HANDLER;
import org.spf4j.ds.SimpleStack;
import org.spf4j.jmx.JmxExport;
import org.spf4j.jmx.Registry;
import org.spf4j.stackmonitor.StackTrace;

/**
 *
 * LIFO scheduled java thread pool, this behavior is identical  with the JDK's fork join pool, but different from the
 * older jdk ThreadPoolExecutor implementations.
 *
 * This implementation behaves differently compared with a java Thread pool in that it prefers to spawn a
 * thread if possible instead of queueing a task when nr of threads has room to grow.
 *
 * Performance of this pool implementation is similar to ThreadPoolExecutor however due to LIFO scheduling,
 * it will have lower resource usage in most use cases.
 *
 * The JDK Fork Join pool's performance is superior to this implementation, however unlike this implementation,
 * with the fork join pool you will not be able to cancel+interrupt running tasks.
 *
 * @author zoly
 */
@ParametersAreNonnullByDefault
@SuppressFBWarnings({"MDM_THREAD_PRIORITIES", "MDM_WAIT_WITHOUT_TIMEOUT"})
public final class LifoThreadPoolExecutorSQP extends AbstractExecutorService implements MutableLifoThreadPool {


  private static final Logger LOG = LoggerFactory.getLogger(LifoThreadPoolExecutorSQP.class);
  /**
   * when a thread survives due core size, this the minimum wait time that core threads will wait for. worker threads
   * have a maximum time they are idle, after which they are retired... in case a user configures a thread pool with
   * idle times less than min wait, the core threads will have to have a minimum wait time to avoid spinning and hogging
   * the CPU. this value is used only when the max idle time of the pool is smaller, and it interferes with thread
   * retirement in that case... I do not see that case as a useful pooling case to be worth trying to optimize it...
   */
  private static final long CORE_MINWAIT_NANOS = Long.getLong("spf4j.lifoTp.coreMaxWaitNanos", 1000000000);

  private static final int LL_THRESHOLD = Integer.getInteger("spf4j.lifoTp.llQueueSizeThreshold", 64000);

  private final ReentrantLock stateLock;

  private final Condition stateCondition;

  private final String poolName;

  private final RejectedExecutionHandler rejectionHandler;

  @GuardedBy("stateLock")
  private final Queue<Runnable> taskQueue;

  @GuardedBy("stateLock")
  private final SimpleStack<QueuedThread> threadQueue;

  @GuardedBy("stateLock")
  private int maxIdleTimeMillis;

  @GuardedBy("stateLock")
  private int maxThreadCount;

  @GuardedBy("stateLock")
  private final PoolState state;

  @GuardedBy("stateLock")
  private int queueSizeLimit;

  @GuardedBy("stateLock")
  private boolean daemonThreads;

  @GuardedBy("stateLock")
  private int threadPriority;

  @GuardedBy("stateLock")
  private int threadCreationCount;

  public LifoThreadPoolExecutorSQP(final int maxNrThreads, final String name) {
    this(name, 0, maxNrThreads, 5000, 0);
  }


  public LifoThreadPoolExecutorSQP(final String poolName, final int coreSize,
          final int maxSize, final int maxIdleTimeMillis,
          final int queueSize, final boolean daemonThreads) {
    this(poolName, coreSize, maxSize, maxIdleTimeMillis,
            queueSize, daemonThreads, REJECT_EXCEPTION_EXEC_HANDLER);
  }

  public LifoThreadPoolExecutorSQP(final String poolName, final int coreSize,
          final int maxSize, final int maxIdleTimeMillis,
          final int queueSizeLimit) {
    this(poolName, coreSize, maxSize, maxIdleTimeMillis,
            queueSizeLimit, false, REJECT_EXCEPTION_EXEC_HANDLER);
  }

  public LifoThreadPoolExecutorSQP(final String poolName, final int coreSize,
          final int maxSize, final int maxIdleTimeMillis,
          final int queueSizeLimit, final boolean daemonThreads,
          final RejectedExecutionHandler rejectionHandler) {
    this(poolName, coreSize, maxSize, maxIdleTimeMillis, queueSizeLimit, daemonThreads,
            rejectionHandler, Thread.NORM_PRIORITY);
  }

  public LifoThreadPoolExecutorSQP(final String poolName, final int coreSize,
          final int maxSize, final int maxIdleTimeMillis,
          final int queueSizeLimit, final boolean daemonThreads,
          final RejectedExecutionHandler rejectionHandler,
          final int threadPriority) {
    if (coreSize > maxSize) {
      throw new IllegalArgumentException("Core size must be smaller than max size " + coreSize
              + " < " + maxSize);
    }
    if (coreSize < 0 || maxSize < 0 || maxIdleTimeMillis < 0 || queueSizeLimit < 0) {
      throw new IllegalArgumentException("All numberic TP configs must be positive values: "
              + coreSize + ", " + maxSize + ", " + maxIdleTimeMillis
              + ", " + queueSizeLimit);
    }
    this.stateLock = new ReentrantLock();
    this.rejectionHandler = rejectionHandler;
    this.poolName = poolName;
    this.maxIdleTimeMillis = maxIdleTimeMillis;
    this.taskQueue = new ArrayDeque<Runnable>(Math.min(queueSizeLimit, LL_THRESHOLD));
    this.queueSizeLimit = queueSizeLimit;
    this.threadQueue = new SimpleStack<>(Math.min(1024, maxSize));
    this.threadPriority = threadPriority;
    state = new PoolState(coreSize, new THashSet<>(Math.min(maxSize, 2048)));
    this.stateCondition = stateLock.newCondition();
    this.daemonThreads = daemonThreads;
    for (int i = 0; i < coreSize; i++) {
      QueuedThread qt = new QueuedThread(poolName + '-' + (threadCreationCount++), threadQueue,
              taskQueue, maxIdleTimeMillis, null, state, stateLock, stateCondition);
      qt.setDaemon(daemonThreads);
      qt.setPriority(threadPriority);
      state.addThread(qt);
      qt.start();
    }
    maxThreadCount = maxSize;
  }

  @Override
  public void exportJmx() {
    Registry.export(LifoThreadPoolExecutorSQP.class.getName(), poolName, this);
  }

  @Override
  @SuppressFBWarnings(value = {"MDM_WAIT_WITHOUT_TIMEOUT", "MDM_LOCK_ISLOCKED", "UL_UNRELEASED_LOCK_EXCEPTION_PATH"},
          justification = "no blocking is done while holding the lock,"
          + " lock is released on all paths, findbugs just cannot figure it out...")
  public void execute(final Runnable command) {
    boolean reject = false;
    stateLock.lock();
    try {
      if (state.isShutdown()) {
        // if shutting down, reject
        stateLock.unlock();
        this.rejectionHandler.rejectedExecution(command, this);
        return;
      }
      QueuedThread nqt = threadQueue.pollLast();
      if (nqt != null) {
        nqt.runNext(command);
        stateLock.unlock();
        return;
      }
      int tc = state.getThreadCount();
      // was not able to submit to an existing available thread, will attempt to create a new thread.
      if (tc < maxThreadCount) {
        QueuedThread qt;
        try {
          qt = new QueuedThread(poolName  + '-' + (threadCreationCount++), threadQueue, taskQueue, maxIdleTimeMillis,
                  command, state, stateLock, stateCondition);
          qt.setDaemon(daemonThreads);
          qt.setPriority(threadPriority);
          state.addThread(qt);
        } finally {
          stateLock.unlock();
        }
        qt.start();
        return;
      }
      // was not able to submit to an existing available thread, reached the maxThread limit.
      // will attempt to queue the task, and reject if unable to
      reject = taskQueue.size() >= queueSizeLimit || !taskQueue.offer(command);
    } catch (Throwable t) {
      if (stateLock.isHeldByCurrentThread()) {
        stateLock.unlock();
      }
      throw t;
    }
    stateLock.unlock();
    if (reject) {
      rejectionHandler.rejectedExecution(command, this);
    }
  }

  @Override
  @SuppressFBWarnings("MDM_WAIT_WITHOUT_TIMEOUT")
  public void shutdown() {
    stateLock.lock();
    try {
      if (!state.isShutdown()) {
        state.setShutdown(true); // set the shutdown flag, to reject new submissions.
        QueuedThread th;
        while ((th = threadQueue.pollLast()) != null) {
          th.signal(); // signal all waiting threads, so thay can start going down.
        }
      }
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  public boolean awaitTermination(final long time, final TimeUnit unit) throws InterruptedException {
    long deadlinenanos = TimeSource.nanoTime() + unit.toNanos(time);
    int threadCount;
    stateLock.lock();
    try {
      if (!state.isShutdown()) {
        throw new IllegalStateException("Threadpool is not is shutdown mode " + this);
      }
      threadCount = state.getThreadCount();
      long timeoutNs = deadlinenanos - TimeSource.nanoTime();
      while (threadCount > 0) {
        if (timeoutNs > 0) {
          timeoutNs = stateCondition.awaitNanos(timeoutNs);
        } else {
          break;
        }
        threadCount = state.getThreadCount();
      }
    } finally {
      stateLock.unlock();
    }
    return threadCount == 0;
  }

  @Override
  public List<Runnable> shutdownNow() {
    shutdown(); // shutdown
    stateLock.lock();
    try {
      state.interruptAll(); // interrupt all running threads.
      return new ArrayList<>(taskQueue);
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  @JmxExport
  public boolean isShutdown() {
    stateLock.lock();
    try {
      return state.isShutdown();
    } finally {
      stateLock.unlock();
    }
  }

  @JmxExport
  @Override
  public boolean isDaemonThreads() {
    stateLock.lock();
    try {
      return daemonThreads;
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  @JmxExport
  public boolean isTerminated() {
    stateLock.lock();
    try {
      return state.isShutdown() && state.getThreadCount() == 0;
    } finally {
      stateLock.unlock();
    }
  }

  @JmxExport
  @Override
  public int getThreadCount() {
    stateLock.lock();
    try {
      return state.getThreadCount();
    } finally {
      stateLock.unlock();
    }
  }

  @JmxExport
  @Override
  public int getMaxThreadCount() {
    stateLock.lock();
    try {
      return maxThreadCount;
    } finally {
      stateLock.unlock();
    }
  }

  @JmxExport
  @Override
  public int getCoreThreadCount() {
    stateLock.lock();
    try {
      return state.getCoreThreads();
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  public ReentrantLock getStateLock() {
    return stateLock;
  }

  @JmxExport
  @SuppressFBWarnings(value = "MDM_WAIT_WITHOUT_TIMEOUT",
          justification = "Holders of this lock will not block")
  @Override
  public int getNrQueuedTasks() {
    stateLock.lock();
    try {
      return taskQueue.size();
    } finally {
      stateLock.unlock();
    }
  }

  @JmxExport
  @Override
  public int getQueueSizeLimit() {
    stateLock.lock();
    try {
      return queueSizeLimit;
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  public void unregisterJmx() {
    Registry.unregister(LifoThreadPoolExecutorSQP.class.getName(), poolName);
  }

  @Override
  @JmxExport
  public void setDaemonThreads(final boolean daemonThreads) {
    stateLock.lock();
    try {
      this.daemonThreads = daemonThreads;
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  @JmxExport
  public void setMaxIdleTimeMillis(final int maxIdleTimeMillis) {
    stateLock.lock();
    try {
      this.maxIdleTimeMillis = maxIdleTimeMillis;
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  @JmxExport
  public void setMaxThreadCount(final int maxThreadCount) {
    stateLock.lock();
    try {
      this.maxThreadCount = maxThreadCount;
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  @JmxExport
  public void setCoreThreadCount(final int coreThreadCount) {
    stateLock.lock();
    try {
      this.state.setCoreThreads(coreThreadCount);
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  @JmxExport
  public void setQueueSizeLimit(final int queueSizeLimit) {
    stateLock.lock();
    try {
      this.queueSizeLimit = queueSizeLimit;
    } finally {
      stateLock.unlock();
    }
  }

  @Override
  @JmxExport
  public void setThreadPriority(final int threadPriority) {
    stateLock.lock();
    try {
      this.threadPriority = threadPriority;
    } finally {
      stateLock.unlock();
    }
  }

  @SuppressFBWarnings("NO_NOTIFY_NOT_NOTIFYALL")
  private static final class QueuedThread extends Thread {

    private final SimpleStack<QueuedThread> threadQueue;

    private final Queue<Runnable> taskQueue;

    private final int maxIdleTimeMillis;

    @GuardedBy("poolStateLock")
    private final PoolState state;

    private long lastRunNanos;

    private final ReentrantLock poolStateLock;

    private final Condition poolStateCondition;

    private final Condition submitCondition;

    @Nullable
    private Runnable toRun;

    QueuedThread(final String name, final SimpleStack<QueuedThread> threadQueue,
            final Queue<Runnable> taskQueue, final int maxIdleTimeMillis,
            @Nullable final Runnable runFirst, final PoolState state,
            final ReentrantLock submitMonitor, final Condition submitCondition) {
      super(name);
      this.threadQueue = threadQueue;
      this.taskQueue = taskQueue;
      this.maxIdleTimeMillis = maxIdleTimeMillis;
      this.state = state;
      this.lastRunNanos = TimeSource.nanoTime();
      this.poolStateLock = submitMonitor;
      this.submitCondition = submitMonitor.newCondition();
      this.poolStateCondition = submitCondition;
      this.toRun = runFirst;
    }

    /**
     * will return false when this thread is not running anymore...
     *
     * @param runnable
     * @return
     */
    @CheckReturnValue
    @SuppressFBWarnings("MDM_SIGNAL_NOT_SIGNALALL") // Only one thread will away on this condition
    private void runNext(final Runnable runnable) {
        toRun = runnable;
        submitCondition.signal();
    }

    @SuppressFBWarnings
    private void signal() {
      runNext(AbstractRunnable.NOP);
    }

    @Override
    public void run() {
      Runnable r = toRun;
      if (r != null) {
        try {
          execute(r);
        } finally {
          toRun = null;
        }
      }
      doRun(TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis));
    }

    @SuppressFBWarnings({"MDM_LOCK_ISLOCKED", "UL_UNRELEASED_LOCK"})
    private void doRun(final long maxIdleNanos) {
      try {
        while (true) {
          poolStateLock.lock();
          Runnable poll = taskQueue.poll();
          if (poll != null) {
            poolStateLock.unlock();
            execute(poll);
          } else { // nothing in the queue, will put the thread to thread queue.
            if (state.isShutdown()) {
              removeThread();
              break;
            }
            long timeoutNanos = lastRunNanos + maxIdleNanos - TimeSource.nanoTime();
            if (timeoutNanos <= 0) { // Thread was idle more than it should
              final int tc = state.getThreadCount();
              if (tc > state.getCoreThreads()) { // can we terminate.
                removeThread();
                break;
              } else { // this is a core thread for now.
                timeoutNanos = CORE_MINWAIT_NANOS;
              }
            }
            int ptr = threadQueue.pushAndGetIdx(this);
            try {
              timeoutNanos = submitCondition.awaitNanos(timeoutNanos);
            } catch (InterruptedException ex) {
              if (state.isShutdown()) {
                removeThread();
                break;
              }
            }
            Runnable r = toRun;
            if (r != null) {
                poolStateLock.unlock();
                try {
                  execute(r);
                } finally {
                  toRun = null;
                }
            } else {
              QueuedThread qt = threadQueue.get(ptr);
              if (qt == this) {
                threadQueue.remove(ptr);
              } else {
                if (!threadQueue.remove(this))  {
                  throw new IllegalStateException("Thread "  + this + " not present in " +  threadQueue);
                }
              }
              if (timeoutNanos <= 0) {
                  final int tc = state.getThreadCount();
                  if (state.isShutdown() || tc > state.getCoreThreads()) {
                    removeThread();
                    break;
                  }
                }
                poolStateLock.unlock();
            }
          }
        }
      } catch (Throwable t) {
        LOG.error("Unexpected exception", t);
        if (poolStateLock.isHeldByCurrentThread()) {
          poolStateLock.unlock();
        }
        throw t;
      }

    }

    private void removeThread() {
      state.removeThread(this);
      poolStateCondition.signalAll();
      poolStateLock.unlock();
    }

    private void execute(final Runnable runnable) {
      try {
        runnable.run();
      }  catch (Throwable e) {
          // Will run the thread uncaught handlers
          // but will continue the thread running unless a uncaught handler throws an exception
          final Thread.UncaughtExceptionHandler uexh = this.getUncaughtExceptionHandler();
          try {
            uexh.uncaughtException(this, e);
          } catch (RuntimeException ex) {
            ex.addSuppressed(e);
            throw new UncheckedExecutionException("Uncaught exception handler blew up: " + uexh, ex);
          }
      } finally {
        lastRunNanos = TimeSource.nanoTime();
      }
    }

    @Override
    public String toString() {
      StackTraceElement[] stackTrace;
      try {
        stackTrace = this.getStackTrace();
      } catch (RuntimeException ex) {
        stackTrace = StackTrace.EMPTY_STACK_TRACE;
      }
      return "QueuedThread{name = " + getName() + ", lastRunNanos="
              + Timing.getCurrentTiming().fromNanoTimeToInstant(lastRunNanos)
              + ", stack =" + Arrays.toString(stackTrace)
              + ", toRun = " + toRun + '}';
    }

  }

  private static final class PoolState {

    private boolean shutdown;

    private int coreThreads;

    private final Set<QueuedThread> allThreads;

    PoolState(final int thnr, final Set<QueuedThread> allThreads) {
      this.shutdown = false;
      this.coreThreads = thnr;
      this.allThreads = allThreads;
    }

    public void addThread(final QueuedThread thread) {
      if (!allThreads.add(thread)) {
        throw new IllegalStateException("Attempting to add a thread twice: " + thread);
      }
      LOG.debug("Started thread {}", thread.getName());
    }

    public void removeThread(final QueuedThread thread) {
      if (!allThreads.remove(thread)) {
        throw new IllegalStateException("Removing thread failed: " + thread);
      }
      LOG.debug("Terminating thread {}", thread.getName());
    }

    public void interruptAll() {
      for (Thread thread : allThreads) {
        thread.interrupt();
      }
    }

    public int getCoreThreads() {
      return coreThreads;
    }

    public void setCoreThreads(final int setCoreThreads) {
       coreThreads = setCoreThreads;
    }

    public boolean isShutdown() {
      return shutdown;
    }

    public void setShutdown(final boolean shutdown) {
      this.shutdown = shutdown;
    }

    public int getThreadCount() {
      return allThreads.size();
    }

    @Override
    public String toString() {
      return "ExecState{" + "shutdown=" + shutdown + ", threadCount="
              + allThreads.size() + '}';
    }

  }

  @Override
  public String toString() {
    return "LifoThreadPoolExecutorSQP{" + "threadQueue=" + threadQueue + ", maxIdleTimeMillis="
            + maxIdleTimeMillis + ", maxThreadCount=" + maxThreadCount + ", state=" + state
            + ", submitMonitor=" + stateLock + ", queueCapacity=" + queueSizeLimit
            + ", poolName=" + poolName + '}';
  }

  @JmxExport
  @Override
  public int getMaxIdleTimeMillis() {
    stateLock.lock();
    try {
      return maxIdleTimeMillis;
    } finally {
      stateLock.unlock();
    }
  }

  @JmxExport
  @Override
  public String getPoolName() {
    return poolName;
  }

  @JmxExport
  @Override
  public int getThreadPriority() {
    stateLock.lock();
    try {
     return threadPriority;
    } finally {
      stateLock.unlock();
    }
  }

}