IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> C++知识库 -> muduo库中的Reactor反应堆EventLoop类 -> 正文阅读

[C++知识库]muduo库中的Reactor反应堆EventLoop类

一、EventLoop.h

在这里插入图片描述
如果我们设置了多线程的Reactor反应堆类型,mainReactor处理新用户的连接,accept,拿到新用户通信的fd,把fd感兴趣的事件打包成Channel,然后唤醒某个工作线程subReactor,即Eventloop。每个工作线程subReactor都监听一组Channel,每一组Channel都在自己所属的Eventloop获取发生的事件以及相应的回调操作

如果没有事件发生的话,mainReactor和subReactor所处的线程都是阻塞睡眠的,如果现在mainReactor监听到一个新用户的连接,得到了表示新用户连接的fd,以及感兴趣的事件的Channel,mainReactor如何把这个Channel给subReactor呢?

统一事件源的原理

在libevent库中,采用的是socketpair,使用的是网络通信机制,效率不如eventfd
在这里插入图片描述
这就是线程间的通信机制,创建了一个eventfd,调用eventfd方法可以直接在内核空间notify用户空间的应用程序,让应用程序该线程起来做事情,效率高
在这里插入图片描述
重写EventLoop.h

#pragma once

#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"

#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>

class Channel;
class Poller;

/**
 * 事件循环类:主要包含两个大模块 Channel(包含了监听的sockfd,感兴趣的事件和发生的事件) Poller(epoll的抽象)
 */ 
class EventLoop : noncopyable{
    public:
        using Functor = std::function<void()>;

        EventLoop();
        ~EventLoop();

        void loop();                                 // 开启事件循环
        void quit();                                 // 退出事件循环

        Timestamp pollReturnTime() const { return pollReturnTime_; }

        void runInLoop(Functor cb);                  // 在当前loop中执行回调cb
        void queueInLoop(Functor cb);                // 把cb放到队列中,等到loop所在线程被唤醒后再执行cb

        void wakeup();                               // 用于唤醒loop所在线程

        void updateChannel(Channel* channel);        // Channel调用所在loop的updateChannel来epoll_ctl  EPOLL_CTL_ADD  EPOLL_CTL_MOD
        void removeChannel(Channel* channel);        // Channel调用所在loop的removeChannel来epoll_ctl  EPOLL_CTL_DEL

        bool hasChannel(Channel* channel);           // 查看当前循环是否有管理参数传入的channel

        bool isInLoopThread() const {
            // threadId_是loop所在线程缓存的tid,CurrentThread::tid()返回的是执行线程的tid
            // 如果不相等,那得等到当前loop所属线程被唤醒的时候,才能执行loop相关的回调操作
            return threadId_ == CurrentThread::tid();
        }

    private:
        void handleRead();                           // 主要用于wakeup
        void doPendingFunctors();                    // 执行回调。回调都在vector pendingFunctors_里面

        using ChannelList = std::vector<Channel*>;

        std::atomic_bool looping_;                   // 是否正常事件循环,原子操作,通过CAS实现
        std::atomic_bool quit_;                      // 退出loop循环的标志
        
        const pid_t threadId_;                       // 记录创建EventLoop对象的线程tid,会用于和执行的线程id比较,就可以判断这个Eventloop对象在不在当前线程里
        
        Timestamp pollReturnTime_;                   // 记录Poller返回发生事件的Channels的时间,有事件发生后,Poller会返回所有有事件发生的Channel
        std::unique_ptr<Poller> poller_;             // EventLoop对象管理的唯一的poller

        int wakeupFd_;                               // eventfd()创建的,作用是当mainLoop获取一个新用户的Channel,通过轮询算法选择一个subLoop,通过wakeupFd_唤醒subLoop处理事件,每一个subReactor都监听了wakeupFd_
        std::unique_ptr<Channel> wakeupChannel_;     // 用于封装wakeupFd_

        ChannelList activateChannels_;               // EventLoop管理的所有channel
        Channel* currentActiveChannel_;

        std::atomic_bool callingPendingFunctors_;    // 标识当前loop是否有需要执行的回调操作
        std::vector<Functor> pendingFunctors_;       // 存放loop具体处理事件的回调操作
        std::mutex mutex_;                           // 用于保护pendingFunctors_的线程安全
};

二、createEventfd

// 创建wakeupfd,用来notify唤醒subReactor,然后处理新的Channel
int createEventfd(){
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0){
        // eventfd失败,会导致事件发生后无法通知subReactor(Ev entLoop)处理事件
        LOG_FATAL("Failed in eventfd! errno : %d\n", errno);
    }
    return evtfd;
}

三、EventLoop::EventLoop

