001    /*
002     * Copyright (C) 2007 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    
021    import com.google.common.annotations.Beta;
022    
023    import java.util.Collections;
024    import java.util.List;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ExecutorService;
027    import java.util.concurrent.Executors;
028    import java.util.concurrent.RejectedExecutionException;
029    import java.util.concurrent.ScheduledExecutorService;
030    import java.util.concurrent.ScheduledFuture;
031    import java.util.concurrent.ScheduledThreadPoolExecutor;
032    import java.util.concurrent.ThreadFactory;
033    import java.util.concurrent.ThreadPoolExecutor;
034    import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
035    import java.util.concurrent.TimeUnit;
036    import java.util.concurrent.locks.Condition;
037    import java.util.concurrent.locks.Lock;
038    import java.util.concurrent.locks.ReentrantLock;
039    
040    /**
041     * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
042     * ExecutorService}, and {@link ThreadFactory}.
043     *
044     * @author Eric Fellheimer
045     * @author Kyle Littlefield
046     * @author Justin Mahoney
047     * @since 3.0
048     */
049    public final class MoreExecutors {
050      private MoreExecutors() {}
051    
052      /**
053       * Converts the given ThreadPoolExecutor into an ExecutorService that exits
054       * when the application is complete.  It does so by using daemon threads and
055       * adding a shutdown hook to wait for their completion.
056       *
057       * <p>This is mainly for fixed thread pools.
058       * See {@link Executors#newFixedThreadPool(int)}.
059       *
060       * @param executor the executor to modify to make sure it exits when the
061       *        application is finished
062       * @param terminationTimeout how long to wait for the executor to
063       *        finish before terminating the JVM
064       * @param timeUnit unit of time for the time parameter
065       * @return an unmodifiable version of the input which will not hang the JVM
066       */
067      @Beta
068      public static ExecutorService getExitingExecutorService(
069          ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
070        executor.setThreadFactory(new ThreadFactoryBuilder()
071            .setDaemon(true)
072            .setThreadFactory(executor.getThreadFactory())
073            .build());
074    
075        ExecutorService service = Executors.unconfigurableExecutorService(executor);
076    
077        addDelayedShutdownHook(service, terminationTimeout, timeUnit);
078    
079        return service;
080      }
081    
082      /**
083       * Converts the given ScheduledThreadPoolExecutor into a
084       * ScheduledExecutorService that exits when the application is complete.  It
085       * does so by using daemon threads and adding a shutdown hook to wait for
086       * their completion.
087       *
088       * <p>This is mainly for fixed thread pools.
089       * See {@link Executors#newScheduledThreadPool(int)}.
090       *
091       * @param executor the executor to modify to make sure it exits when the
092       *        application is finished
093       * @param terminationTimeout how long to wait for the executor to
094       *        finish before terminating the JVM
095       * @param timeUnit unit of time for the time parameter
096       * @return an unmodifiable version of the input which will not hang the JVM
097       */
098      @Beta
099      public static ScheduledExecutorService getExitingScheduledExecutorService(
100          ScheduledThreadPoolExecutor executor, long terminationTimeout,
101          TimeUnit timeUnit) {
102        executor.setThreadFactory(new ThreadFactoryBuilder()
103            .setDaemon(true)
104            .setThreadFactory(executor.getThreadFactory())
105            .build());
106    
107        ScheduledExecutorService service =
108            Executors.unconfigurableScheduledExecutorService(executor);
109    
110        addDelayedShutdownHook(service, terminationTimeout, timeUnit);
111    
112        return service;
113      }
114    
115      /**
116       * Add a shutdown hook to wait for thread completion in the given
117       * {@link ExecutorService service}.  This is useful if the given service uses
118       * daemon threads, and we want to keep the JVM from exiting immediately on
119       * shutdown, instead giving these daemon threads a chance to terminate
120       * normally.
121       * @param service ExecutorService which uses daemon threads
122       * @param terminationTimeout how long to wait for the executor to finish
123       *        before terminating the JVM
124       * @param timeUnit unit of time for the time parameter
125       */
126      @Beta
127      public static void addDelayedShutdownHook(
128          final ExecutorService service, final long terminationTimeout,
129          final TimeUnit timeUnit) {
130        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
131          @Override
132          public void run() {
133            try {
134              // We'd like to log progress and failures that may arise in the
135              // following code, but unfortunately the behavior of logging
136              // is undefined in shutdown hooks.
137              // This is because the logging code installs a shutdown hook of its
138              // own. See Cleaner class inside {@link LogManager}.
139              service.shutdown();
140              service.awaitTermination(terminationTimeout, timeUnit);
141            } catch (InterruptedException ignored) {
142              // We're shutting down anyway, so just ignore.
143            }
144          }
145        }));
146      }
147    
148      /**
149       * Converts the given ThreadPoolExecutor into an ExecutorService that exits
150       * when the application is complete.  It does so by using daemon threads and
151       * adding a shutdown hook to wait for their completion.
152       *
153       * <p>This method waits 120 seconds before continuing with JVM termination,
154       * even if the executor has not finished its work.
155       *
156       * <p>This is mainly for fixed thread pools.
157       * See {@link Executors#newFixedThreadPool(int)}.
158       *
159       * @param executor the executor to modify to make sure it exits when the
160       *        application is finished
161       * @return an unmodifiable version of the input which will not hang the JVM
162       */
163      @Beta
164      public static ExecutorService getExitingExecutorService(
165          ThreadPoolExecutor executor) {
166        return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
167      }
168    
169      /**
170       * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
171       * exits when the application is complete.  It does so by using daemon threads
172       * and adding a shutdown hook to wait for their completion.
173       *
174       * <p>This method waits 120 seconds before continuing with JVM termination,
175       * even if the executor has not finished its work.
176       *
177       * <p>This is mainly for fixed thread pools.
178       * See {@link Executors#newScheduledThreadPool(int)}.
179       *
180       * @param executor the executor to modify to make sure it exits when the
181       *        application is finished
182       * @return an unmodifiable version of the input which will not hang the JVM
183       */
184      @Beta
185      public static ScheduledExecutorService getExitingScheduledExecutorService(
186          ScheduledThreadPoolExecutor executor) {
187        return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
188      }
189    
190      /**
191       * Creates an executor service that runs each task in the thread
192       * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
193       * applies both to individually submitted tasks and to collections of tasks
194       * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
195       * tasks will run serially on the calling thread.  Tasks are run to
196       * completion before a {@code Future} is returned to the caller (unless the
197       * executor has been shutdown).
198       *
199       * <p>Although all tasks are immediately executed in the thread that
200       * submitted the task, this {@code ExecutorService} imposes a small
201       * locking overhead on each task submission in order to implement shutdown
202       * and termination behavior.
203       *
204       * <p>The implementation deviates from the {@code ExecutorService}
205       * specification with regards to the {@code shutdownNow} method.  First,
206       * "best-effort" with regards to canceling running tasks is implemented
207       * as "no-effort".  No interrupts or other attempts are made to stop
208       * threads executing tasks.  Second, the returned list will always be empty,
209       * as any submitted task is considered to have started execution.
210       * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
211       * which are pending serial execution, even the subset of the tasks that
212       * have not yet started execution.  It is unclear from the
213       * {@code ExecutorService} specification if these should be included, and
214       * it's much easier to implement the interpretation that they not be.
215       * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
216       * in concurrent calls to {@code invokeAll/invokeAny} throwing
217       * RejectedExecutionException, although a subset of the tasks may already
218       * have been executed.
219       *
220       * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
221       *        >mostly source-compatible</a> since 3.0)
222       */
223      public static ListeningExecutorService sameThreadExecutor() {
224        return new SameThreadExecutorService();
225      }
226    
227      // See sameThreadExecutor javadoc for behavioral notes.
228      private static class SameThreadExecutorService
229          extends AbstractListeningExecutorService {
230        /**
231         * Lock used whenever accessing the state variables
232         * (runningTasks, shutdown, terminationCondition) of the executor
233         */
234        private final Lock lock = new ReentrantLock();
235    
236        /** Signaled after the executor is shutdown and running tasks are done */
237        private final Condition termination = lock.newCondition();
238    
239        /*
240         * Conceptually, these two variables describe the executor being in
241         * one of three states:
242         *   - Active: shutdown == false
243         *   - Shutdown: runningTasks > 0 and shutdown == true
244         *   - Terminated: runningTasks == 0 and shutdown == true
245         */
246        private int runningTasks = 0;
247        private boolean shutdown = false;
248    
249        @Override
250        public void execute(Runnable command) {
251          startTask();
252          try {
253            command.run();
254          } finally {
255            endTask();
256          }
257        }
258    
259        @Override
260        public boolean isShutdown() {
261          lock.lock();
262          try {
263            return shutdown;
264          } finally {
265            lock.unlock();
266          }
267        }
268    
269        @Override
270        public void shutdown() {
271          lock.lock();
272          try {
273            shutdown = true;
274          } finally {
275            lock.unlock();
276          }
277        }
278    
279        // See sameThreadExecutor javadoc for unusual behavior of this method.
280        @Override
281        public List<Runnable> shutdownNow() {
282          shutdown();
283          return Collections.emptyList();
284        }
285    
286        @Override
287        public boolean isTerminated() {
288          lock.lock();
289          try {
290            return shutdown && runningTasks == 0;
291          } finally {
292            lock.unlock();
293          }
294        }
295    
296        @Override
297        public boolean awaitTermination(long timeout, TimeUnit unit)
298            throws InterruptedException {
299          long nanos = unit.toNanos(timeout);
300          lock.lock();
301          try {
302            for (;;) {
303              if (isTerminated()) {
304                return true;
305              } else if (nanos <= 0) {
306                return false;
307              } else {
308                nanos = termination.awaitNanos(nanos);
309              }
310            }
311          } finally {
312            lock.unlock();
313          }
314        }
315    
316        /**
317         * Checks if the executor has been shut down and increments the running
318         * task count.
319         *
320         * @throws RejectedExecutionException if the executor has been previously
321         *         shutdown
322         */
323        private void startTask() {
324          lock.lock();
325          try {
326            if (isShutdown()) {
327              throw new RejectedExecutionException("Executor already shutdown");
328            }
329            runningTasks++;
330          } finally {
331            lock.unlock();
332          }
333        }
334    
335        /**
336         * Decrements the running task count.
337         */
338        private void endTask() {
339          lock.lock();
340          try {
341            runningTasks--;
342            if (isTerminated()) {
343              termination.signalAll();
344            }
345          } finally {
346            lock.unlock();
347          }
348        }
349      }
350    
351      /**
352       * Creates an {@link ExecutorService} whose {@code submit} and {@code
353       * invokeAll} methods submit {@link ListenableFutureTask} instances to the
354       * given delegate executor. Those methods, as well as {@code execute} and
355       * {@code invokeAny}, are implemented in terms of calls to {@code
356       * delegate.execute}. All other methods are forwarded unchanged to the
357       * delegate. This implies that the returned {@code ListeningExecutorService}
358       * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
359       * invokeAny} methods, so any special handling of tasks must be implemented in
360       * the delegate's {@code execute} method or by wrapping the returned {@code
361       * ListeningExecutorService}.
362       *
363       * <p>If the delegate executor was already an instance of {@code
364       * ListeningExecutorService}, it is returned untouched, and the rest of this
365       * documentation does not apply.
366       *
367       * @since 10.0
368       */
369      public static ListeningExecutorService listeningDecorator(
370          ExecutorService delegate) {
371        return (delegate instanceof ListeningExecutorService)
372            ? (ListeningExecutorService) delegate
373            : (delegate instanceof ScheduledExecutorService)
374            ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
375            : new ListeningDecorator(delegate);
376      }
377    
378      /**
379       * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
380       * invokeAll} methods submit {@link ListenableFutureTask} instances to the
381       * given delegate executor. Those methods, as well as {@code execute} and
382       * {@code invokeAny}, are implemented in terms of calls to {@code
383       * delegate.execute}. All other methods are forwarded unchanged to the
384       * delegate. This implies that the returned {@code
385       * SchedulingListeningExecutorService} never calls the delegate's {@code
386       * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
387       * handling of tasks must be implemented in the delegate's {@code execute}
388       * method or by wrapping the returned {@code
389       * SchedulingListeningExecutorService}.
390       *
391       * <p>If the delegate executor was already an instance of {@code
392       * ListeningScheduledExecutorService}, it is returned untouched, and the rest
393       * of this documentation does not apply.
394       *
395       * @since 10.0
396       */
397      public static ListeningScheduledExecutorService listeningDecorator(
398          ScheduledExecutorService delegate) {
399        return (delegate instanceof ListeningScheduledExecutorService)
400            ? (ListeningScheduledExecutorService) delegate
401            : new ScheduledListeningDecorator(delegate);
402      }
403    
404      private static class ListeningDecorator
405          extends AbstractListeningExecutorService {
406        final ExecutorService delegate;
407    
408        ListeningDecorator(ExecutorService delegate) {
409          this.delegate = checkNotNull(delegate);
410        }
411    
412        @Override
413        public boolean awaitTermination(long timeout, TimeUnit unit)
414            throws InterruptedException {
415          return delegate.awaitTermination(timeout, unit);
416        }
417    
418        @Override
419        public boolean isShutdown() {
420          return delegate.isShutdown();
421        }
422    
423        @Override
424        public boolean isTerminated() {
425          return delegate.isTerminated();
426        }
427    
428        @Override
429        public void shutdown() {
430          delegate.shutdown();
431        }
432    
433        @Override
434        public List<Runnable> shutdownNow() {
435          return delegate.shutdownNow();
436        }
437    
438        @Override
439        public void execute(Runnable command) {
440          delegate.execute(command);
441        }
442      }
443    
444      private static class ScheduledListeningDecorator
445          extends ListeningDecorator implements ListeningScheduledExecutorService {
446        final ScheduledExecutorService delegate;
447    
448        ScheduledListeningDecorator(ScheduledExecutorService delegate) {
449          super(delegate);
450          this.delegate = checkNotNull(delegate);
451        }
452    
453        @Override
454        public ScheduledFuture<?> schedule(
455            Runnable command, long delay, TimeUnit unit) {
456          return delegate.schedule(command, delay, unit);
457        }
458    
459        @Override
460        public <V> ScheduledFuture<V> schedule(
461            Callable<V> callable, long delay, TimeUnit unit) {
462          return delegate.schedule(callable, delay, unit);
463        }
464    
465        @Override
466        public ScheduledFuture<?> scheduleAtFixedRate(
467            Runnable command, long initialDelay, long period, TimeUnit unit) {
468          return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
469        }
470    
471        @Override
472        public ScheduledFuture<?> scheduleWithFixedDelay(
473            Runnable command, long initialDelay, long delay, TimeUnit unit) {
474          return delegate.scheduleWithFixedDelay(
475              command, initialDelay, delay, unit);
476        }
477      }
478    }