View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.base;
33  
34  import java.lang.reflect.InvocationTargetException;
35  import java.util.ArrayList;
36  import java.util.Collection;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.function.BiConsumer;
41  import java.util.function.BiFunction;
42  import java.util.function.Consumer;
43  import java.util.function.Function;
44  import java.util.function.Supplier;
45  import java.util.stream.Collectors;
46  import javax.annotation.Nonnegative;
47  import javax.annotation.Nullable;
48  import javax.annotation.ParametersAreNonnullByDefault;
49  import javax.annotation.Signed;
50  import javax.annotation.concurrent.ThreadSafe;
51  import org.spf4j.base.ExecutionContext.Relation;
52  import org.spf4j.concurrent.ScalableSequence;
53  import org.spf4j.concurrent.UIDGenerator;
54  import org.spf4j.ds.SimpleStack;
55  
56  /**
57   * @author Zoltan Farkas
58   */
59  @ThreadSafe
60  @ParametersAreNonnullByDefault
61  public final class ExecutionContexts {
62  
63    private static final UIDGenerator ID_GEN = new UIDGenerator(new ScalableSequence(0, 10), "X", 1544368928196L);
64  
65    public static final long DEFAULT_TIMEOUT_NANOS
66            = Long.getLong("spf4j.execContext.defaultTimeoutNanos", TimeUnit.HOURS.toNanos(8));
67  
68    private static final ThreadLocal<SimpleStack<ExecutionContext>> EXEC_CTX =
69            new ThreadLocal<SimpleStack<ExecutionContext>>() {
70      @Override
71      protected SimpleStack<ExecutionContext> initialValue() {
72        return new SimpleStack<>(4);
73      }
74  
75    };
76  
77    private static final ThreadLocalContextAttacher DEFAULT_TL_ATTACHER = new ThreadLocalContextAttacherImpl();
78  
79    private static final ExecutionContextFactory<ExecutionContext> CTX_FACTORY = initFactory();
80  
81    private static final ThreadLocalContextAttacher TL_ATTACHER =  initTLAttacher();
82  
83    private ExecutionContexts() {
84    }
85  
86    private static ThreadLocalContextAttacher initTLAttacher() {
87      String factoryClass = System.getProperty("spf4j.execContext.tlAttacherClass");
88      ThreadLocalContextAttacher factory;
89      if (factoryClass == null) {
90        factory = DEFAULT_TL_ATTACHER;
91      } else {
92        try {
93          factory = ((Class<ThreadLocalContextAttacher>) Class.forName(factoryClass)).getConstructor().newInstance();
94        } catch (ClassNotFoundException | InstantiationException | NoSuchMethodException
95                | InvocationTargetException | IllegalAccessException ex) {
96          throw new ExceptionInInitializerError(ex);
97        }
98      }
99      return factory;
100   }
101 
102  private static ExecutionContextFactory<ExecutionContext> initFactory() {
103     String factoryClass = System.getProperty("spf4j.execContext.factoryClass");
104     ExecutionContextFactory<ExecutionContext> factory;
105     if (factoryClass == null) {
106       factory = new BasicExecutionContextFactory();
107     } else {
108       try {
109         factory = ((Class<ExecutionContextFactory<ExecutionContext>>) Class.forName(factoryClass))
110                 .getConstructor().newInstance();
111       } catch (ClassNotFoundException | InstantiationException | NoSuchMethodException
112               | InvocationTargetException | IllegalAccessException ex) {
113         throw new ExceptionInInitializerError(ex);
114       }
115     }
116     String factoryWrapperClass = System.getProperty("spf4j.execContext.factoryWrapperClass");
117     if (factoryWrapperClass != null) {
118       try {
119         factory = (ExecutionContextFactory<ExecutionContext>) Class.forName(factoryWrapperClass)
120                 .getConstructor(ExecutionContextFactory.class).newInstance(factory);
121       } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
122               | NoSuchMethodException | InvocationTargetException ex) {
123         throw new ExceptionInInitializerError(ex);
124       }
125     }
126     return factory;
127   }
128 
129   public static ThreadLocalContextAttacher defaultThreadLocalAttacher() {
130     return DEFAULT_TL_ATTACHER;
131   }
132 
133   public static ThreadLocalContextAttacher threadLocalAttacher() {
134     return TL_ATTACHER;
135   }
136 
137   public static CharSequence genId() {
138     return ID_GEN.next();
139   }
140 
141   public static ExecutionContextFactory<ExecutionContext> getContextFactory() {
142     return CTX_FACTORY;
143   }
144 
145   @Nullable
146   public static ExecutionContext current() {
147     return EXEC_CTX.get().peek();
148   }
149 
150   public static boolean inCurrentThread(final ExecutionContext ctx) {
151     return EXEC_CTX.get().contains(ctx);
152   }
153 
154   public static void clearCurrentThread() {
155      EXEC_CTX.get().clear();
156   }
157 
158   /**
159    * start a execution context.
160    *
161    * @param deadlineNanos the deadline for this context. (System.nanotime)
162    * @return the execution context.
163    */
164   public static ExecutionContext start(final long startTimeNanos, final long deadlineNanos) {
165     return start("anon", null, startTimeNanos, deadlineNanos);
166   }
167 
168   /**
169    * start a execution context.
170    *
171    * @param timeout
172    * @param tu
173    * @return
174    */
175   public static ExecutionContext start(final long timeout, final TimeUnit tu) {
176     return start("anon", current(), timeout, tu);
177   }
178 
179   public static ExecutionContext start(final String opname) {
180     return start(opname, current(), DEFAULT_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
181   }
182 
183   public static ExecutionContext start(final String opname, final long timeout, final TimeUnit tu) {
184     return start(opname, current(), timeout, tu);
185   }
186 
187   public static ExecutionContext start(@Nullable final ExecutionContext parent, final long timeout, final TimeUnit tu) {
188     return start("anon", parent, timeout, tu);
189   }
190 
191   public static ExecutionContext start(@Nullable final ExecutionContext parent) {
192     long nanoTime = TimeSource.nanoTime();
193     return start(parent, nanoTime, parent != null ? parent.getDeadlineNanos() : nanoTime + DEFAULT_TIMEOUT_NANOS);
194   }
195 
196   public static ExecutionContext start(@Nullable final ExecutionContext parent,
197           final long startTimeNanos, final long deadlineNanos) {
198     return start("anon", parent, startTimeNanos, deadlineNanos);
199   }
200 
201   public static ExecutionContext start(final String name, final long startTimeNanos, final long deadlineNanos) {
202     return start(name, current(), startTimeNanos, deadlineNanos);
203   }
204 
205   public static ExecutionContext start(final String name, final long deadlineNanos) {
206     return start(name, current(), TimeSource.nanoTime(), deadlineNanos);
207   }
208 
209   public static ExecutionContext start(final String name,
210           @Nullable final ExecutionContext parent) {
211     long nanoTime = TimeSource.nanoTime();
212     return start(name, parent, nanoTime, parent != null ? parent.getDeadlineNanos()
213             : nanoTime + DEFAULT_TIMEOUT_NANOS);
214   }
215 
216   public static ExecutionContext start(final String name,
217           @Nullable final ExecutionContext parent, final long timeout, final TimeUnit tu) {
218     return start(name, null, parent, timeout, tu);
219   }
220 
221   public static ExecutionContext start(final String name, @Nullable final CharSequence id,
222           @Nullable final ExecutionContext parent, final long timeout, final TimeUnit tu) {
223     long nanoTime = TimeSource.nanoTime();
224     return start(name, id, parent, nanoTime, computeDeadline(nanoTime, parent, tu, timeout));
225   }
226 
227   public static ExecutionContext createDetached(final String name,
228           @Nullable final ExecutionContext parent, final long timeout, final TimeUnit tu) {
229     long nanoTime = TimeSource.nanoTime();
230     return createDetached(name, parent, nanoTime, computeDeadline(nanoTime, parent, tu, timeout));
231   }
232 
233   public static ExecutionContext start(final String name,
234           @Nullable final ExecutionContext parent, final long deadlineNanos) {
235     return start(name, parent, TimeSource.nanoTime(), deadlineNanos);
236   }
237 
238   public static ExecutionContext start(final String name,
239           @Nullable final ExecutionContext parent, final long startTimeNanos, final long deadlineNanos) {
240     return start(name, null, parent, startTimeNanos, deadlineNanos);
241   }
242 
243   public static ExecutionContext start(final String name, @Nullable final CharSequence id,
244           @Nullable final ExecutionContext parent, final long startTimeNanos, final long deadlineNanos) {
245     return start(name, id, parent, Relation.CHILD_OF, startTimeNanos, deadlineNanos);
246   }
247 
248   public static ExecutionContext start(final String name, @Nullable final CharSequence id,
249           @Nullable final ExecutionContext parent, final Relation relation,
250           final long startTimeNanos, final long deadlineNanos) {
251     ExecutionContext nCtx = CTX_FACTORY.start(name, id, parent, relation,
252               startTimeNanos, deadlineNanos);
253     nCtx.attach();
254     return nCtx;
255   }
256 
257   public static ExecutionContext createDetached(final String name,
258           @Nullable final ExecutionContext parent, final long startTimeNanos, final long deadlineNanos) {
259     return createDetached(name, parent, Relation.CHILD_OF, startTimeNanos, deadlineNanos);
260   }
261 
262   public static ExecutionContext createDetached(final String name, @Nullable final ExecutionContext parent,
263           final Relation relation, final long startTimeNanos, final long deadlineNanos) {
264     return createDetached(name, null, parent, relation, startTimeNanos, deadlineNanos);
265   }
266 
267   public static ExecutionContext createDetached(final String name,
268           @Nullable final CharSequence id, @Nullable final ExecutionContext parent,
269           final Relation relation, final long startTimeNanos, final long deadlineNanos) {
270     return CTX_FACTORY.start(name, id, parent, relation, startTimeNanos, deadlineNanos);
271   }
272 
273   public static long getContextDeadlineNanos() {
274     ExecutionContext ec = ExecutionContexts.current();
275     if (ec == null) {
276       return TimeSource.nanoTime() + DEFAULT_TIMEOUT_NANOS;
277     } else {
278       return ec.getDeadlineNanos();
279     }
280   }
281 
282   public static long getContextDeadlineNanos(final long currentTime) {
283     ExecutionContext ec = ExecutionContexts.current();
284     if (ec == null) {
285       return currentTime + DEFAULT_TIMEOUT_NANOS;
286     } else {
287       return ec.getDeadlineNanos();
288     }
289   }
290 
291   @Signed
292   public static long getTimeRelativeToDeadline(final TimeUnit unit) {
293     return unit.convert(getContextDeadlineNanos() - TimeSource.nanoTime(), TimeUnit.NANOSECONDS);
294   }
295 
296   @Nonnegative
297   public static long getTimeToDeadline(final TimeUnit unit) throws TimeoutException {
298     long timeRelativeToDeadline = getTimeRelativeToDeadline(unit);
299     if (timeRelativeToDeadline <= 0) {
300       throw new TimeoutException("Deadline exceeded by " + (-timeRelativeToDeadline) + ' ' + unit);
301     }
302     return timeRelativeToDeadline;
303   }
304 
305   @Nonnegative
306   public static long getTimeToDeadlineUnchecked(final TimeUnit unit) {
307     long timeRelativeToDeadline = getTimeRelativeToDeadline(unit);
308     if (timeRelativeToDeadline <= 0) {
309       throw new UncheckedTimeoutException("Deadline exceeded by " + (-timeRelativeToDeadline) + ' ' + unit);
310     }
311     return timeRelativeToDeadline;
312   }
313 
314   @Nonnegative
315   public static int getTimeToDeadlineInt(final TimeUnit unit) throws TimeoutException {
316     long timeRelativeToDeadline = getTimeToDeadline(unit);
317     if (timeRelativeToDeadline > Integer.MAX_VALUE) {
318       return Integer.MAX_VALUE;
319     } else {
320       return (int) timeRelativeToDeadline;
321     }
322   }
323 
324   @Nonnegative
325   public static long getMillisToDeadline() throws TimeoutException {
326     return getTimeToDeadline(TimeUnit.MILLISECONDS);
327   }
328 
329   @Nonnegative
330   public static int getSecondsToDeadline() throws TimeoutException {
331     long secondsToDeadline = getTimeToDeadline(TimeUnit.SECONDS);
332     if (secondsToDeadline > Integer.MAX_VALUE) {
333       return Integer.MAX_VALUE;
334     } else {
335       return (int) secondsToDeadline;
336     }
337   }
338 
339   public static long computeDeadline(final long timeout, final TimeUnit unit) {
340     return computeDeadline(current(), timeout, unit);
341   }
342 
343   public static long computeTimeout(final long timeout, final TimeUnit unit) throws TimeoutException {
344     return unit.convert(computeTimeoutDeadline(current(), unit, timeout).getTimeoutNanos(), TimeUnit.NANOSECONDS);
345   }
346 
347   /**
348    * @deprecated use variant where the value and unit are it the natural order.
349    */
350   @Deprecated
351   public static long computeDeadline(@Nullable final ExecutionContext current,
352            final TimeUnit unit, final long timeout) {
353     return computeDeadline(current, timeout, unit);
354   }
355 
356   public static long computeDeadline(@Nullable final ExecutionContext current,
357           final long timeout, final TimeUnit unit) {
358     if (current == null) {
359       return TimeSource.getDeadlineNanos(timeout, unit);
360     }
361     long nanoTime = TimeSource.nanoTime();
362     long ctxDeadlinenanos = current.getDeadlineNanos();
363     long timeoutNanos = unit.toNanos(timeout);
364     return (ctxDeadlinenanos - nanoTime < timeoutNanos) ? ctxDeadlinenanos : nanoTime + timeoutNanos;
365   }
366 
367   public static long computeDeadline(final long startTimeNanos, @Nullable final ExecutionContext current,
368           final TimeUnit unit, final long timeout) {
369     if (current == null) {
370       return TimeSource.getDeadlineNanos(startTimeNanos, timeout, unit);
371     }
372     long ctxDeadlinenanos = current.getDeadlineNanos();
373     long timeoutNanos = unit.toNanos(timeout);
374     return (ctxDeadlinenanos - startTimeNanos < timeoutNanos) ? ctxDeadlinenanos : startTimeNanos + timeoutNanos;
375   }
376 
377   /**
378    * Compute the actual timeout taking in consideration the context deadline.
379    * @param current the context
380    * @param unit timeout unit
381    * @param timeout timeout value
382    * @return the earliest timeout (of the provided and context one)
383    * @throws TimeoutException
384    */
385   public static TimeoutDeadline computeTimeoutDeadline(@Nullable final ExecutionContext current,
386           final TimeUnit unit, final long timeout) throws TimeoutException {
387     if (current == null) {
388       return TimeoutDeadline.of(unit.toNanos(timeout), TimeSource.getDeadlineNanos(timeout, unit));
389     }
390     long nanoTime = TimeSource.nanoTime();
391     long ctxDeadlinenanos = current.getDeadlineNanos();
392     long timeoutNanos = unit.toNanos(timeout);
393     long contextTimeoutNanos = ctxDeadlinenanos - nanoTime;
394     return (contextTimeoutNanos < timeoutNanos)
395             ? TimeoutDeadline.of(contextTimeoutNanos, ctxDeadlinenanos)
396             : TimeoutDeadline.of(timeoutNanos, nanoTime + timeoutNanos);
397   }
398 
399   private static class BasicExecutionContextFactory implements ExecutionContextFactory<ExecutionContext> {
400 
401     @Override
402     public ExecutionContext start(final String name, @Nullable final CharSequence id,
403             @Nullable final ExecutionContext parent, final Relation relation,
404             final long startTimeNanos, final long deadlineNanos) {
405       return new BasicExecutionContext(name, id, parent, relation, startTimeNanos, deadlineNanos);
406     }
407 
408   }
409 
410   public static <T> Callable<T> propagatingCallable(final Callable<T> callable) {
411     ExecutionContext current = current();
412     return current == null ? callable : propagatingCallable(callable, current);
413   }
414 
415   public static <T> Callable<T> propagatingCallable(final Callable<T> callable, final ExecutionContext ctx) {
416     return new PropagatingCallable<T>(callable, ctx);
417   }
418 
419   public static <T> Collection<? extends Callable<T>> propagatingCallables(
420           final Collection<? extends Callable<T>> tasks) {
421     ExecutionContext current = current();
422     return current == null ? tasks : propagatingCallables(tasks, current);
423   }
424 
425   public static <T> Collection<? extends Callable<T>> propagatingCallables(
426           final Collection<? extends Callable<T>> tasks,
427           final ExecutionContext ctx) {
428     return tasks.stream().map(
429             (c) -> new PropagatingCallable<>(c, ctx))
430             .collect(Collectors.toCollection(() -> new ArrayList<>(tasks.size())));
431   }
432 
433   public static <T> Collection<? extends Callable<T>> deadlinedPropagatingCallables(
434           final Collection<? extends Callable<T>> tasks,
435           final ExecutionContext ctx, final long deadlineNanos) {
436     return tasks.stream().map(
437             (c) -> new PropagatingNamedCallable<>(c, ctx, null, deadlineNanos))
438             .collect(Collectors.toCollection(() -> new ArrayList<>(tasks.size())));
439   }
440 
441   public static <T> Callable<T> deadlinedPropagatingCallable(final Callable<T> callable,
442           final ExecutionContext ctx, final long deadlineNanos) {
443     return new PropagatingNamedCallable<T>(callable, ctx, null, deadlineNanos);
444   }
445 
446   public static Runnable propagatingRunnable(final Runnable runnable) {
447     ExecutionContext current = current();
448     return current == null ? runnable : propagatingRunnable(runnable, current);
449   }
450 
451   public static Runnable propagatingRunnable(final Runnable runnable, final ExecutionContext ctx) {
452     return new PropagatingRunnable(runnable, ctx, null, ctx.getDeadlineNanos());
453   }
454 
455   public static Runnable propagatingRunnable(final Runnable runnable, final ExecutionContext ctx,
456           @Nullable final String name, final long deadlineNanos) {
457     return new PropagatingRunnable(runnable, ctx, name, deadlineNanos);
458   }
459 
460   private static final class PropagatingCallable<T> implements Callable<T> {
461 
462     private final Callable<T> task;
463     private final ExecutionContext current;
464 
465     PropagatingCallable(final Callable<T> task, final ExecutionContext current) {
466       this.task = task;
467       this.current = current;
468     }
469 
470     @Override
471     public T call() throws Exception {
472       try (ExecutionContext ctx = current.startChild(task.toString())) {
473         return task.call();
474       }
475     }
476   }
477 
478   public static <T> Callable<T> propagatingCallable(final Callable<T> callable, final ExecutionContext ctx,
479           @Nullable final String name, final long deadlineNanos) {
480     return new PropagatingNamedCallable<T>(callable, ctx, name, deadlineNanos);
481   }
482 
483 
484   private static final class PropagatingNamedCallable<T> implements Callable<T>, Wrapper<Callable<T>> {
485 
486     private final Callable<T> task;
487     private final ExecutionContext current;
488 
489     private final String name;
490 
491     private final long deadlineNanos;
492 
493     PropagatingNamedCallable(final Callable<T> task, final ExecutionContext current,
494             @Nullable final String name, final long deadlineNanos) {
495       this.task = task;
496       this.current = current;
497       this.name = name;
498       this.deadlineNanos = deadlineNanos;
499     }
500 
501     @Override
502     public T call() throws Exception {
503       try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
504         return task.call();
505       }
506     }
507 
508     @Override
509     public String toString() {
510       return  name == null ? task.toString() : name;
511     }
512 
513     @Override
514     public Callable<T> getWrapped() {
515       return task;
516     }
517 
518   }
519 
520   public static <X, Y> Function<X, Y> propagatingFunction(final Function<X, Y> callable, final ExecutionContext ctx,
521           @Nullable final String name, final long deadlineNanos) {
522     return new PropagatingFunction<X, Y>(callable, ctx, name, deadlineNanos);
523   }
524 
525   private static final class PropagatingFunction<X, Y> implements Function<X, Y>, Wrapper<Function<X, Y>> {
526 
527     private final Function<X, Y> function;
528     private final ExecutionContext current;
529 
530     private final String name;
531 
532     private final long deadlineNanos;
533 
534     PropagatingFunction(final Function<X, Y> task, final ExecutionContext current,
535             final String name, final long deadlineNanos) {
536       this.function = task;
537       this.current = current;
538       this.name = name;
539       this.deadlineNanos = deadlineNanos;
540     }
541 
542     @Override
543     public Y apply(final X in) {
544       try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
545         return function.apply(in);
546       }
547     }
548 
549     @Override
550     public String toString() {
551       return name == null ? function.toString() : name;
552     }
553 
554     @Override
555     public Function<X, Y> getWrapped() {
556       return function;
557     }
558 
559   }
560 
561   public static <X, Y, Z> BiFunction<X, Y, Z> propagatingBiFunction(final BiFunction<X, Y, Z> callable,
562           final ExecutionContext ctx,
563           @Nullable final String name, final long deadlineNanos) {
564     return new PropagatingBiFunction<X, Y, Z>(callable, ctx, name, deadlineNanos);
565   }
566 
567  private static final class PropagatingBiFunction<X, Y, Z>
568          implements BiFunction<X, Y, Z>, Wrapper<BiFunction<X, Y, Z>> {
569 
570     private final BiFunction<X, Y, Z> function;
571     private final ExecutionContext current;
572 
573     private final String name;
574 
575     private final long deadlineNanos;
576 
577     PropagatingBiFunction(final BiFunction<X, Y, Z> task, final ExecutionContext current,
578             final String name, final long deadlineNanos) {
579       this.function = task;
580       this.current = current;
581       this.name = name;
582       this.deadlineNanos = deadlineNanos;
583     }
584 
585     @Override
586     public Z apply(final X x, final Y y) {
587       try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
588         return function.apply(x, y);
589       }
590     }
591 
592     @Override
593     public String toString() {
594       return name == null ? function.toString() : name;
595     }
596 
597     @Override
598     public BiFunction<X, Y, Z> getWrapped() {
599       return function;
600     }
601 
602   }
603 
604 
605   public static <X> Consumer<X> propagatingConsumer(final Consumer<X> callable, final ExecutionContext ctx,
606           @Nullable final String name, final long deadlineNanos) {
607     return new PropagatingConsumer<X>(callable, ctx, name, deadlineNanos);
608   }
609 
610   private static final class PropagatingConsumer<X> implements Consumer<X>, Wrapper<Consumer<X>> {
611 
612     private final Consumer<X> function;
613     private final ExecutionContext current;
614 
615     private final String name;
616 
617     private final long deadlineNanos;
618 
619     PropagatingConsumer(final Consumer<X> task, final ExecutionContext current,
620             final String name, final long deadlineNanos) {
621       this.function = task;
622       this.current = current;
623       this.name = name;
624       this.deadlineNanos = deadlineNanos;
625     }
626 
627     @Override
628     public void accept(final X in) {
629       try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
630         function.accept(in);
631       }
632     }
633 
634     @Override
635     public String toString() {
636       return name == null ? function.toString() : name;
637     }
638 
639     @Override
640     public Consumer<X> getWrapped() {
641       return function;
642     }
643 
644   }
645 
646   public static <X> Supplier<X> propagatingSupplier(final Supplier<X> callable, final ExecutionContext ctx,
647           @Nullable final String name, final long deadlineNanos) {
648     return new PropagatingSupplier<X>(callable, ctx, name, deadlineNanos);
649   }
650 
651 private static final class PropagatingSupplier<X> implements Supplier<X>, Wrapper<Supplier<X>> {
652 
653     private final Supplier<X> function;
654     private final ExecutionContext current;
655 
656     private final String name;
657 
658     private final long deadlineNanos;
659 
660     PropagatingSupplier(final Supplier<X> task, final ExecutionContext current,
661             final String name, final long deadlineNanos) {
662       this.function = task;
663       this.current = current;
664       this.name = name;
665       this.deadlineNanos = deadlineNanos;
666     }
667 
668     @Override
669     public X get() {
670       try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
671         return function.get();
672       }
673     }
674 
675     @Override
676     public String toString() {
677       return name == null ? function.toString() : name;
678     }
679 
680     @Override
681     public Supplier<X> getWrapped() {
682       return function;
683     }
684 
685   }
686 
687 
688   public static <X, Y> BiConsumer<X, Y> propagatingBiConsumer(final BiConsumer<X, Y> callable,
689           final ExecutionContext ctx,
690           @Nullable final String name, final long deadlineNanos) {
691     return new PropagatingBiConsumer<>(callable, ctx, name, deadlineNanos);
692   }
693 
694   private static final class PropagatingBiConsumer<X, Y> implements BiConsumer<X, Y>, Wrapper<BiConsumer<X, Y>> {
695 
696     private final BiConsumer<X, Y> function;
697     private final ExecutionContext current;
698 
699     private final String name;
700 
701     private final long deadlineNanos;
702 
703     PropagatingBiConsumer(final BiConsumer<X, Y> task, final ExecutionContext current,
704             final String name, final long deadlineNanos) {
705       this.function = task;
706       this.current = current;
707       this.name = name;
708       this.deadlineNanos = deadlineNanos;
709     }
710 
711     @Override
712     public void accept(final X x, final Y y) {
713       try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
714         function.accept(x, y);
715       }
716     }
717 
718     @Override
719     public String toString() {
720       return name == null ? function.toString() : name;
721     }
722 
723     @Override
724     public BiConsumer<X, Y> getWrapped() {
725       return function;
726     }
727 
728   }
729 
730 
731 
732   private static final class PropagatingRunnable implements Runnable, Wrapper<Runnable> {
733 
734     private final Runnable task;
735     private final ExecutionContext current;
736     private final String name;
737     private final long deadlineNanos;
738 
739     PropagatingRunnable(final Runnable task, final ExecutionContext current, final String name,
740             final long deadlineNanos) {
741       this.task = task;
742       this.current = current;
743       this.name = name;
744       this.deadlineNanos = deadlineNanos;
745     }
746 
747     @Override
748     public void run() {
749       try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
750         task.run();
751       }
752     }
753 
754     @Override
755     public Runnable getWrapped() {
756       return task;
757     }
758 
759     @Override
760     public String toString() {
761      return name == null ? task.toString() : name;
762     }
763   }
764 
765   private static class ThreadLocalContextAttacherImpl implements ThreadLocalContextAttacher {
766 
767     @Override
768     public Attached attach(final ExecutionContext ctx) {
769       final Thread currentThread = Thread.currentThread();
770       SimpleStack<ExecutionContext> contextStack = ExecutionContexts.EXEC_CTX.get();
771       int stackPtr = contextStack.pushAndGetIdx(ctx);
772       return new AttachedImpl(currentThread, contextStack, ctx, stackPtr);
773     }
774 
775     private static class AttachedImpl implements Attached {
776 
777       private final Thread thread;
778       private final SimpleStack<ExecutionContext> contextStack;
779       private final ExecutionContext ctx;
780       private final int stackPtr;
781 
782       AttachedImpl(final Thread currentThread,
783               final SimpleStack<ExecutionContext> contextStack,
784               final ExecutionContext ctx, final int stackPtr) {
785         this.thread = currentThread;
786         this.contextStack = contextStack;
787         this.ctx = ctx;
788         this.stackPtr = stackPtr;
789       }
790 
791       @Override
792       public void detach() {
793         Thread now = Thread.currentThread();
794         if (now != thread) {
795           throw  new IllegalStateException("Detaching in different thread " + thread + " != " + now);
796         }
797         ExecutionContext pop = contextStack.pop();
798         if (pop != ctx) {
799           contextStack.push(pop);
800           throw new IllegalStateException("Detaching ctx that is not attached " + ctx + ", found: " + pop);
801         }
802       }
803 
804       public boolean isTopOfStack() {
805         return stackPtr == 0;
806       }
807 
808       public Thread attachedThread() {
809         return thread;
810       }
811 
812     }
813   }
814 
815 }