- epfd 表示epoll句柄
- op 表示fd操作类型,有如下3种
- EPOLL_CTL_ADD 注册新的fd到epfd中
- EPOLL_CTL_MOD 修改已注册的fd的监听事件
- EPOLL_CTL_DEL 从epfd中删除一个fd
- fd 是要监听的描述符
- event 表示要监听的事件,事件可以使如下如下几种宏的集合
-
EPOLLIN :表示对应的文件描述符可以读 -
EPOLLOUT:表示对应的文件描述符可以写 -
EPOLLERR:表示对应的文件描述符发生错误; -
epoll_wait
- epfd 表示epoll句柄
- events 表示从内核得到的就绪事件集合,将该数组传入wait,wait返回时,这个数组就塞满了就绪事件
- 告诉内核events的大小,必须小于等于epoll_create时的maxevents
- timeout 表示等待的超时时间,-1则一直阻塞,直到有数据返回
我个人理解,wait返回只返回了就绪的文件描述符数量,而非文件描述符,也就是说我还是要遍历寻找,例如遍历events 看看都有哪些就绪事件,是有读的?还是写的?然后接着去真正调用IO,这个路子也和Java NIO的使用方式是相同的,即:阻塞返回了,说明有就绪事件,然后遍历events,找到自己关注的事件处理。
handler在native层使用到了epoll函数,我们在讲解native层时会用到上述知识。
handler工作原理
概述
其实这部分内容,但凡看过几篇相关文档的人都能背出来:以下描述基于对handler工作方式有一定理解:Lopper不断轮询MessageQueue中的消息,有消息就拿出来分给消息的target,也就是分给handler,handler调用dispatchMessage()处理;也是通过handler调用sendMessage()塞消息给messageQueue,说到这里,可能会有个疑问:为什么要通过Handler塞消息给MessageQueue(),同时handler重写一个handleMessage()方法,在这个方法中处理Looper轮询到的消息,折腾这一圈是干嘛呢?其实就是【基于消息驱动】:
-
在一个线程中,可以创建多个handler,都往跑在这个线程的Looper中塞消息,Looper按照顺序去轮询到消息,进行处理。 -
在线程A中,可以使用线程B中handler,发送消息到线程B,线程B的handler意思是其Looper是在线程B中轮询的。这个handler就像是线程B放出去的口子,谁向往线程B发消息,就通过我来塞。
个人认为这是handler的重要使命:线程间通信
延伸:线程间通信的其他方法
工作原理
架构
Tips:以上论述比较口头话,是基于对handler工作方式有一定理解来论述的,下面我们讲一下工作方式
从架构图中看出重要角色间的持有关系
- Handler持有队列mQueue和Looper对象
- Looper中持有队列mQueue
- message中持有handler和next message
- MessageQueue中持有当前message
流程
这里我们讲一下handler工作的流程,首先的角色是Looper,Looper的作用是不断轮询消息队列MessageQueue。Looper是和线程强关联的,Loop先在某个线程中跑起来。
我们以这样一个例子,来讲解handler代码
class LooperThread extends Thread { public Handler mHandler;
public void run() { Looper.prepare(); mHandler = new Handler() { public void handleMessage(Message msg) { //TODO 定义消息处理 } }; Looper.loop(); } }
解释
重点:一个线程有且只能有一个Looper
Looper
创建Looper
//门面方法创建 Looper.prepare();
//创建逻辑核心 private static void prepare(boolean quitAllowed) { //每个线程只允许执行一次该方法,第二次执行时线程的TLS已有数据,则会抛出异常。 if (sThreadLocal.get() != null) { throw new RuntimeException(“Only one Looper may be created per thread”); } //创建Looper对象,并保存到当前线程的TLS区域 sThreadLocal.set(new Looper(quitAllowed)); }
//Looper构造方法 private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); //创建MessageQueue对象. 【见4.1】 mThread = Thread.currentThread(); //记录当前线程. }
//关键方法Loop public static void loop() { final Looper me = myLooper(); //获取TLS存储的Looper对象 【见2.4】 final MessageQueue queue = me.mQueue; //获取Looper对象中的消息队列
Binder.clearCallingIdentity(); //确保在权限检查时基于本地进程,而不是调用进程。 final long ident = Binder.clearCallingIdentity();
for (;😉 { //进入loop的主循环方法 Message msg = queue.next(); //可能会阻塞 if (msg == null) { //没有消息,则退出循环 return; }
//默认为null,可通过setMessageLogging()方法来指定输出,用于debug功能 Printer logging = me.mLogging; if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + “: " + msg.what); } msg.target.dispatchMessage(msg); //用于分发Message,将分发的逻辑转嫁到handler.dispatchMessage if (logging != null) { logging.println(”<<<<< Finished to " + msg.target + " " + msg.callback); }
//恢复调用者信息 final long newIdent = Binder.clearCallingIdentity(); msg.recycleUnchecked(); //将Message放入消息池回收,减少消息的创建 } }
关于ThreadLocal不再赘述:保存当前线程的信息
loop方法中,是个死循环,不断从MessageQueue()中读取next()消息,可能会阻塞,阻塞的原理,在【MessqgeQueue.next()】中,下面讲。总体来看loop()的逻辑还是还简单:
- next()取消息,取不到阻塞
- 取到了将分发逻辑转嫁到message.target(handler上)
- 取出来是空,结束轮询
handler
构造
创建handler,一定要有looper,并且该Looper已经执行过prepare()了,这样才可以从looper中获取MessageQueue。没有指定Looper会直接那当前线程的Looper。
public Handler() { this(null, false); }
public Handler(Callback callback, boolean async) { … //必须先执行Looper.prepare(),才能获取Looper对象,否则为null. mLooper = Looper.myLooper(); //从当前线程的TLS中获取Looper对象【见2.1】 if (mLooper == null) { throw new RuntimeException(""); } mQueue = mLooper.mQueue; //消息队列,来自Looper对象 mCallback = callback; //回调方法 mAsynchronous = async; //设置消息是否为异步处理方式 }
//有参构造 public Handler(Looper looper) { this(looper, null, false); }
public Handler(Looper looper, Callback callback, boolean async) { mLooper = looper; mQueue = looper.mQueue; mCallback = callback; mAsynchronous = async; }
消息分发
在讲解looper时说道,当looper.loop()循环到消息时,会调用message.target.dispatchMessage(),也就是handler的dispatchMessage()方法
public void dispatchMessage(Message msg) { if (msg.callback != null) { //当Message存在回调方法,回调msg.callback.run()方法; //其实message.callback是个runnable handleCallback(msg); } else { if (mCallback != null) { //当Handler存在Callback成员变量时,回调方法handleMessage(); if (mCallback.handleMessage(msg)) { return; } } //Handler自身的回调方法handleMessage() handleMessage(msg); } }
一般我们都会在创建handler时重写handleMessage方法。
发送消息
在使用handler时,我们可以通过调用handler的发送消息方法,向队列中抛message。
具体方法有:
sendMessage(Message e) sendMessageDelayed(Message e) sendEmptyMessage(int what) obtainMessage() obtainMessage(int what) post(Runnable r)
推荐使用obtainMessage()方法,因为不需要外部去new Message()传进来,而是使用handler内部消息池中的消息。减少创建Message所带来的开销
无论哪个方法最终都会走到这个方法
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
我们重点讲解下这个方法 参数:
-
queue:消息队列,就是handler中持有的消息队列,也就是Looper初始化时创建的queue -
message:消息体 -
uptimeMillis:消息触发的时间,我们可以通过sendMessageDelay()来延迟消息的触发。当前时间13:10:10,调用sendMessageDelay(message,10000),表示10秒钟后触发消息,那么uptimeMillis的值就是13:10:20所对应的毫秒值,具体的转换实现如下:系统时间+延迟时间
public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMillis < 0) { delayMillis = 0; } return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); }
由此可见,如何实现的延迟发送,并没有在handler中实现,而是带着延迟时间一起传递了messageQueue,由它去实现
另外还有一个逻辑,即若Handler是同步的,则设置消息是同步的,这个先不管。
移除消息
public final void removeMessages(int what) { mQueue.removeMessages(this, what, null); 【见 4.5】 }
同样丢给了MessageQueue去处理
MessageQueue
MessageQueue不仅是逻辑的中心,并且是与n ative交互的类,一些比较重要的逻辑,例如阻塞与唤醒,是由native实现的
创建MessageQueue
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; //通过native方法初始化消息队列,其中mPtr是供native代码使用 mPtr = nativeInit(); }
在创建Looper()时,会创建MessageQueue,这里直接调用了native方法,返回的mPtr参数是调用其他native方法时需要的参数,这里我们先不过多讨论。在讲解native部分时讨论
循环消息
整个handler工作原理的逻辑中心就是MessagwQueue的next()方法以及enqueueMessage()方法。其实MessageQueue中维护了一份链表,元素是Message,我们从Message类的定义中可以看出定义了【Message next】;enqueueMessage()方法就是将Message插入到链表合适的位置(按,delay的顺序排序),next()方法就是从链表中取出合适的Message处理。下边我们具体看
enqueueMessage
boolean enqueueMessage(Message msg, long when) { // 每一个普通Message必须有一个target,除了屏障消息 if (msg.target == null) { throw new IllegalArgumentException(“Message must have a target.”); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { //正在退出时,回收msg,加入到消息池 msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; /**
- 1.p为null 代表链表为空,新进来的msg作为链表第一个元素
- 2.msg的触发时间是队列中最早的,则插入到p的前边,并作为mMessages
*/ if (p == null || when == 0 || when < p.when) { //p为null(代表MessageQueue没有消息) msg.next = p; mMessages = msg; //如果当前是阻塞的,那么有消息插入进来了,要唤醒线程处理这个消息 needWake = mBlocked; //当阻塞时需要唤醒 } else { //将消息按时间顺序插入到MessageQueue。一般不需要唤醒事件队列因为插入的并不是需要立马执行的消息,除非这个消息是同步屏障消息 needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; //这个for找到比msg.when大的元素, for (;😉 { prev = p; p = p.next; if (p == null || when < p.when) { break; } //如果是同步屏障消息,则不唤醒,等着,等异步消息插进来 if (needWake && p.isAsynchronous()) { needWake = false; } } //将msg插入到p的前边 msg.next = p; prev.next = msg; } //消息没有退出,我们认为此时mPtr != 0 //如果需要唤醒则唤醒 if (needWake) { nativeWake(mPtr); } } return true; }
总结:
- 逻辑是将Message插入到链表中合适的位置,并判断是否需要唤起。熟悉链表操作
就可以很容易明白插入的逻辑 - nativeWake(mPtr)从名字上也可以看出是通过native方法将该线程唤醒
Message next() { final long ptr = mPtr; if (ptr == 0) { //ptr是调用native方法进行阻塞或唤起的关键参数,退出时会清空。当消息循环已经退出,则直接返回, return null; } int pendingIdleHandlerCount = -1; // 循环迭代的首次为-1 int nextPollTimeoutMillis = 0; for (;😉 { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } //阻塞操作,当等待nextPollTimeoutMillis时长,或者消息队列被唤醒,都会返【见解释1】 nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { final long now = SystemClock.uptimeMillis(); Message prevMsg = null; //取当前消息 Message msg = mMessages; //当前消息是同步屏障消息,则查询异步消息【见解释2】,我们平时塞入的全部是同步消息(有delay的也是同步消息!),一定有target的 if (msg != null && msg.target == null) { //当查询到异步消息,则立刻退出循环,查询链表 do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } //msg是正常消息或者经过上边的while循环赋值成异步消息了 //则进行执行逻辑 if (msg != null) { if (now < msg.when) { //当异步消息触发时间大于当前时间,说明还没到,继续计算下一次需要阻塞的时间 nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { //链表操作:将该消息从链表中删除 mBlocked = false; if (prevMsg != null) { //从链表中删除 prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; //设置消息的使用状态,即flags |= FLAG_IN_USE msg.markInUse(); return msg; //成功地获取MessageQueue中的下一条即将要执行的消息,交给Looper处理 } } else { /**
- 没有消息,有两种情况
- 1.mMessage本来就是null
- 2.mMessage是同步屏障消息,但是遍历整个链表没有找到异步消息,则无限期阻塞,直到有地方唤醒
*/ nextPollTimeoutMillis = -1; } //正在退出,返回null if (mQuitting) { dispose(); return null; } //当消息队列为空,或者是消息队列的第一个消息时 if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { //没有idle handlers 需要运行,则循环并等待。 mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } //只有第一次循环时,会运行idle handlers,执行完成后,重置pendingIdleHandlerCount为0. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; //去掉handler的引用 boolean keep = false; try { keep = idler.queueIdle(); //idle时执行的方法 } catch (Throwable t) { Log.wtf(TAG, “IdleHandler threw exception”, t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } //重置idle handler个数为0,以保证不会再次重复运行 pendingIdleHandlerCount = 0; //当调用一个空闲handler时,一个新message能够被分发,因此无需等待可以直接查询pending message. nextPollTimeoutMillis = 0; } }
总结: 这里需要有三个地方需要解释
Android handler同步屏障机制
阅读至此,应该理解了handler的工作原理,有些细节不懂,多看几次就明白了,无非就是when计算、链表操作、同步屏障的逻辑。还有一个巨大的疑问,那就是nativePollOnce()阻塞线程,原理是什么?nativeWake()唤醒线程,原理是什么?
一句话:利用Linux的多路复用机制epoll实现阻塞与唤醒,在前置知识中我们介绍了epoll相关知识,那么我们下面就看看native层时如何使用的
native层的handler
我们接着MessageQueue创建时调用native层方法来接着看,就可以看到epoll函数的影子
首先说明,native层也有一套Handler工作方式,和Java层是类似的,都有messagequeue/looper/handler,但是我们先重点介绍和Java层有交互的部分,也就是阻塞/唤起相关,再直白一点,寻找调用epoll相关函数的影子
关于native层handler原理,可以参考这篇文章: native handler
初始化
我们从Java层MessageQueue的初始化作为起点开始看
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); //mPtr记录native消息队列的信息 }
nativeInit对应native层方法
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { //初始化native层的消息队列 NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); nativeMessageQueue->incStrong(env); //增加引用计数 return reinterpret_cast(nativeMessageQueue); }
接下来NativeMessageQueue的构造方法
NativeMessageQueue::NativeMessageQueue()
-
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread(); //获取TLS中的Looper对象 if (mLooper == NULL) { mLooper = new Looper(false); //创建native层的Looper 【4】 Looper::setForThread(mLooper); //保存native层的Looper到TLS } }
可以看到里边创建了looper,并保存在线程数据中,setForThread类比Java中的ThreadLocal,这一点是比较相似的,还有一点是和Java层相反的,Java层时创建Looper时创建MessageQueue,而native层时先创建MessageQueue中创建Looper
Looper的构造方法
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK); //构造唤醒事件的fd AutoMutex _l(mLock); rebuildEpollLocked(); //重建Epoll事件【5】 }
重点来了,rebuildEpollLocker()方法中会初始化epoll,这里就用到了前置知识中的epoll
void Looper::rebuildEpollLocked() { if (mEpollFd >= 0) { close(mEpollFd); //关闭旧的epoll实例 } mEpollFd = epoll_create(EPOLL_SIZE_HINT); //创建新的epoll实例,并注册wake管道 struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); //把未使用的数据区域进行置0操作 eventItem.events = EPOLLIN; //可读事件 eventItem.data.fd = mWakeEventFd; //将唤醒事件(mWakeEventFd)添加到epoll实例(mEpollFd) //并且关注的是可读事件,那么调用epoll_wait阻塞后,一旦向该文件操作符所代表的文件中写入数据,那么epoll_wait便会返回 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
//native层message的逻辑 for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); //将request队列的事件,分别添加到epoll实例 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); } }
Looper对象中的mWakeEventFd添加到epoll监控,以及mRequests也添加到epoll的监控范围内。
阻塞
在handler原理中我们知道,调用nativePollOnce()后便会阻塞当前线程,那么我们来看看nativePollOnce()做了什么。
t阻塞后,一旦向该文件操作符所代表的文件中写入数据,那么epoll_wait便会返回 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
//native层message的逻辑 for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); //将request队列的事件,分别添加到epoll实例 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); } }
Looper对象中的mWakeEventFd添加到epoll监控,以及mRequests也添加到epoll的监控范围内。
阻塞
在handler原理中我们知道,调用nativePollOnce()后便会阻塞当前线程,那么我们来看看nativePollOnce()做了什么。
|