1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.concurrent;
33
34 import com.google.common.collect.Maps;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.TimeoutException;
45 import java.util.function.BiConsumer;
46 import javax.annotation.CheckReturnValue;
47 import javax.annotation.Nonnull;
48 import javax.annotation.Nullable;
49 import org.spf4j.base.Pair;
50 import org.spf4j.base.Throwables;
51 import org.spf4j.base.TimeSource;
52
53
54
55
56
57 public final class Futures {
58
59 private Futures() { }
60
61 @Nullable
62 @CheckReturnValue
63 public static RuntimeException cancelAll(final boolean mayInterrupt, final Future<?>... futures) {
64 return cancelAll(mayInterrupt, futures, 0);
65 }
66
67 @Nullable
68 @CheckReturnValue
69 public static RuntimeException cancelAll(final boolean mayInterrupt, final Future[] futures, final int from) {
70 RuntimeException ex = null;
71 for (int i = from; i < futures.length; i++) {
72 Future future = futures[i];
73 try {
74 future.cancel(mayInterrupt);
75 } catch (RuntimeException e) {
76 if (ex == null) {
77 ex = e;
78 } else {
79 Throwables.suppressLimited(ex, e);
80 }
81 }
82 }
83 return ex;
84 }
85
86 @Nullable
87 public static RuntimeException cancelAll(final boolean mayInterrupt, final Iterator<Future<?>> iterator) {
88 RuntimeException ex = null;
89 while (iterator.hasNext()) {
90 Future future = iterator.next();
91 try {
92 future.cancel(mayInterrupt);
93 } catch (RuntimeException e) {
94 if (ex == null) {
95 ex = e;
96 } else {
97 Throwables.suppressLimited(ex, e);
98 }
99 }
100 }
101 return ex;
102 }
103
104 @CheckReturnValue
105 @Nonnull
106 public static Pair<Map<Future, Object>, Exception> getAll(final long timeoutMillis, final Future... futures) {
107 long deadlineNanos = TimeSource.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
108 return getAllWithDeadlineNanos(deadlineNanos, futures);
109 }
110
111
112
113 @CheckReturnValue
114 @Nonnull
115 public static Pair<Map<Future, Object>, Exception> getAllWithDeadlineNanos(final long deadlineNanos,
116 final Future... futures) {
117 Map<Future, Object> res = Maps.newHashMapWithExpectedSize(futures.length);
118 Exception ex = getAllWithDeadlineNanos(deadlineNanos, res::put, futures);
119 return Pair.of(res, ex);
120 }
121
122
123
124
125
126
127
128 @CheckReturnValue
129 @Nullable
130 public static <T> Exception getAllWithDeadlineNanos(final long deadlineNanos,
131 final BiConsumer<Future<T>, T> consumer,
132 final Future<T>... futures) {
133 Exception exception = null;
134 for (int i = 0; i < futures.length; i++) {
135 Future<T> future = futures[i];
136 try {
137 final long toNanos = deadlineNanos - TimeSource.nanoTime();
138 T get = future.get(Math.max(0, toNanos), TimeUnit.NANOSECONDS);
139 consumer.accept(future, get);
140 } catch (InterruptedException ex) {
141 Thread.currentThread().interrupt();
142 if (exception == null) {
143 exception = ex;
144 } else {
145 Throwables.suppressLimited(ex, exception);
146 exception = ex;
147 }
148 RuntimeException cex = cancelAll(true, futures, i + 1);
149 if (cex != null) {
150 Throwables.suppressLimited(exception, cex);
151 }
152 } catch (TimeoutException ex) {
153 try {
154 future.cancel(true);
155 } catch (RuntimeException ex2) {
156 ex.addSuppressed(ex2);
157 }
158 if (exception == null) {
159 exception = ex;
160 } else {
161 Throwables.suppressLimited(exception, ex);
162 }
163 } catch (ExecutionException | RuntimeException ex) {
164 if (exception == null) {
165 exception = ex;
166 } else {
167 Throwables.suppressLimited(exception, ex);
168 }
169 }
170 }
171 return exception;
172 }
173
174
175
176
177
178
179
180
181 @CheckReturnValue
182 @Nullable
183 @SuppressWarnings("unchecked")
184 public static Exception getAllWithDeadlineNanosRetVoid(final long deadlineNanos,
185 final Future... futures) {
186 return getAllWithDeadlineNanos(deadlineNanos, (a, b) -> { }, futures);
187 }
188
189 @CheckReturnValue
190 @Nonnull
191 public static Pair<Map<Future, Object>, Exception> getAll(final long timeoutMillis, final Iterable<Future> futures) {
192 long deadlineNanos = TimeSource.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
193 return getAllWithDeadlineNanos(deadlineNanos, futures);
194 }
195
196
197 @CheckReturnValue
198 @Nonnull
199 public static Pair<Map<Future, Object>, Exception> getAllWithDeadlineNanos(final long deadlineNanos,
200 final Iterable<Future> futures) {
201 Map<Future, Object> results;
202 if (futures instanceof Collection) {
203 results = Maps.newHashMapWithExpectedSize(((Collection) futures).size());
204 } else {
205 results = new HashMap<>();
206 }
207 @SuppressWarnings("unchecked")
208 Exception ex = getAllWithDeadlineNanos(deadlineNanos, results::put, (Iterable) futures);
209 return Pair.of(results, ex);
210 }
211
212 @CheckReturnValue
213 @Nullable
214 public static <T> Exception getAllWithDeadlineNanos(final long deadlineNanos,
215 final BiConsumer<Future<T>, T> consumer,
216 final Iterable<Future<T>> futures) {
217 Exception exception = null;
218 Iterator<Future<T>> iterator = futures.iterator();
219 while (iterator.hasNext()) {
220 Future<T> future = iterator.next();
221 try {
222 final long toNanos = deadlineNanos - TimeSource.nanoTime();
223 T get = future.get(Math.max(0, toNanos), TimeUnit.NANOSECONDS);
224 consumer.accept(future, get);
225 } catch (InterruptedException ex) {
226 Thread.currentThread().interrupt();
227 if (exception == null) {
228 exception = ex;
229 } else {
230 Throwables.suppressLimited(ex, exception);
231 exception = ex;
232 }
233 RuntimeException cex = cancelAll(true, (Iterator) iterator);
234 if (cex != null) {
235 Throwables.suppressLimited(exception, cex);
236 }
237 break;
238 } catch (TimeoutException ex) {
239 try {
240 future.cancel(true);
241 } catch (RuntimeException ex2) {
242 ex.addSuppressed(ex2);
243 }
244 if (exception == null) {
245 exception = ex;
246 } else {
247 Throwables.suppressLimited(exception, ex);
248 }
249 } catch (ExecutionException | RuntimeException ex) {
250 if (exception == null) {
251 exception = ex;
252 } else {
253 Throwables.suppressLimited(exception, ex);
254 }
255 }
256 }
257 return exception;
258 }
259
260 public static <T> List<Future<T>> timedOutFutures(final int copies, final TimeoutException ex) {
261 Future<T> fut = new Future<T>() {
262 @Override
263 public boolean cancel(final boolean mayInterruptIfRunning) {
264 throw new UnsupportedOperationException();
265 }
266
267 @Override
268 public boolean isCancelled() {
269 return false;
270 }
271
272 @Override
273 public boolean isDone() {
274 return true;
275 }
276
277 @Override
278 public T get() throws ExecutionException {
279 throw new ExecutionException(ex);
280 }
281
282 @Override
283 public T get(final long timeout, final TimeUnit unit) throws TimeoutException {
284 throw ex;
285 }
286 };
287 return Collections.nCopies(copies, fut);
288 }
289
290
291 }