001 /*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import static com.google.common.base.Preconditions.checkNotNull;
020
021 import com.google.common.annotations.Beta;
022 import com.google.common.annotations.VisibleForTesting;
023
024 import java.util.concurrent.Executor;
025 import java.util.concurrent.Executors;
026 import java.util.concurrent.Future;
027 import java.util.concurrent.ThreadFactory;
028 import java.util.concurrent.atomic.AtomicBoolean;
029
030 /**
031 * Utilities necessary for working with libraries that supply plain {@link
032 * Future} instances.
033 *
034 * @author Sven Mawson
035 * @since 10.0 (replacing {@code Futures.makeListenable}, which
036 * existed in 1.0)
037 */
038 @Beta
039 public
040 final class JdkFutureAdapters {
041 /**
042 * Assigns a thread to the given {@link Future} to provide {@link
043 * ListenableFuture} functionality.
044 *
045 * <p><b>Warning:</b> If the input future does not already implement {@link
046 * ListenableFuture}, the returned future will emulate {@link
047 * ListenableFuture#addListener} by taking a thread from an internal,
048 * unbounded pool at the first call to {@code addListener} and holding it
049 * until the future is {@linkplain Future#isDone() done}.
050 *
051 * <p>Prefer to create {@code ListenableFuture} instances with {@link
052 * SettableFuture}, {@link MoreExecutors#listeningDecorator(
053 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
054 * {@link AbstractFuture}, and other utilities over creating plain {@code
055 * Future} instances to be upgraded to {@code ListenableFuture} after the
056 * fact.
057 */
058 public static <V> ListenableFuture<V> listenInPoolThread(
059 Future<V> future) {
060 if (future instanceof ListenableFuture<?>) {
061 return (ListenableFuture<V>) future;
062 }
063 return new ListenableFutureAdapter<V>(future);
064 }
065
066 @VisibleForTesting
067 static <V> ListenableFuture<V> listenInPoolThread(
068 Future<V> future, Executor executor) {
069 checkNotNull(executor);
070 if (future instanceof ListenableFuture<?>) {
071 return (ListenableFuture<V>) future;
072 }
073 return new ListenableFutureAdapter<V>(future, executor);
074 }
075
076 /**
077 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This
078 * will wait on the future to finish, and when it completes, run the
079 * listeners. This implementation will wait on the source future
080 * indefinitely, so if the source future never completes, the adapter will
081 * never complete either.
082 *
083 * <p>If the delegate future is interrupted or throws an unexpected unchecked
084 * exception, the listeners will not be invoked.
085 */
086 private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
087 implements ListenableFuture<V> {
088
089 private static final ThreadFactory threadFactory =
090 new ThreadFactoryBuilder()
091 .setNameFormat("ListenableFutureAdapter-thread-%d")
092 .build();
093 private static final Executor defaultAdapterExecutor =
094 Executors.newCachedThreadPool(threadFactory);
095
096 private final Executor adapterExecutor;
097
098 // The execution list to hold our listeners.
099 private final ExecutionList executionList = new ExecutionList();
100
101 // This allows us to only start up a thread waiting on the delegate future
102 // when the first listener is added.
103 private final AtomicBoolean hasListeners = new AtomicBoolean(false);
104
105 // The delegate future.
106 private final Future<V> delegate;
107
108 ListenableFutureAdapter(Future<V> delegate) {
109 this(delegate, defaultAdapterExecutor);
110 }
111
112 ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
113 this.delegate = checkNotNull(delegate);
114 this.adapterExecutor = checkNotNull(adapterExecutor);
115 }
116
117 @Override
118 protected Future<V> delegate() {
119 return delegate;
120 }
121
122 @Override
123 public void addListener(Runnable listener, Executor exec) {
124 executionList.add(listener, exec);
125
126 // When a listener is first added, we run a task that will wait for
127 // the delegate to finish, and when it is done will run the listeners.
128 if (hasListeners.compareAndSet(false, true)) {
129 if (delegate.isDone()) {
130 // If the delegate is already done, run the execution list
131 // immediately on the current thread.
132 executionList.execute();
133 return;
134 }
135
136 adapterExecutor.execute(new Runnable() {
137 @Override
138 public void run() {
139 try {
140 delegate.get();
141 } catch (Error e) {
142 throw e;
143 } catch (InterruptedException e) {
144 Thread.currentThread().interrupt();
145 // Threads from our private pool are never interrupted.
146 throw new AssertionError(e);
147 } catch (Throwable e) {
148 // ExecutionException / CancellationException / RuntimeException
149 // The task is done, run the listeners.
150 }
151 executionList.execute();
152 }
153 });
154 }
155 }
156 }
157
158 private JdkFutureAdapters() {}
159 }