001 /* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.eventbus; 018 019 import com.google.common.annotations.Beta; 020 import java.util.concurrent.ConcurrentLinkedQueue; 021 import java.util.concurrent.Executor; 022 023 /** 024 * An {@link EventBus} that takes the Executor of your choice and uses it to 025 * dispatch events, allowing dispatch to occur asynchronously. 026 * 027 * @author Cliff Biffle 028 * @since 10.0 029 */ 030 @Beta 031 public class AsyncEventBus extends EventBus { 032 private final Executor executor; 033 034 /** the queue of events is shared across all threads */ 035 private final ConcurrentLinkedQueue<EventWithHandler> eventsToDispatch = 036 new ConcurrentLinkedQueue<EventWithHandler>(); 037 038 /** 039 * Creates a new AsyncEventBus that will use {@code executor} to dispatch 040 * events. Assigns {@code identifier} as the bus's name for logging purposes. 041 * 042 * @param identifier short name for the bus, for logging purposes. 043 * @param executor Executor to use to dispatch events. It is the caller's 044 * responsibility to shut down the executor after the last event has 045 * been posted to this event bus. 046 */ 047 public AsyncEventBus(String identifier, Executor executor) { 048 super(identifier); 049 this.executor = executor; 050 } 051 052 /** 053 * Creates a new AsyncEventBus that will use {@code executor} to dispatch 054 * events. 055 * 056 * @param executor Executor to use to dispatch events. It is the caller's 057 * responsibility to shut down the executor after the last event has 058 * been posted to this event bus. 059 */ 060 public AsyncEventBus(Executor executor) { 061 this.executor = executor; 062 } 063 064 @Override 065 protected void enqueueEvent(Object event, EventHandler handler) { 066 eventsToDispatch.offer(new EventWithHandler(event, handler)); 067 } 068 069 /** 070 * Dispatch {@code events} in the order they were posted, regardless of 071 * the posting thread. 072 */ 073 @Override 074 protected void dispatchQueuedEvents() { 075 while (true) { 076 EventWithHandler eventWithHandler = eventsToDispatch.poll(); 077 if (eventWithHandler == null) { 078 break; 079 } 080 081 dispatch(eventWithHandler.event, eventWithHandler.handler); 082 } 083 } 084 085 /** 086 * Calls the {@link #executor} to dispatch {@code event} to {@code handler}. 087 */ 088 @Override 089 protected void dispatch(final Object event, final EventHandler handler) { 090 executor.execute(new Runnable() { 091 @Override 092 @SuppressWarnings("synthetic-access") 093 public void run() { 094 AsyncEventBus.super.dispatch(event, handler); 095 } 096 }); 097 } 098 099 }