一、EventLoop.h
如果我们设置了多线程的Reactor反应堆类型,mainReactor处理新用户的连接,accept,拿到新用户通信的fd,把fd感兴趣的事件打包成Channel,然后唤醒某个工作线程subReactor,即Eventloop。每个工作线程subReactor都监听一组Channel,每一组Channel都在自己所属的Eventloop获取发生的事件以及相应的回调操作
如果没有事件发生的话,mainReactor和subReactor所处的线程都是阻塞睡眠的,如果现在mainReactor监听到一个新用户的连接,得到了表示新用户连接的fd,以及感兴趣的事件的Channel,mainReactor如何把这个Channel给subReactor呢?
统一事件源的原理
在libevent库中,采用的是socketpair,使用的是网络通信机制,效率不如eventfd 这就是线程间的通信机制,创建了一个eventfd,调用eventfd方法可以直接在内核空间notify用户空间的应用程序,让应用程序该线程起来做事情,效率高 重写EventLoop.h
#pragma once
#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"
#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>
class Channel;
class Poller;
class EventLoop : noncopyable{
public:
using Functor = std::function<void()>;
EventLoop();
~EventLoop();
void loop();
void quit();
Timestamp pollReturnTime() const { return pollReturnTime_; }
void runInLoop(Functor cb);
void queueInLoop(Functor cb);
void wakeup();
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
bool isInLoopThread() const {
return threadId_ == CurrentThread::tid();
}
private:
void handleRead();
void doPendingFunctors();
using ChannelList = std::vector<Channel*>;
std::atomic_bool looping_;
std::atomic_bool quit_;
const pid_t threadId_;
Timestamp pollReturnTime_;
std::unique_ptr<Poller> poller_;
int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activateChannels_;
Channel* currentActiveChannel_;
std::atomic_bool callingPendingFunctors_;
std::vector<Functor> pendingFunctors_;
std::mutex mutex_;
};
二、createEventfd
int createEventfd(){
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0){
LOG_FATAL("Failed in eventfd! errno : %d\n", errno);
}
return evtfd;
}
三、EventLoop::EventLoop
EventLoop::EventLoop()
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, threadId_(CurrentThread::tid())
, poller_(Poller::newDefaultPoller(this))
, wakeupFd_(createEventfd())
, wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n". this, threadId_);
if(t_loopInThisThread != nullptr){
LOG_FATAL("There has existed an EventLoop %p in this thread %d \n", this, threadId_);
}else{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallBack(std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading();
}
四、EventLoop::handleRead和EventLoop::wakeup
当有新的客户端连接时,mainReactor会发送8字节的数据到wakeupFd_,发送的内容不重要,重要的是subReactor会监听wakeupFd_,一旦wakeupFd_有事件发生,subReactor就会被唤醒,subReactor被唤醒后就能拿到新用户到来的Channel
void EventLoop::handleRead(){
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof(one));
if(n != sizeof(one)){
LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8 \n", n);
}
}
wakeup方法是给其他loop所在线程执行的,用于唤醒loop所在线程,向该loop的wakeupFd_写数据即可
void EventLoop::wakeup(){
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if(n != sizeof(one)){
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}
}
五、EventLoop::loop和EventLoop::doPendingFunctors
执行当前EventLoop事件循环需要处理的回调操作?
对于IO线程mainReactor主要是accept,主要用于等待新用户的到来,新连接建立后我们会用Channel封装fd然后分发给subReactor,而mainReactor只负责监听新用户的连接
mainReactor会事先注册回调函数,回调函数需要subReactor执行
void EventLoop::loop(){
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping \n", this);
while(!quit_){
activateChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activateChannels_);
for(Channel* channel : activateChannels_){
channel->handleEvent(pollReturnTime_);
}
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop loop \n", this);
looping_ = false;
}
EventLoop::doPendingFunctors源码: doPendingFunctors执行回调的时候我们看到,它首先定义了一个局部的vector,然后把1个变量置为true
接着加了一把锁,执行swap,把放在pendingFunctors_里面的所有待执行的回调全部放到局部的vector里面,而pendingFunctors_就被置空了
因为doPendingFunctors这个函数要把pendingFunctors_里面的所有待执行的回调全部执行完并清空,再给其他线程继续添加回调函数。由于操作pendingFunctors_容器进行线程安全的加锁控制,如果让函数变边执行边删除容器元素,pendingFunctors_容器就会一直被锁住,导致其他线程无法读写,即可能使得mainReactor无法对pendingFunctors_容器写入channel下发给subReactor,造成服务器的时延变长
使用swap,就能解决pendingFunctors_一直被加锁的问题
重写doPendingFunctors
void EventLoop::doPendingFunctors(){
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor& functor : functors){
functor();
}
callingPendingFunctors_ = false;
}
六、EventLoop::quit
退出事件循环分两种情况:EventLoop所属线程调用quit、非EventLoop所属线程调用quit
这两种情况都是允许发生的
void EventLoop::quit(){
quit_ = true;
if(!isInLoopThread()){
wakeup();
}
}
情况一则是正常停止,若是情况二,当前执行线程并不是EventLoop所属线程,则通过eventFd唤醒EventLoop所属线程,让其停止阻塞继续执行,接着就退出while循环
如果EventLoop所属线程阻塞了,不进行唤醒的话,就算把quit_置为true,EventLoop所属线程也不会立刻退出,因为无法立即执行while的判断语句
七、EventLoop::runInLoop和EventLoop::queueInLoop
void EventLoop::runInLoop(Functor cb){
if(isInLoopThread()){
cb();
}else{
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(Functor cb){
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
if(!isInLoopThread() || callingPendingFunctors_){
wakeup();
}
}
八、完整代码
#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"
#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <functional>
#include <memory>
__thread EventLoop* t_loopInThisThread = nullptr;
const int kPollTimeMs = 10000;
int createEventfd(){
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0){
LOG_FATAL("Failed in eventfd! errno : %d\n", errno);
}
return evtfd;
}
EventLoop::EventLoop()
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, threadId_(CurrentThread::tid())
, poller_(Poller::newDefaultPoller(this))
, wakeupFd_(createEventfd())
, wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n". this, threadId_);
if(t_loopInThisThread != nullptr){
LOG_FATAL("There has existed an EventLoop %p in this thread %d \n", this, threadId_);
}else{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallBack(std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop(){
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupChannel_->fd());
t_loopInThisThread = nullptr;
}
void EventLoop::loop(){
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping \n", this);
while(!quit_){
activateChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activateChannels_);
for(Channel* channel : activateChannels_){
channel->handleEvent(pollReturnTime_);
}
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop loop \n", this);
looping_ = false;
}
void EventLoop::quit(){
quit_ = true;
if(!isInLoopThread()){
wakeup();
}
}
void EventLoop::runInLoop(Functor cb){
if(isInLoopThread()){
cb();
}else{
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(Functor cb){
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
if(!isInLoopThread() || callingPendingFunctors_){
wakeup();
}
}
void EventLoop::wakeup(){
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if(n != sizeof(one)){
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}
}
void EventLoop::updateChannel(Channel* channel){
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel* channel){
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel* channel){
poller_->hasChannel(channel);
}
void EventLoop::handleRead(){
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof(one));
if(n != sizeof(one)){
LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8 \n", n);
}
}
void EventLoop::doPendingFunctors(){
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor& functor : functors){
functor();
}
callingPendingFunctors_ = false;
}
|