001 /* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 021 import com.google.common.annotations.Beta; 022 import com.google.common.annotations.VisibleForTesting; 023 024 import java.util.concurrent.Executor; 025 import java.util.concurrent.Executors; 026 import java.util.concurrent.Future; 027 import java.util.concurrent.ThreadFactory; 028 import java.util.concurrent.atomic.AtomicBoolean; 029 030 /** 031 * Utilities necessary for working with libraries that supply plain {@link 032 * Future} instances. 033 * 034 * @author Sven Mawson 035 * @since 10.0 (replacing {@code Futures.makeListenable}, which 036 * existed in 1.0) 037 */ 038 @Beta 039 public 040 final class JdkFutureAdapters { 041 /** 042 * Assigns a thread to the given {@link Future} to provide {@link 043 * ListenableFuture} functionality. 044 * 045 * <p><b>Warning:</b> If the input future does not already implement {@link 046 * ListenableFuture}, the returned future will emulate {@link 047 * ListenableFuture#addListener} by taking a thread from an internal, 048 * unbounded pool at the first call to {@code addListener} and holding it 049 * until the future is {@linkplain Future#isDone() done}. 050 * 051 * <p>Prefer to create {@code ListenableFuture} instances with {@link 052 * SettableFuture}, {@link MoreExecutors#listeningDecorator( 053 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask}, 054 * {@link AbstractFuture}, and other utilities over creating plain {@code 055 * Future} instances to be upgraded to {@code ListenableFuture} after the 056 * fact. 057 */ 058 public static <V> ListenableFuture<V> listenInPoolThread( 059 Future<V> future) { 060 if (future instanceof ListenableFuture<?>) { 061 return (ListenableFuture<V>) future; 062 } 063 return new ListenableFutureAdapter<V>(future); 064 } 065 066 @VisibleForTesting 067 static <V> ListenableFuture<V> listenInPoolThread( 068 Future<V> future, Executor executor) { 069 checkNotNull(executor); 070 if (future instanceof ListenableFuture<?>) { 071 return (ListenableFuture<V>) future; 072 } 073 return new ListenableFutureAdapter<V>(future, executor); 074 } 075 076 /** 077 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This 078 * will wait on the future to finish, and when it completes, run the 079 * listeners. This implementation will wait on the source future 080 * indefinitely, so if the source future never completes, the adapter will 081 * never complete either. 082 * 083 * <p>If the delegate future is interrupted or throws an unexpected unchecked 084 * exception, the listeners will not be invoked. 085 */ 086 private static class ListenableFutureAdapter<V> extends ForwardingFuture<V> 087 implements ListenableFuture<V> { 088 089 private static final ThreadFactory threadFactory = 090 new ThreadFactoryBuilder() 091 .setNameFormat("ListenableFutureAdapter-thread-%d") 092 .build(); 093 private static final Executor defaultAdapterExecutor = 094 Executors.newCachedThreadPool(threadFactory); 095 096 private final Executor adapterExecutor; 097 098 // The execution list to hold our listeners. 099 private final ExecutionList executionList = new ExecutionList(); 100 101 // This allows us to only start up a thread waiting on the delegate future 102 // when the first listener is added. 103 private final AtomicBoolean hasListeners = new AtomicBoolean(false); 104 105 // The delegate future. 106 private final Future<V> delegate; 107 108 ListenableFutureAdapter(Future<V> delegate) { 109 this(delegate, defaultAdapterExecutor); 110 } 111 112 ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) { 113 this.delegate = checkNotNull(delegate); 114 this.adapterExecutor = checkNotNull(adapterExecutor); 115 } 116 117 @Override 118 protected Future<V> delegate() { 119 return delegate; 120 } 121 122 @Override 123 public void addListener(Runnable listener, Executor exec) { 124 executionList.add(listener, exec); 125 126 // When a listener is first added, we run a task that will wait for 127 // the delegate to finish, and when it is done will run the listeners. 128 if (hasListeners.compareAndSet(false, true)) { 129 if (delegate.isDone()) { 130 // If the delegate is already done, run the execution list 131 // immediately on the current thread. 132 executionList.execute(); 133 return; 134 } 135 136 adapterExecutor.execute(new Runnable() { 137 @Override 138 public void run() { 139 try { 140 delegate.get(); 141 } catch (Error e) { 142 throw e; 143 } catch (InterruptedException e) { 144 Thread.currentThread().interrupt(); 145 // Threads from our private pool are never interrupted. 146 throw new AssertionError(e); 147 } catch (Throwable e) { 148 // ExecutionException / CancellationException / RuntimeException 149 // The task is done, run the listeners. 150 } 151 executionList.execute(); 152 } 153 }); 154 } 155 } 156 } 157 158 private JdkFutureAdapters() {} 159 }