EventBus配置
android {
defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
// 生成的Index类的名称
arguments = [ eventBusIndex : 'com.example.myapp.MyEventBusIndex' ]
}
}
}
}
dependencies {
def eventbus_version = '3.2.0'
implementation "org.greenrobot:eventbus:$eventbus_version"
annotationProcessor "org.greenrobot:eventbus-annotation-processor:$eventbus_version"
}
EventBus使用流程
- 定义Event事件。类型对应下文的
eventType 。 - 使用
@Subscribe 注解方法。方法的入参是上一步中定义的Event。方法只允许有一个Event入参。 register 注册。post 发送事件消息,入参是Event对象。@Subscribe 注解过的方法就会接收到事件了。unregister 解除注册。
EventBus中的数据结构
Map变量:subscriptionsByEventType
- Map 的key值是class类型,value是一个list同步写列表。
- 一个key代表一个class类型,是Subscribe注解的函数的参数class类型,是event类型,即eventType。
- value是一个list,包含了该class中的所有Subscribe注解的方法。这个list是按照优先级
priority 顺序排列的,priority 越大,优先级高,在列表的前面。 - 一个key-value键值对 表示监听一个特定event类型事件的所有method方法。
post 方法传入的也是event类型。
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
Map变量:typesBySubscriber
- Map的key值是subscriber对象实例,value值是一个普通list列表,存储class对象实例。list列表中存储的是eventType的类型。
- key-value键值对表示的是一个subscriber对象对应的所有的eventType类型。
private final Map<Object, List<Class<?>>> typesBySubscriber;
Subscription.java
- Subscription 单词是订阅,观察的意思,是一个观察者。
- 这个类代表一个观察者。该类持有一个对象实例
subscriber 和 实例所在类的一个消息处理的方法(用Subscribe注解的方法)。
final class Subscription {
final Object subscriber;
final SubscriberMethod subscriberMethod;
}
SubscriberMethod.java
- 是一个bean类型的方法。
- 注解方法解析后的封装对象。封装了一个使用 Subscribe注解的方法。可以用来处理消息。
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
PendingPost.java
PendingPostQueue.java
注解相关
@Subscribe注解
- Subscribe注解 表示一个方法注册监听消息事件
ThreadMode
- POSTING 默认模式。直接在本线程调用执行其他对象注册的函数方法。
- MAIN APP主函数调用,应该避免阻塞操作。如果当前线程是主函数,那么会直接调用。如果不是主线程,那么会先通过handler抛到主函数的事件队列中处理。
- MAIN_ORDERED 无论当前是否是主线程,直接将事件抛到主函数事件队列,通过队列去执行。
- BACKGROUND 如果当前是主线程,直接抛到一个后台线程队列去处理;如果当前不是主线程,那么直接在当前线程,调用注册的函数方法。 注意,同一时间,只会有一个后台线程出来
- ASYNC 所有消息都会抛到一个独立的线程处理。适合耗时操作。执行线程从线程池中获取。EventBus会采用线程池,动态的扩展线程。默认采用的线程池是
newCachedThreadPool . 避免同时提交过多的耗时操作,导致线程池创建过多的线程。 Avoid triggering a large number of long running asynchronous subscriber methods at the same time to limit the number of concurrent threads - BACKGROUND 和 ASYNC的区别:BACKGROUND 的事件,同一时刻,所有事件会发送到一个线程队列中,串行处理。ASYNC则不同,每个事件都会单独占用一个线程处理。
org.greenrobot.eventbus.EventBusBuilder
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public enum ThreadMode {
POSTING,
MAIN,
MAIN_ORDERED,
BACKGROUND,
ASYNC
}
初始化注册流程
register 注册。 传入一个subscriber 对象实例。findSubscriberMethods 解析注解,寻找所有 subscriber 对象所有带有@Subscribe 的方法。并封装成为SubscriberMethod 对象,返回method列表。subscribe 注册监听。把解析出的方法添加到Map数据结构里面,方便后续post消息时,调用处理方法。
register 方法
- 相当于观察者模式的注册监听。主要是用来触发对Subscribe注解的函数的解析。
- 入参
subscriber 表示的class类需要含有@Subscribe 注解方法。表示该对象监听和处理post消息。
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
findSubscriberMethods,解析@Subscribe解方法
- 入参是
register 注册函数传入的subscriber 的class对象类型。 - 如果找不到注解函数,会抛出异常。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
private static final int BRIDGE = 0x40;
private static final int SYNTHETIC = 0x1000;
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
post 发送消息
ThreadLocal对象: currentPostingThreadState
- 每个线程内都会有一个ThreadLocal对象,类型PostingThreadState。每个线程内部都有一个事件队列。
private final ThreadLocal<PostingThreadState> currentPostingThreadState =
new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
post函数
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
}
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
几种poster
MainThreadSupport、HandlerPoster 主线程poster,传入主线程的looper作为参数。BackgroundPoster 后台poster,所有事件顺序进入队列执行。AsyncPoster 异步poster,每个事件在单独的线程中执行。
EventBusAnnotationProcessor 注解解析器
EventBus eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
EventBus eventBus = EventBus.getDefault();
参考资料
|