001 /* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkArgument; 020 import static com.google.common.base.Preconditions.checkNotNull; 021 import static com.google.common.base.Preconditions.checkState; 022 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 023 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 024 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; 025 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; 026 import static java.lang.Thread.currentThread; 027 import static java.util.Arrays.asList; 028 import static java.util.concurrent.TimeUnit.NANOSECONDS; 029 030 import com.google.common.annotations.Beta; 031 import com.google.common.base.Function; 032 import com.google.common.base.Preconditions; 033 import com.google.common.collect.ImmutableList; 034 import com.google.common.collect.Lists; 035 import com.google.common.collect.Ordering; 036 037 import java.lang.reflect.Constructor; 038 import java.lang.reflect.InvocationTargetException; 039 import java.lang.reflect.UndeclaredThrowableException; 040 import java.util.Arrays; 041 import java.util.List; 042 import java.util.concurrent.BlockingQueue; 043 import java.util.concurrent.CancellationException; 044 import java.util.concurrent.CountDownLatch; 045 import java.util.concurrent.ExecutionException; 046 import java.util.concurrent.Executor; 047 import java.util.concurrent.Future; 048 import java.util.concurrent.LinkedBlockingQueue; 049 import java.util.concurrent.TimeUnit; 050 import java.util.concurrent.TimeoutException; 051 import java.util.concurrent.atomic.AtomicInteger; 052 053 import javax.annotation.Nullable; 054 055 /** 056 * Static utility methods pertaining to the {@link Future} interface. 057 * 058 * @author Kevin Bourrillion 059 * @author Nishant Thakkar 060 * @author Sven Mawson 061 * @since 1.0 062 */ 063 @Beta 064 public final class Futures { 065 private Futures() {} 066 067 /** 068 * Returns an uninterruptible view of a {@code Future}. If a thread is 069 * interrupted during an attempt to {@code get()} from the returned future, it 070 * continues to wait on the result until it is available or the timeout 071 * elapses, and only then re-interrupts the thread. 072 * @deprecated Use 073 * {@link Uninterruptibles#getUninterruptibly(Future) getUninterruptibly}. 074 * <b>This method is scheduled for deletion in Guava Release 11.</b> 075 */ 076 @Deprecated @SuppressWarnings("deprecation") 077 public 078 static <V> UninterruptibleFuture<V> makeUninterruptible( 079 final Future<V> future) { 080 checkNotNull(future); 081 if (future instanceof UninterruptibleFuture<?>) { 082 return (UninterruptibleFuture<V>) future; 083 } 084 return new UninterruptibleFuture<V>() { 085 @Override 086 public boolean cancel(boolean mayInterruptIfRunning) { 087 return future.cancel(mayInterruptIfRunning); 088 } 089 @Override 090 public boolean isCancelled() { 091 return future.isCancelled(); 092 } 093 @Override 094 public boolean isDone() { 095 return future.isDone(); 096 } 097 098 @Override 099 public V get(long timeout, TimeUnit unit) 100 throws TimeoutException, ExecutionException { 101 return Uninterruptibles.getUninterruptibly(future, timeout, unit); 102 } 103 104 @Override 105 public V get() throws ExecutionException { 106 return Uninterruptibles.getUninterruptibly(future); 107 } 108 }; 109 } 110 111 /** 112 * 113 * <p>Creates a {@link ListenableFuture} out of a normal {@link Future}. The 114 * returned future will create a thread to wait for the source future to 115 * complete before executing the listeners. 116 * 117 * <p><b>Warning:</b> If the input future does not already implement {@link 118 * ListenableFuture}, the returned future will emulate {@link 119 * ListenableFuture#addListener} by taking a thread from an internal, 120 * unbounded pool at the first call to {@code addListener} and holding it 121 * until the future is {@linkplain Future#isDone() done}. 122 * 123 * @deprecated Prefer to create {@code ListenableFuture} instances with {@link 124 * SettableFuture}, {@link MoreExecutors#listeningDecorator( 125 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask}, 126 * {@link AbstractFuture}, and other utilities over creating plain {@code 127 * Future} instances to be upgraded to {@code ListenableFuture} after the 128 * fact. If this is not possible, the functionality of {@code 129 * makeListenable} is now available as {@link 130 * JdkFutureAdapters#listenInPoolThread}. <b>This method is scheduled 131 * for deletion from Guava in Guava release 11.0.</b> 132 */ 133 @Deprecated 134 public 135 static <V> ListenableFuture<V> makeListenable(Future<V> future) { 136 return JdkFutureAdapters.listenInPoolThread(future); 137 } 138 139 /** 140 * Creates a {@link CheckedFuture} out of a normal {@link Future} and a 141 * {@link Function} that maps from {@link Exception} instances into the 142 * appropriate checked type. 143 * 144 * <p><b>Warning:</b> If the input future does not implement {@link 145 * ListenableFuture}, the returned future will emulate {@link 146 * ListenableFuture#addListener} by taking a thread from an internal, 147 * unbounded pool at the first call to {@code addListener} and holding it 148 * until the future is {@linkplain Future#isDone() done}. 149 * 150 * <p>The given mapping function will be applied to an 151 * {@link InterruptedException}, a {@link CancellationException}, or an 152 * {@link ExecutionException} with the actual cause of the exception. 153 * See {@link Future#get()} for details on the exceptions thrown. 154 * 155 * @deprecated Obtain a {@link ListenableFuture}, following the advice in its 156 * documentation and use {@link #makeChecked(ListenableFuture, Function)}. 157 * <b>This method is scheduled for deletion from Guava in Guava release 158 * 11.0.</b> 159 */ 160 @Deprecated 161 public 162 static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 163 Future<V> future, Function<Exception, X> mapper) { 164 return new MappingCheckedFuture<V, X>(makeListenable(future), mapper); 165 } 166 167 /** 168 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 169 * and a {@link Function} that maps from {@link Exception} instances into the 170 * appropriate checked type. 171 * 172 * <p>The given mapping function will be applied to an 173 * {@link InterruptedException}, a {@link CancellationException}, or an 174 * {@link ExecutionException} with the actual cause of the exception. 175 * See {@link Future#get()} for details on the exceptions thrown. 176 * 177 * @since 9.0 (source-compatible since 1.0) 178 */ 179 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 180 ListenableFuture<V> future, Function<Exception, X> mapper) { 181 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 182 } 183 184 /** 185 * Creates a {@code ListenableFuture} which has its value set immediately upon 186 * construction. The getters just return the value. This {@code Future} can't 187 * be canceled or timed out and its {@code isDone()} method always returns 188 * {@code true}. 189 */ 190 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 191 SettableFuture<V> future = SettableFuture.create(); 192 future.set(value); 193 return future; 194 } 195 196 /** 197 * Returns a {@code CheckedFuture} which has its value set immediately upon 198 * construction. 199 * 200 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 201 * method always returns {@code true}. Calling {@code get()} or {@code 202 * checkedGet()} will immediately return the provided value. 203 */ 204 public static <V, X extends Exception> CheckedFuture<V, X> 205 immediateCheckedFuture(@Nullable V value) { 206 SettableFuture<V> future = SettableFuture.create(); 207 future.set(value); 208 return Futures.makeChecked(future, new Function<Exception, X>() { 209 @Override 210 public X apply(Exception e) { 211 throw new AssertionError("impossible"); 212 } 213 }); 214 } 215 216 /** 217 * Returns a {@code ListenableFuture} which has an exception set immediately 218 * upon construction. 219 * 220 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 221 * method always returns {@code true}. Calling {@code get()} will immediately 222 * throw the provided {@code Throwable} wrapped in an {@code 223 * ExecutionException}. 224 * 225 * @throws Error if the throwable is an {@link Error}. 226 */ 227 public static <V> ListenableFuture<V> immediateFailedFuture( 228 Throwable throwable) { 229 checkNotNull(throwable); 230 SettableFuture<V> future = SettableFuture.create(); 231 future.setException(throwable); 232 return future; 233 } 234 235 /** 236 * Returns a {@code CheckedFuture} which has an exception set immediately upon 237 * construction. 238 * 239 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 240 * method always returns {@code true}. Calling {@code get()} will immediately 241 * throw the provided {@code Throwable} wrapped in an {@code 242 * ExecutionException}, and calling {@code checkedGet()} will throw the 243 * provided exception itself. 244 * 245 * @throws Error if the throwable is an {@link Error}. 246 */ 247 public static <V, X extends Exception> CheckedFuture<V, X> 248 immediateFailedCheckedFuture(final X exception) { 249 checkNotNull(exception); 250 return makeChecked(Futures.<V>immediateFailedFuture(exception), 251 new Function<Exception, X>() { 252 @Override 253 public X apply(Exception e) { 254 return exception; 255 } 256 }); 257 } 258 259 /** 260 * Returns a new {@code ListenableFuture} whose result is asynchronously 261 * derived from the result of the given {@code Future}. More precisely, the 262 * returned {@code Future} takes its result from a {@code Future} produced by 263 * applying the given {@code Function} to the result of the original {@code 264 * Future}. Example: 265 * 266 * <pre> {@code 267 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 268 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = 269 * new Function<RowKey, ListenableFuture<QueryResult>>() { 270 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 271 * return dataService.read(rowKey); 272 * } 273 * }; 274 * ListenableFuture<QueryResult> queryFuture = 275 * chain(queryFuture, queryFunction); 276 * }</pre> 277 * 278 * <p>Note: This overload of {@code chain} is designed for cases in which the 279 * work of creating the derived future is fast and lightweight, as the method 280 * does not accept an {@code Executor} to perform the the work in. For heavier 281 * derivations, this overload carries some caveats: First, the thread that the 282 * derivation runs in depends on whether the input {@code Future} is done at 283 * the time {@code chain} is called. In particular, if called late, {@code 284 * chain} will run the derivation in the thread that called {@code chain}. 285 * Second, derivations may run in an internal thread of the system responsible 286 * for the input {@code Future}, such as an RPC network thread. Finally, 287 * during the execution of a {@link MoreExecutors#sameThreadExecutor 288 * sameThreadExecutor} {@code chain} function, all other registered but 289 * unexecuted listeners are prevented from running, even if those listeners 290 * are to run in other executors. 291 * 292 * <p>The returned {@code Future} attempts to keep its cancellation state in 293 * sync with that of the input future and that of the future returned by the 294 * chain function. That is, if the returned {@code Future} is cancelled, it 295 * will attempt to cancel the other two, and if either of the other two is 296 * cancelled, the returned {@code Future} will receive a callback in which it 297 * will attempt to cancel itself. 298 * 299 * <p>The typical use for this method would be when a RPC call is dependent on 300 * the results of another RPC. One would call the first RPC (input), create a 301 * function that calls another RPC based on input's result, and then call 302 * chain on input and that function to get a {@code ListenableFuture} of 303 * the result. 304 * 305 * @param input The future to chain 306 * @param function A function to chain the results of the provided future 307 * to the results of the returned future. This will be run in the thread 308 * that notifies input it is complete. 309 * @return A future that holds result of the chain. 310 */ 311 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 312 Function<? super I, ? extends ListenableFuture<? extends O>> function) { 313 return chain(input, function, MoreExecutors.sameThreadExecutor()); 314 } 315 316 /** 317 * Returns a new {@code ListenableFuture} whose result is asynchronously 318 * derived from the result of the given {@code Future}. More precisely, the 319 * returned {@code Future} takes its result from a {@code Future} produced by 320 * applying the given {@code Function} to the result of the original {@code 321 * Future}. Example: 322 * 323 * <pre> {@code 324 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 325 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = 326 * new Function<RowKey, ListenableFuture<QueryResult>>() { 327 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 328 * return dataService.read(rowKey); 329 * } 330 * }; 331 * ListenableFuture<QueryResult> queryFuture = 332 * chain(queryFuture, queryFunction, executor); 333 * }</pre> 334 * 335 * <p>The returned {@code Future} attempts to keep its cancellation state in 336 * sync with that of the input future and that of the future returned by the 337 * chain function. That is, if the returned {@code Future} is cancelled, it 338 * will attempt to cancel the other two, and if either of the other two is 339 * cancelled, the returned {@code Future} will receive a callback in which it 340 * will attempt to cancel itself. 341 * 342 * <p>Note: For cases in which the work of creating the derived future is fast 343 * and lightweight, consider {@linkplain Futures#chain(ListenableFuture, 344 * Function) the other overload} or explicit use of {@link 345 * MoreExecutors#sameThreadExecutor}. For heavier derivations, this choice 346 * carries some caveats: First, the thread that the derivation runs in depends 347 * on whether the input {@code Future} is done at the time {@code chain} is 348 * called. In particular, if called late, {@code chain} will run the 349 * derivation in the thread that called {@code chain}. Second, derivations may 350 * run in an internal thread of the system responsible for the input {@code 351 * Future}, such as an RPC network thread. Finally, during the execution of a 352 * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor} {@code chain} 353 * function, all other registered but unexecuted listeners are prevented from 354 * running, even if those listeners are to run in other executors. 355 * 356 * @param input The future to chain 357 * @param function A function to chain the results of the provided future 358 * to the results of the returned future. 359 * @param exec Executor to run the function in. 360 * @return A future that holds result of the chain. 361 */ 362 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 363 Function<? super I, ? extends ListenableFuture<? extends O>> function, 364 Executor exec) { 365 ChainingListenableFuture<I, O> chain = 366 new ChainingListenableFuture<I, O>(function, input); 367 input.addListener(chain, exec); 368 return chain; 369 } 370 371 /** 372 * Returns a new {@code ListenableFuture} whose result is the product of 373 * applying the given {@code Function} to the result of the given {@code 374 * Future}. Example: 375 * 376 * <pre> {@code 377 * ListenableFuture<QueryResult> queryFuture = ...; 378 * Function<QueryResult, List<Row>> rowsFunction = 379 * new Function<QueryResult, List<Row>>() { 380 * public List<Row> apply(QueryResult queryResult) { 381 * return queryResult.getRows(); 382 * } 383 * }; 384 * ListenableFuture<List<Row>> rowsFuture = 385 * transform(queryFuture, rowsFunction); 386 * }</pre> 387 * 388 * <p>Note: This overload of {@code transform} is designed for cases in which 389 * the transformation is fast and lightweight, as the method does not accept 390 * an {@code Executor} to perform the the work in. For heavier 391 * transformations, this overload carries some caveats: First, the thread that 392 * the transformation runs in depends on whether the input {@code Future} is 393 * done at the time {@code transform} is called. In particular, if called 394 * late, {@code transform} will perform the transformation in the thread that 395 * called {@code transform}. Second, transformations may run in an internal 396 * thread of the system responsible for the input {@code Future}, such as an 397 * RPC network thread. Finally, during the execution of a {@link 398 * MoreExecutors#sameThreadExecutor sameThreadExecutor} transformation, all 399 * other registered but unexecuted listeners are prevented from running, even 400 * if those listeners are to run in other executors. 401 * 402 * <p>The returned {@code Future} attempts to keep its cancellation state in 403 * sync with that of the input future. That is, if the returned {@code Future} 404 * is cancelled, it will attempt to cancel the input, and if the input is 405 * cancelled, the returned {@code Future} will receive a callback in which it 406 * will attempt to cancel itself. 407 * 408 * <p>An example use of this method is to convert a serializable object 409 * returned from an RPC into a POJO. 410 * 411 * @param future The future to transform 412 * @param function A Function to transform the results of the provided future 413 * to the results of the returned future. This will be run in the thread 414 * that notifies input it is complete. 415 * @return A future that holds result of the transformation. 416 * @since 9.0 (in 1.0 as {@code compose}) 417 */ 418 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, 419 final Function<? super I, ? extends O> function) { 420 return transform(future, function, MoreExecutors.sameThreadExecutor()); 421 } 422 423 /** 424 * Returns a new {@code ListenableFuture} whose result is the product of 425 * applying the given {@code Function} to the result of the given {@code 426 * Future}. Example: 427 * 428 * <pre> {@code 429 * ListenableFuture<QueryResult> queryFuture = ...; 430 * Function<QueryResult, List<Row>> rowsFunction = 431 * new Function<QueryResult, List<Row>>() { 432 * public List<Row> apply(QueryResult queryResult) { 433 * return queryResult.getRows(); 434 * } 435 * }; 436 * ListenableFuture<List<Row>> rowsFuture = 437 * transform(queryFuture, rowsFunction, executor); 438 * }</pre> 439 * 440 * <p>The returned {@code Future} attempts to keep its cancellation state in 441 * sync with that of the input future. That is, if the returned {@code Future} 442 * is cancelled, it will attempt to cancel the input, and if the input is 443 * cancelled, the returned {@code Future} will receive a callback in which it 444 * will attempt to cancel itself. 445 * 446 * <p>An example use of this method is to convert a serializable object 447 * returned from an RPC into a POJO. 448 * 449 * <p>Note: For cases in which the transformation is fast and lightweight, 450 * consider {@linkplain Futures#transform(ListenableFuture, Function) the 451 * other overload} or explicit use of {@link 452 * MoreExecutors#sameThreadExecutor}. For heavier transformations, this choice 453 * carries some caveats: First, the thread that the transformation runs in 454 * depends on whether the input {@code Future} is done at the time {@code 455 * transform} is called. In particular, if called late, {@code transform} will 456 * perform the transformation in the thread that called {@code transform}. 457 * Second, transformations may run in an internal thread of the system 458 * responsible for the input {@code Future}, such as an RPC network thread. 459 * Finally, during the execution of a {@link MoreExecutors#sameThreadExecutor 460 * sameThreadExecutor} transformation, all other registered but unexecuted 461 * listeners are prevented from running, even if those listeners are to run 462 * in other executors. 463 * 464 * @param future The future to transform 465 * @param function A Function to transform the results of the provided future 466 * to the results of the returned future. 467 * @param exec Executor to run the function in. 468 * @return A future that holds result of the transformation. 469 * @since 9.0 (in 2.0 as {@code compose}) 470 */ 471 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, 472 final Function<? super I, ? extends O> function, Executor exec) { 473 checkNotNull(function); 474 Function<I, ListenableFuture<O>> wrapperFunction 475 = new Function<I, ListenableFuture<O>>() { 476 @Override public ListenableFuture<O> apply(I input) { 477 O output = function.apply(input); 478 return immediateFuture(output); 479 } 480 }; 481 return chain(future, wrapperFunction, exec); 482 } 483 484 /** 485 * Like {@link #transform(ListenableFuture, Function)} except that the 486 * transformation {@code function} is invoked on each call to 487 * {@link Future#get() get()} on the returned future. 488 * 489 * <p>The returned {@code Future} reflects the input's cancellation 490 * state directly, and any attempt to cancel the returned Future is likewise 491 * passed through to the input Future. 492 * 493 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 494 * only apply the timeout to the execution of the underlying {@code Future}, 495 * <em>not</em> to the execution of the transformation function. 496 * 497 * <p>The primary audience of this method is callers of {@code transform} 498 * who don't have a {@code ListenableFuture} available and 499 * do not mind repeated, lazy function evaluation. 500 * 501 * @param future The future to transform 502 * @param function A Function to transform the results of the provided future 503 * to the results of the returned future. 504 * @return A future that returns the result of the transformation. 505 * @since 10.0 506 */ 507 @Beta 508 public static <I, O> Future<O> lazyTransform(final Future<I> future, 509 final Function<? super I, ? extends O> function) { 510 checkNotNull(future); 511 checkNotNull(function); 512 return new Future<O>() { 513 514 @Override 515 public boolean cancel(boolean mayInterruptIfRunning) { 516 return future.cancel(mayInterruptIfRunning); 517 } 518 519 @Override 520 public boolean isCancelled() { 521 return future.isCancelled(); 522 } 523 524 @Override 525 public boolean isDone() { 526 return future.isDone(); 527 } 528 529 @Override 530 public O get() throws InterruptedException, ExecutionException { 531 return applyTransformation(future.get()); 532 } 533 534 @Override 535 public O get(long timeout, TimeUnit unit) 536 throws InterruptedException, ExecutionException, TimeoutException { 537 return applyTransformation(future.get(timeout, unit)); 538 } 539 540 private O applyTransformation(I input) throws ExecutionException { 541 try { 542 return function.apply(input); 543 } catch (Throwable t) { 544 throw new ExecutionException(t); 545 } 546 } 547 }; 548 } 549 550 /** 551 * Returns a new {@code Future} whose result is the product of applying the 552 * given {@code Function} to the result of the given {@code Future}. Example: 553 * 554 * <pre> {@code 555 * Future<QueryResult> queryFuture = ...; 556 * Function<QueryResult, List<Row>> rowsFunction = 557 * new Function<QueryResult, List<Row>>() { 558 * public List<Row> apply(QueryResult queryResult) { 559 * return queryResult.getRows(); 560 * } 561 * }; 562 * Future<List<Row>> rowsFuture = transform(queryFuture, rowsFunction); 563 * }</pre> 564 * 565 * <p>Each call to {@code Future<O>.get(*)} results in a call to 566 * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it 567 * is assumed that {@code Future<I>.get(*)} is idempotent. 568 * 569 * <p>When calling {@link Future#get(long, TimeUnit)} on the returned 570 * future, the timeout only applies to the future passed in to this method. 571 * Any additional time taken by applying {@code function} is not considered. 572 * (Exception: If the input future is a {@link ListenableFuture}, timeouts 573 * will be strictly enforced.) 574 * 575 * @param future The future to transform 576 * @param function A Function to transform the results of the provided future 577 * to the results of the returned future. This will be run in the thread 578 * that calls one of the varieties of {@code get()}. 579 * @return A future that computes result of the transformation 580 * @since 9.0 (in 1.0 as {@code compose}) 581 * @deprecated Obtain a {@code ListenableFuture} (following the advice in its 582 * documentation) and use {@link #transform(ListenableFuture, Function)} 583 * or use {@link #lazyTransform(Future, Function)}, which will apply the 584 * transformation on each call to {@code get()}. 585 * <b>This method is scheduled for deletion from Guava in Guava release 586 * 11.0.</b> 587 */ 588 @Deprecated 589 public static <I, O> Future<O> transform(final Future<I> future, 590 final Function<? super I, ? extends O> function) { 591 if (future instanceof ListenableFuture) { 592 return transform((ListenableFuture<I>) future, function); 593 } 594 checkNotNull(future); 595 checkNotNull(function); 596 return new Future<O>() { 597 598 /* 599 * Concurrency detail: 600 * 601 * <p>To preserve the idempotency of calls to this.get(*) calls to the 602 * function are only applied once. A lock is required to prevent multiple 603 * applications of the function. The calls to future.get(*) are performed 604 * outside the lock, as is required to prevent calls to 605 * get(long, TimeUnit) to persist beyond their timeout. 606 * 607 * <p>Calls to future.get(*) on every call to this.get(*) also provide 608 * the cancellation behavior for this. 609 * 610 * <p>(Consider: in thread A, call get(), in thread B call get(long, 611 * TimeUnit). Thread B may have to wait for Thread A to finish, which 612 * would be unacceptable.) 613 * 614 * <p>Note that each call to Future<O>.get(*) results in a call to 615 * Future<I>.get(*), but the function is only applied once, so 616 * Future<I>.get(*) is assumed to be idempotent. 617 */ 618 619 private final Object lock = new Object(); 620 private boolean set = false; 621 private O value = null; 622 private ExecutionException exception = null; 623 624 @Override 625 public O get() throws InterruptedException, ExecutionException { 626 return apply(future.get()); 627 } 628 629 @Override 630 public O get(long timeout, TimeUnit unit) throws InterruptedException, 631 ExecutionException, TimeoutException { 632 return apply(future.get(timeout, unit)); 633 } 634 635 private O apply(I raw) throws ExecutionException { 636 synchronized (lock) { 637 if (!set) { 638 try { 639 value = function.apply(raw); 640 } catch (RuntimeException e) { 641 exception = new ExecutionException(e); 642 } catch (Error e) { 643 exception = new ExecutionException(e); 644 } 645 set = true; 646 } 647 648 if (exception != null) { 649 throw exception; 650 } 651 return value; 652 } 653 } 654 655 @Override 656 public boolean cancel(boolean mayInterruptIfRunning) { 657 return future.cancel(mayInterruptIfRunning); 658 } 659 660 @Override 661 public boolean isCancelled() { 662 return future.isCancelled(); 663 } 664 665 @Override 666 public boolean isDone() { 667 return future.isDone(); 668 } 669 }; 670 } 671 672 /** 673 * An implementation of {@code ListenableFuture} that also implements 674 * {@code Runnable} so that it can be used to nest ListenableFutures. 675 * Once the passed-in {@code ListenableFuture} is complete, it calls the 676 * passed-in {@code Function} to generate the result. 677 * 678 * <p>If the function throws any checked exceptions, they should be wrapped 679 * in a {@code UndeclaredThrowableException} so that this class can get 680 * access to the cause. 681 */ 682 private static class ChainingListenableFuture<I, O> 683 extends AbstractFuture<O> implements Runnable { 684 685 private Function<? super I, ? extends ListenableFuture<? extends O>> 686 function; 687 private ListenableFuture<? extends I> inputFuture; 688 private volatile ListenableFuture<? extends O> outputFuture; 689 private final BlockingQueue<Boolean> mayInterruptIfRunningChannel = 690 new LinkedBlockingQueue<Boolean>(1); 691 private final CountDownLatch outputCreated = new CountDownLatch(1); 692 693 private ChainingListenableFuture( 694 Function<? super I, ? extends ListenableFuture<? extends O>> function, 695 ListenableFuture<? extends I> inputFuture) { 696 this.function = checkNotNull(function); 697 this.inputFuture = checkNotNull(inputFuture); 698 } 699 700 /** 701 * Delegate the get() to the input and output futures, in case 702 * their implementations defer starting computation until their 703 * own get() is invoked. 704 */ 705 @Override 706 public O get() throws InterruptedException, ExecutionException { 707 if (!isDone()) { 708 // Invoking get on the inputFuture will ensure our own run() 709 // method below is invoked as a listener when inputFuture sets 710 // its value. Therefore when get() returns we should then see 711 // the outputFuture be created. 712 ListenableFuture<? extends I> inputFuture = this.inputFuture; 713 if (inputFuture != null) { 714 inputFuture.get(); 715 } 716 717 // If our listener was scheduled to run on an executor we may 718 // need to wait for our listener to finish running before the 719 // outputFuture has been constructed by the function. 720 outputCreated.await(); 721 722 // Like above with the inputFuture, we have a listener on 723 // the outputFuture that will set our own value when its 724 // value is set. Invoking get will ensure the output can 725 // complete and invoke our listener, so that we can later 726 // get the result. 727 ListenableFuture<? extends O> outputFuture = this.outputFuture; 728 if (outputFuture != null) { 729 outputFuture.get(); 730 } 731 } 732 return super.get(); 733 } 734 735 /** 736 * Delegate the get() to the input and output futures, in case 737 * their implementations defer starting computation until their 738 * own get() is invoked. 739 */ 740 @Override 741 public O get(long timeout, TimeUnit unit) throws TimeoutException, 742 ExecutionException, InterruptedException { 743 if (!isDone()) { 744 // Use a single time unit so we can decrease remaining timeout 745 // as we wait for various phases to complete. 746 if (unit != NANOSECONDS) { 747 timeout = NANOSECONDS.convert(timeout, unit); 748 unit = NANOSECONDS; 749 } 750 751 // Invoking get on the inputFuture will ensure our own run() 752 // method below is invoked as a listener when inputFuture sets 753 // its value. Therefore when get() returns we should then see 754 // the outputFuture be created. 755 ListenableFuture<? extends I> inputFuture = this.inputFuture; 756 if (inputFuture != null) { 757 long start = System.nanoTime(); 758 inputFuture.get(timeout, unit); 759 timeout -= Math.max(0, System.nanoTime() - start); 760 } 761 762 // If our listener was scheduled to run on an executor we may 763 // need to wait for our listener to finish running before the 764 // outputFuture has been constructed by the function. 765 long start = System.nanoTime(); 766 if (!outputCreated.await(timeout, unit)) { 767 throw new TimeoutException(); 768 } 769 timeout -= Math.max(0, System.nanoTime() - start); 770 771 // Like above with the inputFuture, we have a listener on 772 // the outputFuture that will set our own value when its 773 // value is set. Invoking get will ensure the output can 774 // complete and invoke our listener, so that we can later 775 // get the result. 776 ListenableFuture<? extends O> outputFuture = this.outputFuture; 777 if (outputFuture != null) { 778 outputFuture.get(timeout, unit); 779 } 780 } 781 return super.get(timeout, unit); 782 } 783 784 @Override 785 public boolean cancel(boolean mayInterruptIfRunning) { 786 /* 787 * Our additional cancellation work needs to occur even if 788 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 789 */ 790 if (super.cancel(mayInterruptIfRunning)) { 791 // This should never block since only one thread is allowed to cancel 792 // this Future. 793 putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning); 794 cancel(inputFuture, mayInterruptIfRunning); 795 cancel(outputFuture, mayInterruptIfRunning); 796 return true; 797 } 798 return false; 799 } 800 801 private void cancel(@Nullable Future<?> future, 802 boolean mayInterruptIfRunning) { 803 if (future != null) { 804 future.cancel(mayInterruptIfRunning); 805 } 806 } 807 808 @Override 809 public void run() { 810 try { 811 I sourceResult; 812 try { 813 sourceResult = getUninterruptibly(inputFuture); 814 } catch (CancellationException e) { 815 // Cancel this future and return. 816 // At this point, inputFuture is cancelled and outputFuture doesn't 817 // exist, so the value of mayInterruptIfRunning is irrelevant. 818 cancel(false); 819 return; 820 } catch (ExecutionException e) { 821 // Set the cause of the exception as this future's exception 822 setException(e.getCause()); 823 return; 824 } 825 826 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 827 function.apply(sourceResult); 828 if (isCancelled()) { 829 // Handles the case where cancel was called while the function was 830 // being applied. 831 // There is a gap in cancel(boolean) between calling sync.cancel() 832 // and storing the value of mayInterruptIfRunning, so this thread 833 // needs to block, waiting for that value. 834 outputFuture.cancel( 835 takeUninterruptibly(mayInterruptIfRunningChannel)); 836 this.outputFuture = null; 837 return; 838 } 839 outputFuture.addListener(new Runnable() { 840 @Override 841 public void run() { 842 try { 843 // Here it would have been nice to have had an 844 // UninterruptibleListenableFuture, but we don't want to start a 845 // combinatorial explosion of interfaces, so we have to make do. 846 set(getUninterruptibly(outputFuture)); 847 } catch (CancellationException e) { 848 // Cancel this future and return. 849 // At this point, inputFuture and outputFuture are done, so the 850 // value of mayInterruptIfRunning is irrelevant. 851 cancel(false); 852 return; 853 } catch (ExecutionException e) { 854 // Set the cause of the exception as this future's exception 855 setException(e.getCause()); 856 } finally { 857 // Don't pin inputs beyond completion 858 ChainingListenableFuture.this.outputFuture = null; 859 } 860 } 861 }, MoreExecutors.sameThreadExecutor()); 862 } catch (UndeclaredThrowableException e) { 863 // Set the cause of the exception as this future's exception 864 setException(e.getCause()); 865 } catch (RuntimeException e) { 866 // This exception is irrelevant in this thread, but useful for the 867 // client 868 setException(e); 869 } catch (Error e) { 870 // Propagate errors up ASAP - our superclass will rethrow the error 871 setException(e); 872 } finally { 873 // Don't pin inputs beyond completion 874 function = null; 875 inputFuture = null; 876 // Allow our get routines to examine outputFuture now. 877 outputCreated.countDown(); 878 } 879 } 880 } 881 882 /** 883 * Creates a new {@code ListenableFuture} whose value is a list containing the 884 * values of all its input futures, if all succeed. If any input fails, the 885 * returned future fails. 886 * 887 * <p>The list of results is in the same order as the input list. 888 * 889 * <p>Canceling this future does not cancel any of the component futures; 890 * however, if any of the provided futures fails or is canceled, this one is, 891 * too. 892 * 893 * @param futures futures to combine 894 * @return a future that provides a list of the results of the component 895 * futures 896 * @since 10.0 897 */ 898 @Beta 899 public static <V> ListenableFuture<List<V>> allAsList( 900 ListenableFuture<? extends V>... futures) { 901 return new ListFuture<V>(ImmutableList.copyOf(futures), true, 902 MoreExecutors.sameThreadExecutor()); 903 } 904 905 /** 906 * Creates a new {@code ListenableFuture} whose value is a list containing the 907 * values of all its input futures, if all succeed. If any input fails, the 908 * returned future fails. 909 * 910 * <p>The list of results is in the same order as the input list. 911 * 912 * <p>Canceling this future does not cancel any of the component futures; 913 * however, if any of the provided futures fails or is canceled, this one is, 914 * too. 915 * 916 * @param futures futures to combine 917 * @return a future that provides a list of the results of the component 918 * futures 919 * @since 10.0 920 */ 921 @Beta 922 public static <V> ListenableFuture<List<V>> allAsList( 923 Iterable<? extends ListenableFuture<? extends V>> futures) { 924 return new ListFuture<V>(ImmutableList.copyOf(futures), true, 925 MoreExecutors.sameThreadExecutor()); 926 } 927 928 /** 929 * Creates a new {@code ListenableFuture} whose value is a list containing the 930 * values of all its successful input futures. The list of results is in the 931 * same order as the input list, and if any of the provided futures fails or 932 * is canceled, its corresponding position will contain {@code null} (which is 933 * indistinguishable from the future having a successful value of 934 * {@code null}). 935 * 936 * @param futures futures to combine 937 * @return a future that provides a list of the results of the component 938 * futures 939 * @since 10.0 940 */ 941 @Beta 942 public static <V> ListenableFuture<List<V>> successfulAsList( 943 ListenableFuture<? extends V>... futures) { 944 return new ListFuture<V>(ImmutableList.copyOf(futures), false, 945 MoreExecutors.sameThreadExecutor()); 946 } 947 948 /** 949 * Creates a new {@code ListenableFuture} whose value is a list containing the 950 * values of all its successful input futures. The list of results is in the 951 * same order as the input list, and if any of the provided futures fails or 952 * is canceled, its corresponding position will contain {@code null} (which is 953 * indistinguishable from the future having a successful value of 954 * {@code null}). 955 * 956 * @param futures futures to combine 957 * @return a future that provides a list of the results of the component 958 * futures 959 * @since 10.0 960 */ 961 @Beta 962 public static <V> ListenableFuture<List<V>> successfulAsList( 963 Iterable<? extends ListenableFuture<? extends V>> futures) { 964 return new ListFuture<V>(ImmutableList.copyOf(futures), false, 965 MoreExecutors.sameThreadExecutor()); 966 } 967 968 /** 969 * Registers separate success and failure callbacks to be run when the {@code 970 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 971 * complete} or, if the computation is already complete, immediately. 972 * 973 * <p>There is no guaranteed ordering of execution of callbacks, but any 974 * callback added through this method is guaranteed to be called once the 975 * computation is complete. 976 * 977 * Example: <pre> {@code 978 * ListenableFuture<QueryResult> future = ...; 979 * addCallback(future, 980 * new FutureCallback<QueryResult> { 981 * public void onSuccess(QueryResult result) { 982 * storeInCache(result); 983 * } 984 * public void onFailure(Throwable t) { 985 * reportError(t); 986 * } 987 * });}</pre> 988 * 989 * <p>Note: This overload of {@code addCallback} is designed for cases in 990 * which the callack is fast and lightweight, as the method does not accept 991 * an {@code Executor} to perform the the work in. For heavier 992 * callbacks, this overload carries some caveats: First, the thread that 993 * the callback runs in depends on whether the input {@code Future} is 994 * done at the time {@code addCallback} is called. In particular, if called 995 * late, {@code addCallback} will execute the callback in the thread that 996 * called {@code addCallback}. Second, callbacks may run in an internal 997 * thread of the system responsible for the input {@code Future}, such as an 998 * RPC network thread. Finally, during the execution of a {@link 999 * MoreExecutors#sameThreadExecutor sameThreadExecutor} callback, all other 1000 * registered but unexecuted listeners are prevented from running, even if 1001 * those listeners are to run in other executors. 1002 * 1003 * <p>For a more general interface to attach a completion listener to a 1004 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1005 * 1006 * @param future The future attach the callback to. 1007 * @param callback The callback to invoke when {@code future} is completed. 1008 * @since 10.0 1009 */ 1010 public static <V> void addCallback(ListenableFuture<V> future, 1011 FutureCallback<? super V> callback) { 1012 addCallback(future, callback, MoreExecutors.sameThreadExecutor()); 1013 } 1014 1015 /** 1016 * Registers separate success and failure callbacks to be run when the {@code 1017 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1018 * complete} or, if the computation is already complete, immediately. 1019 * 1020 * <p>The callback is run in {@code executor}. 1021 * There is no guaranteed ordering of execution of callbacks, but any 1022 * callback added through this method is guaranteed to be called once the 1023 * computation is complete. 1024 * 1025 * Example: <pre> {@code 1026 * ListenableFuture<QueryResult> future = ...; 1027 * Executor e = ... 1028 * addCallback(future, e, 1029 * new FutureCallback<QueryResult> { 1030 * public void onSuccess(QueryResult result) { 1031 * storeInCache(result); 1032 * } 1033 * public void onFailure(Throwable t) { 1034 * reportError(t); 1035 * } 1036 * });}</pre> 1037 * 1038 * When the callback is fast and lightweight consider 1039 * {@linkplain Futures#addCallback(ListenableFuture, FutureCallback) 1040 * the other overload} or explicit use of 1041 * {@link MoreExecutors#sameThreadExecutor() sameThreadExecutor}. For heavier 1042 * callbacks, this choice carries some caveats: First, the thread that 1043 * the callback runs in depends on whether the input {@code Future} is 1044 * done at the time {@code addCallback} is called. In particular, if called 1045 * late, {@code addCallback} will execute the callback in the thread that 1046 * called {@code addCallback}. Second, callbacks may run in an internal 1047 * thread of the system responsible for the input {@code Future}, such as an 1048 * RPC network thread. Finally, during the execution of a {@link 1049 * MoreExecutors#sameThreadExecutor sameThreadExecutor} callback, all other 1050 * registered but unexecuted listeners are prevented from running, even if 1051 * those listeners are to run in other executors. 1052 * 1053 * <p>For a more general interface to attach a completion listener to a 1054 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1055 * 1056 * @param future The future attach the callback to. 1057 * @param callback The callback to invoke when {@code future} is completed. 1058 * @param executor The executor to run {@code callback} when the future 1059 * completes. 1060 * @since 10.0 1061 */ 1062 public static <V> void addCallback(final ListenableFuture<V> future, 1063 final FutureCallback<? super V> callback, Executor executor) { 1064 Preconditions.checkNotNull(callback); 1065 Runnable callbackListener = new Runnable() { 1066 @Override 1067 public void run() { 1068 try { 1069 // TODO(user): (Before Guava release), validate that this 1070 // is the thing for IE. 1071 V value = getUninterruptibly(future); 1072 callback.onSuccess(value); 1073 } catch (ExecutionException e) { 1074 callback.onFailure(e.getCause()); 1075 } catch (RuntimeException e) { 1076 callback.onFailure(e); 1077 } catch (Error e) { 1078 callback.onFailure(e); 1079 } 1080 } 1081 }; 1082 future.addListener(callbackListener, executor); 1083 } 1084 1085 /** 1086 * Returns the result of {@link Future#get()}, converting most exceptions to a 1087 * new instance of the given checked exception type. This reduces boilerplate 1088 * for a common use of {@code Future} in which it is unnecessary to 1089 * programmatically distinguish between exception types or to extract other 1090 * information from the exception instance. 1091 * 1092 * <p>Exceptions from {@code Future.get} are treated as follows: 1093 * <ul> 1094 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1095 * {@code X} if the cause is a checked exception, an {@link 1096 * UncheckedExecutionException} if the cause is a {@code 1097 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1098 * {@code Error}. 1099 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1100 * restoring the interrupt). 1101 * <li>Any {@link CancellationException} is propagated untouched, as is any 1102 * other {@link RuntimeException} (though {@code get} implementations are 1103 * discouraged from throwing such exceptions). 1104 * </ul> 1105 * 1106 * The overall principle is to continue to treat every checked exception as a 1107 * checked exception, every unchecked exception as an unchecked exception, and 1108 * every error as an error. In addition, the cause of any {@code 1109 * ExecutionException} is wrapped in order to ensure that the new stack trace 1110 * matches that of the current thread. 1111 * 1112 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1113 * public constructor that accepts zero or more arguments, all of type {@code 1114 * String} or {@code Throwable} (preferring constructors with at least one 1115 * {@code String}) and calling the constructor via reflection. If the 1116 * exception did not already have a cause, one is set by calling {@link 1117 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1118 * {@code IllegalArgumentException} is thrown. 1119 * 1120 * @throws X if {@code get} throws any checked exception except for an {@code 1121 * ExecutionException} whose cause is not itself a checked exception 1122 * @throws UncheckedExecutionException if {@code get} throws an {@code 1123 * ExecutionException} with a {@code RuntimeException} as its cause 1124 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1125 * with an {@code Error} as its cause 1126 * @throws CancellationException if {@code get} throws a {@code 1127 * CancellationException} 1128 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1129 * RuntimeException} or does not have a suitable constructor 1130 * @since 10.0 1131 */ 1132 @Beta 1133 public static <V, X extends Exception> V get( 1134 Future<V> future, Class<X> exceptionClass) throws X { 1135 checkNotNull(future); 1136 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1137 "Futures.get exception type (%s) must not be a RuntimeException", 1138 exceptionClass); 1139 try { 1140 return future.get(); 1141 } catch (InterruptedException e) { 1142 currentThread().interrupt(); 1143 throw newWithCause(exceptionClass, e); 1144 } catch (ExecutionException e) { 1145 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1146 throw new AssertionError(); 1147 } 1148 } 1149 1150 /** 1151 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1152 * exceptions to a new instance of the given checked exception type. This 1153 * reduces boilerplate for a common use of {@code Future} in which it is 1154 * unnecessary to programmatically distinguish between exception types or to 1155 * extract other information from the exception instance. 1156 * 1157 * <p>Exceptions from {@code Future.get} are treated as follows: 1158 * <ul> 1159 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1160 * {@code X} if the cause is a checked exception, an {@link 1161 * UncheckedExecutionException} if the cause is a {@code 1162 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1163 * {@code Error}. 1164 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1165 * restoring the interrupt). 1166 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1167 * <li>Any {@link CancellationException} is propagated untouched, as is any 1168 * other {@link RuntimeException} (though {@code get} implementations are 1169 * discouraged from throwing such exceptions). 1170 * </ul> 1171 * 1172 * The overall principle is to continue to treat every checked exception as a 1173 * checked exception, every unchecked exception as an unchecked exception, and 1174 * every error as an error. In addition, the cause of any {@code 1175 * ExecutionException} is wrapped in order to ensure that the new stack trace 1176 * matches that of the current thread. 1177 * 1178 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1179 * public constructor that accepts zero or more arguments, all of type {@code 1180 * String} or {@code Throwable} (preferring constructors with at least one 1181 * {@code String}) and calling the constructor via reflection. If the 1182 * exception did not already have a cause, one is set by calling {@link 1183 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1184 * {@code IllegalArgumentException} is thrown. 1185 * 1186 * @throws X if {@code get} throws any checked exception except for an {@code 1187 * ExecutionException} whose cause is not itself a checked exception 1188 * @throws UncheckedExecutionException if {@code get} throws an {@code 1189 * ExecutionException} with a {@code RuntimeException} as its cause 1190 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1191 * with an {@code Error} as its cause 1192 * @throws CancellationException if {@code get} throws a {@code 1193 * CancellationException} 1194 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1195 * RuntimeException} or does not have a suitable constructor 1196 * @since 10.0 1197 */ 1198 @Beta 1199 public static <V, X extends Exception> V get( 1200 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1201 throws X { 1202 checkNotNull(future); 1203 checkNotNull(unit); 1204 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1205 "Futures.get exception type (%s) must not be a RuntimeException", 1206 exceptionClass); 1207 try { 1208 return future.get(timeout, unit); 1209 } catch (InterruptedException e) { 1210 currentThread().interrupt(); 1211 throw newWithCause(exceptionClass, e); 1212 } catch (TimeoutException e) { 1213 throw newWithCause(exceptionClass, e); 1214 } catch (ExecutionException e) { 1215 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1216 throw new AssertionError(); 1217 } 1218 } 1219 1220 private static <X extends Exception> void wrapAndThrowExceptionOrError( 1221 Throwable cause, Class<X> exceptionClass) throws X { 1222 if (cause instanceof Error) { 1223 throw new ExecutionError((Error) cause); 1224 } 1225 if (cause instanceof RuntimeException) { 1226 throw new UncheckedExecutionException(cause); 1227 } 1228 throw newWithCause(exceptionClass, cause); 1229 } 1230 1231 /** 1232 * Returns the result of calling {@link Future#get()} uninterruptibly on a 1233 * task known not to throw a checked exception. This makes {@code Future} more 1234 * suitable for lightweight, fast-running tasks that, barring bugs in the 1235 * code, will not fail. This gives it exception-handling behavior similar to 1236 * that of {@code ForkJoinTask.join}. 1237 * 1238 * <p>Exceptions from {@code Future.get} are treated as follows: 1239 * <ul> 1240 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1241 * {@link UncheckedExecutionException} (if the cause is an {@code 1242 * Exception}) or {@link ExecutionError} (if the cause is an {@code 1243 * Error}). 1244 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 1245 * call. The interrupt is restored before {@code getUnchecked} returns. 1246 * <li>Any {@link CancellationException} is propagated untouched. So is any 1247 * other {@link RuntimeException} ({@code get} implementations are 1248 * discouraged from throwing such exceptions). 1249 * </ul> 1250 * 1251 * The overall principle is to eliminate all checked exceptions: to loop to 1252 * avoid {@code InterruptedException}, to pass through {@code 1253 * CancellationException}, and to wrap any exception from the underlying 1254 * computation in an {@code UncheckedExecutionException} or {@code 1255 * ExecutionError}. 1256 * 1257 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 1258 * {@link Uninterruptibles#getUninterruptibly(Future)}. 1259 * 1260 * @throws UncheckedExecutionException if {@code get} throws an {@code 1261 * ExecutionException} with an {@code Exception} as its cause 1262 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1263 * with an {@code Error} as its cause 1264 * @throws CancellationException if {@code get} throws a {@code 1265 * CancellationException} 1266 * @since 10.0 1267 */ 1268 @Beta 1269 public static <V> V getUnchecked(Future<V> future) { 1270 checkNotNull(future); 1271 try { 1272 return getUninterruptibly(future); 1273 } catch (ExecutionException e) { 1274 wrapAndThrowUnchecked(e.getCause()); 1275 throw new AssertionError(); 1276 } 1277 } 1278 1279 private static void wrapAndThrowUnchecked(Throwable cause) { 1280 if (cause instanceof Error) { 1281 throw new ExecutionError((Error) cause); 1282 } 1283 /* 1284 * It's a non-Error, non-Exception Throwable. From my survey of such 1285 * classes, I believe that most users intended to extend Exception, so we'll 1286 * treat it like an Exception. 1287 */ 1288 throw new UncheckedExecutionException(cause); 1289 } 1290 1291 /* 1292 * TODO(user): FutureChecker interface for these to be static methods on? If 1293 * so, refer to it in the (static-method) Futures.get documentation 1294 */ 1295 1296 /* 1297 * Arguably we don't need a timed getUnchecked because any operation slow 1298 * enough to require a timeout is heavyweight enough to throw a checked 1299 * exception and therefore be inappropriate to use with getUnchecked. Further, 1300 * it's not clear that converting the checked TimeoutException to a 1301 * RuntimeException -- especially to an UncheckedExecutionException, since it 1302 * wasn't thrown by the computation -- makes sense, and if we don't convert 1303 * it, the user still has to write a try-catch block. 1304 * 1305 * If you think you would use this method, let us know. 1306 */ 1307 1308 private static <X extends Exception> X newWithCause( 1309 Class<X> exceptionClass, Throwable cause) { 1310 // getConstructors() guarantees this as long as we don't modify the array. 1311 @SuppressWarnings("unchecked") 1312 List<Constructor<X>> constructors = 1313 (List) Arrays.asList(exceptionClass.getConstructors()); 1314 for (Constructor<X> constructor : preferringStrings(constructors)) { 1315 @Nullable X instance = newFromConstructor(constructor, cause); 1316 if (instance != null) { 1317 if (instance.getCause() == null) { 1318 instance.initCause(cause); 1319 } 1320 return instance; 1321 } 1322 } 1323 throw new IllegalArgumentException( 1324 "No appropriate constructor for exception of type " + exceptionClass 1325 + " in response to chained exception", cause); 1326 } 1327 1328 private static <X extends Exception> List<Constructor<X>> 1329 preferringStrings(List<Constructor<X>> constructors) { 1330 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1331 } 1332 1333 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1334 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1335 @Override public Boolean apply(Constructor<?> input) { 1336 return asList(input.getParameterTypes()).contains(String.class); 1337 } 1338 }).reverse(); 1339 1340 @Nullable private static <X> X newFromConstructor( 1341 Constructor<X> constructor, Throwable cause) { 1342 Class<?>[] paramTypes = constructor.getParameterTypes(); 1343 Object[] params = new Object[paramTypes.length]; 1344 for (int i = 0; i < paramTypes.length; i++) { 1345 Class<?> paramType = paramTypes[i]; 1346 if (paramType.equals(String.class)) { 1347 params[i] = cause.toString(); 1348 } else if (paramType.equals(Throwable.class)) { 1349 params[i] = cause; 1350 } else { 1351 return null; 1352 } 1353 } 1354 try { 1355 return constructor.newInstance(params); 1356 } catch (IllegalArgumentException e) { 1357 return null; 1358 } catch (InstantiationException e) { 1359 return null; 1360 } catch (IllegalAccessException e) { 1361 return null; 1362 } catch (InvocationTargetException e) { 1363 return null; 1364 } 1365 } 1366 1367 /** 1368 * Class that implements {@link #allAsList} and {@link #successfulAsList}. 1369 * The idea is to create a (null-filled) List and register a listener with 1370 * each component future to fill out the value in the List when that future 1371 * completes. 1372 */ 1373 private static class ListFuture<V> extends AbstractFuture<List<V>> { 1374 ImmutableList<? extends ListenableFuture<? extends V>> futures; 1375 final boolean allMustSucceed; 1376 final AtomicInteger remaining; 1377 List<V> values; 1378 1379 /** 1380 * Constructor. 1381 * 1382 * @param futures all the futures to build the list from 1383 * @param allMustSucceed whether a single failure or cancellation should 1384 * propagate to this future 1385 * @param listenerExecutor used to run listeners on all the passed in 1386 * futures. 1387 */ 1388 ListFuture( 1389 final ImmutableList<? extends ListenableFuture<? extends V>> futures, 1390 final boolean allMustSucceed, final Executor listenerExecutor) { 1391 this.futures = futures; 1392 this.values = Lists.newArrayListWithCapacity(futures.size()); 1393 this.allMustSucceed = allMustSucceed; 1394 this.remaining = new AtomicInteger(futures.size()); 1395 1396 init(listenerExecutor); 1397 } 1398 1399 private void init(final Executor listenerExecutor) { 1400 // First, schedule cleanup to execute when the Future is done. 1401 addListener(new Runnable() { 1402 @Override 1403 public void run() { 1404 // By now the values array has either been set as the Future's value, 1405 // or (in case of failure) is no longer useful. 1406 ListFuture.this.values = null; 1407 1408 // Let go of the memory held by other futures 1409 ListFuture.this.futures = null; 1410 } 1411 }, MoreExecutors.sameThreadExecutor()); 1412 1413 // Now begin the "real" initialization. 1414 1415 // Corner case: List is empty. 1416 if (futures.isEmpty()) { 1417 set(Lists.newArrayList(values)); 1418 return; 1419 } 1420 1421 // Populate the results list with null initially. 1422 for (int i = 0; i < futures.size(); ++i) { 1423 values.add(null); 1424 } 1425 1426 // Register a listener on each Future in the list to update 1427 // the state of this future. 1428 // Note that if all the futures on the list are done prior to completing 1429 // this loop, the last call to addListener() will callback to 1430 // setOneValue(), transitively call our cleanup listener, and set 1431 // this.futures to null. 1432 // We store a reference to futures to avoid the NPE. 1433 ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures; 1434 for (int i = 0; i < localFutures.size(); i++) { 1435 final ListenableFuture<? extends V> listenable = localFutures.get(i); 1436 final int index = i; 1437 listenable.addListener(new Runnable() { 1438 @Override 1439 public void run() { 1440 setOneValue(index, listenable); 1441 } 1442 }, listenerExecutor); 1443 } 1444 } 1445 1446 /** 1447 * Sets the value at the given index to that of the given future. 1448 */ 1449 private void setOneValue(int index, Future<? extends V> future) { 1450 List<V> localValues = values; 1451 if (isDone() || localValues == null) { 1452 // Some other future failed or has been cancelled, causing this one to 1453 // also be cancelled or have an exception set. This should only happen 1454 // if allMustSucceed is true. 1455 checkState(allMustSucceed, 1456 "Future was done before all dependencies completed"); 1457 return; 1458 } 1459 1460 try { 1461 checkState(future.isDone(), 1462 "Tried to set value from future which is not done"); 1463 localValues.set(index, getUninterruptibly(future)); 1464 } catch (CancellationException e) { 1465 if (allMustSucceed) { 1466 // Set ourselves as cancelled. Let the input futures keep running 1467 // as some of them may be used elsewhere. 1468 // (Currently we don't override interruptTask, so 1469 // mayInterruptIfRunning==false isn't technically necessary.) 1470 cancel(false); 1471 } 1472 } catch (ExecutionException e) { 1473 if (allMustSucceed) { 1474 // As soon as the first one fails, throw the exception up. 1475 // The result of all other inputs is then ignored. 1476 setException(e.getCause()); 1477 } 1478 } catch (RuntimeException e) { 1479 if (allMustSucceed) { 1480 setException(e); 1481 } 1482 } catch (Error e) { 1483 // Propagate errors up ASAP - our superclass will rethrow the error 1484 setException(e); 1485 } finally { 1486 int newRemaining = remaining.decrementAndGet(); 1487 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1488 if (newRemaining == 0) { 1489 localValues = values; 1490 if (localValues != null) { 1491 set(Lists.newArrayList(localValues)); 1492 } else { 1493 checkState(isDone()); 1494 } 1495 } 1496 } 1497 } 1498 1499 @Override 1500 public List<V> get() throws InterruptedException, ExecutionException { 1501 callAllGets(); 1502 1503 // This may still block in spite of the calls above, as the listeners may 1504 // be scheduled for execution in other threads. 1505 return super.get(); 1506 } 1507 1508 /** 1509 * Calls the get method of all dependency futures to work around a bug in 1510 * some ListenableFutures where the listeners aren't called until get() is 1511 * called. 1512 */ 1513 private void callAllGets() throws InterruptedException { 1514 List<? extends ListenableFuture<? extends V>> oldFutures = futures; 1515 if (oldFutures != null && !isDone()) { 1516 for (ListenableFuture<? extends V> future : oldFutures) { 1517 // We wait for a little while for the future, but if it's not done, 1518 // we check that no other futures caused a cancellation or failure. 1519 // This can introduce a delay of up to 10ms in reporting an exception. 1520 while (!future.isDone()) { 1521 try { 1522 future.get(); 1523 } catch (Error e) { 1524 throw e; 1525 } catch (InterruptedException e) { 1526 throw e; 1527 } catch (Throwable e) { 1528 // ExecutionException / CancellationException / RuntimeException 1529 if (allMustSucceed) { 1530 return; 1531 } else { 1532 continue; 1533 } 1534 } 1535 } 1536 } 1537 } 1538 } 1539 } 1540 1541 /** 1542 * A checked future that uses a function to map from exceptions to the 1543 * appropriate checked type. 1544 */ 1545 private static class MappingCheckedFuture<V, X extends Exception> extends 1546 AbstractCheckedFuture<V, X> { 1547 1548 final Function<Exception, X> mapper; 1549 1550 MappingCheckedFuture(ListenableFuture<V> delegate, 1551 Function<Exception, X> mapper) { 1552 super(delegate); 1553 1554 this.mapper = checkNotNull(mapper); 1555 } 1556 1557 @Override 1558 protected X mapException(Exception e) { 1559 return mapper.apply(e); 1560 } 1561 } 1562 }