001 /*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import static com.google.common.base.Preconditions.checkNotNull;
020
021 import com.google.common.annotations.Beta;
022
023 import java.util.concurrent.CancellationException;
024 import java.util.concurrent.ExecutionException;
025 import java.util.concurrent.Executor;
026 import java.util.concurrent.TimeUnit;
027 import java.util.concurrent.TimeoutException;
028 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
029
030 import javax.annotation.Nullable;
031
032 /**
033 * An abstract implementation of the {@link ListenableFuture} interface. This
034 * class is preferable to {@link java.util.concurrent.FutureTask} for two
035 * reasons: It implements {@code ListenableFuture}, and it does not implement
036 * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
037 * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your
038 * tasks to a {@link ListeningExecutorService}.)
039 *
040 * <p>This class implements all methods in {@code ListenableFuture}.
041 * Subclasses should provide a way to set the result of the computation through
042 * the protected methods {@link #set(Object)} and
043 * {@link #setException(Throwable)}. Subclasses may also override {@link
044 * #interruptTask()}, which will be invoked automatically if a call to {@link
045 * #cancel(boolean) cancel(true)} succeeds in canceling the future.
046 *
047 * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
048 * with concurrency issues and guarantee thread safety.
049 *
050 * <p>The state changing methods all return a boolean indicating success or
051 * failure in changing the future's state. Valid states are running,
052 * completed, failed, or cancelled.
053 *
054 * <p>This class uses an {@link ExecutionList} to guarantee that all registered
055 * listeners will be executed, either when the future finishes or, for listeners
056 * that are added after the future completes, immediately.
057 * {@code Runnable}-{@code Executor} pairs are stored in the execution list but
058 * are not necessarily executed in the order in which they were added. (If a
059 * listener is added after the Future is complete, it will be executed
060 * immediately, even if earlier listeners have not been executed. Additionally,
061 * executors need not guarantee FIFO execution, or different listeners may run
062 * in different executors.)
063 *
064 * @author Sven Mawson
065 * @since 1.0
066 */
067 public abstract class AbstractFuture<V> implements ListenableFuture<V> {
068
069 /** Synchronization control for AbstractFutures. */
070 private final Sync<V> sync = new Sync<V>();
071
072 // The execution list to hold our executors.
073 private final ExecutionList executionList = new ExecutionList();
074
075 /*
076 * Improve the documentation of when InterruptedException is thrown. Our
077 * behavior matches the JDK's, but the JDK's documentation is misleading.
078 */
079 /**
080 * {@inheritDoc}
081 *
082 * <p>The default {@link AbstractFuture} implementation throws {@code
083 * InterruptedException} if the current thread is interrupted before or during
084 * the call, even if the value is already available.
085 *
086 * @throws InterruptedException if the current thread was interrupted before
087 * or during the call (optional but recommended).
088 * @throws CancellationException {@inheritDoc}
089 */
090 @Override
091 public V get(long timeout, TimeUnit unit) throws InterruptedException,
092 TimeoutException, ExecutionException {
093 return sync.get(unit.toNanos(timeout));
094 }
095
096 /*
097 * Improve the documentation of when InterruptedException is thrown. Our
098 * behavior matches the JDK's, but the JDK's documentation is misleading.
099 */
100 /**
101 * {@inheritDoc}
102 *
103 * <p>The default {@link AbstractFuture} implementation throws {@code
104 * InterruptedException} if the current thread is interrupted before or during
105 * the call, even if the value is already available.
106 *
107 * @throws InterruptedException if the current thread was interrupted before
108 * or during the call (optional but recommended).
109 * @throws CancellationException {@inheritDoc}
110 */
111 @Override
112 public V get() throws InterruptedException, ExecutionException {
113 return sync.get();
114 }
115
116 @Override
117 public boolean isDone() {
118 return sync.isDone();
119 }
120
121 @Override
122 public boolean isCancelled() {
123 return sync.isCancelled();
124 }
125
126 @Override
127 public boolean cancel(boolean mayInterruptIfRunning) {
128 if (!sync.cancel()) {
129 return false;
130 }
131 done();
132 if (mayInterruptIfRunning) {
133 interruptTask();
134 }
135 return true;
136 }
137
138 /**
139 * Subclasses can override this method to implement interruption of the
140 * future's computation. The method is invoked automatically by a successful
141 * call to {@link #cancel(boolean) cancel(true)}.
142 *
143 * <p>The default implementation does nothing.
144 *
145 * @since 10.0
146 */
147 protected void interruptTask() {
148 }
149
150 /**
151 * {@inheritDoc}
152 *
153 * @since 10.0
154 */
155 @Override
156 public void addListener(Runnable listener, Executor exec) {
157 executionList.add(listener, exec);
158 }
159
160 /**
161 * Subclasses should invoke this method to set the result of the computation
162 * to {@code value}. This will set the state of the future to
163 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
164 * state was successfully changed.
165 *
166 * @param value the value that was the result of the task.
167 * @return true if the state was successfully changed.
168 */
169 protected boolean set(@Nullable V value) {
170 boolean result = sync.set(value);
171 if (result) {
172 done();
173 }
174 return result;
175 }
176
177 /**
178 * Subclasses should invoke this method to set the result of the computation
179 * to an error, {@code throwable}. This will set the state of the future to
180 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
181 * state was successfully changed.
182 *
183 * @param throwable the exception that the task failed with.
184 * @return true if the state was successfully changed.
185 * @throws Error if the throwable was an {@link Error}.
186 */
187 protected boolean setException(Throwable throwable) {
188 boolean result = sync.setException(checkNotNull(throwable));
189 if (result) {
190 done();
191 }
192
193 // If it's an Error, we want to make sure it reaches the top of the
194 // call stack, so we rethrow it.
195 if (throwable instanceof Error) {
196 throw (Error) throwable;
197 }
198 return result;
199 }
200
201 /**
202 * <p>Subclasses can invoke this method to mark the future as cancelled.
203 * This will set the state of the future to {@link
204 * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
205 * successfully changed.
206 *
207 * @return true if the state was successfully changed.
208 * @deprecated Most implementations will be satisfied with the default
209 * implementation of {@link #cancel(boolean)} and not need to call this method
210 * at all. Those that are not can delegate to {@code
211 * super.cancel(mayInterruptIfRunning)} or, to get behavior exactly equivalent
212 * to this method, {@code super.cancel(false)}. This method will be removed
213 * from Guava in Guava release 11.0.
214 */
215 @Beta @Deprecated
216 protected final boolean cancel() {
217 boolean result = sync.cancel();
218 if (result) {
219 done();
220 }
221 return result;
222 }
223
224 /**
225 * <b>Deprecated.</b> {@linkplain #addListener Add listeners} (possible executed
226 * in {@link MoreExecutors#sameThreadExecutor}) to perform the work currently
227 * performed by your {@code done} implementation. This method will be removed
228 * from Guava in Guava release 11.0.
229 *
230 * Called by the success, failed, or cancelled methods to indicate that the
231 * value is now available and the latch can be released.
232 */
233 @Beta @Deprecated protected
234 void done() {
235 executionList.execute();
236 }
237
238 /**
239 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
240 * private subclass to hold the synchronizer. This synchronizer is used to
241 * implement the blocking and waiting calls as well as to handle state changes
242 * in a thread-safe manner. The current state of the future is held in the
243 * Sync state, and the lock is released whenever the state changes to either
244 * {@link #COMPLETED} or {@link #CANCELLED}.
245 *
246 * <p>To avoid races between threads doing release and acquire, we transition
247 * to the final state in two steps. One thread will successfully CAS from
248 * RUNNING to COMPLETING, that thread will then set the result of the
249 * computation, and only then transition to COMPLETED or CANCELLED.
250 *
251 * <p>We don't use the integer argument passed between acquire methods so we
252 * pass around a -1 everywhere.
253 */
254 static final class Sync<V> extends AbstractQueuedSynchronizer {
255
256 private static final long serialVersionUID = 0L;
257
258 /* Valid states. */
259 static final int RUNNING = 0;
260 static final int COMPLETING = 1;
261 static final int COMPLETED = 2;
262 static final int CANCELLED = 4;
263
264 private V value;
265 private Throwable exception;
266
267 /*
268 * Acquisition succeeds if the future is done, otherwise it fails.
269 */
270 @Override
271 protected int tryAcquireShared(int ignored) {
272 if (isDone()) {
273 return 1;
274 }
275 return -1;
276 }
277
278 /*
279 * We always allow a release to go through, this means the state has been
280 * successfully changed and the result is available.
281 */
282 @Override
283 protected boolean tryReleaseShared(int finalState) {
284 setState(finalState);
285 return true;
286 }
287
288 /**
289 * Blocks until the task is complete or the timeout expires. Throws a
290 * {@link TimeoutException} if the timer expires, otherwise behaves like
291 * {@link #get()}.
292 */
293 V get(long nanos) throws TimeoutException, CancellationException,
294 ExecutionException, InterruptedException {
295
296 // Attempt to acquire the shared lock with a timeout.
297 if (!tryAcquireSharedNanos(-1, nanos)) {
298 throw new TimeoutException("Timeout waiting for task.");
299 }
300
301 return getValue();
302 }
303
304 /**
305 * Blocks until {@link #complete(Object, Throwable, int)} has been
306 * successfully called. Throws a {@link CancellationException} if the task
307 * was cancelled, or a {@link ExecutionException} if the task completed with
308 * an error.
309 */
310 V get() throws CancellationException, ExecutionException,
311 InterruptedException {
312
313 // Acquire the shared lock allowing interruption.
314 acquireSharedInterruptibly(-1);
315 return getValue();
316 }
317
318 /**
319 * Implementation of the actual value retrieval. Will return the value
320 * on success, an exception on failure, a cancellation on cancellation, or
321 * an illegal state if the synchronizer is in an invalid state.
322 */
323 private V getValue() throws CancellationException, ExecutionException {
324 int state = getState();
325 switch (state) {
326 case COMPLETED:
327 if (exception != null) {
328 throw new ExecutionException(exception);
329 } else {
330 return value;
331 }
332
333 case CANCELLED:
334 throw new CancellationException("Task was cancelled.");
335
336 default:
337 throw new IllegalStateException(
338 "Error, synchronizer in invalid state: " + state);
339 }
340 }
341
342 /**
343 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
344 */
345 boolean isDone() {
346 return (getState() & (COMPLETED | CANCELLED)) != 0;
347 }
348
349 /**
350 * Checks if the state is {@link #CANCELLED}.
351 */
352 boolean isCancelled() {
353 return getState() == CANCELLED;
354 }
355
356 /**
357 * Transition to the COMPLETED state and set the value.
358 */
359 boolean set(@Nullable V v) {
360 return complete(v, null, COMPLETED);
361 }
362
363 /**
364 * Transition to the COMPLETED state and set the exception.
365 */
366 boolean setException(Throwable t) {
367 return complete(null, t, COMPLETED);
368 }
369
370 /**
371 * Transition to the CANCELLED state.
372 */
373 boolean cancel() {
374 return complete(null, null, CANCELLED);
375 }
376
377 /**
378 * Implementation of completing a task. Either {@code v} or {@code t} will
379 * be set but not both. The {@code finalState} is the state to change to
380 * from {@link #RUNNING}. If the state is not in the RUNNING state we
381 * return {@code false}.
382 *
383 * @param v the value to set as the result of the computation.
384 * @param t the exception to set as the result of the computation.
385 * @param finalState the state to transition to.
386 */
387 private boolean complete(@Nullable V v, Throwable t, int finalState) {
388 if (compareAndSetState(RUNNING, COMPLETING)) {
389 this.value = v;
390 this.exception = t;
391 releaseShared(finalState);
392 return true;
393 }
394
395 // The state was not RUNNING, so there are no valid transitions.
396 return false;
397 }
398 }
399 }