1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.base;
33
34 import com.fasterxml.jackson.core.JsonGenerator;
35 import com.google.common.annotations.Beta;
36 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
37 import java.io.IOException;
38 import java.util.ArrayDeque;
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.function.BiFunction;
46 import java.util.function.Consumer;
47 import java.util.logging.Logger;
48 import javax.annotation.Nonnull;
49 import javax.annotation.Nullable;
50 import javax.annotation.ParametersAreNonnullByDefault;
51 import javax.annotation.concurrent.ThreadSafe;
52 import org.spf4j.io.AppendableWriter;
53 import org.spf4j.log.Level;
54 import org.spf4j.log.Slf4jLogRecord;
55 import org.spf4j.base.ThreadLocalContextAttacher.Attached;
56 import org.spf4j.log.LogUtils;
57
58
59
60
61
62
63 @ThreadSafe
64 @ParametersAreNonnullByDefault
65 public class BasicExecutionContext implements ExecutionContext {
66
67 private static final int MX_NR_LOGS_PER_CTXT = Integer.getInteger("spf4j.execContext.maxNrLogsPerContext", 100);
68
69 private static final Level MIN_LOG_LEVEL
70 = Level.valueOf(System.getProperty("spf4j.execContext.minLogLevel", "TRACE"));
71
72 private final String name;
73
74 private final CharSequence id;
75
76 private final ExecutionContext source;
77
78 private final Relation relation;
79
80 private final long startTimeNanos;
81
82 private final long deadlineNanos;
83
84 private ArrayDeque<Slf4jLogRecord> logs;
85
86 private List<AutoCloseable> closeables;
87
88 private Map<Tag, Object> baggage;
89
90 private long childCount;
91
92 private boolean isClosed = false;
93
94 private Level minBackendLogLevel;
95
96 private Attached attached;
97
98 @SuppressWarnings("unchecked")
99 @SuppressFBWarnings({"EI_EXPOSE_REP2", "STT_TOSTRING_STORED_IN_FIELD"})
100 public BasicExecutionContext(final String name, @Nullable final CharSequence id,
101 @Nullable final ExecutionContext source, final Relation relation,
102 final long startTimeNanos, final long deadlineNanos) {
103 this.isClosed = false;
104 this.relation = relation;
105 this.name = name;
106 this.startTimeNanos = startTimeNanos;
107 if (source != null) {
108 long parentDeadline = source.getDeadlineNanos();
109 if (parentDeadline < deadlineNanos) {
110 this.deadlineNanos = parentDeadline;
111 } else {
112 this.deadlineNanos = deadlineNanos;
113 }
114 if (id == null) {
115 CharSequence pId = source.getId();
116 StringBuilder sb = new StringBuilder(pId.length() + 2).append(pId).append('/');
117 AppendableUtils.appendUnsignedString(sb, source.nextChildId(), 5);
118 this.id = sb;
119 } else {
120 this.id = id;
121 }
122 this.minBackendLogLevel = source.getBackendMinLogLevel();
123 } else {
124 this.deadlineNanos = deadlineNanos;
125 this.id = id == null ? ExecutionContexts.genId() : id;
126 this.minBackendLogLevel = null;
127 }
128 this.source = source;
129 this.baggage = Collections.EMPTY_MAP;
130 this.logs = null;
131 this.closeables = Collections.EMPTY_LIST;
132 }
133
134 @Override
135 public final String getName() {
136 return name;
137 }
138
139 @Override
140 public final long getDeadlineNanos() {
141 return deadlineNanos;
142 }
143
144 public final long getStartTimeNanos() {
145 return startTimeNanos;
146 }
147
148 @Nullable
149 @Beta
150 @Override
151 public final synchronized <T> T put(@Nonnull final Tag<T, ?> key, @Nonnull final T data) {
152 if (baggage == Collections.EMPTY_MAP) {
153 baggage = new HashMap<>(4);
154 }
155 return (T) baggage.put(key, data);
156 }
157
158 @Nullable
159 @Beta
160 @Override
161 public final synchronized <T> T get(@Nonnull final Tag<T, ?> key) {
162 Object res = baggage.get(key);
163 if (res == null && source != null && key.isInherited(relation)) {
164 ExecutionContext src = source;
165 Relation rel;
166 do {
167 res = src.getLocal(key);
168 rel = src.getRelationToSource();
169 src = src.getSource();
170 } while (res == null && src != null && key.isInherited(rel));
171 }
172 return (T) res;
173 }
174
175 @Override
176 @SuppressWarnings("unchecked")
177 public final synchronized <T> ContextValue<T> getContextAndValue(final Tag<T, ?> key) {
178 Object res = baggage.get(key);
179 ExecutionContext ctx = this;
180 if (res == null && source != null && key.isInherited(relation)) {
181 ExecutionContext src = source;
182 Relation rel;
183 do {
184 res = src.getLocal(key);
185 rel = src.getRelationToSource();
186 ctx = src;
187 src = src.getSource();
188 } while (res == null && src != null && key.isInherited(rel));
189 }
190 return res == null ? null : new ContextValue<T>(ctx, (T) res);
191 }
192
193
194 @Nullable
195 @Beta
196 @Override
197 public final synchronized <T> T getLocal(@Nonnull final Tag<T, ?> key) {
198 return (T) baggage.get(key);
199 }
200
201 @Override
202 @Nullable
203 public final synchronized <V, A> V compute(@Nonnull final Tag<V, A> key, final BiFunction<Tag<V, A>, V, V> compute) {
204 if (baggage == Collections.EMPTY_MAP) {
205 baggage = new HashMap(4);
206 }
207 return (V) baggage.compute(key, (BiFunction) compute);
208 }
209
210 @Override
211 @SuppressFBWarnings(value = "EI_EXPOSE_REP",
212 justification = "THis is intentional,"
213 + " and as such these objects need to be handled with care since they are mutable, but thread safe")
214 public final ExecutionContext getSource() {
215 return source;
216 }
217
218
219
220
221 @Override
222 public synchronized void close() {
223 if (!isClosed) {
224 if (attached != null) {
225 detach();
226 }
227 Exception ex = null;
228 for (int i = closeables.size() - 1; i >= 0; i--) {
229 try {
230 closeables.get(i).close();
231 } catch (Exception e) {
232 if (ex != null) {
233 Throwables.suppressLimited(e, ex);
234 }
235 ex = e;
236 }
237 }
238 ExecutionContext parent = getNotClosedParent();
239 if (parent != null) {
240 if (logs != null) {
241 parent.addLogs(logs);
242 }
243 for (Map.Entry<Tag, Object> be : baggage.entrySet()) {
244 Tag key = be.getKey();
245 if (key.pushOnClose()) {
246 parent.accumulate(key, be.getValue());
247 }
248 }
249 } else if (this.source != null && this.relation == relation.CHILD_OF) {
250
251
252 StackTraceElement[] stackTrace = null;
253 Logger orphaned = Logger.getLogger("ORPHAN_CTX_ENTITIES");
254 if (logs != null) {
255 for (Slf4jLogRecord lr : logs) {
256 if (stackTrace == null) {
257 stackTrace = Thread.currentThread().getStackTrace();
258 }
259 LogUtils.logUpgrade(orphaned, Level.INFO, "Orphaned log", lr.toLogRecord("", ""), stackTrace);
260 }
261 }
262 for (Map.Entry<Tag, Object> be : baggage.entrySet()) {
263 Tag key = be.getKey();
264 if (key.pushOnClose()) {
265 if (stackTrace == null) {
266 stackTrace = Thread.currentThread().getStackTrace();
267 }
268 LogUtils.logUpgrade(orphaned, Level.INFO, "Orphaned baggage", be.getKey().toString(),
269 be.getValue(), stackTrace);
270 }
271 }
272 }
273 isClosed = true;
274 if (ex != null) {
275 if (ex instanceof RuntimeException) {
276 throw (RuntimeException) ex;
277 } else {
278 throw new RuntimeException(ex);
279 }
280 }
281 }
282 }
283
284 @Override
285 public final synchronized void detach() {
286 attached.detach();
287 attached = null;
288 }
289
290 public final synchronized boolean isAttached() {
291 return attached != null;
292 }
293
294 @Override
295 public final synchronized void attach() {
296 if (attached != null) {
297 throw new IllegalStateException("Context already attached, can only be attached to one thread at a time: "
298 + attached);
299 }
300 attached = ExecutionContexts.threadLocalAttacher().attach(this);
301 }
302
303
304
305
306 @Override
307 public String toString() {
308 StringBuilder sb = new StringBuilder(64);
309 writeTo(sb);
310 return sb.toString();
311 }
312
313
314
315
316
317 @Override
318 public synchronized void writeJsonTo(final Appendable appendable) throws IOException {
319 JsonGenerator gen = Json.FACTORY.createGenerator(new AppendableWriter(appendable));
320 gen.setCodec(Json.MAPPER);
321 gen.writeStartObject();
322 gen.writeFieldName("name");
323 gen.writeString(name);
324 gen.writeFieldName("startTs");
325 Timing currentTiming = Timing.getCurrentTiming();
326 gen.writeString(currentTiming.fromNanoTimeToInstant(startTimeNanos).toString());
327 gen.writeFieldName("deadlineTs");
328 gen.writeString(currentTiming.fromNanoTimeToInstant(deadlineNanos).toString());
329 gen.writeEndObject();
330 gen.flush();
331 }
332
333 @Override
334 public final synchronized void addLog(final Slf4jLogRecord log) {
335 if (isClosed) {
336 if (source == null) {
337 return;
338 } else {
339 source.addLog(log);
340 return;
341 }
342 }
343 if (logs == null) {
344 logs = new ArrayDeque<>(4);
345 }
346 if (logs.size() >= MX_NR_LOGS_PER_CTXT) {
347 logs.removeFirst();
348 }
349 logs.addLast(log);
350 }
351
352 @Beta
353 @Override
354 public final synchronized void addCloseable(final AutoCloseable closeable) {
355 if (this.closeables.isEmpty()) {
356 this.closeables = new ArrayList<>(4);
357 }
358 this.closeables.add(closeable);
359 }
360
361 @Override
362 public final synchronized void addLogs(final Collection<Slf4jLogRecord> pLogs) {
363 if (isClosed) {
364 if (source == null) {
365 return;
366 } else {
367 source.addLogs(pLogs);
368 return;
369 }
370 }
371 if (logs == null) {
372 logs = new ArrayDeque<>(pLogs);
373 return;
374 }
375 int xNrLogs = logs.size();
376 int toRemove = xNrLogs + pLogs.size() - MX_NR_LOGS_PER_CTXT;
377 if (toRemove >= xNrLogs) {
378 logs.clear();
379 } else {
380 for (int i = 0; i < toRemove; i++) {
381 logs.removeFirst();
382 }
383 }
384 logs.addAll(pLogs);
385 }
386
387 @Override
388 public final synchronized void streamLogs(final Consumer<Slf4jLogRecord> to) {
389 if (logs != null) {
390 for (Slf4jLogRecord log : logs) {
391 to.accept(log);
392 }
393 }
394 }
395
396 @Override
397 public final synchronized void streamLogs(final Consumer<Slf4jLogRecord> to, final int maxNr) {
398 if (logs != null) {
399 if (maxNr >= logs.size()) {
400 for (Slf4jLogRecord log : logs) {
401 to.accept(log);
402 }
403 } else {
404 int i = 0;
405 int toSkip = logs.size() - maxNr;
406 for (Slf4jLogRecord log : logs) {
407 if (i >= toSkip) {
408 to.accept(log);
409 } else {
410 i++;
411 }
412 }
413 }
414 }
415 }
416
417
418
419
420
421
422 @Override
423 public Level getContextMinLogLevel(final String loggerName) {
424 return MIN_LOG_LEVEL;
425 }
426
427
428
429
430
431
432 @Override
433 public synchronized Level getBackendMinLogLevel(final String loggerName) {
434 return minBackendLogLevel;
435 }
436
437
438
439
440
441
442 @Override
443 public synchronized Level setBackendMinLogLevel(final String loggerName, final Level level) {
444 Level result = minBackendLogLevel;
445 minBackendLogLevel = level;
446 return result;
447 }
448
449 @Override
450 public final CharSequence getId() {
451 return id;
452 }
453
454 @Override
455 public final synchronized long nextChildId() {
456 return childCount++;
457 }
458
459
460
461
462
463
464 @Override
465 public void add(final StackTraceElement[] sample) {
466
467 }
468
469
470
471
472
473
474
475 @Override
476 @Nullable
477 public StackSamples getAndClearStackSamples() {
478 return null;
479 }
480
481
482
483
484
485 @Override
486 @Nullable
487 public StackSamples getStackSamples() {
488 return null;
489 }
490
491 @Override
492 public final synchronized boolean isClosed() {
493 return isClosed;
494 }
495
496 @Override
497 public final Relation getRelationToSource() {
498 return relation;
499 }
500
501
502
503
504
505 @Override
506 public void add(final StackSamples samples) {
507
508 }
509
510 }