ContextPropagatingCompletableFuture.java

package org.spf4j.concurrent;

import com.google.common.annotations.Beta;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.spf4j.base.ExecutionContext;
import org.spf4j.base.ExecutionContexts;
import org.spf4j.base.TimeSource;
import org.spf4j.base.Timing;
import org.spf4j.base.UncheckedTimeoutException;

/**
 * A Completable future that will wrap all Functions to properly propagate ExecutionContext.
 * see CompletableFuture javadoc for more detail.
 *
 * This works properly only in JDK 11.
 *
 * @author Zoltan Farkas
 */
@SuppressWarnings("checkstyle:DesignForExtension")
@Beta
@ParametersAreNonnullByDefault
public class ContextPropagatingCompletableFuture<T>
        extends InterruptibleCompletableFuture<T> {

  private final ExecutionContext parentContext;

  private final long deadlinenanos;

  @SuppressFBWarnings("EI_EXPOSE_REP2")
  public ContextPropagatingCompletableFuture(final ExecutionContext parentContext, final long deadlinenanos) {
    this.parentContext = parentContext;
    this.deadlinenanos = deadlinenanos;
  }



  @Override
  public <U> CompletableFuture<U> thenApply(final Function<? super T, ? extends U> fn) {
    return super.thenApply(ExecutionContexts.propagatingFunction(fn, parentContext,
            null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<U> thenApplyAsync(final Function<? super T, ? extends U> fn) {
    return super.thenApplyAsync(ExecutionContexts.propagatingFunction(fn, parentContext,
            null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<U> thenApplyAsync(final Function<? super T, ? extends U> fn, final Executor executor) {
    return super.thenApplyAsync(ExecutionContexts.propagatingFunction(fn, parentContext,
            null, deadlinenanos), executor);
  }

  @Override
  public CompletableFuture<Void> thenAccept(final Consumer<? super T> action) {
    return super.thenAccept(ExecutionContexts.propagatingConsumer(action,
            parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> thenAcceptAsync(final Consumer<? super T> action) {
    return super.thenAcceptAsync(ExecutionContexts.propagatingConsumer(action,
            parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> thenAcceptAsync(final Consumer<? super T> action, final Executor executor) {
    return super.thenAcceptAsync(ExecutionContexts.propagatingConsumer(action,
            parentContext, null, deadlinenanos), executor);
  }

  @Override
  public CompletableFuture<Void> thenRun(final Runnable action) {
    return super.thenRun(ExecutionContexts.propagatingRunnable(action, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> thenRunAsync(final Runnable action) {
    return super.thenRunAsync(
            ExecutionContexts.propagatingRunnable(action, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> thenRunAsync(final Runnable action, final Executor executor) {
    return super.thenRunAsync(
                    ExecutionContexts.propagatingRunnable(action, parentContext, null, deadlinenanos), executor);
  }

  @Override
  public <U, V> CompletableFuture<V> thenCombine(final CompletionStage<? extends U> other,
          final BiFunction<? super T, ? super U, ? extends V> fn) {
    return super.thenCombine(other,
            ExecutionContexts.propagatingBiFunction(fn, parentContext, null, deadlinenanos));
  }

  @Override
  public <U, V> CompletableFuture<V> thenCombineAsync(final CompletionStage<? extends U> other,
          final BiFunction<? super T, ? super U, ? extends V> fn) {
    return super.thenCombineAsync(other,
            ExecutionContexts.propagatingBiFunction(fn, parentContext, null, deadlinenanos));
  }

  @Override
  public <U, V> CompletableFuture<V> thenCombineAsync(final CompletionStage<? extends U> other,
          final BiFunction<? super T, ? super U, ? extends V> fn, final Executor executor) {
    return super.thenCombineAsync(other,
                    ExecutionContexts.propagatingBiFunction(fn, parentContext, null, deadlinenanos),
                    executor);
  }

  @Override
  public <U> CompletableFuture<Void> thenAcceptBoth(final CompletionStage<? extends U> other,
          final BiConsumer<? super T, ? super U> action) {
    return super.thenAcceptBoth(other, ExecutionContexts.propagatingBiConsumer(action, parentContext,
                    null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<Void> thenAcceptBothAsync(final CompletionStage<? extends U> other,
          final BiConsumer<? super T, ? super U> action) {
    return super.thenAcceptBothAsync(other,
            ExecutionContexts.propagatingBiConsumer(action, parentContext,
                    null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<Void> thenAcceptBothAsync(final CompletionStage<? extends U> other,
          final BiConsumer<? super T, ? super U> action, final Executor executor) {
    return super.thenAcceptBothAsync(other,
            ExecutionContexts.propagatingBiConsumer(action, parentContext,
                    null, deadlinenanos), executor);
  }

  @Override
  public CompletableFuture<Void> runAfterBoth(final CompletionStage<?> other, final Runnable action) {
    return super.runAfterBoth(other,
            ExecutionContexts.propagatingRunnable(action, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> runAfterBothAsync(final CompletionStage<?> other, final Runnable action) {
    return super.runAfterBothAsync(other, ExecutionContexts.propagatingRunnable(action, parentContext,
            null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> runAfterBothAsync(final CompletionStage<?> other,
          final Runnable action, final Executor executor) {
    return super.runAfterBothAsync(other, ExecutionContexts.propagatingRunnable(action, parentContext,
            null, deadlinenanos), executor);
  }

  @Override
  public <U> CompletableFuture<U> applyToEither(final CompletionStage<? extends T> other,
          final Function<? super T, U> fn) {
    return super.applyToEither(other,
                    ExecutionContexts.propagatingFunction(fn, parentContext, null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<U> applyToEitherAsync(final CompletionStage<? extends T> other,
          final Function<? super T, U> fn) {
    return super.applyToEitherAsync(other,
            ExecutionContexts.propagatingFunction(fn, parentContext, null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<U> applyToEitherAsync(final CompletionStage<? extends T> other,
          final Function<? super T, U> fn, final Executor executor) {
    return super.applyToEitherAsync(other,
            ExecutionContexts.propagatingFunction(fn, parentContext, null, deadlinenanos), executor);
  }

  @Override
  public CompletableFuture<Void> acceptEither(final CompletionStage<? extends T> other,
          final Consumer<? super T> action) {
    return super.acceptEither(other,
            ExecutionContexts.propagatingConsumer(action, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> acceptEitherAsync(final CompletionStage<? extends T> other,
          final Consumer<? super T> action) {
    return super.acceptEitherAsync(other,
            ExecutionContexts.propagatingConsumer(action, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> acceptEitherAsync(final CompletionStage<? extends T> other,
          final Consumer<? super T> action, final Executor executor) {
    return super.acceptEitherAsync(other,
            ExecutionContexts.propagatingConsumer(action, parentContext, null, deadlinenanos), executor);
  }

  @Override
  public CompletableFuture<Void> runAfterEither(final CompletionStage<?> other, final Runnable action) {
    return super.runAfterEither(other,
            ExecutionContexts.propagatingRunnable(action, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> runAfterEitherAsync(final CompletionStage<?> other, final Runnable action) {
    return super.runAfterEitherAsync(other,
            ExecutionContexts.propagatingRunnable(action, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<Void> runAfterEitherAsync(final CompletionStage<?> other,
          final Runnable action, final Executor executor) {
    return super.runAfterEitherAsync(other,
            ExecutionContexts.propagatingRunnable(action, parentContext, null, deadlinenanos), executor);
  }

  @Override
  public <U> CompletableFuture<U> thenCompose(final Function<? super T, ? extends CompletionStage<U>> fn) {
    return super.thenCompose(ExecutionContexts.propagatingFunction(fn, parentContext, null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<U> thenComposeAsync(final Function<? super T, ? extends CompletionStage<U>> fn) {
    return super.thenComposeAsync(ExecutionContexts.propagatingFunction(fn, parentContext, null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<U> thenComposeAsync(final Function<? super T, ? extends CompletionStage<U>> fn,
          final Executor executor) {
    return super.thenComposeAsync(
            ExecutionContexts.propagatingFunction(fn, parentContext, null, deadlinenanos), executor);
  }

  @Override
  public CompletableFuture<T> exceptionally(final Function<Throwable, ? extends T> fn) {
    return super.exceptionally(ExecutionContexts.propagatingFunction(fn, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<T> whenComplete(final BiConsumer<? super T, ? super Throwable> action) {
    return super.whenComplete(ExecutionContexts.propagatingBiConsumer(action, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<T> whenCompleteAsync(final BiConsumer<? super T, ? super Throwable> action) {
    return super.whenCompleteAsync(
            ExecutionContexts.propagatingBiConsumer(action, parentContext, null, deadlinenanos));
  }

  @Override
  public CompletableFuture<T> whenCompleteAsync(final BiConsumer<? super T, ? super Throwable> action,
          final Executor executor) {
    return super.whenCompleteAsync(
            ExecutionContexts.propagatingBiConsumer(action, parentContext, null, deadlinenanos),
            executor);
  }

  @Override
  public <U> CompletableFuture<U> handle(final BiFunction<? super T, Throwable, ? extends U> fn) {
    return super.handle(ExecutionContexts.propagatingBiFunction(fn, parentContext, null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<U> handleAsync(final BiFunction<? super T, Throwable, ? extends U> fn) {
    return super.handleAsync(
            ExecutionContexts.propagatingBiFunction(fn, parentContext, null, deadlinenanos));
  }

  @Override
  public <U> CompletableFuture<U> handleAsync(final BiFunction<? super T, Throwable, ? extends U> fn,
          final Executor executor) {
    return super.handleAsync(
            ExecutionContexts.propagatingBiFunction(fn, parentContext, null, deadlinenanos), executor);
  }

  @Override
  public CompletableFuture<T> toCompletableFuture() {
    return this;
  }

  @Override
  public String toString() {
    return "ContextPropagatingCompletableFuture{" + "parentContext=" + parentContext
            + ", deadlinenanos=" + deadlinenanos + ", super=" + super.toString() +  '}';
  }

  @Override
  @Nullable
  public T get() throws InterruptedException, ExecutionException {
    try {
      long timeout = deadlinenanos - TimeSource.nanoTime();
      if (timeout < 0) {
        throw new UncheckedTimeoutException("deadline exceeded " + Timing.getCurrentTiming()
                .fromNanoTimeToInstant(deadlinenanos));
      }
      return super.get(timeout, TimeUnit.NANOSECONDS);
    } catch (TimeoutException ex) {
      throw new UncheckedTimeoutException(ex);
    }
  }


  /**
   * JDK 11.
   */
  public CompletableFuture<T> completeAsync(final Supplier<? extends T> supplier) {
    try {
      return (CompletableFuture)
              this.getClass().getMethod("completeAsync", new Class[] {Supplier.class})
                .invoke(this, ExecutionContexts.propagatingSupplier(supplier, parentContext, null, deadlinenanos));
    } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException ex) {
      throw new UnsupportedOperationException("Supported only on JDK 11", ex);
    }
  }

  /**
   * JDK 11.
   */
  public CompletableFuture<T> completeAsync(final Supplier<? extends T> supplier, final Executor executor) {
    try {
      return (CompletableFuture)
              this.getClass().getMethod("completeAsync", new Class[] {Supplier.class, Executor.class})
                      .invoke(this,
                              ExecutionContexts.propagatingSupplier(supplier, parentContext, null, deadlinenanos),
                              executor);
    } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException ex) {
      throw new UnsupportedOperationException("Supported only on JDK 11", ex);
    }
  }

  /**
   * JDK 11!
   */
  public <U> CompletableFuture<U> newIncompleteFuture() {
    return new ContextPropagatingCompletableFuture<>(parentContext, deadlinenanos);
  }

  public static <U> CompletableFuture<U> supplyAsync(final Supplier<U> f) {
    return supplyAsync(f, DefaultExecutor.INSTANCE);
  }

  public static <U> CompletableFuture<U> supplyAsync(final Supplier<U> f, final Executor e) {
    ExecutionContext current = ExecutionContexts.current();
    if (current == null) {
      return CompletableFuture.supplyAsync(f, e);
    }
    return supplyAsync(f, current, e, current.getDeadlineNanos());
  }

  public static <U> CompletableFuture<U> supplyAsync(
          final Supplier<U> f, final Executor e, final long deadlineNanos) {
    ExecutionContext current = ExecutionContexts.current();
    if (current == null) {
      return CompletableFuture.supplyAsync(f, e);
    }
    return supplyAsync(f, current, e, deadlineNanos);
  }

  public static <U> CompletableFuture<U> supplyAsync(final Supplier<U> f,
          final ExecutionContext current,
          final Executor e, final long deadlineNanos) {
    long ctxDeadlineNanos = current.getDeadlineNanos();
    long currentTime = TimeSource.nanoTime();
    long effectiveDeadlineNanos;
    if ((ctxDeadlineNanos - currentTime) > (deadlineNanos - currentTime)) {
      effectiveDeadlineNanos = deadlineNanos;
    } else {
      effectiveDeadlineNanos = ctxDeadlineNanos;
    }
    ContextPropagatingCompletableFuture<U> d
            = new ContextPropagatingCompletableFuture<U>(current, effectiveDeadlineNanos);
    e.execute(() -> {
      try {
        U r;
        try (ExecutionContext ec = ExecutionContexts.start(f.toString(), current,
                currentTime, effectiveDeadlineNanos)) {
          r = f.get();
        }
        d.complete(r);
      } catch (Throwable t) {
        d.completeExceptionally(t);
      }
    });
    return d;
  }

  public static <U> CompletableFuture<U> completedFuture(final U value) {
    ExecutionContext current = ExecutionContexts.current();
    if (current == null) {
      return CompletableFuture.completedFuture(value);
    } else {
      return completedFuture(current, current.getDeadlineNanos(), value);
    }
  }

  public static <U> CompletableFuture<U> completedFuture(
          final ExecutionContext parentContext, final long deadlinenanos, final U value) {
    ContextPropagatingCompletableFuture<U> r = new ContextPropagatingCompletableFuture<U>(parentContext, deadlinenanos);
    if (!r.complete(value)) {
      throw new IllegalStateException("Cannot be already completed " + r);
    }
    return r;
  }


}