001 /* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 021 import com.google.common.annotations.Beta; 022 023 import java.util.concurrent.CancellationException; 024 import java.util.concurrent.ExecutionException; 025 import java.util.concurrent.Executor; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.TimeoutException; 028 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 029 030 import javax.annotation.Nullable; 031 032 /** 033 * An abstract implementation of the {@link ListenableFuture} interface. This 034 * class is preferable to {@link java.util.concurrent.FutureTask} for two 035 * reasons: It implements {@code ListenableFuture}, and it does not implement 036 * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code 037 * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your 038 * tasks to a {@link ListeningExecutorService}.) 039 * 040 * <p>This class implements all methods in {@code ListenableFuture}. 041 * Subclasses should provide a way to set the result of the computation through 042 * the protected methods {@link #set(Object)} and 043 * {@link #setException(Throwable)}. Subclasses may also override {@link 044 * #interruptTask()}, which will be invoked automatically if a call to {@link 045 * #cancel(boolean) cancel(true)} succeeds in canceling the future. 046 * 047 * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal 048 * with concurrency issues and guarantee thread safety. 049 * 050 * <p>The state changing methods all return a boolean indicating success or 051 * failure in changing the future's state. Valid states are running, 052 * completed, failed, or cancelled. 053 * 054 * <p>This class uses an {@link ExecutionList} to guarantee that all registered 055 * listeners will be executed, either when the future finishes or, for listeners 056 * that are added after the future completes, immediately. 057 * {@code Runnable}-{@code Executor} pairs are stored in the execution list but 058 * are not necessarily executed in the order in which they were added. (If a 059 * listener is added after the Future is complete, it will be executed 060 * immediately, even if earlier listeners have not been executed. Additionally, 061 * executors need not guarantee FIFO execution, or different listeners may run 062 * in different executors.) 063 * 064 * @author Sven Mawson 065 * @since 1.0 066 */ 067 public abstract class AbstractFuture<V> implements ListenableFuture<V> { 068 069 /** Synchronization control for AbstractFutures. */ 070 private final Sync<V> sync = new Sync<V>(); 071 072 // The execution list to hold our executors. 073 private final ExecutionList executionList = new ExecutionList(); 074 075 /* 076 * Improve the documentation of when InterruptedException is thrown. Our 077 * behavior matches the JDK's, but the JDK's documentation is misleading. 078 */ 079 /** 080 * {@inheritDoc} 081 * 082 * <p>The default {@link AbstractFuture} implementation throws {@code 083 * InterruptedException} if the current thread is interrupted before or during 084 * the call, even if the value is already available. 085 * 086 * @throws InterruptedException if the current thread was interrupted before 087 * or during the call (optional but recommended). 088 * @throws CancellationException {@inheritDoc} 089 */ 090 @Override 091 public V get(long timeout, TimeUnit unit) throws InterruptedException, 092 TimeoutException, ExecutionException { 093 return sync.get(unit.toNanos(timeout)); 094 } 095 096 /* 097 * Improve the documentation of when InterruptedException is thrown. Our 098 * behavior matches the JDK's, but the JDK's documentation is misleading. 099 */ 100 /** 101 * {@inheritDoc} 102 * 103 * <p>The default {@link AbstractFuture} implementation throws {@code 104 * InterruptedException} if the current thread is interrupted before or during 105 * the call, even if the value is already available. 106 * 107 * @throws InterruptedException if the current thread was interrupted before 108 * or during the call (optional but recommended). 109 * @throws CancellationException {@inheritDoc} 110 */ 111 @Override 112 public V get() throws InterruptedException, ExecutionException { 113 return sync.get(); 114 } 115 116 @Override 117 public boolean isDone() { 118 return sync.isDone(); 119 } 120 121 @Override 122 public boolean isCancelled() { 123 return sync.isCancelled(); 124 } 125 126 @Override 127 public boolean cancel(boolean mayInterruptIfRunning) { 128 if (!sync.cancel()) { 129 return false; 130 } 131 done(); 132 if (mayInterruptIfRunning) { 133 interruptTask(); 134 } 135 return true; 136 } 137 138 /** 139 * Subclasses can override this method to implement interruption of the 140 * future's computation. The method is invoked automatically by a successful 141 * call to {@link #cancel(boolean) cancel(true)}. 142 * 143 * <p>The default implementation does nothing. 144 * 145 * @since 10.0 146 */ 147 protected void interruptTask() { 148 } 149 150 /** 151 * {@inheritDoc} 152 * 153 * @since 10.0 154 */ 155 @Override 156 public void addListener(Runnable listener, Executor exec) { 157 executionList.add(listener, exec); 158 } 159 160 /** 161 * Subclasses should invoke this method to set the result of the computation 162 * to {@code value}. This will set the state of the future to 163 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 164 * state was successfully changed. 165 * 166 * @param value the value that was the result of the task. 167 * @return true if the state was successfully changed. 168 */ 169 protected boolean set(@Nullable V value) { 170 boolean result = sync.set(value); 171 if (result) { 172 done(); 173 } 174 return result; 175 } 176 177 /** 178 * Subclasses should invoke this method to set the result of the computation 179 * to an error, {@code throwable}. This will set the state of the future to 180 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 181 * state was successfully changed. 182 * 183 * @param throwable the exception that the task failed with. 184 * @return true if the state was successfully changed. 185 * @throws Error if the throwable was an {@link Error}. 186 */ 187 protected boolean setException(Throwable throwable) { 188 boolean result = sync.setException(checkNotNull(throwable)); 189 if (result) { 190 done(); 191 } 192 193 // If it's an Error, we want to make sure it reaches the top of the 194 // call stack, so we rethrow it. 195 if (throwable instanceof Error) { 196 throw (Error) throwable; 197 } 198 return result; 199 } 200 201 /** 202 * <p>Subclasses can invoke this method to mark the future as cancelled. 203 * This will set the state of the future to {@link 204 * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was 205 * successfully changed. 206 * 207 * @return true if the state was successfully changed. 208 * @deprecated Most implementations will be satisfied with the default 209 * implementation of {@link #cancel(boolean)} and not need to call this method 210 * at all. Those that are not can delegate to {@code 211 * super.cancel(mayInterruptIfRunning)} or, to get behavior exactly equivalent 212 * to this method, {@code super.cancel(false)}. This method will be removed 213 * from Guava in Guava release 11.0. 214 */ 215 @Beta @Deprecated 216 protected final boolean cancel() { 217 boolean result = sync.cancel(); 218 if (result) { 219 done(); 220 } 221 return result; 222 } 223 224 /** 225 * <b>Deprecated.</b> {@linkplain #addListener Add listeners} (possible executed 226 * in {@link MoreExecutors#sameThreadExecutor}) to perform the work currently 227 * performed by your {@code done} implementation. This method will be removed 228 * from Guava in Guava release 11.0. 229 * 230 * Called by the success, failed, or cancelled methods to indicate that the 231 * value is now available and the latch can be released. 232 */ 233 @Beta @Deprecated protected 234 void done() { 235 executionList.execute(); 236 } 237 238 /** 239 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a 240 * private subclass to hold the synchronizer. This synchronizer is used to 241 * implement the blocking and waiting calls as well as to handle state changes 242 * in a thread-safe manner. The current state of the future is held in the 243 * Sync state, and the lock is released whenever the state changes to either 244 * {@link #COMPLETED} or {@link #CANCELLED}. 245 * 246 * <p>To avoid races between threads doing release and acquire, we transition 247 * to the final state in two steps. One thread will successfully CAS from 248 * RUNNING to COMPLETING, that thread will then set the result of the 249 * computation, and only then transition to COMPLETED or CANCELLED. 250 * 251 * <p>We don't use the integer argument passed between acquire methods so we 252 * pass around a -1 everywhere. 253 */ 254 static final class Sync<V> extends AbstractQueuedSynchronizer { 255 256 private static final long serialVersionUID = 0L; 257 258 /* Valid states. */ 259 static final int RUNNING = 0; 260 static final int COMPLETING = 1; 261 static final int COMPLETED = 2; 262 static final int CANCELLED = 4; 263 264 private V value; 265 private Throwable exception; 266 267 /* 268 * Acquisition succeeds if the future is done, otherwise it fails. 269 */ 270 @Override 271 protected int tryAcquireShared(int ignored) { 272 if (isDone()) { 273 return 1; 274 } 275 return -1; 276 } 277 278 /* 279 * We always allow a release to go through, this means the state has been 280 * successfully changed and the result is available. 281 */ 282 @Override 283 protected boolean tryReleaseShared(int finalState) { 284 setState(finalState); 285 return true; 286 } 287 288 /** 289 * Blocks until the task is complete or the timeout expires. Throws a 290 * {@link TimeoutException} if the timer expires, otherwise behaves like 291 * {@link #get()}. 292 */ 293 V get(long nanos) throws TimeoutException, CancellationException, 294 ExecutionException, InterruptedException { 295 296 // Attempt to acquire the shared lock with a timeout. 297 if (!tryAcquireSharedNanos(-1, nanos)) { 298 throw new TimeoutException("Timeout waiting for task."); 299 } 300 301 return getValue(); 302 } 303 304 /** 305 * Blocks until {@link #complete(Object, Throwable, int)} has been 306 * successfully called. Throws a {@link CancellationException} if the task 307 * was cancelled, or a {@link ExecutionException} if the task completed with 308 * an error. 309 */ 310 V get() throws CancellationException, ExecutionException, 311 InterruptedException { 312 313 // Acquire the shared lock allowing interruption. 314 acquireSharedInterruptibly(-1); 315 return getValue(); 316 } 317 318 /** 319 * Implementation of the actual value retrieval. Will return the value 320 * on success, an exception on failure, a cancellation on cancellation, or 321 * an illegal state if the synchronizer is in an invalid state. 322 */ 323 private V getValue() throws CancellationException, ExecutionException { 324 int state = getState(); 325 switch (state) { 326 case COMPLETED: 327 if (exception != null) { 328 throw new ExecutionException(exception); 329 } else { 330 return value; 331 } 332 333 case CANCELLED: 334 throw new CancellationException("Task was cancelled."); 335 336 default: 337 throw new IllegalStateException( 338 "Error, synchronizer in invalid state: " + state); 339 } 340 } 341 342 /** 343 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. 344 */ 345 boolean isDone() { 346 return (getState() & (COMPLETED | CANCELLED)) != 0; 347 } 348 349 /** 350 * Checks if the state is {@link #CANCELLED}. 351 */ 352 boolean isCancelled() { 353 return getState() == CANCELLED; 354 } 355 356 /** 357 * Transition to the COMPLETED state and set the value. 358 */ 359 boolean set(@Nullable V v) { 360 return complete(v, null, COMPLETED); 361 } 362 363 /** 364 * Transition to the COMPLETED state and set the exception. 365 */ 366 boolean setException(Throwable t) { 367 return complete(null, t, COMPLETED); 368 } 369 370 /** 371 * Transition to the CANCELLED state. 372 */ 373 boolean cancel() { 374 return complete(null, null, CANCELLED); 375 } 376 377 /** 378 * Implementation of completing a task. Either {@code v} or {@code t} will 379 * be set but not both. The {@code finalState} is the state to change to 380 * from {@link #RUNNING}. If the state is not in the RUNNING state we 381 * return {@code false}. 382 * 383 * @param v the value to set as the result of the computation. 384 * @param t the exception to set as the result of the computation. 385 * @param finalState the state to transition to. 386 */ 387 private boolean complete(@Nullable V v, Throwable t, int finalState) { 388 if (compareAndSetState(RUNNING, COMPLETING)) { 389 this.value = v; 390 this.exception = t; 391 releaseShared(finalState); 392 return true; 393 } 394 395 // The state was not RUNNING, so there are no valid transitions. 396 return false; 397 } 398 } 399 }