001 /* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.eventbus; 018 019 import com.google.common.annotations.Beta; 020 import com.google.common.annotations.VisibleForTesting; 021 import com.google.common.base.Supplier; 022 import com.google.common.base.Throwables; 023 import com.google.common.cache.Cache; 024 import com.google.common.cache.CacheBuilder; 025 import com.google.common.cache.CacheLoader; 026 import com.google.common.collect.Lists; 027 import com.google.common.collect.Multimap; 028 import com.google.common.collect.Multimaps; 029 import com.google.common.collect.SetMultimap; 030 import com.google.common.collect.Sets; 031 032 import java.lang.reflect.InvocationTargetException; 033 import java.util.Collection; 034 import java.util.List; 035 import java.util.Map.Entry; 036 import java.util.Set; 037 import java.util.concurrent.ConcurrentHashMap; 038 import java.util.concurrent.ConcurrentLinkedQueue; 039 import java.util.concurrent.CopyOnWriteArraySet; 040 import java.util.concurrent.ExecutionException; 041 import java.util.logging.Level; 042 import java.util.logging.Logger; 043 044 /** 045 * Dispatches events to listeners, and provides ways for listeners to register 046 * themselves. 047 * 048 * <p>The EventBus allows publish-subscribe-style communication between 049 * components without requiring the components to explicitly register with one 050 * another (and thus be aware of each other). It is designed exclusively to 051 * replace traditional Java in-process event distribution using explicit 052 * registration. It is <em>not</em> a general-purpose publish-subscribe system, 053 * nor is it intended for interprocess communication. 054 * 055 * <h2>Receiving Events</h2> 056 * To receive events, an object should:<ol> 057 * <li>Expose a public method, known as the <i>event handler</i>, which accepts 058 * a single argument of the type of event desired;</li> 059 * <li>Mark it with a {@link Subscribe} annotation;</li> 060 * <li>Pass itself to an EventBus instance's {@link #register(Object)} method. 061 * </li> 062 * </ol> 063 * 064 * <h2>Posting Events</h2> 065 * To post an event, simply provide the event object to the 066 * {@link #post(Object)} method. The EventBus instance will determine the type 067 * of event and route it to all registered listeners. 068 * 069 * <p>Events are routed based on their type — an event will be delivered 070 * to any handler for any type to which the event is <em>assignable.</em> This 071 * includes implemented interfaces, all superclasses, and all interfaces 072 * implemented by superclasses. 073 * 074 * <p>When {@code post} is called, all registered handlers for an event are run 075 * in sequence, so handlers should be reasonably quick. If an event may trigger 076 * an extended process (such as a database load), spawn a thread or queue it for 077 * later. (For a convenient way to do this, use an {@link AsyncEventBus}.) 078 * 079 * <h2>Handler Methods</h2> 080 * Event handler methods must accept only one argument: the event. 081 * 082 * <p>Handlers should not, in general, throw. If they do, the EventBus will 083 * catch and log the exception. This is rarely the right solution for error 084 * handling and should not be relied upon; it is intended solely to help find 085 * problems during development. 086 * 087 * <p>The EventBus guarantees that it will not call a handler method from 088 * multiple threads simultaneously, unless the method explicitly allows it by 089 * bearing the {@link AllowConcurrentEvents} annotation. If this annotation is 090 * not present, handler methods need not worry about being reentrant, unless 091 * also called from outside the EventBus. 092 * 093 * <h2>Dead Events</h2> 094 * If an event is posted, but no registered handlers can accept it, it is 095 * considered "dead." To give the system a second chance to handle dead events, 096 * they are wrapped in an instance of {@link DeadEvent} and reposted. 097 * 098 * <p>If a handler for a supertype of all events (such as Object) is registered, 099 * no event will ever be considered dead, and no DeadEvents will be generated. 100 * Accordingly, while DeadEvent extends {@link Object}, a handler registered to 101 * receive any Object will never receive a DeadEvent. 102 * 103 * <p>This class is safe for concurrent use. 104 * 105 * @author Cliff Biffle 106 * @since 10.0 107 */ 108 @Beta 109 public class EventBus { 110 111 /** 112 * All registered event handlers, indexed by event type. 113 */ 114 private final SetMultimap<Class<?>, EventHandler> handlersByType = 115 Multimaps.newSetMultimap(new ConcurrentHashMap<Class<?>, Collection<EventHandler>>(), 116 new Supplier<Set<EventHandler>>() { 117 @Override 118 public Set<EventHandler> get() { 119 return new CopyOnWriteArraySet<EventHandler>(); 120 } 121 }); 122 123 /** 124 * Logger for event dispatch failures. Named by the fully-qualified name of 125 * this class, followed by the identifier provided at construction. 126 */ 127 private final Logger logger; 128 129 /** 130 * Strategy for finding handler methods in registered objects. Currently, 131 * only the {@link AnnotatedHandlerFinder} is supported, but this is 132 * encapsulated for future expansion. 133 */ 134 private final HandlerFindingStrategy finder = new AnnotatedHandlerFinder(); 135 136 /** queues of events for the current thread to dispatch */ 137 private final ThreadLocal<ConcurrentLinkedQueue<EventWithHandler>> 138 eventsToDispatch = 139 new ThreadLocal<ConcurrentLinkedQueue<EventWithHandler>>() { 140 @Override protected ConcurrentLinkedQueue<EventWithHandler> initialValue() { 141 return new ConcurrentLinkedQueue<EventWithHandler>(); 142 } 143 }; 144 145 /** true if the current thread is currently dispatching an event */ 146 private final ThreadLocal<Boolean> isDispatching = 147 new ThreadLocal<Boolean>() { 148 @Override protected Boolean initialValue() { 149 return false; 150 } 151 }; 152 153 /** 154 * A thread-safe cache for flattenHierarch(). The Class class is immutable. 155 */ 156 private Cache<Class<?>, Set<Class<?>>> flattenHierarchyCache = 157 CacheBuilder.newBuilder() 158 .weakKeys() 159 .build(new CacheLoader<Class<?>, Set<Class<?>>>() { 160 @Override 161 public Set<Class<?>> load(Class<?> concreteClass) throws Exception { 162 List<Class<?>> parents = Lists.newLinkedList(); 163 Set<Class<?>> classes = Sets.newHashSet(); 164 165 parents.add(concreteClass); 166 167 while (!parents.isEmpty()) { 168 Class<?> clazz = parents.remove(0); 169 classes.add(clazz); 170 171 Class<?> parent = clazz.getSuperclass(); 172 if (parent != null) { 173 parents.add(parent); 174 } 175 176 for (Class<?> iface : clazz.getInterfaces()) { 177 parents.add(iface); 178 } 179 } 180 181 return classes; 182 } 183 }); 184 185 /** 186 * Creates a new EventBus named "default". 187 */ 188 public EventBus() { 189 this("default"); 190 } 191 192 /** 193 * Creates a new EventBus with the given {@code identifier}. 194 * 195 * @param identifier a brief name for this bus, for logging purposes. Should 196 * be a valid Java identifier. 197 */ 198 public EventBus(String identifier) { 199 logger = Logger.getLogger(EventBus.class.getName() + "." + identifier); 200 } 201 202 /** 203 * Registers all handler methods on {@code object} to receive events. 204 * Handler methods are selected and classified using this EventBus's 205 * {@link HandlerFindingStrategy}; the default strategy is the 206 * {@link AnnotatedHandlerFinder}. 207 * 208 * @param object object whose handler methods should be registered. 209 */ 210 public void register(Object object) { 211 handlersByType.putAll(finder.findAllHandlers(object)); 212 } 213 214 /** 215 * Unregisters all handler methods on a registered {@code object}. 216 * 217 * @param object object whose handler methods should be unregistered. 218 * @throws IllegalArgumentException if the object was not previously registered. 219 */ 220 public void unregister(Object object) { 221 Multimap<Class<?>, EventHandler> methodsInListener = finder.findAllHandlers(object); 222 for (Entry<Class<?>, Collection<EventHandler>> entry : methodsInListener.asMap().entrySet()) { 223 Set<EventHandler> currentHandlers = getHandlersForEventType(entry.getKey()); 224 Collection<EventHandler> eventMethodsInListener = entry.getValue(); 225 226 if (currentHandlers == null || !currentHandlers.containsAll(entry.getValue())) { 227 throw new IllegalArgumentException( 228 "missing event handler for an annotated method. Is " + object + " registered?"); 229 } 230 currentHandlers.removeAll(eventMethodsInListener); 231 } 232 } 233 234 /** 235 * Posts an event to all registered handlers. This method will return 236 * successfully after the event has been posted to all handlers, and 237 * regardless of any exceptions thrown by handlers. 238 * 239 * <p>If no handlers have been subscribed for {@code event}'s class, and 240 * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a 241 * DeadEvent and reposted. 242 * 243 * @param event event to post. 244 */ 245 public void post(Object event) { 246 Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass()); 247 248 boolean dispatched = false; 249 for (Class<?> eventType : dispatchTypes) { 250 Set<EventHandler> wrappers = getHandlersForEventType(eventType); 251 252 if (wrappers != null && !wrappers.isEmpty()) { 253 dispatched = true; 254 for (EventHandler wrapper : wrappers) { 255 enqueueEvent(event, wrapper); 256 } 257 } 258 } 259 260 if (!dispatched && !(event instanceof DeadEvent)) { 261 post(new DeadEvent(this, event)); 262 } 263 264 dispatchQueuedEvents(); 265 } 266 267 /** 268 * Queue the {@code event} for dispatch during 269 * {@link #dispatchQueuedEvents()}. Events are queued in-order of occurrence 270 * so they can be dispatched in the same order. 271 */ 272 protected void enqueueEvent(Object event, EventHandler handler) { 273 eventsToDispatch.get().offer(new EventWithHandler(event, handler)); 274 } 275 276 /** 277 * Drain the queue of events to be dispatched. As the queue is being drained, 278 * new events may be posted to the end of the queue. 279 */ 280 protected void dispatchQueuedEvents() { 281 // don't dispatch if we're already dispatching, that would allow reentrancy 282 // and out-of-order events. Instead, leave the events to be dispatched 283 // after the in-progress dispatch is complete. 284 if (isDispatching.get()) { 285 return; 286 } 287 288 isDispatching.set(true); 289 try { 290 while (true) { 291 EventWithHandler eventWithHandler = eventsToDispatch.get().poll(); 292 if (eventWithHandler == null) { 293 break; 294 } 295 296 dispatch(eventWithHandler.event, eventWithHandler.handler); 297 } 298 } finally { 299 isDispatching.set(false); 300 } 301 } 302 303 /** 304 * Dispatches {@code event} to the handler in {@code wrapper}. This method 305 * is an appropriate override point for subclasses that wish to make 306 * event delivery asynchronous. 307 * 308 * @param event event to dispatch. 309 * @param wrapper wrapper that will call the handler. 310 */ 311 protected void dispatch(Object event, EventHandler wrapper) { 312 try { 313 wrapper.handleEvent(event); 314 } catch (InvocationTargetException e) { 315 logger.log(Level.SEVERE, 316 "Could not dispatch event: " + event + " to handler " + wrapper, e); 317 } 318 } 319 320 /** 321 * Retrieves a mutable set of the currently registered handlers for 322 * {@code type}. If no handlers are currently registered for {@code type}, 323 * this method may either return {@code null} or an empty set. 324 * 325 * @param type type of handlers to retrieve. 326 * @return currently registered handlers, or {@code null}. 327 */ 328 Set<EventHandler> getHandlersForEventType(Class<?> type) { 329 return handlersByType.get(type); 330 } 331 332 /** 333 * Creates a new Set for insertion into the handler map. This is provided 334 * as an override point for subclasses. The returned set should support 335 * concurrent access. 336 * 337 * @return a new, mutable set for handlers. 338 */ 339 protected Set<EventHandler> newHandlerSet() { 340 return new CopyOnWriteArraySet<EventHandler>(); 341 } 342 343 /** 344 * Flattens a class's type hierarchy into a set of Class objects. The set 345 * will include all superclasses (transitively), and all interfaces 346 * implemented by these superclasses. 347 * 348 * @param concreteClass class whose type hierarchy will be retrieved. 349 * @return {@code clazz}'s complete type hierarchy, flattened and uniqued. 350 */ 351 @VisibleForTesting 352 Set<Class<?>> flattenHierarchy(Class<?> concreteClass) { 353 try { 354 return flattenHierarchyCache.get(concreteClass); 355 } catch (ExecutionException e) { 356 throw Throwables.propagate(e.getCause()); 357 } 358 } 359 360 /** simple struct representing an event and it's handler */ 361 static class EventWithHandler { 362 final Object event; 363 final EventHandler handler; 364 public EventWithHandler(Object event, EventHandler handler) { 365 this.event = event; 366 this.handler = handler; 367 } 368 } 369 }