前言
之前使用了一下Spring Event,虽然知道大概又是观察者模式,但还是比较好奇它的实现。源码也比较好理解。 其中
基本使用
使用非常简单。
定义事件
public class CustomSpringEvent extends ApplicationEvent {
private String message;
public CustomSpringEvent(Object source, String message) {
super(source);
this.message = message;
}
public String getMessage() {
return message;
}
}
监听事件
@Component
public class CustomSpringEventListener implements ApplicationListener<CustomSpringEvent> {
@Override
public void onApplicationEvent(CustomSpringEvent event) {
System.out.println("Received spring custom event - " + event.getMessage());
}
}
也可以直接使用Spring注解形式声明。
@EventListener
public void handleContextStart(ContextStartedEvent cse) {
System.out.println("Handling context started event.");
}
发布事件
@Component
public class CustomSpringEventPublisher {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void publishCustomEvent(final String message) {
System.out.println("Publishing custom event. ");
CustomSpringEvent customSpringEvent = new CustomSpringEvent(this, message);
applicationEventPublisher.publishEvent(customSpringEvent);
}
}
开启异步
以上是同步的——事件发布后会阻塞掉,等待监听器执行完成。异步事件需要开启异步:
@Configuration
public class AsynchronousSpringEventsConfig {
@Bean(name = "applicationEventMulticaster")
public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster =
new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return eventMulticaster;
}
}
什么?配置自定义的ApplicationEventMulticaster,简单配置一个Executor就能支持异步了?为啥?这个答案需要在源码解析中才能回答。 还有一种更简单的方式,通过Spring注解开启异步。
@Async
@EventListener(MsgEvent.class)
public void func(MsgEvent event) {
}
@EnableAsync
@SpringBootApplication
public class SpringbootEventApplication {
}
观察者模式
理解Spring Event可以来复习一下观察者模式,或者说Spring Event的实现是一个理解观察者模式的案例。 
public class Subject {
private List<Observer> observers
= new ArrayList<Observer>();
private int state;
public void setState(int state) {
this.state = state;
notifyAllObservers();
}
public void notifyAllObservers(){
for (Observer observer : observers) {
observer.update();
}
}
}
观察者模式一个Subject注册许多Observer,在Subject的事件(state)变更时,将会执行所有注册的Observer。 一句话说明本质:Event发生时,会遍历执行所有的监听器(Observer)。
源码解析
发布
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
}
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
这里就体现了观察者模式:Event发生时,会遍历执行所有的监听器。 executor.execute(() -> invokeListener(listener, event)) 这里也可以回答上边的问题,为啥配置Excutor就能实现异步事件。
事件注册
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
try {
initApplicationEventMulticaster();
onRefresh();
registerListeners();
}
}
}
protected void registerListeners() {
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
}
public void addApplicationListener(ApplicationListener<?> listener) {
synchronized (this.retrievalMutex) {
Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
if (singletonTarget instanceof ApplicationListener) {
this.defaultRetriever.applicationListeners.remove(singletonTarget);
}
this.defaultRetriever.applicationListeners.add(listener);
this.retrieverCache.clear();
}
}
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
applicationListeners是一个set,默认所有Listener都会存放在这。这时候就有些问题了——Listener不是应该和Event绑定在一起吗?就像观察者模式一样,Observer都是注册到Event对应的Subject里的。至少也应该有个map关系,将Event和一系列Listener绑定吧? 我们从发布中找线索,getApplicationListeners。
protected Collection<ApplicationListener<?>> getApplicationListeners(ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
this.retrieverCache.put(cacheKey, retriever);
return listeners;
}
}
else {
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) {
List<ApplicationListener<?>> allListeners = new ArrayList<>();
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
synchronized (this.retrievalMutex) {
listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
}
for (ApplicationListener<?> listener : listeners) {
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListeners.add(listener);
}
allListeners.add(listener);
}
}
return allListeners;
}
可以发现,它就是在getApplicationListeners顺便建立映射。
|