001    /*
002     * Copyright (C) 2011 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static java.util.concurrent.TimeUnit.NANOSECONDS;
020    
021    import com.google.common.annotations.Beta;
022    import com.google.common.base.Preconditions;
023    
024    import java.util.concurrent.BlockingQueue;
025    import java.util.concurrent.CountDownLatch;
026    import java.util.concurrent.ExecutionException;
027    import java.util.concurrent.Future;
028    import java.util.concurrent.TimeUnit;
029    import java.util.concurrent.TimeoutException;
030    
031    /**
032     * Utilities for treating interruptible operations as uninterruptible.
033     * In all cases, if a thread is interrupted during such a call, the call
034     * continues to block until the result is available or the timeout elapses,
035     * and only then re-interrupts the thread.
036     *
037     * @author Anthony Zana
038     * @since 10.0
039     */
040    @Beta
041    public final class Uninterruptibles {
042    
043      // Implementation Note: As of 3-7-11, the logic for each blocking/timeout
044      // methods is identical, save for method being invoked.
045    
046      /**
047       * Invokes {@code latch.}{@link CountDownLatch#await() await()}
048       * uninterruptibly.
049       */
050      public static void awaitUninterruptibly(CountDownLatch latch) {
051        boolean interrupted = false;
052        try {
053          while (true) {
054            try {
055              latch.await();
056              return;
057            } catch (InterruptedException e) {
058              interrupted = true;
059            }
060          }
061        } finally {
062          if (interrupted) {
063            Thread.currentThread().interrupt();
064          }
065        }
066      }
067    
068      /**
069       * Invokes
070       * {@code latch.}{@link CountDownLatch#await(long, TimeUnit)
071       * await(timeout, unit)} uninterruptibly.
072       */
073      public static boolean awaitUninterruptibly(CountDownLatch latch,
074          long timeout, TimeUnit unit) {
075        boolean interrupted = false;
076        try {
077          long remainingNanos = unit.toNanos(timeout);
078          long end = System.nanoTime() + remainingNanos;
079    
080          while (true) {
081            try {
082              // CountDownLatch treats negative timeouts just like zero.
083              return latch.await(remainingNanos, NANOSECONDS);
084            } catch (InterruptedException e) {
085              interrupted = true;
086              remainingNanos = end - System.nanoTime();
087            }
088          }
089        } finally {
090          if (interrupted) {
091            Thread.currentThread().interrupt();
092          }
093        }
094      }
095    
096      /**
097       * Invokes {@code toJoin.}{@link Thread#join() join()} uninterruptibly.
098       */
099      public static void joinUninterruptibly(Thread toJoin) {
100        boolean interrupted = false;
101        try {
102          while (true) {
103            try {
104              toJoin.join();
105              return;
106            } catch (InterruptedException e) {
107              interrupted = true;
108            }
109          }
110        } finally {
111          if (interrupted) {
112            Thread.currentThread().interrupt();
113          }
114        }
115      }
116    
117      /**
118       * Invokes {@code future.}{@link Future#get() get()} uninterruptibly.
119       * To get uninterruptibility and remove checked exceptions, see
120       * {@link Futures#getUnchecked}.
121       *
122       * <p>If instead, you wish to treat {@link InterruptedException} uniformly
123       * with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
124       * or {@link Futures#makeChecked}.
125       */
126      public static <V> V getUninterruptibly(Future<V> future)
127          throws ExecutionException {
128        boolean interrupted = false;
129        try {
130          while (true) {
131            try {
132              return future.get();
133            } catch (InterruptedException e) {
134              interrupted = true;
135            }
136          }
137        } finally {
138          if (interrupted) {
139            Thread.currentThread().interrupt();
140          }
141        }
142      }
143    
144      /**
145       * Invokes
146       * {@code future.}{@link Future#get(long, TimeUnit) get(timeout, unit)}
147       * uninterruptibly.
148       *
149       * <p>If instead, you wish to treat {@link InterruptedException} uniformly
150       * with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
151       * or {@link Futures#makeChecked}.
152       */
153      public static <V> V getUninterruptibly(
154          Future<V> future, long timeout,  TimeUnit unit)
155              throws ExecutionException, TimeoutException {
156        boolean interrupted = false;
157        try {
158          long remainingNanos = unit.toNanos(timeout);
159          long end = System.nanoTime() + remainingNanos;
160    
161          while (true) {
162            try {
163              // Future treats negative timeouts just like zero.
164              return future.get(remainingNanos, NANOSECONDS);
165            } catch (InterruptedException e) {
166              interrupted = true;
167              remainingNanos = end - System.nanoTime();
168            }
169          }
170        } finally {
171          if (interrupted) {
172            Thread.currentThread().interrupt();
173          }
174        }
175      }
176    
177      /**
178       * Invokes
179       * {@code unit.}{@link TimeUnit#timedJoin(Thread, long)
180       * timedJoin(toJoin, timeout)} uninterruptibly.
181       */
182      public static void joinUninterruptibly(Thread toJoin,
183          long timeout, TimeUnit unit) {
184        Preconditions.checkNotNull(toJoin);
185        boolean interrupted = false;
186        try {
187          long remainingNanos = unit.toNanos(timeout);
188          long end = System.nanoTime() + remainingNanos;
189          while (true) {
190            try {
191              // TimeUnit.timedJoin() treats negative timeouts just like zero.
192              NANOSECONDS.timedJoin(toJoin, remainingNanos);
193              return;
194            } catch (InterruptedException e) {
195              interrupted = true;
196              remainingNanos = end - System.nanoTime();
197            }
198          }
199        } finally {
200          if (interrupted) {
201            Thread.currentThread().interrupt();
202          }
203        }
204      }
205    
206      /**
207       * Invokes {@code queue.}{@link BlockingQueue#take() take()} uninterruptibly.
208       */
209      public static <E> E takeUninterruptibly(BlockingQueue<E> queue) {
210        boolean interrupted = false;
211        try {
212          while (true) {
213            try {
214              return queue.take();
215            } catch (InterruptedException e) {
216              interrupted = true;
217            }
218          }
219        } finally {
220          if (interrupted) {
221            Thread.currentThread().interrupt();
222          }
223        }
224      }
225    
226      /**
227       * Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)}
228       * uninterruptibly.
229       */
230      public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) {
231        boolean interrupted = false;
232        try {
233          while (true) {
234            try {
235              queue.put(element);
236              return;
237            } catch (InterruptedException e) {
238              interrupted = true;
239            }
240          }
241        } finally {
242          if (interrupted) {
243            Thread.currentThread().interrupt();
244          }
245        }
246      }
247    
248      // TODO(user): Support Sleeper somehow (wrapper or interface method)?
249      /**
250       * Invokes {@code unit.}{@link TimeUnit#sleep(long) sleep(sleepFor)}
251       * uninterruptibly.
252       */
253      public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
254        boolean interrupted = false;
255        try {
256          long remainingNanos = unit.toNanos(sleepFor);
257          long end = System.nanoTime() + remainingNanos;
258          while (true) {
259            try {
260              // TimeUnit.sleep() treats negative timeouts just like zero.
261              NANOSECONDS.sleep(remainingNanos);
262              return;
263            } catch (InterruptedException e) {
264              interrupted = true;
265              remainingNanos = end - System.nanoTime();
266            }
267          }
268        } finally {
269          if (interrupted) {
270            Thread.currentThread().interrupt();
271          }
272        }
273      }
274    
275      // TODO(user): Add support for waitUninterruptibly.
276    
277      private Uninterruptibles() {}
278    }