这是事件通知里处理事件的落脚点,也就是事件执行的落地
先贴下接口Subscriber
public abstract class Subscriber<T extends Event> {
public abstract void onEvent(T event);
public abstract Class<? extends Event> subscribeType();
public Executor executor() {
return null;
}
public boolean ignoreExpireEvent() {
return false;
}
}
从接口描述上,就能清晰的了解到这个接口的主流程
SmartSubscriber
Subscriber有一个SmartSubscriber的扩展子类,是用来针对多事件订阅的List<Class<? extends Event>> subscribeTypes() 该方法返回的是这个订阅者所需要订阅的事件列表
----->代码片段1
public abstract class SmartSubscriber extends Subscriber {
public abstract List<Class<? extends Event>> subscribeTypes();
}
----->代码片段2
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
@Override
public List<Class<? extends Event>> subscribeTypes() {
List<Class<? extends Event>> result = new LinkedList<>();
result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);
result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);
result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);
result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);
result.add(ClientEvent.ClientDisconnectEvent.class);
return result;
}
}
----->代码片段3
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
addSubscriber(consumer, subscribeType, factory);
}
}
return;
}
}
这里贴了三段代码 ??代码片段一,就是SmartSubscriber的抽象List<Class<? extends Event>> subscribeTypes() ??代码片段二,拿某一个该接口的实现举例子,返回了一个List,里面包含了该订阅者订阅的事件类型集合,那么我就可以这样理解,这个接口的返回值所对应的事件,就是该订阅者订阅的事件,通过代码片段三NotifyCenter#registerSubscriber 注册 ??代码片段三,又回到了NotifyCenter,订阅者注册的时候,首先是先判定了当前订阅者是监听的是否是多事件的,其事件注册的具体细节可以参考NotifyCenter的源码分析
subscribeType
这个接口和上面说的SmartSubscriber 的subscribeTypes() 接口没有多大的区别,只是这个返回的是一个事件,上面的返回的是多个事件
@Override
public Class<? extends Event> subscribeType() {
return Xxxxxxxx.class;
}
executor
executor() 上一篇的EventPublisher在选择通过异步方式还是同步方式的时候就是调用的这里来判定的,我们可以看到这个的默认实现是null
@Override
public void notifySubscriber(Subscriber subscriber, Event event) {
final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
}
}
}
?? 所以订阅者如果没有实际的实现executor这个接口,那么事件发布者就是走的同步方式,这里简单的回顾了一下上一篇的事件发布者
ignoreExpireEvent
这个接口的话,可以看下上游的调用处
@Override
public void receiveEvent(Event event) {
for (Subscriber subscriber : subscribers) {
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
continue;
}
notifySubscriber(subscriber, event);
}
}
?? 这段代码是事件发布者里的,是在准备发布广播给订阅者(notifySubscriber)之前,判断的是否需要忽略过期的事件的 ?? 订阅者们可以自己去判断是否订阅过期事件,当然这个接口给的默认实现是false
onEvent
最重要的接口最后说 其实也没啥好说的,就随便拿一个实现lou一眼就行
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
@Override
public void onEvent(Event event) {
if (event instanceof ClientEvent.ClientDisconnectEvent) {
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
handleClientOperation((ClientOperationEvent) event);
}
}
}
● 这里拿的ClientServiceIndexesManager 这个类的onEvent方法来举例子,上面也贴出过这个类了(SmartSubscriber接口的描述) 这里接收的Event参数通过判断其实际的类型,来走这个事件所对应的不同的逻辑 ● Event是事件的抽象类,每个不同的事件都往其自身塞入了其事件所携带的信息,这个抽象类在下一篇描述
Nacos源码篇
语雀版文档
Nacos源码注释
|