001 /* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 021 import com.google.common.annotations.Beta; 022 023 import java.util.Collections; 024 import java.util.List; 025 import java.util.concurrent.Callable; 026 import java.util.concurrent.ExecutorService; 027 import java.util.concurrent.Executors; 028 import java.util.concurrent.RejectedExecutionException; 029 import java.util.concurrent.ScheduledExecutorService; 030 import java.util.concurrent.ScheduledFuture; 031 import java.util.concurrent.ScheduledThreadPoolExecutor; 032 import java.util.concurrent.ThreadFactory; 033 import java.util.concurrent.ThreadPoolExecutor; 034 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; 035 import java.util.concurrent.TimeUnit; 036 import java.util.concurrent.locks.Condition; 037 import java.util.concurrent.locks.Lock; 038 import java.util.concurrent.locks.ReentrantLock; 039 040 /** 041 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link 042 * ExecutorService}, and {@link ThreadFactory}. 043 * 044 * @author Eric Fellheimer 045 * @author Kyle Littlefield 046 * @author Justin Mahoney 047 * @since 3.0 048 */ 049 public final class MoreExecutors { 050 private MoreExecutors() {} 051 052 /** 053 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 054 * when the application is complete. It does so by using daemon threads and 055 * adding a shutdown hook to wait for their completion. 056 * 057 * <p>This is mainly for fixed thread pools. 058 * See {@link Executors#newFixedThreadPool(int)}. 059 * 060 * @param executor the executor to modify to make sure it exits when the 061 * application is finished 062 * @param terminationTimeout how long to wait for the executor to 063 * finish before terminating the JVM 064 * @param timeUnit unit of time for the time parameter 065 * @return an unmodifiable version of the input which will not hang the JVM 066 */ 067 @Beta 068 public static ExecutorService getExitingExecutorService( 069 ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { 070 executor.setThreadFactory(new ThreadFactoryBuilder() 071 .setDaemon(true) 072 .setThreadFactory(executor.getThreadFactory()) 073 .build()); 074 075 ExecutorService service = Executors.unconfigurableExecutorService(executor); 076 077 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 078 079 return service; 080 } 081 082 /** 083 * Converts the given ScheduledThreadPoolExecutor into a 084 * ScheduledExecutorService that exits when the application is complete. It 085 * does so by using daemon threads and adding a shutdown hook to wait for 086 * their completion. 087 * 088 * <p>This is mainly for fixed thread pools. 089 * See {@link Executors#newScheduledThreadPool(int)}. 090 * 091 * @param executor the executor to modify to make sure it exits when the 092 * application is finished 093 * @param terminationTimeout how long to wait for the executor to 094 * finish before terminating the JVM 095 * @param timeUnit unit of time for the time parameter 096 * @return an unmodifiable version of the input which will not hang the JVM 097 */ 098 @Beta 099 public static ScheduledExecutorService getExitingScheduledExecutorService( 100 ScheduledThreadPoolExecutor executor, long terminationTimeout, 101 TimeUnit timeUnit) { 102 executor.setThreadFactory(new ThreadFactoryBuilder() 103 .setDaemon(true) 104 .setThreadFactory(executor.getThreadFactory()) 105 .build()); 106 107 ScheduledExecutorService service = 108 Executors.unconfigurableScheduledExecutorService(executor); 109 110 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 111 112 return service; 113 } 114 115 /** 116 * Add a shutdown hook to wait for thread completion in the given 117 * {@link ExecutorService service}. This is useful if the given service uses 118 * daemon threads, and we want to keep the JVM from exiting immediately on 119 * shutdown, instead giving these daemon threads a chance to terminate 120 * normally. 121 * @param service ExecutorService which uses daemon threads 122 * @param terminationTimeout how long to wait for the executor to finish 123 * before terminating the JVM 124 * @param timeUnit unit of time for the time parameter 125 */ 126 @Beta 127 public static void addDelayedShutdownHook( 128 final ExecutorService service, final long terminationTimeout, 129 final TimeUnit timeUnit) { 130 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 131 @Override 132 public void run() { 133 try { 134 // We'd like to log progress and failures that may arise in the 135 // following code, but unfortunately the behavior of logging 136 // is undefined in shutdown hooks. 137 // This is because the logging code installs a shutdown hook of its 138 // own. See Cleaner class inside {@link LogManager}. 139 service.shutdown(); 140 service.awaitTermination(terminationTimeout, timeUnit); 141 } catch (InterruptedException ignored) { 142 // We're shutting down anyway, so just ignore. 143 } 144 } 145 })); 146 } 147 148 /** 149 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 150 * when the application is complete. It does so by using daemon threads and 151 * adding a shutdown hook to wait for their completion. 152 * 153 * <p>This method waits 120 seconds before continuing with JVM termination, 154 * even if the executor has not finished its work. 155 * 156 * <p>This is mainly for fixed thread pools. 157 * See {@link Executors#newFixedThreadPool(int)}. 158 * 159 * @param executor the executor to modify to make sure it exits when the 160 * application is finished 161 * @return an unmodifiable version of the input which will not hang the JVM 162 */ 163 @Beta 164 public static ExecutorService getExitingExecutorService( 165 ThreadPoolExecutor executor) { 166 return getExitingExecutorService(executor, 120, TimeUnit.SECONDS); 167 } 168 169 /** 170 * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that 171 * exits when the application is complete. It does so by using daemon threads 172 * and adding a shutdown hook to wait for their completion. 173 * 174 * <p>This method waits 120 seconds before continuing with JVM termination, 175 * even if the executor has not finished its work. 176 * 177 * <p>This is mainly for fixed thread pools. 178 * See {@link Executors#newScheduledThreadPool(int)}. 179 * 180 * @param executor the executor to modify to make sure it exits when the 181 * application is finished 182 * @return an unmodifiable version of the input which will not hang the JVM 183 */ 184 @Beta 185 public static ScheduledExecutorService getExitingScheduledExecutorService( 186 ScheduledThreadPoolExecutor executor) { 187 return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS); 188 } 189 190 /** 191 * Creates an executor service that runs each task in the thread 192 * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy} This 193 * applies both to individually submitted tasks and to collections of tasks 194 * submitted via {@code invokeAll} or {@code invokeAny}. In the latter case, 195 * tasks will run serially on the calling thread. Tasks are run to 196 * completion before a {@code Future} is returned to the caller (unless the 197 * executor has been shutdown). 198 * 199 * <p>Although all tasks are immediately executed in the thread that 200 * submitted the task, this {@code ExecutorService} imposes a small 201 * locking overhead on each task submission in order to implement shutdown 202 * and termination behavior. 203 * 204 * <p>The implementation deviates from the {@code ExecutorService} 205 * specification with regards to the {@code shutdownNow} method. First, 206 * "best-effort" with regards to canceling running tasks is implemented 207 * as "no-effort". No interrupts or other attempts are made to stop 208 * threads executing tasks. Second, the returned list will always be empty, 209 * as any submitted task is considered to have started execution. 210 * This applies also to tasks given to {@code invokeAll} or {@code invokeAny} 211 * which are pending serial execution, even the subset of the tasks that 212 * have not yet started execution. It is unclear from the 213 * {@code ExecutorService} specification if these should be included, and 214 * it's much easier to implement the interpretation that they not be. 215 * Finally, a call to {@code shutdown} or {@code shutdownNow} may result 216 * in concurrent calls to {@code invokeAll/invokeAny} throwing 217 * RejectedExecutionException, although a subset of the tasks may already 218 * have been executed. 219 * 220 * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility" 221 * >mostly source-compatible</a> since 3.0) 222 */ 223 public static ListeningExecutorService sameThreadExecutor() { 224 return new SameThreadExecutorService(); 225 } 226 227 // See sameThreadExecutor javadoc for behavioral notes. 228 private static class SameThreadExecutorService 229 extends AbstractListeningExecutorService { 230 /** 231 * Lock used whenever accessing the state variables 232 * (runningTasks, shutdown, terminationCondition) of the executor 233 */ 234 private final Lock lock = new ReentrantLock(); 235 236 /** Signaled after the executor is shutdown and running tasks are done */ 237 private final Condition termination = lock.newCondition(); 238 239 /* 240 * Conceptually, these two variables describe the executor being in 241 * one of three states: 242 * - Active: shutdown == false 243 * - Shutdown: runningTasks > 0 and shutdown == true 244 * - Terminated: runningTasks == 0 and shutdown == true 245 */ 246 private int runningTasks = 0; 247 private boolean shutdown = false; 248 249 @Override 250 public void execute(Runnable command) { 251 startTask(); 252 try { 253 command.run(); 254 } finally { 255 endTask(); 256 } 257 } 258 259 @Override 260 public boolean isShutdown() { 261 lock.lock(); 262 try { 263 return shutdown; 264 } finally { 265 lock.unlock(); 266 } 267 } 268 269 @Override 270 public void shutdown() { 271 lock.lock(); 272 try { 273 shutdown = true; 274 } finally { 275 lock.unlock(); 276 } 277 } 278 279 // See sameThreadExecutor javadoc for unusual behavior of this method. 280 @Override 281 public List<Runnable> shutdownNow() { 282 shutdown(); 283 return Collections.emptyList(); 284 } 285 286 @Override 287 public boolean isTerminated() { 288 lock.lock(); 289 try { 290 return shutdown && runningTasks == 0; 291 } finally { 292 lock.unlock(); 293 } 294 } 295 296 @Override 297 public boolean awaitTermination(long timeout, TimeUnit unit) 298 throws InterruptedException { 299 long nanos = unit.toNanos(timeout); 300 lock.lock(); 301 try { 302 for (;;) { 303 if (isTerminated()) { 304 return true; 305 } else if (nanos <= 0) { 306 return false; 307 } else { 308 nanos = termination.awaitNanos(nanos); 309 } 310 } 311 } finally { 312 lock.unlock(); 313 } 314 } 315 316 /** 317 * Checks if the executor has been shut down and increments the running 318 * task count. 319 * 320 * @throws RejectedExecutionException if the executor has been previously 321 * shutdown 322 */ 323 private void startTask() { 324 lock.lock(); 325 try { 326 if (isShutdown()) { 327 throw new RejectedExecutionException("Executor already shutdown"); 328 } 329 runningTasks++; 330 } finally { 331 lock.unlock(); 332 } 333 } 334 335 /** 336 * Decrements the running task count. 337 */ 338 private void endTask() { 339 lock.lock(); 340 try { 341 runningTasks--; 342 if (isTerminated()) { 343 termination.signalAll(); 344 } 345 } finally { 346 lock.unlock(); 347 } 348 } 349 } 350 351 /** 352 * Creates an {@link ExecutorService} whose {@code submit} and {@code 353 * invokeAll} methods submit {@link ListenableFutureTask} instances to the 354 * given delegate executor. Those methods, as well as {@code execute} and 355 * {@code invokeAny}, are implemented in terms of calls to {@code 356 * delegate.execute}. All other methods are forwarded unchanged to the 357 * delegate. This implies that the returned {@code ListeningExecutorService} 358 * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code 359 * invokeAny} methods, so any special handling of tasks must be implemented in 360 * the delegate's {@code execute} method or by wrapping the returned {@code 361 * ListeningExecutorService}. 362 * 363 * <p>If the delegate executor was already an instance of {@code 364 * ListeningExecutorService}, it is returned untouched, and the rest of this 365 * documentation does not apply. 366 * 367 * @since 10.0 368 */ 369 public static ListeningExecutorService listeningDecorator( 370 ExecutorService delegate) { 371 return (delegate instanceof ListeningExecutorService) 372 ? (ListeningExecutorService) delegate 373 : (delegate instanceof ScheduledExecutorService) 374 ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate) 375 : new ListeningDecorator(delegate); 376 } 377 378 /** 379 * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code 380 * invokeAll} methods submit {@link ListenableFutureTask} instances to the 381 * given delegate executor. Those methods, as well as {@code execute} and 382 * {@code invokeAny}, are implemented in terms of calls to {@code 383 * delegate.execute}. All other methods are forwarded unchanged to the 384 * delegate. This implies that the returned {@code 385 * SchedulingListeningExecutorService} never calls the delegate's {@code 386 * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special 387 * handling of tasks must be implemented in the delegate's {@code execute} 388 * method or by wrapping the returned {@code 389 * SchedulingListeningExecutorService}. 390 * 391 * <p>If the delegate executor was already an instance of {@code 392 * ListeningScheduledExecutorService}, it is returned untouched, and the rest 393 * of this documentation does not apply. 394 * 395 * @since 10.0 396 */ 397 public static ListeningScheduledExecutorService listeningDecorator( 398 ScheduledExecutorService delegate) { 399 return (delegate instanceof ListeningScheduledExecutorService) 400 ? (ListeningScheduledExecutorService) delegate 401 : new ScheduledListeningDecorator(delegate); 402 } 403 404 private static class ListeningDecorator 405 extends AbstractListeningExecutorService { 406 final ExecutorService delegate; 407 408 ListeningDecorator(ExecutorService delegate) { 409 this.delegate = checkNotNull(delegate); 410 } 411 412 @Override 413 public boolean awaitTermination(long timeout, TimeUnit unit) 414 throws InterruptedException { 415 return delegate.awaitTermination(timeout, unit); 416 } 417 418 @Override 419 public boolean isShutdown() { 420 return delegate.isShutdown(); 421 } 422 423 @Override 424 public boolean isTerminated() { 425 return delegate.isTerminated(); 426 } 427 428 @Override 429 public void shutdown() { 430 delegate.shutdown(); 431 } 432 433 @Override 434 public List<Runnable> shutdownNow() { 435 return delegate.shutdownNow(); 436 } 437 438 @Override 439 public void execute(Runnable command) { 440 delegate.execute(command); 441 } 442 } 443 444 private static class ScheduledListeningDecorator 445 extends ListeningDecorator implements ListeningScheduledExecutorService { 446 final ScheduledExecutorService delegate; 447 448 ScheduledListeningDecorator(ScheduledExecutorService delegate) { 449 super(delegate); 450 this.delegate = checkNotNull(delegate); 451 } 452 453 @Override 454 public ScheduledFuture<?> schedule( 455 Runnable command, long delay, TimeUnit unit) { 456 return delegate.schedule(command, delay, unit); 457 } 458 459 @Override 460 public <V> ScheduledFuture<V> schedule( 461 Callable<V> callable, long delay, TimeUnit unit) { 462 return delegate.schedule(callable, delay, unit); 463 } 464 465 @Override 466 public ScheduledFuture<?> scheduleAtFixedRate( 467 Runnable command, long initialDelay, long period, TimeUnit unit) { 468 return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); 469 } 470 471 @Override 472 public ScheduledFuture<?> scheduleWithFixedDelay( 473 Runnable command, long initialDelay, long delay, TimeUnit unit) { 474 return delegate.scheduleWithFixedDelay( 475 command, initialDelay, delay, unit); 476 } 477 } 478 }