View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.concurrent;
33  
34  import com.google.common.collect.Maps;
35  import java.util.Collection;
36  import java.util.Collections;
37  import java.util.HashMap;
38  import java.util.Iterator;
39  import java.util.List;
40  import java.util.Map;
41  import java.util.concurrent.ExecutionException;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.TimeoutException;
45  import java.util.function.BiConsumer;
46  import javax.annotation.CheckReturnValue;
47  import javax.annotation.Nonnull;
48  import javax.annotation.Nullable;
49  import org.spf4j.base.Pair;
50  import org.spf4j.base.Throwables;
51  import org.spf4j.base.TimeSource;
52  
53  /**
54   *
55   * @author zoly
56   */
57  public final class Futures {
58  
59    private Futures() { }
60  
61    @Nullable
62    @CheckReturnValue
63    public static RuntimeException cancelAll(final boolean mayInterrupt, final Future<?>... futures) {
64      return cancelAll(mayInterrupt, futures, 0);
65    }
66  
67    @Nullable
68    @CheckReturnValue
69    public static RuntimeException cancelAll(final boolean mayInterrupt, final Future[] futures, final int from) {
70      RuntimeException ex = null;
71      for (int i = from; i < futures.length; i++) {
72        Future future = futures[i];
73        try {
74          future.cancel(mayInterrupt);
75        } catch (RuntimeException e) {
76          if (ex == null) {
77            ex = e;
78          } else {
79            Throwables.suppressLimited(ex, e);
80          }
81        }
82      }
83      return ex;
84    }
85  
86    @Nullable
87    public static RuntimeException cancelAll(final boolean mayInterrupt, final Iterator<Future<?>> iterator) {
88      RuntimeException ex = null;
89      while (iterator.hasNext()) {
90        Future future = iterator.next();
91        try {
92          future.cancel(mayInterrupt);
93        } catch (RuntimeException e) {
94          if (ex == null) {
95            ex = e;
96          } else {
97            Throwables.suppressLimited(ex, e);
98          }
99        }
100     }
101     return ex;
102   }
103 
104   @CheckReturnValue
105   @Nonnull
106   public static Pair<Map<Future, Object>, Exception> getAll(final long timeoutMillis, final Future... futures)  {
107     long deadlineNanos = TimeSource.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
108     return getAllWithDeadlineNanos(deadlineNanos, futures);
109   }
110 
111 
112 
113   @CheckReturnValue
114   @Nonnull
115   public static  Pair<Map<Future, Object>, Exception> getAllWithDeadlineNanos(final long deadlineNanos,
116           final Future... futures) {
117     Map<Future, Object> res = Maps.newHashMapWithExpectedSize(futures.length);
118     Exception ex = getAllWithDeadlineNanos(deadlineNanos, res::put, futures);
119     return Pair.of(res, ex);
120   }
121 
122   /**
123    *
124    * @param deadlineNanos
125    * @param futures
126    * @return
127    */
128   @CheckReturnValue
129   @Nullable
130   public static <T> Exception getAllWithDeadlineNanos(final long deadlineNanos,
131           final BiConsumer<Future<T>, T> consumer,
132           final Future<T>... futures) {
133     Exception exception = null;
134     for (int i = 0; i < futures.length; i++) {
135       Future<T> future = futures[i];
136       try {
137         final long toNanos = deadlineNanos - TimeSource.nanoTime();
138         T get = future.get(Math.max(0, toNanos), TimeUnit.NANOSECONDS);
139         consumer.accept(future, get);
140       } catch (InterruptedException ex) {
141         Thread.currentThread().interrupt();
142         if (exception == null) {
143           exception = ex;
144         } else {
145           Throwables.suppressLimited(ex, exception);
146           exception = ex;
147         }
148         RuntimeException cex = cancelAll(true, futures, i + 1);
149         if (cex != null) {
150           Throwables.suppressLimited(exception, cex);
151         }
152       }  catch (TimeoutException  ex) {
153         try {
154           future.cancel(true);
155         } catch (RuntimeException ex2) {
156           ex.addSuppressed(ex2);
157         }
158         if (exception == null) {
159           exception = ex;
160         } else {
161           Throwables.suppressLimited(exception, ex);
162         }
163       } catch (ExecutionException | RuntimeException ex) {
164         if (exception == null) {
165           exception = ex;
166         } else {
167           Throwables.suppressLimited(exception, ex);
168         }
169       }
170     }
171     return exception;
172   }
173 
174  /**
175    * Gets all futures resuls for futures that return Void (no return).
176    *
177    * @param deadlineNanos
178    * @param futures
179    * @return
180    */
181   @CheckReturnValue
182   @Nullable
183   @SuppressWarnings("unchecked")
184   public static Exception getAllWithDeadlineNanosRetVoid(final long deadlineNanos,
185           final Future... futures) {
186     return getAllWithDeadlineNanos(deadlineNanos, (a, b) -> { }, futures);
187   }
188 
189   @CheckReturnValue
190   @Nonnull
191   public static Pair<Map<Future, Object>, Exception> getAll(final long timeoutMillis, final Iterable<Future> futures)  {
192     long deadlineNanos = TimeSource.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
193     return getAllWithDeadlineNanos(deadlineNanos, futures);
194   }
195 
196 
197   @CheckReturnValue
198   @Nonnull
199   public static Pair<Map<Future, Object>, Exception> getAllWithDeadlineNanos(final long deadlineNanos,
200           final Iterable<Future> futures) {
201     Map<Future, Object> results;
202     if (futures instanceof Collection) {
203       results = Maps.newHashMapWithExpectedSize(((Collection) futures).size());
204     } else {
205       results = new HashMap<>();
206     }
207     @SuppressWarnings("unchecked")
208     Exception ex = getAllWithDeadlineNanos(deadlineNanos, results::put, (Iterable) futures);
209     return Pair.of(results, ex);
210   }
211 
212   @CheckReturnValue
213   @Nullable
214   public static <T> Exception getAllWithDeadlineNanos(final long deadlineNanos,
215           final BiConsumer<Future<T>, T> consumer,
216           final Iterable<Future<T>> futures) {
217     Exception exception = null;
218     Iterator<Future<T>> iterator = futures.iterator();
219     while (iterator.hasNext()) {
220       Future<T> future = iterator.next();
221       try {
222         final long toNanos = deadlineNanos - TimeSource.nanoTime();
223         T get = future.get(Math.max(0, toNanos), TimeUnit.NANOSECONDS);
224         consumer.accept(future, get);
225       } catch (InterruptedException ex) {
226         Thread.currentThread().interrupt();
227         if (exception == null) {
228           exception = ex;
229         } else {
230           Throwables.suppressLimited(ex, exception);
231           exception = ex;
232         }
233         RuntimeException cex = cancelAll(true, (Iterator) iterator);
234         if (cex != null) {
235           Throwables.suppressLimited(exception, cex);
236         }
237         break;
238       } catch (TimeoutException ex) {
239         try {
240           future.cancel(true);
241         } catch (RuntimeException ex2) {
242           ex.addSuppressed(ex2);
243         }
244         if (exception == null) {
245           exception = ex;
246         } else {
247           Throwables.suppressLimited(exception, ex);
248         }
249       } catch (ExecutionException | RuntimeException ex) {
250         if (exception == null) {
251           exception = ex;
252         } else {
253           Throwables.suppressLimited(exception, ex);
254         }
255       }
256     }
257     return exception;
258   }
259 
260   public static <T> List<Future<T>> timedOutFutures(final int copies, final TimeoutException ex) {
261     Future<T> fut = new Future<T>() {
262       @Override
263       public boolean cancel(final boolean mayInterruptIfRunning) {
264         throw new UnsupportedOperationException();
265       }
266 
267       @Override
268       public boolean isCancelled() {
269         return false;
270       }
271 
272       @Override
273       public boolean isDone() {
274         return true;
275       }
276 
277       @Override
278       public T get() throws ExecutionException {
279         throw new ExecutionException(ex);
280       }
281 
282       @Override
283       public T get(final long timeout, final TimeUnit unit) throws TimeoutException {
284         throw ex;
285       }
286     };
287     return Collections.nCopies(copies, fut);
288   }
289 
290 
291 }