1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.base;
33
34 import java.lang.reflect.InvocationTargetException;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.function.BiConsumer;
41 import java.util.function.BiFunction;
42 import java.util.function.Consumer;
43 import java.util.function.Function;
44 import java.util.function.Supplier;
45 import java.util.stream.Collectors;
46 import javax.annotation.Nonnegative;
47 import javax.annotation.Nullable;
48 import javax.annotation.ParametersAreNonnullByDefault;
49 import javax.annotation.Signed;
50 import javax.annotation.concurrent.ThreadSafe;
51 import org.spf4j.base.ExecutionContext.Relation;
52 import org.spf4j.concurrent.ScalableSequence;
53 import org.spf4j.concurrent.UIDGenerator;
54 import org.spf4j.ds.SimpleStack;
55
56
57
58
59 @ThreadSafe
60 @ParametersAreNonnullByDefault
61 public final class ExecutionContexts {
62
63 private static final UIDGenerator ID_GEN = new UIDGenerator(new ScalableSequence(0, 10), "X", 1544368928196L);
64
65 public static final long DEFAULT_TIMEOUT_NANOS
66 = Long.getLong("spf4j.execContext.defaultTimeoutNanos", TimeUnit.HOURS.toNanos(8));
67
68 private static final ThreadLocal<SimpleStack<ExecutionContext>> EXEC_CTX =
69 new ThreadLocal<SimpleStack<ExecutionContext>>() {
70 @Override
71 protected SimpleStack<ExecutionContext> initialValue() {
72 return new SimpleStack<>(4);
73 }
74
75 };
76
77 private static final ThreadLocalContextAttacher DEFAULT_TL_ATTACHER = new ThreadLocalContextAttacherImpl();
78
79 private static final ExecutionContextFactory<ExecutionContext> CTX_FACTORY = initFactory();
80
81 private static final ThreadLocalContextAttacher TL_ATTACHER = initTLAttacher();
82
83 private ExecutionContexts() {
84 }
85
86 private static ThreadLocalContextAttacher initTLAttacher() {
87 String factoryClass = System.getProperty("spf4j.execContext.tlAttacherClass");
88 ThreadLocalContextAttacher factory;
89 if (factoryClass == null) {
90 factory = DEFAULT_TL_ATTACHER;
91 } else {
92 try {
93 factory = ((Class<ThreadLocalContextAttacher>) Class.forName(factoryClass)).getConstructor().newInstance();
94 } catch (ClassNotFoundException | InstantiationException | NoSuchMethodException
95 | InvocationTargetException | IllegalAccessException ex) {
96 throw new ExceptionInInitializerError(ex);
97 }
98 }
99 return factory;
100 }
101
102 private static ExecutionContextFactory<ExecutionContext> initFactory() {
103 String factoryClass = System.getProperty("spf4j.execContext.factoryClass");
104 ExecutionContextFactory<ExecutionContext> factory;
105 if (factoryClass == null) {
106 factory = new BasicExecutionContextFactory();
107 } else {
108 try {
109 factory = ((Class<ExecutionContextFactory<ExecutionContext>>) Class.forName(factoryClass))
110 .getConstructor().newInstance();
111 } catch (ClassNotFoundException | InstantiationException | NoSuchMethodException
112 | InvocationTargetException | IllegalAccessException ex) {
113 throw new ExceptionInInitializerError(ex);
114 }
115 }
116 String factoryWrapperClass = System.getProperty("spf4j.execContext.factoryWrapperClass");
117 if (factoryWrapperClass != null) {
118 try {
119 factory = (ExecutionContextFactory<ExecutionContext>) Class.forName(factoryWrapperClass)
120 .getConstructor(ExecutionContextFactory.class).newInstance(factory);
121 } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
122 | NoSuchMethodException | InvocationTargetException ex) {
123 throw new ExceptionInInitializerError(ex);
124 }
125 }
126 return factory;
127 }
128
129 public static ThreadLocalContextAttacher defaultThreadLocalAttacher() {
130 return DEFAULT_TL_ATTACHER;
131 }
132
133 public static ThreadLocalContextAttacher threadLocalAttacher() {
134 return TL_ATTACHER;
135 }
136
137 public static CharSequence genId() {
138 return ID_GEN.next();
139 }
140
141 public static ExecutionContextFactory<ExecutionContext> getContextFactory() {
142 return CTX_FACTORY;
143 }
144
145 @Nullable
146 public static ExecutionContext current() {
147 return EXEC_CTX.get().peek();
148 }
149
150 public static boolean inCurrentThread(final ExecutionContext ctx) {
151 return EXEC_CTX.get().contains(ctx);
152 }
153
154 public static void clearCurrentThread() {
155 EXEC_CTX.get().clear();
156 }
157
158
159
160
161
162
163
164 public static ExecutionContext start(final long startTimeNanos, final long deadlineNanos) {
165 return start("anon", null, startTimeNanos, deadlineNanos);
166 }
167
168
169
170
171
172
173
174
175 public static ExecutionContext start(final long timeout, final TimeUnit tu) {
176 return start("anon", current(), timeout, tu);
177 }
178
179 public static ExecutionContext start(final String opname) {
180 return start(opname, current(), DEFAULT_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
181 }
182
183 public static ExecutionContext start(final String opname, final long timeout, final TimeUnit tu) {
184 return start(opname, current(), timeout, tu);
185 }
186
187 public static ExecutionContext start(@Nullable final ExecutionContext parent, final long timeout, final TimeUnit tu) {
188 return start("anon", parent, timeout, tu);
189 }
190
191 public static ExecutionContext start(@Nullable final ExecutionContext parent) {
192 long nanoTime = TimeSource.nanoTime();
193 return start(parent, nanoTime, parent != null ? parent.getDeadlineNanos() : nanoTime + DEFAULT_TIMEOUT_NANOS);
194 }
195
196 public static ExecutionContext start(@Nullable final ExecutionContext parent,
197 final long startTimeNanos, final long deadlineNanos) {
198 return start("anon", parent, startTimeNanos, deadlineNanos);
199 }
200
201 public static ExecutionContext start(final String name, final long startTimeNanos, final long deadlineNanos) {
202 return start(name, current(), startTimeNanos, deadlineNanos);
203 }
204
205 public static ExecutionContext start(final String name, final long deadlineNanos) {
206 return start(name, current(), TimeSource.nanoTime(), deadlineNanos);
207 }
208
209 public static ExecutionContext start(final String name,
210 @Nullable final ExecutionContext parent) {
211 long nanoTime = TimeSource.nanoTime();
212 return start(name, parent, nanoTime, parent != null ? parent.getDeadlineNanos()
213 : nanoTime + DEFAULT_TIMEOUT_NANOS);
214 }
215
216 public static ExecutionContext start(final String name,
217 @Nullable final ExecutionContext parent, final long timeout, final TimeUnit tu) {
218 return start(name, null, parent, timeout, tu);
219 }
220
221 public static ExecutionContext start(final String name, @Nullable final CharSequence id,
222 @Nullable final ExecutionContext parent, final long timeout, final TimeUnit tu) {
223 long nanoTime = TimeSource.nanoTime();
224 return start(name, id, parent, nanoTime, computeDeadline(nanoTime, parent, tu, timeout));
225 }
226
227 public static ExecutionContext createDetached(final String name,
228 @Nullable final ExecutionContext parent, final long timeout, final TimeUnit tu) {
229 long nanoTime = TimeSource.nanoTime();
230 return createDetached(name, parent, nanoTime, computeDeadline(nanoTime, parent, tu, timeout));
231 }
232
233 public static ExecutionContext start(final String name,
234 @Nullable final ExecutionContext parent, final long deadlineNanos) {
235 return start(name, parent, TimeSource.nanoTime(), deadlineNanos);
236 }
237
238 public static ExecutionContext start(final String name,
239 @Nullable final ExecutionContext parent, final long startTimeNanos, final long deadlineNanos) {
240 return start(name, null, parent, startTimeNanos, deadlineNanos);
241 }
242
243 public static ExecutionContext start(final String name, @Nullable final CharSequence id,
244 @Nullable final ExecutionContext parent, final long startTimeNanos, final long deadlineNanos) {
245 return start(name, id, parent, Relation.CHILD_OF, startTimeNanos, deadlineNanos);
246 }
247
248 public static ExecutionContext start(final String name, @Nullable final CharSequence id,
249 @Nullable final ExecutionContext parent, final Relation relation,
250 final long startTimeNanos, final long deadlineNanos) {
251 ExecutionContext nCtx = CTX_FACTORY.start(name, id, parent, relation,
252 startTimeNanos, deadlineNanos);
253 nCtx.attach();
254 return nCtx;
255 }
256
257 public static ExecutionContext createDetached(final String name,
258 @Nullable final ExecutionContext parent, final long startTimeNanos, final long deadlineNanos) {
259 return createDetached(name, parent, Relation.CHILD_OF, startTimeNanos, deadlineNanos);
260 }
261
262 public static ExecutionContext createDetached(final String name, @Nullable final ExecutionContext parent,
263 final Relation relation, final long startTimeNanos, final long deadlineNanos) {
264 return createDetached(name, null, parent, relation, startTimeNanos, deadlineNanos);
265 }
266
267 public static ExecutionContext createDetached(final String name,
268 @Nullable final CharSequence id, @Nullable final ExecutionContext parent,
269 final Relation relation, final long startTimeNanos, final long deadlineNanos) {
270 return CTX_FACTORY.start(name, id, parent, relation, startTimeNanos, deadlineNanos);
271 }
272
273 public static long getContextDeadlineNanos() {
274 ExecutionContext ec = ExecutionContexts.current();
275 if (ec == null) {
276 return TimeSource.nanoTime() + DEFAULT_TIMEOUT_NANOS;
277 } else {
278 return ec.getDeadlineNanos();
279 }
280 }
281
282 public static long getContextDeadlineNanos(final long currentTime) {
283 ExecutionContext ec = ExecutionContexts.current();
284 if (ec == null) {
285 return currentTime + DEFAULT_TIMEOUT_NANOS;
286 } else {
287 return ec.getDeadlineNanos();
288 }
289 }
290
291 @Signed
292 public static long getTimeRelativeToDeadline(final TimeUnit unit) {
293 return unit.convert(getContextDeadlineNanos() - TimeSource.nanoTime(), TimeUnit.NANOSECONDS);
294 }
295
296 @Nonnegative
297 public static long getTimeToDeadline(final TimeUnit unit) throws TimeoutException {
298 long timeRelativeToDeadline = getTimeRelativeToDeadline(unit);
299 if (timeRelativeToDeadline <= 0) {
300 throw new TimeoutException("Deadline exceeded by " + (-timeRelativeToDeadline) + ' ' + unit);
301 }
302 return timeRelativeToDeadline;
303 }
304
305 @Nonnegative
306 public static long getTimeToDeadlineUnchecked(final TimeUnit unit) {
307 long timeRelativeToDeadline = getTimeRelativeToDeadline(unit);
308 if (timeRelativeToDeadline <= 0) {
309 throw new UncheckedTimeoutException("Deadline exceeded by " + (-timeRelativeToDeadline) + ' ' + unit);
310 }
311 return timeRelativeToDeadline;
312 }
313
314 @Nonnegative
315 public static int getTimeToDeadlineInt(final TimeUnit unit) throws TimeoutException {
316 long timeRelativeToDeadline = getTimeToDeadline(unit);
317 if (timeRelativeToDeadline > Integer.MAX_VALUE) {
318 return Integer.MAX_VALUE;
319 } else {
320 return (int) timeRelativeToDeadline;
321 }
322 }
323
324 @Nonnegative
325 public static long getMillisToDeadline() throws TimeoutException {
326 return getTimeToDeadline(TimeUnit.MILLISECONDS);
327 }
328
329 @Nonnegative
330 public static int getSecondsToDeadline() throws TimeoutException {
331 long secondsToDeadline = getTimeToDeadline(TimeUnit.SECONDS);
332 if (secondsToDeadline > Integer.MAX_VALUE) {
333 return Integer.MAX_VALUE;
334 } else {
335 return (int) secondsToDeadline;
336 }
337 }
338
339 public static long computeDeadline(final long timeout, final TimeUnit unit) {
340 return computeDeadline(current(), timeout, unit);
341 }
342
343 public static long computeTimeout(final long timeout, final TimeUnit unit) throws TimeoutException {
344 return unit.convert(computeTimeoutDeadline(current(), unit, timeout).getTimeoutNanos(), TimeUnit.NANOSECONDS);
345 }
346
347
348
349
350 @Deprecated
351 public static long computeDeadline(@Nullable final ExecutionContext current,
352 final TimeUnit unit, final long timeout) {
353 return computeDeadline(current, timeout, unit);
354 }
355
356 public static long computeDeadline(@Nullable final ExecutionContext current,
357 final long timeout, final TimeUnit unit) {
358 if (current == null) {
359 return TimeSource.getDeadlineNanos(timeout, unit);
360 }
361 long nanoTime = TimeSource.nanoTime();
362 long ctxDeadlinenanos = current.getDeadlineNanos();
363 long timeoutNanos = unit.toNanos(timeout);
364 return (ctxDeadlinenanos - nanoTime < timeoutNanos) ? ctxDeadlinenanos : nanoTime + timeoutNanos;
365 }
366
367 public static long computeDeadline(final long startTimeNanos, @Nullable final ExecutionContext current,
368 final TimeUnit unit, final long timeout) {
369 if (current == null) {
370 return TimeSource.getDeadlineNanos(startTimeNanos, timeout, unit);
371 }
372 long ctxDeadlinenanos = current.getDeadlineNanos();
373 long timeoutNanos = unit.toNanos(timeout);
374 return (ctxDeadlinenanos - startTimeNanos < timeoutNanos) ? ctxDeadlinenanos : startTimeNanos + timeoutNanos;
375 }
376
377
378
379
380
381
382
383
384
385 public static TimeoutDeadline computeTimeoutDeadline(@Nullable final ExecutionContext current,
386 final TimeUnit unit, final long timeout) throws TimeoutException {
387 if (current == null) {
388 return TimeoutDeadline.of(unit.toNanos(timeout), TimeSource.getDeadlineNanos(timeout, unit));
389 }
390 long nanoTime = TimeSource.nanoTime();
391 long ctxDeadlinenanos = current.getDeadlineNanos();
392 long timeoutNanos = unit.toNanos(timeout);
393 long contextTimeoutNanos = ctxDeadlinenanos - nanoTime;
394 return (contextTimeoutNanos < timeoutNanos)
395 ? TimeoutDeadline.of(contextTimeoutNanos, ctxDeadlinenanos)
396 : TimeoutDeadline.of(timeoutNanos, nanoTime + timeoutNanos);
397 }
398
399 private static class BasicExecutionContextFactory implements ExecutionContextFactory<ExecutionContext> {
400
401 @Override
402 public ExecutionContext start(final String name, @Nullable final CharSequence id,
403 @Nullable final ExecutionContext parent, final Relation relation,
404 final long startTimeNanos, final long deadlineNanos) {
405 return new BasicExecutionContext(name, id, parent, relation, startTimeNanos, deadlineNanos);
406 }
407
408 }
409
410 public static <T> Callable<T> propagatingCallable(final Callable<T> callable) {
411 ExecutionContext current = current();
412 return current == null ? callable : propagatingCallable(callable, current);
413 }
414
415 public static <T> Callable<T> propagatingCallable(final Callable<T> callable, final ExecutionContext ctx) {
416 return new PropagatingCallable<T>(callable, ctx);
417 }
418
419 public static <T> Collection<? extends Callable<T>> propagatingCallables(
420 final Collection<? extends Callable<T>> tasks) {
421 ExecutionContext current = current();
422 return current == null ? tasks : propagatingCallables(tasks, current);
423 }
424
425 public static <T> Collection<? extends Callable<T>> propagatingCallables(
426 final Collection<? extends Callable<T>> tasks,
427 final ExecutionContext ctx) {
428 return tasks.stream().map(
429 (c) -> new PropagatingCallable<>(c, ctx))
430 .collect(Collectors.toCollection(() -> new ArrayList<>(tasks.size())));
431 }
432
433 public static <T> Collection<? extends Callable<T>> deadlinedPropagatingCallables(
434 final Collection<? extends Callable<T>> tasks,
435 final ExecutionContext ctx, final long deadlineNanos) {
436 return tasks.stream().map(
437 (c) -> new PropagatingNamedCallable<>(c, ctx, null, deadlineNanos))
438 .collect(Collectors.toCollection(() -> new ArrayList<>(tasks.size())));
439 }
440
441 public static <T> Callable<T> deadlinedPropagatingCallable(final Callable<T> callable,
442 final ExecutionContext ctx, final long deadlineNanos) {
443 return new PropagatingNamedCallable<T>(callable, ctx, null, deadlineNanos);
444 }
445
446 public static Runnable propagatingRunnable(final Runnable runnable) {
447 ExecutionContext current = current();
448 return current == null ? runnable : propagatingRunnable(runnable, current);
449 }
450
451 public static Runnable propagatingRunnable(final Runnable runnable, final ExecutionContext ctx) {
452 return new PropagatingRunnable(runnable, ctx, null, ctx.getDeadlineNanos());
453 }
454
455 public static Runnable propagatingRunnable(final Runnable runnable, final ExecutionContext ctx,
456 @Nullable final String name, final long deadlineNanos) {
457 return new PropagatingRunnable(runnable, ctx, name, deadlineNanos);
458 }
459
460 private static final class PropagatingCallable<T> implements Callable<T> {
461
462 private final Callable<T> task;
463 private final ExecutionContext current;
464
465 PropagatingCallable(final Callable<T> task, final ExecutionContext current) {
466 this.task = task;
467 this.current = current;
468 }
469
470 @Override
471 public T call() throws Exception {
472 try (ExecutionContext ctx = current.startChild(task.toString())) {
473 return task.call();
474 }
475 }
476 }
477
478 public static <T> Callable<T> propagatingCallable(final Callable<T> callable, final ExecutionContext ctx,
479 @Nullable final String name, final long deadlineNanos) {
480 return new PropagatingNamedCallable<T>(callable, ctx, name, deadlineNanos);
481 }
482
483
484 private static final class PropagatingNamedCallable<T> implements Callable<T>, Wrapper<Callable<T>> {
485
486 private final Callable<T> task;
487 private final ExecutionContext current;
488
489 private final String name;
490
491 private final long deadlineNanos;
492
493 PropagatingNamedCallable(final Callable<T> task, final ExecutionContext current,
494 @Nullable final String name, final long deadlineNanos) {
495 this.task = task;
496 this.current = current;
497 this.name = name;
498 this.deadlineNanos = deadlineNanos;
499 }
500
501 @Override
502 public T call() throws Exception {
503 try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
504 return task.call();
505 }
506 }
507
508 @Override
509 public String toString() {
510 return name == null ? task.toString() : name;
511 }
512
513 @Override
514 public Callable<T> getWrapped() {
515 return task;
516 }
517
518 }
519
520 public static <X, Y> Function<X, Y> propagatingFunction(final Function<X, Y> callable, final ExecutionContext ctx,
521 @Nullable final String name, final long deadlineNanos) {
522 return new PropagatingFunction<X, Y>(callable, ctx, name, deadlineNanos);
523 }
524
525 private static final class PropagatingFunction<X, Y> implements Function<X, Y>, Wrapper<Function<X, Y>> {
526
527 private final Function<X, Y> function;
528 private final ExecutionContext current;
529
530 private final String name;
531
532 private final long deadlineNanos;
533
534 PropagatingFunction(final Function<X, Y> task, final ExecutionContext current,
535 final String name, final long deadlineNanos) {
536 this.function = task;
537 this.current = current;
538 this.name = name;
539 this.deadlineNanos = deadlineNanos;
540 }
541
542 @Override
543 public Y apply(final X in) {
544 try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
545 return function.apply(in);
546 }
547 }
548
549 @Override
550 public String toString() {
551 return name == null ? function.toString() : name;
552 }
553
554 @Override
555 public Function<X, Y> getWrapped() {
556 return function;
557 }
558
559 }
560
561 public static <X, Y, Z> BiFunction<X, Y, Z> propagatingBiFunction(final BiFunction<X, Y, Z> callable,
562 final ExecutionContext ctx,
563 @Nullable final String name, final long deadlineNanos) {
564 return new PropagatingBiFunction<X, Y, Z>(callable, ctx, name, deadlineNanos);
565 }
566
567 private static final class PropagatingBiFunction<X, Y, Z>
568 implements BiFunction<X, Y, Z>, Wrapper<BiFunction<X, Y, Z>> {
569
570 private final BiFunction<X, Y, Z> function;
571 private final ExecutionContext current;
572
573 private final String name;
574
575 private final long deadlineNanos;
576
577 PropagatingBiFunction(final BiFunction<X, Y, Z> task, final ExecutionContext current,
578 final String name, final long deadlineNanos) {
579 this.function = task;
580 this.current = current;
581 this.name = name;
582 this.deadlineNanos = deadlineNanos;
583 }
584
585 @Override
586 public Z apply(final X x, final Y y) {
587 try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
588 return function.apply(x, y);
589 }
590 }
591
592 @Override
593 public String toString() {
594 return name == null ? function.toString() : name;
595 }
596
597 @Override
598 public BiFunction<X, Y, Z> getWrapped() {
599 return function;
600 }
601
602 }
603
604
605 public static <X> Consumer<X> propagatingConsumer(final Consumer<X> callable, final ExecutionContext ctx,
606 @Nullable final String name, final long deadlineNanos) {
607 return new PropagatingConsumer<X>(callable, ctx, name, deadlineNanos);
608 }
609
610 private static final class PropagatingConsumer<X> implements Consumer<X>, Wrapper<Consumer<X>> {
611
612 private final Consumer<X> function;
613 private final ExecutionContext current;
614
615 private final String name;
616
617 private final long deadlineNanos;
618
619 PropagatingConsumer(final Consumer<X> task, final ExecutionContext current,
620 final String name, final long deadlineNanos) {
621 this.function = task;
622 this.current = current;
623 this.name = name;
624 this.deadlineNanos = deadlineNanos;
625 }
626
627 @Override
628 public void accept(final X in) {
629 try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
630 function.accept(in);
631 }
632 }
633
634 @Override
635 public String toString() {
636 return name == null ? function.toString() : name;
637 }
638
639 @Override
640 public Consumer<X> getWrapped() {
641 return function;
642 }
643
644 }
645
646 public static <X> Supplier<X> propagatingSupplier(final Supplier<X> callable, final ExecutionContext ctx,
647 @Nullable final String name, final long deadlineNanos) {
648 return new PropagatingSupplier<X>(callable, ctx, name, deadlineNanos);
649 }
650
651 private static final class PropagatingSupplier<X> implements Supplier<X>, Wrapper<Supplier<X>> {
652
653 private final Supplier<X> function;
654 private final ExecutionContext current;
655
656 private final String name;
657
658 private final long deadlineNanos;
659
660 PropagatingSupplier(final Supplier<X> task, final ExecutionContext current,
661 final String name, final long deadlineNanos) {
662 this.function = task;
663 this.current = current;
664 this.name = name;
665 this.deadlineNanos = deadlineNanos;
666 }
667
668 @Override
669 public X get() {
670 try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
671 return function.get();
672 }
673 }
674
675 @Override
676 public String toString() {
677 return name == null ? function.toString() : name;
678 }
679
680 @Override
681 public Supplier<X> getWrapped() {
682 return function;
683 }
684
685 }
686
687
688 public static <X, Y> BiConsumer<X, Y> propagatingBiConsumer(final BiConsumer<X, Y> callable,
689 final ExecutionContext ctx,
690 @Nullable final String name, final long deadlineNanos) {
691 return new PropagatingBiConsumer<>(callable, ctx, name, deadlineNanos);
692 }
693
694 private static final class PropagatingBiConsumer<X, Y> implements BiConsumer<X, Y>, Wrapper<BiConsumer<X, Y>> {
695
696 private final BiConsumer<X, Y> function;
697 private final ExecutionContext current;
698
699 private final String name;
700
701 private final long deadlineNanos;
702
703 PropagatingBiConsumer(final BiConsumer<X, Y> task, final ExecutionContext current,
704 final String name, final long deadlineNanos) {
705 this.function = task;
706 this.current = current;
707 this.name = name;
708 this.deadlineNanos = deadlineNanos;
709 }
710
711 @Override
712 public void accept(final X x, final Y y) {
713 try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
714 function.accept(x, y);
715 }
716 }
717
718 @Override
719 public String toString() {
720 return name == null ? function.toString() : name;
721 }
722
723 @Override
724 public BiConsumer<X, Y> getWrapped() {
725 return function;
726 }
727
728 }
729
730
731
732 private static final class PropagatingRunnable implements Runnable, Wrapper<Runnable> {
733
734 private final Runnable task;
735 private final ExecutionContext current;
736 private final String name;
737 private final long deadlineNanos;
738
739 PropagatingRunnable(final Runnable task, final ExecutionContext current, final String name,
740 final long deadlineNanos) {
741 this.task = task;
742 this.current = current;
743 this.name = name;
744 this.deadlineNanos = deadlineNanos;
745 }
746
747 @Override
748 public void run() {
749 try (ExecutionContext ctx = start(toString(), current, deadlineNanos)) {
750 task.run();
751 }
752 }
753
754 @Override
755 public Runnable getWrapped() {
756 return task;
757 }
758
759 @Override
760 public String toString() {
761 return name == null ? task.toString() : name;
762 }
763 }
764
765 private static class ThreadLocalContextAttacherImpl implements ThreadLocalContextAttacher {
766
767 @Override
768 public Attached attach(final ExecutionContext ctx) {
769 final Thread currentThread = Thread.currentThread();
770 SimpleStack<ExecutionContext> contextStack = ExecutionContexts.EXEC_CTX.get();
771 int stackPtr = contextStack.pushAndGetIdx(ctx);
772 return new AttachedImpl(currentThread, contextStack, ctx, stackPtr);
773 }
774
775 private static class AttachedImpl implements Attached {
776
777 private final Thread thread;
778 private final SimpleStack<ExecutionContext> contextStack;
779 private final ExecutionContext ctx;
780 private final int stackPtr;
781
782 AttachedImpl(final Thread currentThread,
783 final SimpleStack<ExecutionContext> contextStack,
784 final ExecutionContext ctx, final int stackPtr) {
785 this.thread = currentThread;
786 this.contextStack = contextStack;
787 this.ctx = ctx;
788 this.stackPtr = stackPtr;
789 }
790
791 @Override
792 public void detach() {
793 Thread now = Thread.currentThread();
794 if (now != thread) {
795 throw new IllegalStateException("Detaching in different thread " + thread + " != " + now);
796 }
797 ExecutionContext pop = contextStack.pop();
798 if (pop != ctx) {
799 contextStack.push(pop);
800 throw new IllegalStateException("Detaching ctx that is not attached " + ctx + ", found: " + pop);
801 }
802 }
803
804 public boolean isTopOfStack() {
805 return stackPtr == 0;
806 }
807
808 public Thread attachedThread() {
809 return thread;
810 }
811
812 }
813 }
814
815 }