EventLoop::EventLoop()
    : looping_(false)
    , quit_(false)
    , callingPendingFunctors_(false)
    , threadId_(CurrentThread::tid())
    , poller_(Poller::newDefaultPoller(this))
    , wakeupFd_(createEventfd())
    , wakeupChannel_(new Channel(this, wakeupFd_))
{
    LOG_DEBUG("EventLoop created %p in thread %d \n". this, threadId_);
    if(t_loopInThisThread != nullptr){
        LOG_FATAL("There has existed an EventLoop %p in this thread %d \n", this, threadId_);
    }else{
        // 当前线程第一次创建EventLoop对象
        t_loopInThisThread = this;
    }

    // 设置wakeupFd_的事件类型以及发生事件后的回调操作
    // EventLoop::handleRead方法只有一个参数,是EventLoop对象,由于我们传入了this,bind返回的是一个function<void()>的对象
    // 这里有一个疑问,function<void()>对象作为实参,传递给setReadCallBack形参ReadEventCallBack function<void(Timestamp)> ???
    wakeupChannel_->setReadCallBack(std::bind(&EventLoop::handleRead, this));
    // 每一个EventLoop都将监听wakeupChannel_的读事件,kReadEvent = EPOLLIN | EPOLLPRI
    wakeupChannel_->enableReading();
}

四、EventLoop::handleRead和EventLoop::wakeup

当有新的客户端连接时,mainReactor会发送8字节的数据到wakeupFd_,发送的内容不重要,重要的是subReactor会监听wakeupFd_,一旦wakeupFd_有事件发生,subReactor就会被唤醒,subReactor被唤醒后就能拿到新用户到来的Channel

void EventLoop::handleRead(){
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_, &one, sizeof(one));
    if(n != sizeof(one)){
        LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8 \n", n);
    }
}

wakeup方法是给其他loop所在线程执行的,用于唤醒loop所在线程,向该loop的wakeupFd_写数据即可

void EventLoop::wakeup(){
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_, &one, sizeof(one));
    if(n != sizeof(one)){
        // 真正写入的数据大小n并不是我们想写的sizeof(one)
        LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
    }
}

五、EventLoop::loop和EventLoop::doPendingFunctors

执行当前EventLoop事件循环需要处理的回调操作?

对于IO线程mainReactor主要是accept,主要用于等待新用户的到来,新连接建立后我们会用Channel封装fd然后分发给subReactor,而mainReactor只负责监听新用户的连接

mainReactor会事先注册回调函数,回调函数需要subReactor执行

// 开启事件循环
void EventLoop::loop(){
    looping_ = true;
    quit_ = false;

    LOG_INFO("EventLoop %p start looping \n", this);

    while(!quit_){
        activateChannels_.clear();
        // 当epoll_wait发生事件以后,poller会把发生事件的channel写入EventLoop的成员变量activateChannels_
        // 然后EventLoop就会调用发生事件Channel的回调函数
        // 监听两类回调函数:client fd和wakeupfd(用于mainReactor和subReactor通信)
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activateChannels_);
        // for循环处理所有发生事件的channel
        for(Channel* channel : activateChannels_){
            // poller监听哪些channel发生事件,然后上报给EventLoop,EventLoop通知channel调用相应的回调函数处理相应的事件(Channel执行回调)
            channel->handleEvent(pollReturnTime_);
        }
        // subReactor被唤醒后,执行需要处理的回调操作(mainReactor事先注册的),接收新用户的Channel
        doPendingFunctors();
    }

    LOG_INFO("EventLoop %p stop loop \n", this);
    looping_ = false;
}

EventLoop::doPendingFunctors源码:
在这里插入图片描述
doPendingFunctors执行回调的时候我们看到,它首先定义了一个局部的vector,然后把1个变量置为true

接着加了一把锁,执行swap,把放在pendingFunctors_里面的所有待执行的回调全部放到局部的vector里面,而pendingFunctors_就被置空了

因为doPendingFunctors这个函数要把pendingFunctors_里面的所有待执行的回调全部执行完并清空,再给其他线程继续添加回调函数。由于操作pendingFunctors_容器进行线程安全的加锁控制,如果让函数变边执行边删除容器元素,pendingFunctors_容器就会一直被锁住,导致其他线程无法读写,即可能使得mainReactor无法对pendingFunctors_容器写入channel下发给subReactor,造成服务器的时延变长

使用swap,就能解决pendingFunctors_一直被加锁的问题

重写doPendingFunctors

void EventLoop::doPendingFunctors(){
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
    	// 操作pendingFunctors_容器需要加锁控制,否则可能出现回调函数对象被覆盖等线程安全问题
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for(const Functor& functor : functors){
        functor();  // 执行当前loop需要执行的回调函数
    }

    callingPendingFunctors_ = false;
}

