VMExecutor.java

/*
 * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 * Additionally licensed with:
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.spf4j.zel.vm;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import javax.annotation.Nullable;
import org.spf4j.base.Pair;

/**
 *
 * @author zoly
 */
@SuppressFBWarnings("NOS_NON_OWNED_SYNCHRONIZATION")
public final class VMExecutor {

  private final Executor exec;

  /**
   * Map from Future -> Suspendables suspended at this futures and their futures.
   */
  private final ConcurrentMap<VMFuture<Object>, List<Pair<Suspendable<Object>, VMFuture<Object>>>> futToSuspMap
          = new ConcurrentHashMap<>();


  public static class Lazy {

    private static final ExecutorService DEF_EXEC
            = new ForkJoinPool(Integer.getInteger("zel.pool.maxThreadNr", org.spf4j.base.Runtime.NR_PROCESSORS),
                    new DefaultForkJoinWorkerThreadFactory(), new Thread.UncaughtExceptionHandler() {

              @Override
              public void uncaughtException(final Thread t, final Throwable e) {
                org.spf4j.base.Runtime.error("Uncaucht Exception zel default executor", e);
              }
            }, true);

    static {
      org.spf4j.base.Runtime.queueHook(0, new Runnable() {
        @Override
        public void run() {
          DEF_EXEC.shutdown();
        }
      });
    }

    public static final VMExecutor DEFAULT = new VMExecutor(DEF_EXEC);

    static class ZelWorker extends ForkJoinWorkerThread {

      ZelWorker(final ForkJoinPool pool) {
        super(pool);
      }

    }

    static class DefaultForkJoinWorkerThreadFactory
            implements ForkJoinPool.ForkJoinWorkerThreadFactory {

      public ForkJoinWorkerThread newThread(final ForkJoinPool pool) {
        return new ZelWorker(pool);
      }
    }
  }

  public interface Suspendable<T> extends Callable<T> {

    @Override
    T call() throws SuspendedException, ExecutionException, InterruptedException;

    List<VMFuture<Object>> getSuspendedAt();

  }

  public static <T> Suspendable<T> synchronize(final Suspendable<T> what) {
    return new Suspendable<T>() {

      private volatile boolean isRunning = false;

      @Override
      public T call() throws SuspendedException, ExecutionException, InterruptedException {
        if (!isRunning) {
          synchronized (this) {
            if (!isRunning) {
              isRunning = true;
              try {
                return what.call();
              } catch (SuspendedException e) {
                isRunning = false;
                throw e;
              }
            }
          }
        }
        throw ExecAbortException.INSTANCE;
      }

      @Override
      public synchronized List<VMFuture<Object>> getSuspendedAt() {
        return what.getSuspendedAt();
      }
    };
  }

  public VMExecutor(final Executor exec) {
    this.exec = exec;
  }

  public <T> Future<T> submitNonSuspendable(final Callable<T> callable) {
    FutureTask task = new FutureTask(callable);
    exec.execute(task);
    return task;
  }

  public <T> Future<T> submit(final Suspendable<T> callable) {
    final VMFuture<T> resultFuture = new VMSyncFuture<>();
    submit(callable, resultFuture);
    return resultFuture;
  }

  /**
   * Returns a future that will not get notified when callable completes.
   *
   * @param <T>
   * @param callable
   * @return
   */
  public <T> Future<T> submitInternal(final Suspendable<T> callable) {
    final VMFuture<T> resultFuture = new VMASyncFuture<>();
    submit(callable, resultFuture);
    return resultFuture;
  }

  @Nullable
  public List<Pair<Suspendable<Object>, VMFuture<Object>>> resumeSuspendables(final VMFuture<Object> future) {
    List<Pair<Suspendable<Object>, VMFuture<Object>>> suspended = futToSuspMap.remove(future);
    if (suspended != null) {
      for (Pair<Suspendable<Object>, VMFuture<Object>> susp : suspended) {
        submit(susp.getFirst(), susp.getSecond());
      }
    }
    return suspended;
  }

  private void addSuspendable(final VMFuture<Object> futureSuspendedFor,
          final Suspendable<Object> suspendedCallable, final VMFuture<Object> suspendedCallableFuture) {

    List<Pair<Suspendable<Object>, VMFuture<Object>>> suspended
            = futToSuspMap.get(futureSuspendedFor);
    if (suspended == null) {
      suspended = new LinkedList<>();
      List<Pair<Suspendable<Object>, VMFuture<Object>>> old
              = futToSuspMap.putIfAbsent(futureSuspendedFor, suspended);
      if (old != null) {
        suspended = old;
      }
    }
    do {
      List<Pair<Suspendable<Object>, VMFuture<Object>>> newList
              = new LinkedList<>(suspended);
      newList.add(Pair.of(suspendedCallable, suspendedCallableFuture));
      if (futToSuspMap.replace(futureSuspendedFor, suspended, newList)) {
        break;
      } else {
        suspended = futToSuspMap.get(futureSuspendedFor);
        if (suspended == null) {
          suspended = new LinkedList<>();
          List<Pair<Suspendable<Object>, VMFuture<Object>>> old
                  = futToSuspMap.putIfAbsent(futureSuspendedFor, suspended);
          if (old != null) {
            suspended = old;
          }
        }
      }
    } while (true);
    if (futureSuspendedFor.isDone()) {
      resumeSuspendables(futureSuspendedFor);
    }
  }

  private <T> void submit(final Suspendable<T> callable, final VMFuture<T> future) {
    exec.execute(new Runnable() {

      @Override
      public void run() {
        try {
          T result = callable.call();
          future.setResult(result);
          resumeSuspendables((VMFuture<Object>) future);
        } catch (SuspendedException ex) {
          for (VMFuture<Object> fut : callable.getSuspendedAt()) {
            addSuspendable(fut,
                    (Suspendable<Object>) callable, (VMFuture<Object>) future);
          }
        } catch (ExecutionException e) {
          future.setExceptionResult(e);
          resumeSuspendables((VMFuture<Object>) future);
        } catch (RuntimeException | InterruptedException e) {
          future.setExceptionResult(new ExecutionException(e));
          resumeSuspendables((VMFuture<Object>) future);
        }
      }
    });
  }

  @Override
  public String toString() {
    return "VMExecutor{" + "exec=" + exec + ", futToSuspMap=" + futToSuspMap + '}';
  }

}