001    /*
002     * Copyright (C) 2007 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    
021    import com.google.common.annotations.Beta;
022    
023    import java.util.concurrent.CancellationException;
024    import java.util.concurrent.ExecutionException;
025    import java.util.concurrent.Executor;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.TimeoutException;
028    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
029    
030    import javax.annotation.Nullable;
031    
032    /**
033     * An abstract implementation of the {@link ListenableFuture} interface. This
034     * class is preferable to {@link java.util.concurrent.FutureTask} for two
035     * reasons: It implements {@code ListenableFuture}, and it does not implement
036     * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
037     * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your
038     * tasks to a {@link ListeningExecutorService}.)
039     *
040     * <p>This class implements all methods in {@code ListenableFuture}.
041     * Subclasses should provide a way to set the result of the computation through
042     * the protected methods {@link #set(Object)} and
043     * {@link #setException(Throwable)}. Subclasses may also override {@link
044     * #interruptTask()}, which will be invoked automatically if a call to {@link
045     * #cancel(boolean) cancel(true)} succeeds in canceling the future.
046     *
047     * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
048     * with concurrency issues and guarantee thread safety.
049     *
050     * <p>The state changing methods all return a boolean indicating success or
051     * failure in changing the future's state.  Valid states are running,
052     * completed, failed, or cancelled.
053     *
054     * <p>This class uses an {@link ExecutionList} to guarantee that all registered
055     * listeners will be executed, either when the future finishes or, for listeners
056     * that are added after the future completes, immediately.
057     * {@code Runnable}-{@code Executor} pairs are stored in the execution list but
058     * are not necessarily executed in the order in which they were added.  (If a
059     * listener is added after the Future is complete, it will be executed
060     * immediately, even if earlier listeners have not been executed. Additionally,
061     * executors need not guarantee FIFO execution, or different listeners may run
062     * in different executors.)
063     *
064     * @author Sven Mawson
065     * @since 1.0
066     */
067    public abstract class AbstractFuture<V> implements ListenableFuture<V> {
068    
069      /** Synchronization control for AbstractFutures. */
070      private final Sync<V> sync = new Sync<V>();
071    
072      // The execution list to hold our executors.
073      private final ExecutionList executionList = new ExecutionList();
074    
075      /*
076       * Improve the documentation of when InterruptedException is thrown. Our
077       * behavior matches the JDK's, but the JDK's documentation is misleading.
078       */
079      /**
080       * {@inheritDoc}
081       *
082       * <p>The default {@link AbstractFuture} implementation throws {@code
083       * InterruptedException} if the current thread is interrupted before or during
084       * the call, even if the value is already available.
085       *
086       * @throws InterruptedException if the current thread was interrupted before
087       *     or during the call (optional but recommended).
088       * @throws CancellationException {@inheritDoc}
089       */
090      @Override
091      public V get(long timeout, TimeUnit unit) throws InterruptedException,
092          TimeoutException, ExecutionException {
093        return sync.get(unit.toNanos(timeout));
094      }
095    
096      /*
097       * Improve the documentation of when InterruptedException is thrown. Our
098       * behavior matches the JDK's, but the JDK's documentation is misleading.
099       */
100      /**
101       * {@inheritDoc}
102       *
103       * <p>The default {@link AbstractFuture} implementation throws {@code
104       * InterruptedException} if the current thread is interrupted before or during
105       * the call, even if the value is already available.
106       *
107       * @throws InterruptedException if the current thread was interrupted before
108       *     or during the call (optional but recommended).
109       * @throws CancellationException {@inheritDoc}
110       */
111      @Override
112      public V get() throws InterruptedException, ExecutionException {
113        return sync.get();
114      }
115    
116      @Override
117      public boolean isDone() {
118        return sync.isDone();
119      }
120    
121      @Override
122      public boolean isCancelled() {
123        return sync.isCancelled();
124      }
125    
126      @Override
127      public boolean cancel(boolean mayInterruptIfRunning) {
128        if (!sync.cancel()) {
129          return false;
130        }
131        done();
132        if (mayInterruptIfRunning) {
133          interruptTask();
134        }
135        return true;
136      }
137    
138      /**
139       * Subclasses can override this method to implement interruption of the
140       * future's computation. The method is invoked automatically by a successful
141       * call to {@link #cancel(boolean) cancel(true)}.
142       *
143       * <p>The default implementation does nothing.
144       *
145       * @since 10.0
146       */
147      protected void interruptTask() {
148      }
149    
150      /**
151       * {@inheritDoc}
152       *
153       * @since 10.0
154       */
155      @Override
156      public void addListener(Runnable listener, Executor exec) {
157        executionList.add(listener, exec);
158      }
159    
160      /**
161       * Subclasses should invoke this method to set the result of the computation
162       * to {@code value}.  This will set the state of the future to
163       * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
164       * state was successfully changed.
165       *
166       * @param value the value that was the result of the task.
167       * @return true if the state was successfully changed.
168       */
169      protected boolean set(@Nullable V value) {
170        boolean result = sync.set(value);
171        if (result) {
172          done();
173        }
174        return result;
175      }
176    
177      /**
178       * Subclasses should invoke this method to set the result of the computation
179       * to an error, {@code throwable}.  This will set the state of the future to
180       * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
181       * state was successfully changed.
182       *
183       * @param throwable the exception that the task failed with.
184       * @return true if the state was successfully changed.
185       * @throws Error if the throwable was an {@link Error}.
186       */
187      protected boolean setException(Throwable throwable) {
188        boolean result = sync.setException(checkNotNull(throwable));
189        if (result) {
190          done();
191        }
192    
193        // If it's an Error, we want to make sure it reaches the top of the
194        // call stack, so we rethrow it.
195        if (throwable instanceof Error) {
196          throw (Error) throwable;
197        }
198        return result;
199      }
200    
201      /**
202       * <p>Subclasses can invoke this method to mark the future as cancelled.
203       * This will set the state of the future to {@link
204       * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
205       * successfully changed.
206       *
207       * @return true if the state was successfully changed.
208       * @deprecated Most implementations will be satisfied with the default
209       * implementation of {@link #cancel(boolean)} and not need to call this method
210       * at all. Those that are not can delegate to {@code
211       * super.cancel(mayInterruptIfRunning)} or, to get behavior exactly equivalent
212       * to this method, {@code super.cancel(false)}. This method will be removed
213       * from Guava in Guava release 11.0.
214       */
215      @Beta @Deprecated
216      protected final boolean cancel() {
217        boolean result = sync.cancel();
218        if (result) {
219          done();
220        }
221        return result;
222      }
223    
224      /**
225       * <b>Deprecated.</b> {@linkplain #addListener Add listeners} (possible executed
226       * in {@link MoreExecutors#sameThreadExecutor}) to perform the work currently
227       * performed by your {@code done} implementation. This method will be removed
228       * from Guava in Guava release 11.0.
229       *
230       * Called by the success, failed, or cancelled methods to indicate that the
231       * value is now available and the latch can be released.
232       */
233      @Beta @Deprecated protected
234      void done() {
235        executionList.execute();
236      }
237    
238      /**
239       * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
240       * private subclass to hold the synchronizer.  This synchronizer is used to
241       * implement the blocking and waiting calls as well as to handle state changes
242       * in a thread-safe manner.  The current state of the future is held in the
243       * Sync state, and the lock is released whenever the state changes to either
244       * {@link #COMPLETED} or {@link #CANCELLED}.
245       *
246       * <p>To avoid races between threads doing release and acquire, we transition
247       * to the final state in two steps.  One thread will successfully CAS from
248       * RUNNING to COMPLETING, that thread will then set the result of the
249       * computation, and only then transition to COMPLETED or CANCELLED.
250       *
251       * <p>We don't use the integer argument passed between acquire methods so we
252       * pass around a -1 everywhere.
253       */
254      static final class Sync<V> extends AbstractQueuedSynchronizer {
255    
256        private static final long serialVersionUID = 0L;
257    
258        /* Valid states. */
259        static final int RUNNING = 0;
260        static final int COMPLETING = 1;
261        static final int COMPLETED = 2;
262        static final int CANCELLED = 4;
263    
264        private V value;
265        private Throwable exception;
266    
267        /*
268         * Acquisition succeeds if the future is done, otherwise it fails.
269         */
270        @Override
271        protected int tryAcquireShared(int ignored) {
272          if (isDone()) {
273            return 1;
274          }
275          return -1;
276        }
277    
278        /*
279         * We always allow a release to go through, this means the state has been
280         * successfully changed and the result is available.
281         */
282        @Override
283        protected boolean tryReleaseShared(int finalState) {
284          setState(finalState);
285          return true;
286        }
287    
288        /**
289         * Blocks until the task is complete or the timeout expires.  Throws a
290         * {@link TimeoutException} if the timer expires, otherwise behaves like
291         * {@link #get()}.
292         */
293        V get(long nanos) throws TimeoutException, CancellationException,
294            ExecutionException, InterruptedException {
295    
296          // Attempt to acquire the shared lock with a timeout.
297          if (!tryAcquireSharedNanos(-1, nanos)) {
298            throw new TimeoutException("Timeout waiting for task.");
299          }
300    
301          return getValue();
302        }
303    
304        /**
305         * Blocks until {@link #complete(Object, Throwable, int)} has been
306         * successfully called.  Throws a {@link CancellationException} if the task
307         * was cancelled, or a {@link ExecutionException} if the task completed with
308         * an error.
309         */
310        V get() throws CancellationException, ExecutionException,
311            InterruptedException {
312    
313          // Acquire the shared lock allowing interruption.
314          acquireSharedInterruptibly(-1);
315          return getValue();
316        }
317    
318        /**
319         * Implementation of the actual value retrieval.  Will return the value
320         * on success, an exception on failure, a cancellation on cancellation, or
321         * an illegal state if the synchronizer is in an invalid state.
322         */
323        private V getValue() throws CancellationException, ExecutionException {
324          int state = getState();
325          switch (state) {
326            case COMPLETED:
327              if (exception != null) {
328                throw new ExecutionException(exception);
329              } else {
330                return value;
331              }
332    
333            case CANCELLED:
334              throw new CancellationException("Task was cancelled.");
335    
336            default:
337              throw new IllegalStateException(
338                  "Error, synchronizer in invalid state: " + state);
339          }
340        }
341    
342        /**
343         * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
344         */
345        boolean isDone() {
346          return (getState() & (COMPLETED | CANCELLED)) != 0;
347        }
348    
349        /**
350         * Checks if the state is {@link #CANCELLED}.
351         */
352        boolean isCancelled() {
353          return getState() == CANCELLED;
354        }
355    
356        /**
357         * Transition to the COMPLETED state and set the value.
358         */
359        boolean set(@Nullable V v) {
360          return complete(v, null, COMPLETED);
361        }
362    
363        /**
364         * Transition to the COMPLETED state and set the exception.
365         */
366        boolean setException(Throwable t) {
367          return complete(null, t, COMPLETED);
368        }
369    
370        /**
371         * Transition to the CANCELLED state.
372         */
373        boolean cancel() {
374          return complete(null, null, CANCELLED);
375        }
376    
377        /**
378         * Implementation of completing a task.  Either {@code v} or {@code t} will
379         * be set but not both.  The {@code finalState} is the state to change to
380         * from {@link #RUNNING}.  If the state is not in the RUNNING state we
381         * return {@code false}.
382         *
383         * @param v the value to set as the result of the computation.
384         * @param t the exception to set as the result of the computation.
385         * @param finalState the state to transition to.
386         */
387        private boolean complete(@Nullable V v, Throwable t, int finalState) {
388          if (compareAndSetState(RUNNING, COMPLETING)) {
389            this.value = v;
390            this.exception = t;
391            releaseShared(finalState);
392            return true;
393          }
394    
395          // The state was not RUNNING, so there are no valid transitions.
396          return false;
397        }
398      }
399    }