六、EventLoop::quit

退出事件循环分两种情况:EventLoop所属线程调用quit、非EventLoop所属线程调用quit

这两种情况都是允许发生的

void EventLoop::quit(){
    // 把EventLoop的quit_置为true,表示要停止当前loop,EventLoop所属线程在loop函数中退出while循环
    quit_ = true;
    if(!isInLoopThread()){
        // 当前执行线程并不是EventLoop所属线程,则通过eventFd唤醒EventLoop所属线程,让其停止阻塞继续执行,接着就退出while循环
        // 如果EventLoop所属线程阻塞了,不进行唤醒的话,就算把quit_置为true,EventLoop所属线程也不会立刻退出,因为无法立即执行while的判断语句
        wakeup();
    }
}

情况一则是正常停止,若是情况二,当前执行线程并不是EventLoop所属线程,则通过eventFd唤醒EventLoop所属线程,让其停止阻塞继续执行,接着就退出while循环

如果EventLoop所属线程阻塞了,不进行唤醒的话,就算把quit_置为true,EventLoop所属线程也不会立刻退出,因为无法立即执行while的判断语句

七、EventLoop::runInLoop和EventLoop::queueInLoop

// 在当前loop中执行回调cb
void EventLoop::runInLoop(Functor cb){
    if(isInLoopThread()){
        cb();  // 在当前loop线程中
    }else{
        queueInLoop(cb);
    }
}

// queueInLoop是给非EventLoop所在线程执行的,把cb放到队列中,等到loop所在线程被唤醒后再执行cb
void EventLoop::queueInLoop(Functor cb){
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);  // emplace_back是直接构造,push_back是拷贝构造
    }
    // 执行线程并非loop所在线程 || loop所在线程正在执行回调函数,但是loop又有了新的回调
    // loop执行完回调函数后会重新阻塞,所以需要唤醒,继续执行新来的回调
    if(!isInLoopThread() || callingPendingFunctors_){
        wakeup(); // 唤醒相应需要执行pendingFunctors_内回调操作的线程
    }
}

八、完整代码

#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"

#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <functional>
#include <memory>

/**
 * t_loopInThisThread是全局的EventLoop类型的指针变量,指向当前线程中的EventLoop对象,
 * t_loopInThisThread为nullptr时才会创建EventLoop对象,防止一个线程创建多个EventLoop
 * __thread就是thread_local,每个线程会拷贝一份该全局变量在自己的线程空间,每个线程一份
 */
 
__thread EventLoop* t_loopInThisThread = nullptr;


// 定义默认Poller IO复用接口的超时事件
const int kPollTimeMs = 10000;


// 创建wakefd,用来notify唤醒subReactor,然后处理新的Channel
int createEventfd(){
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0){
        // eventfd失败,会导致事件发生后无法通知subReactor(Ev entLoop)处理事件
        LOG_FATAL("Failed in eventfd! errno : %d\n", errno);
    }
    return evtfd;
}


EventLoop::EventLoop()
    : looping_(false)
    , quit_(false)
    , callingPendingFunctors_(false)
    , threadId_(CurrentThread::tid())
    , poller_(Poller::newDefaultPoller(this))
    , wakeupFd_(createEventfd())
    , wakeupChannel_(new Channel(this, wakeupFd_))
{
    LOG_DEBUG("EventLoop created %p in thread %d \n". this, threadId_);
    if(t_loopInThisThread != nullptr){
        LOG_FATAL("There has existed an EventLoop %p in this thread %d \n", this, threadId_);
    }else{
        // 当前线程第一次创建EventLoop对象
        t_loopInThisThread = this;
    }

    // 设置wakeupFd_的事件类型以及发生事件后的回调操作
    // EventLoop::handleRead方法只有一个参数,是EventLoop对象,由于我们传入了this,bind返回的是一个function<void()>的对象
    // 这里有一个疑问,function<void()>对象作为实参,传递给setReadCallBack形参ReadEventCallBack function<void(Timestamp)> ???
    wakeupChannel_->setReadCallBack(std::bind(&EventLoop::handleRead, this));
    // 每一个EventLoop都将监听wakeupChannel_的读事件,kReadEvent = EPOLLIN | EPOLLPRI
    wakeupChannel_->enableReading();
}


EventLoop::~EventLoop(){
    wakeupChannel_->disableAll();    // 所有事件都不感兴趣 
    wakeupChannel_->remove();        // 
    ::close(wakeupChannel_->fd());   // 关闭fd
    t_loopInThisThread = nullptr;    // 指向线程中EventLoop对象的指针置空
}


// 开启事件循环
void EventLoop::loop(){
    looping_ = true;
    quit_ = false;

    LOG_INFO("EventLoop %p start looping \n", this);

    while(!quit_){
        activateChannels_.clear();
        // 当epoll_wait发生事件以后,poller会把发生事件的channel写入EventLoop的成员变量activateChannels_
        // 然后EventLoop就会调用发生事件Channel的回调函数
        // 监听两类回调函数:client fd和wakeupfd(用于mainReactor和subReactor通信)
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activateChannels_);
        // for循环处理所有发生事件的channel
        for(Channel* channel : activateChannels_){
            // poller监听哪些channel发生事件,然后上报给EventLoop,EventLoop通知channel调用相应的回调函数处理相应的事件(Channel执行回调)
            channel->handleEvent(pollReturnTime_);
        }
        // subReactor给唤醒后,执行需要处理的回调操作(mainReactor事先注册的),接收新用户的Channel
        doPendingFunctors();
    }

    LOG_INFO("EventLoop %p stop loop \n", this);
    looping_ = false;
}


// 退出事件循环
// 分两种情况:1.EventLoop所属线程调用quit    2.非EventLoop所属线程调用quit,这两种情况都是允许发生的
void EventLoop::quit(){
    // 把EventLoop的quit_置为true,表示要停止当前loop,EventLoop所属线程在loop函数中退出while循环
    quit_ = true;
    if(!isInLoopThread()){
        // 当前执行线程并不是EventLoop所属线程,则通过eventFd唤醒EventLoop所属线程,让其停止阻塞继续执行,接着就退出while循环
        // 如果EventLoop所属线程阻塞了,不进行唤醒的话,就算把quit_置为true,EventLoop所属线程也不会立刻退出,因为无法立即执行while的判断语句
        wakeup();
    }
}


// 在当前loop中执行回调cb
void EventLoop::runInLoop(Functor cb){
    if(isInLoopThread()){
        cb();  // 在当前loop线程中
    }else{
        queueInLoop(cb);
    }
}


// 把cb放到队列中,等到loop所在线程被唤醒后再执行cb
void EventLoop::queueInLoop(Functor cb){
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);  // emplace_back是直接构造,push_back是拷贝构造
    }
    // 执行线程并非loop所在线程 || loop所在线程正在执行回调函数,但是loop又有了新的回调
    // loop执行完回调函数后会重新阻塞,所以需要唤醒,继续执行新来的回调
    if(!isInLoopThread() || callingPendingFunctors_){
        wakeup(); // 唤醒loop所在线程
    }
}


// wakeup方法是给其他loop所在线程执行的,用于唤醒loop所在线程,向该loop的wakeupFd_写数据即可,
void EventLoop::wakeup(){
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_, &one, sizeof(one));
    if(n != sizeof(one)){
        // 真正写入的数据大小n并不是我们想写的sizeof(one)
        LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
    }
}


// Channel调用所在loop的updateChannel来epoll_ctl  EPOLL_CTL_ADD  EPOLL_CTL_MOD,修改Poller
void EventLoop::updateChannel(Channel* channel){
    poller_->updateChannel(channel);
}


// Channel调用所在loop的removeChannel来epoll_ctl  EPOLL_CTL_DEL
void EventLoop::removeChannel(Channel* channel){
    poller_->removeChannel(channel);
}


// 查看当前循环是否有管理参数传入的channel
bool EventLoop::hasChannel(Channel* channel){
    poller_->hasChannel(channel);
}


/**
 * 当有新的客户端连接时,mainReactor会发送8字节的数据到wakeupFd_,
 * 发送的内容不重要,重要的是subReactor会监听wakeupFd_,一旦wakeupFd_有事件发生,subReactor就会被唤醒,
 * subReactor被唤醒后就能拿到新用户到来的Channel
 */
void EventLoop::handleRead(){
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_, &one, sizeof(one));
    if(n != sizeof(one)){
        LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8 \n", n);
    }
}

// 执行回调。回调都在vector pendingFunctors_里面
void EventLoop::doPendingFunctors(){
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for(const Functor& functor : functors){
        functor();  // 执行当前loop需要执行的回调函数
    }

    callingPendingFunctors_ = false;
}
  C++知识库 最新文章
【C++】友元、嵌套类、异常、RTTI、类型转换
通讯录的思路与实现(C语言)
C++PrimerPlus 第七章 函数-C++的编程模块(
Problem C: 算法9-9~9-12:平衡二叉树的基本
MSVC C++ UTF-8编程
C++进阶 多态原理
简单string类c++实现
我的年度总结
【C语言】以深厚地基筑伟岸高楼-基础篇(六
c语言常见错误合集
上一篇文章      下一篇文章      查看所有文章
加:2022-04-27 11:09:20  更:2022-04-27 11:10:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/10 23:48:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码