IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> muduo总结 -> 正文阅读

[网络协议]muduo总结

本文重点在muduo TcpServer的启动,I/O线程池的启动,以及各种回调

用户可见的类,用户不可见的

base

一些基础库,都是用户可见的类
如何实现一个不能被继承的类:

final、使用友元、私有构造函数、虚继承等方式可以使一个类不能被继承

AsyncLogging.{h,cc}

异步日志backend

Atomic.h

原子操作与原子整数

BlockinQueue.h

无界阻塞队列(生产者消费者模型)

BoundedBlockinQueue.h

有界阻塞队列

Condition.h

条件变量,与Mutex.h一起使用

copyable.h

一个空基类,用于标识(tag)值类型

CountDownLatch.{h,cc}

“倒计时门闩”同步

Date.{h,cc}

Julian日期库(即公历)

Exception.{h,cc}

带stack trace的异常基类

Logging.{h,cc}

简单的日志,搭配AsyncLogging使用

Mutex.h

互斥器

ProcessInfo.{h,cc}

进程信息

Singleton.h

线程安全的singleton
实现懒汉单例,用pthread_once()函数保证初始化函数只会在本进程中执行一次。如果在单例中有函数no_destroy(),程序结束时就不会通过atexit()函数注册清理函数,会造成内存泄漏

StringPiece.h

从Google开源代码借用的字符串参数传递类型

Thread.{h,cc}

线程对象
在这里插入图片描述
注意latch_的变化:它会等待子线程执行完startThread
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
try后面接catch语句捕捉异常

ThreadLocal.h

线程局部数据
线程本地变量(线程特定变量,线程私有变量)
pthread_key_create
pthread_key_delete
pthread_key_getspecific
pthread_key_setspecific

ThreadLocalSingleton.h

每个线程一个singleton
线程本地懒汉单例,用的__thread。
每个线程都有自己的一份单例,利用pthread_key_create管理单例的生命周期,生成单例之后利用pthread_key_create传入的destructor析构该单例对象。

ThreadPool.{h,cc}

简单的固定大小线程池

Timestamp.{h,cc}

UTC时间戳

TimeZone.{h,cc}

时区与夏令时

Types.h

基本类型的声明,包括muduo::string

net

muduo的I/O模型采用非阻塞模式,避免阻塞在read()或write()或其他系统调用上

Socket.{h,cc}

KeepAlive:https://zhuanlan.zhihu.com/p/28894266
socket阻塞和非阻塞有哪些不同

  1. 建立连接

阻塞方式下,connect首先发送SYN请求到服务器,当客户端收到服务器返回的SYN的确认时,则connect返回,否则的话一直阻塞。

非阻塞方式,connect将启用TCP协议的三次握手,但是connect函数并不等待连接建立好才返回,而是立即返回,返回的错误码为EINPROGRESS,表示正在进行某种过程。

  1. 接收连接

阻塞模式下调用accept()函数,而且没有新连接时,进程会进入睡眠状态,直到有可用的连接,才返回。

非阻塞模式下调用accept()函数立即返回,有连接返回客户端套接字描述符,没有新连接时,将返回EWOULDBLOCK错误码,表示本来应该阻塞。

muduo使用的是非阻塞IO,和IO多路复用结合起来的一般都是非阻塞。
封装了socket套接字编程,诸Listen/bind/accept方法等等,有的是调用SocketOps.h的接口

SocketOps.{h,cc}

对Socket.h方法的补充和填充,实际调用系统接口,实现了create/bind/listen/accept/connect/read/write/close/shutdown等函数

需要关注非阻塞connect和close-on-exec的写法:

void setNonBlockAndCloseOnExec(int sockfd)
{
  // non-block
  int flags = ::fcntl(sockfd, F_GETFL, 0);
  flags |= O_NONBLOCK;
  int ret = ::fcntl(sockfd, F_SETFL, flags);
  // FIXME check

  // close-on-exec
  flags = ::fcntl(sockfd, F_GETFD, 0);
  flags |= FD_CLOEXEC;
  ret = ::fcntl(sockfd, F_SETFD, flags);
  // FIXME check

  (void)ret;
}

InetAddress.{h,cc}

对sockaddr_in和sockaddr_in6的封装,方便构造,获取ip地址和port

Edian.h

网络字节序与本机字节序的转换

Poller.{h,cc}

负责监听事件是否触发的部分,在 muduo 中叫做 Poller

基类实现了hasChannel函数,判断Map是否拥有此Channel

 protected:
    typedef std::map<int, Channel*> ChannelMap;	
    ChannelMap channels_;	
 
 private:
     EventLoop* ownerLoop_;

Poller使用一个map来存放描述符fd和对应的Channel类型的指针,
这样我们就可以通过fd很方便的得到Channel了,该map是protected变量。

私有成员是一个EventLoop的指针,用来指向当前EventLoop,用来判断防止Poller被跨线程调用。

该类要被PollPoller和EPollPoller继承

PollPoller

封装了高级IO:poll
image

pollfds_存放pollfd的数组,用来传入poll模式中的第一个事件集合参数:

int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);

缺点:当事件就绪时,用户并不知道哪些事件就绪了,需要遍历pollfds_,如果pollfds_的某个元素(假设为A)的revents字段大于0,则说明该事件就绪,此时会到channels_这个map里根据fd找到对应的channel,将pollfds_中A的revents字段赋值给对应channel的revents字段,然后把该channel放到活跃事件数组activeChannel中。

事件的fd的修改技巧:
不直接修改为-1,而是改为-fd-1。方便后续还原(还原是为了去map中寻找对应的channel),还原方式仍为:-fd-1

select和poll在内核态的遍历:

将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。

epoll在醒着的时候只需要去检测就绪队列有无就绪的事件就行,不需要去遍历。

EPollPoller

封装了高级IO:epoll
在这里插入图片描述

muduo的epoll采用的是水平触发
原因:

  1. 与传统的poll兼容,在文件描述符数目较少而活跃的文件描述符数目又较多时,回调函数触发太频繁,此时的poll甚至比epoll效率更高。(epoll试用于连接较多,活动较少的情况)
  2. 水平触发编程更加简单,不会有漏掉事件的bug
  3. 读写的时候不必等候出现EAGAIN,可以节省系统调用次数,降低延迟。(ET模式下,read一个fd的时候一定要把它的buffer读光,也就是说一直读到read的返回值小于请求值,或者遇到EAGAIN错误)
    在VxWorks和Windows上,EAGAIN的名字叫做EWOULDBLOCK。在linux进行非阻塞的socket接收数据时经常出现Resource temporarily unavailable,errno代码为11(EAGAIN),该错误不会破坏socket的同步。对非阻塞socket而言,EAGAIN不是一种错误。

epoll水平触发和边缘触发的区别:

水平模式下,只要有事件就绪,可以先不处理,下次调用epoll_wait的时候回再次通知。
边缘触发:文件描述符A的可读写事件就绪,通知一次。如果没有一次性读取完毕,下一次文件描述符B的可读事件就绪,通知用户的时候不会通知A还没读完,只会通知B的,只有等到A文件描述符再次有事件就绪时,才会通知。这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符。

ET在很大程度上降低了同一个epoll事件被重复触发的次数。

epoll优点

  • select/poll一般只能处理几千的并发连接。epoll能监控的文件描述符没有上限,1G内存大概能监控10W个端口
  • 不用轮询,只有活跃事件才会调用回调函数。select和poll在内核态需要遍历,epoll只需要查看就绪队列
  • 每次调用select和poll都需要有用户态到内核态的拷贝,而且每次都需要把当前进程挂到设备等待队列中,epoll只需要一次拷贝,而且也只需要把进程挂一次等待对列(注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等待队列)
  • epoll的三个接口更加方便

epoll机制:

epoll_create方法时,Linux内核会创建一个eventpoll结构体

struct eventpoll{
    ....
    /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    struct rb_root  rbr;
    /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdlist;
    ....
};

每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的节点数)。

而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。

在epoll中,对于每一个事件,都会建立一个epitem结构体:

struct epitem{
    struct rb_node  rbn;//红黑树节点
    struct list_head    rdllink;//双向链表节点
    struct epoll_filefd  ffd;  //事件句柄信息
    struct eventpoll *ep;    //指向其所属的eventpoll对象
    struct epoll_event event; //期待发生的事件类型
}

在这里插入图片描述
epoll惊群问题:
一瞬间大量的连接都活跃,epoll_wait一直在活跃,一直在处理数据,最高效的是不用epoll,直接用非阻塞的读就好了。大部分时间花在epoll_wait之后的处理。

Channel.{h,cc}

用于每个Socket连接的事件分发
在这里插入图片描述
构造函数:

Channel::Channel(EventLoop* loop, int fd__)
  : loop_(loop),
    fd_(fd__),
    events_(0),
    revents_(0),
    index_(-1),                                                             //index_初始值设置为-1,表示本channel目前还没有和pollfd_绑定
    logHup_(true),
    tied_(false),
    eventHandling_(false),
    addedToLoop_(false)
{
}

不管是poll还是epoll,最终的事件都会存放到channel中
index_在poll中表示下标,在epoll中表示三个标志位(新增、删除、更新)。因为PollPoller::poller_这个数组是poll的传入传出参数,就绪的和没有就绪的都在这个数组里面,因此可以根据index快速的定位。而epoll的EPollPollPoller::events_
只用来存放就绪的事件,是一个传出参数,因此index_可以用来做其他的事情

定时器

为什么网络编程中需要定时器呢?

在开发Linux网络程序时,通常需要维护多个定时器,如维护客户端心跳时间、检查多个数据包的超时重传等。如果采用Linux的SIGALARM信号实现,则会带来较大的系统开销,且不便于管理。

timerfd是Linux为用户程序提供的一个定时器接口。这个接口基于文件描述符,通过文件描述符的可读事件进行超时通知,所以能够被用于select/poll的应用场景,采用文件描述符实现定时有利于统一事件源。

mudo的定时器由三个类实现,TimerId,Timer,TimerQueue,用户只能看到第一个类,其它两个类都是内部实现细节。

TimerId.h

TimerId被设计用来取消Timer的,它的结构很简单,只有一个Timer指针和其序列号。其中还声明了TimerQueue为其友元,可以操作其私有数据。

Timer.{h,cc}

Timer是对定时器的高层次抽象,封装了定时器的一些参数,例如超时回调函数、超时时间、超时时间间隔、定时器是否重复、定时器的序列号。其函数大都是设置这些参数,run()用来调用回调函数,restart()用来重启定时器(如果设置为重复)。

TimerQueue.{h,cc}

TimerQueue 采用了最简单的实现(链表)来管理定时器,它的效率比不上常见的 binary heap 的做法,如果程序中大量(10 个以上)使用重复触发的定时器,或许值得考虑改用更高级的实现。

TimerQueue的接口很简单,只有两个函数addTimer()和cancel()。它的内部有channel,和timerfd相关联。添加新的Timer后,在超时后,timerfd可读,会处理channel事件,之后调用Timer的回调函数;在timerfd的事件处理后,还有检查一遍超时定时器,如果其属性为重复还有再次添加到定时器集合中。
image

TimeQueue的优化:
用二叉搜索树(例如std::set/std::map),把Timer按到期时间先后排好序,其操作的复杂度是O(logN),从而快速地根据当前时间找到已经到期的Timer,也要能高效地添加和删除Timer。
但我们使用时还要处理两个Timer到期时间相同的情况(map不支持key相同的情况),做法如下:

两种类型的set,一种按时间戳排序,一种按Timer的地址排序
实际上,这两个set保存的是相同的定时器列表

定时器主要是在EventLoop中使用,EventLoop中为我们提供了四个函数,供用户使用

Buffer.{h,cc}

Non-blocking IO 的核心思想是避免阻塞在 read() 或 write() 或其他 IO 系统调用上,这样可以最大限度地复用 thread-of-control,让一个线程能服务于多个 socket 连接。IO 线程只能阻塞在 IO-multiplexing 函数上,如 select()/poll()/epoll_wait()。这样一来,应用层的缓冲是必须的,每个 TCP socket 都要有 stateful 的 input buffer 和 output buffer。详情见书P205。

性能和易用性:muduo的buffer更偏向于易用性。

  • 对外表现为一块连续的内存(char*, len),以方便客户代码的编写。
  • 其 size() 可以自动增长,以适应不同大小的消息。它不是一个 fixed size array (即 char buf[8192])。
  • 内部以 vector of char 来保存数据,并提供相应的访问函数。vector有capcity()机制减少了分配内存的次数。
  • vector重新分配内存会使原来的指针失效,因此下标使用size_t而不用const char *

在这里插入图片描述

Buffer 其实像是一个 queue,从末尾写入数据,从头部读出数据。

muduo::net::Buffer 不是线程安全的,这么做是有意的,原因见书P209:

muduo的缓冲区Buffer类不是线程安全的,因为它是每个连接私有的,不需要锁的操作

  1. 对于 input buffer,onMessage() 回调始终发生在该 TcpConnection 所属的那个 IO 线程,应用程序应该在 onMessage() 完成对 input buffer 的操作,并且不要把 input buffer 暴露给其他线程。这样所有对 input buffer 的操作都在同一个线程,Buffer class 不必是线程安全的。
  2. 对于 output buffer,应用程序不会直接操作它,而是调用 TcpConnection::send() 来发送数据,后者是线程安全的。

在这里插入图片描述

调用send,如果没有一次写完就会往outbuffer内写数据,导致writeIndex后移。

调用系统函数write,将outbuffer中的数据往socket中写,会导致readIndex后移,如果全部都写入socket中了,则会将readIndex和writeIndex重新赋值8,指向最开始的位置。

buffer的初始大小为1kB多,如果经常发送10kB的数据,几次之后buffer的size()就会自动增长到10kB。初始值小可以避免内存浪费,自适应大小可以避免反复分配内存。

readFd函数的优点:

//节省一次ioctl系统调用(获取当前有多少可读数据)
//为什么这么说?因为我们准备了足够大的extrabuf,那么我们就不需要使用ioctl去查看fd有多少可读字节数了

//保证只调用一次read,不反复调用read导致返回EAGAIN
//而且muduo采用的水平触发,保证一次读完
//高效,只需要一次系统调用
//公平,不会因为某个连接上数据量过大而影响其他连接处理消息

前方添加(prepend)

buffer前面预留了8个字节,即提供了prependable空间,可以简化客户代码,以空间换时间。比如程序以固定的4个字节表示消息长度。

Acceptor.{h,cc}

在这里插入图片描述
构造函数:

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
  : loop_(loop),
    acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),                         //创建监听套接字
    acceptChannel_(loop, acceptSocket_.fd()),                                                    //绑定Channel和socketfd
    listening_(false),
    idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))                                           //预先准备一个空闲文件描述符
{
  assert(idleFd_ >= 0);
  acceptSocket_.setReuseAddr(true);
  acceptSocket_.setReusePort(reuseport);
  acceptSocket_.bindAddress(listenAddr);
  acceptChannel_.setReadCallback(
      std::bind(&Acceptor::handleRead, this));                                                   //设置读事件回调,Channel的fd的读回调函数
}

Acceptor用于accept(2)接受TCP连接。
Acceptor的数据成员包括Socket、Channel。
Acceptor的socket是listening socket(即server socket)。
Channel用于观察此socket的readable事件,并Acceptor::handleRead(),后者调用accept(2)来接受连接,并回调用户callback。

不过,Acceptor类在上层应用程序中我们不直接使用,而是把它封装作为TcpServer的成员。

在监听套接字可读事件触发时,我们会调用accept接受连接。如果此时注册过回调函数,就执行它。如果没有就直接关闭!

成员变量 idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) //预先准备一个空闲文件描述符
如果已用文件描述符过多,accept会返回-1,我们构造函数中注册的idleFd_就派上用场了。当前文件描述符过多,无法接收新的连接。但是由于我们采用LT模式,如果无法接收,可读事件会一直触发。那么在这个地方的处理机制就是,关掉之前创建的空心idleFd_,然后去accept让这个事件不会一直触发,然后再关掉该文件描述符,重新将它设置为空文件描述符。

这种机制可以让网络库在处理连接过多,文件描述符不够用时,不至于因为LT模式一直触发而产生坏的影响。

    if (errno == EMFILE)                                      //太多的文件描述符
    {
      ::close(idleFd_);
      idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
      ::close(idleFd_);
      idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
    }

EventLoop.{h,cc}

EventLoop是初始分发器,其实就是一个reactor角色,负责事件循环的部分在 muduo 被命名为 EventLoop
在这里插入图片描述

EventLoop::EventLoop()
  : looping_(false),                                                                //表示还未循环
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    iteration_(0),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),                                        //设置了环境变量MUDUO_USE_POLL,就构造一个实际的PollPoller对象。否则构造一个EPollPoller对象。创建一个epollfd_=3
                                                                                    //基类指针,指向派生类,基类的指针、引用可以指向子类对象
                                                                                    //poller_成员在eventlooper中只会调用基类有的四个函数:poll、updateChannel、removeChannel、hasChannel。派生类重写了前三个函数

    timerQueue_(new TimerQueue(this)),                                              //构造一个timerQueue指针,使用scope_ptr管理,创建一个timerfd_ = 4
    wakeupFd_(createEventfd()),                                                     //创建eventfd作为线程间等待/通知机制,创建一个wakeupFD_ = 5
    wakeupChannel_(new Channel(this, wakeupFd_)),                                   //创建wakeupChannel通道
    currentActiveChannel_(NULL)
{
  LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
  if (t_loopInThisThread)                                                           //保证每个线程最多一个EventLoop对象,如果已创建,终止程序(LOG_FATAL)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;
  }
  // 合成一个eventfd的通道Channel
  // 设置读事件回调函数,设定wakeupChannel的回调函数,即EventLoop自己的的handleRead函数
  wakeupChannel_->setReadCallback(
      std::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd

  wakeupChannel_->enableReading();                                                 // 使能wakeupFD_监听读事件,此处调用Channel的enableReading函数
}

该类中有一个loop函数,执行了该函数的线程就是I/O线程,负责调用poll来监控文件描述符,当有事件发生时,会去调用对应的回调函数,有四种回调函数:读、写、出错和关闭。这些函数在Channel中。

void EventLoop::loop()
{
  assert(!looping_);                                                              // 判断是否重复开始事件循环
  assertInLoopThread();                                                           //断言处于创建该对象的线程中
  looping_ = true;
  quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
  LOG_TRACE << "EventLoop " << this << " start looping";

  while (!quit_)
  {
    activeChannels_.clear();

    // 1.等待事件
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);              //调用poll返回活动的通道,有可能是唤醒返回的

    // 记录循环次数
    ++iteration_;

    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // TODO sort channel by priority(按照优先级对通道排序)

    // 2.处理事件
    eventHandling_ = true;
    for (Channel* channel : activeChannels_)                                    //遍历通道来进行处理
    {
      currentActiveChannel_ = channel;                                          //当前正在处理的活动通道
      currentActiveChannel_->handleEvent(pollReturnTime_);                      // 执行事件对应的处理函数,在Channel.cc中
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;

    // 3.处理未执行的函数 todo 暂时不知道哪种业务场景使用这个比较合适
    //I/O线程设计比较灵活,通过下面这个设计也能够进行计算任务,否则当I/O不是很繁忙的时候,这个I/O线程就一直处于阻塞状态。
    //我们需要让它也能执行一些计算任务
    doPendingFunctors();//处理用户回调任务
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

I/O线程比较灵活。当I/O线程不忙的时候,也就是poll监控的文件描述符的事件不怎么发生,I/O线程还能去执行计算任务:doPendingFunctors();

queueInLoop和runInloop函数的区别:

  • 前者仅是加入等待队列pendingFunctors_,等待执行
  • 后者,如果本线程是I/O线程,就直接执行,如果不是I/O线程就放入等待队列pendingFunctors_

EventLoopThread.{h,cc}

在这里插入图片描述
构造函数:

EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
                                 const string& name)
  : loop_(NULL),                                                                   //loop未启动为NULL
    exiting_(false),
    thread_(std::bind(&EventLoopThread::threadFunc, this), name),                  //绑定线程运行函数
    mutex_(),
    cond_(mutex_),
    callback_(cb)                                                                  //初始化回调函数
{
}

关于EventLoopThread有以下几点:

  1. 任何一个线程,只要创建并运行了EventLoop,都称之为I/O线程。

  2. I/O线程不一定是主线程。I/O线程中可能有I/O线程池和计算线程池。(主Reactor(TcpServer)的I/O线程有I/O线程池)

  3. muduo并发模型one loop per thread + threadpool

  4. 为了方便使用,就直接定义了一个I/O线程的类,就是EventLoopThread类,该类实际上就是对I/O线程的封装。

(1)EventLoopThread创建了一个线程。
(2)在该类线程函数中创建了一个EventLoop对象并调用EventLoop::loop,调用EventLoopThread::startLoop(),就启动了线程也调用了loop

EventLoopThread::startLoop()和EventLoopThread::threadFunc()的讲解放在后面 muduo TcpServer线程池的启动。

EventLoopThreadPool.{h,cc}

在这里插入图片描述
构造函数:

EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
  : baseLoop_(baseLoop),
    name_(nameArg),
    started_(false),
    numThreads_(0),
    next_(0)
{
}

EventLoopThreadPool 是一个线程池,只不过该线程池有一点特殊,该线程池中的每一个线程都要执行EventLoop进行文件描述符的监听。

此时一个线程用于管理分配线程池中的EventLoop,如果线程池为空,主线程的EventLoop用与监听所有的文件描述符baseLoop_

每个muduo网络库有一个事件驱动循环线程池EventLoopThreadPool
每个线程池中有多个事件驱动线程EventLoopThread
每个线程运行一个EventLoop事件循环
每个EventLoop事件循环包含一个io复用Poller,一个计时器队列TimerQueue
每个Poller监听多个Channel,TimerQueue其实也是一个Channel
每个Channel对应一个fd,在Channel被激活后调用回调函数
每个回调函数是在EventLoop所在线程执行
所有激活的Channel回调结束后EventLoop继续让Poller监听

muduo TcpServer线程池的启动

TcpServer::start

首先是TcpServer::start()函数内部会启动线程池,调用EventLoopThreadPool::start(threadInitCallback_),同时传入一个回调函数,这个函数由用户通过调用TcpServer::setThreadInitCallback()显示设置,TcpServer并没有设置,这个函数也就是在EventLoopThreadPool启动的时候会执行一下。
在这里插入图片描述

EventLoopThreadPool::start

此处传入的cb就是threadInitCallback_。可见如果numThreads_大于1时才会创建EventLoopThread。numThreads_被初始化为0,需要用户调用TcpServer::setThreadNum()手动设置。
第46行:把每个子线程启动后返回的EventLoop都存储起来
在这里插入图片描述

EventLoopThread::startLoop

42行thread_.start(),线程启动,(后面的事情就是新创建的线程去做了,主线程等待一下loop_不为空,就返回了,不需要等待threadFunc函数执行完毕,见base::thread::runInThread())去执行的函数是EventLoopThread::threadFunc(),在EventLoopThread的构造函数中就设置了:

thread_(std::bind(&EventLoopThread::threadFunc, this), name),                  //绑定线程运行函数

在这里插入图片描述
在这里插入图片描述

TcpConnection.{h,cc}

在这里插入图片描述
构造函数:

TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),                                                              //检查loop不为空
    name_(nameArg),                                                                          //连接名字
    state_(kConnecting),                                                                     //连接的状态
    reading_(true),                                                                          //监听读事件
    socket_(new Socket(sockfd)),                                                             //将建立连接成功返回的sockfd进行封装,生成socket_对象
    channel_(new Channel(loop, sockfd)),                                                     //生成一个channel对象
    localAddr_(localAddr),                                                                   //本端地址
    peerAddr_(peerAddr),                                                                     //对端地址
    highWaterMark_(64*1024*1024)                                                             //高水位标记
{
                                                                                             //channel设置读回调,写回调,关闭回调,错误回调
  channel_->setReadCallback(
      std::bind(&TcpConnection::handleRead, this, _1));
  channel_->setWriteCallback(
      std::bind(&TcpConnection::handleWrite, this));
  channel_->setCloseCallback(
      std::bind(&TcpConnection::handleClose, this));
  channel_->setErrorCallback(
      std::bind(&TcpConnection::handleError, this));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  socket_->setKeepAlive(true);                                                               //开启保活机制
}

该类起到一个承上启下的作用,维持着TcpServer, Channel, Socket等等之间的联系。

TcpConnection里面的五个回调函数都会在TcpClient和TcpServer里面设置。

该类对象,客户端和服务器都会用到 。这是一个接口类。

TcpConnection在构造函数中开启了保活机制。

TcpServer.{h,cc}

Tcp服务端,就是一个服务器。
该类即支持单线程,也支持多线程。
image

TcpServer的构造函数最少要传三个参数:EventLoop,InetAddress,string。还有第四个参数,这个参数给了默认值,默认为0,还可以给1,默认值是在构造Acceptor对象的时候,不开启ReusePort,为1的时候则去开启。

TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option)
  : loop_(CHECK_NOTNULL(loop)),                                                            //外部传入的一个EventLoop,检查不为空
    ipPort_(listenAddr.toIpPort()),                                                        //绑定的地址
    name_(nameArg),                                                                        //服务名称
    acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),                       //构造一个Acceptor对象
    threadPool_(new EventLoopThreadPool(loop, name_)),                                     //构造一个I/O线程池对象
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),                                              //将buffer中的读索引和写索引重置
    nextConnId_(1)                                                                         //下一个建立的连接ID为1
{
  acceptor_->setNewConnectionCallback(                                                     //监听套接字获取到新链接后就会执行该回调函数
      std::bind(&TcpServer::newConnection, this, _1, _2));
}

注意这个loop,可以看到该loop还参与了Acceptor和EventLoopThreadPool的初始化:
image

Acceptor的初始化中,又用该loop初始化了Channel:
image

EventLoopThreadPool的初始化需要传一个loop来标明baseloop是谁:
image
线程池初始化成功后可以调用一个线程池初始化回调函数,这个函数要由用户手动设置,muduo的测试用例没有设置。

由此可见baseloop(主Reactor)的TcpServer,Acceptor,EventLoopThreadPool和Channel的loop都是同一个。
start:
启动线程池
Acceptor::listen----->acceptChannel_.enableReading()------>loop_->updateChannel(this)------>poller_->updateChannel(channel)

建立连接:

image
TcpServer

image
目前在TcpServer和TcpClient都没有设置TcpConnection::highWaterMarkCallback_和TcpServer::threadInitCallback_。后者有暴露接口给用户设置,前者没有暴露接口。

关闭连接:

muduo断开连接的方式:

  1. 被动关闭:即对方先关闭连接,本地read()返回0,触发关闭逻辑,调用TcpConnection::handleClose()
  2. 主动关闭:本地调用forceClose(),调用TcpConnection::handleClose()
  3. Channel监听到POLLHUP事件,并且没有POLLIN事件,调用channel的closeCallback_
  • Channel的close回调就是TcpConnection中的handleClose(在TcpConnection构造函数中设置)
  • HandleRead调用readFd的返回值是0,会调用handleClose
  • forceCloseInLoop会调用handleClose

handleClose–>clsoeCallback_(在TcpServer的newConnection中设置)–>removeConnection–>removeConnectionInLoop–>connectDestroyed

  • TcpServer的析构函数,会直接调用connectDestroyed函数

image

TcpConnection的removeConnection,erase将这个连接对象从列表中移除。按照正常的思路,我们还应该将这个对象销毁掉,但是在这里我们不能立即销毁这个连接对象,如果销毁了这个对象,TcpConnection所包含的Channel对象也就跟着销毁了。而当前正在调用这个Channel对象的handleEvent函数,而这个Channel对象又销毁了,就会出现coredump。因而这个不能销毁TcpConnection对象,也就是说TcpConnection对象的生存期应该长于HandleEvent函数,如何做到这一点,可以利用shared_ptr来管理TcpConnection对象。

当连接到来,创建一个TcpConnection对象,立刻用shared_ptr来管理,这时候引用计数为1。

在Channel中维护一个weak_ptr(tie_),将这个shared_ptr对象赋值给tie_,因为是弱引用,所以引用计数不会加1。

当连接关闭,调用了Channel的handleEvent函数,在这个函数中,将tie_提升,得到一个shared_ptr对象,此时引用计数为2。

Connector.{h,cc}

连接器,用于客户端发起连接
在这里插入图片描述
start()--->startInLoop()--->connect()
EchoClient中的TcpClient直接调用的Connector::connect()。没有调用start()。

TcpClient.{h,cc}

TCP客户端

TcpClient::TcpClient(EventLoop* loop,
                     const InetAddress& serverAddr,
                     const string& nameArg)
  : loop_(CHECK_NOTNULL(loop)),
    connector_(new Connector(loop, serverAddr)),
    name_(nameArg),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    retry_(false),
    connect_(true),
    nextConnId_(1)
{
  //将Connector的连接回调函数设置为TcpClient::newConnection
  //该函数在Connector::handleWrite中被调用,可写就表示连接已经建立成功了(但是还需判断是否出错)

  connector_->setNewConnectionCallback(
      std::bind(&TcpClient::newConnection, this, _1));
  // FIXME setConnectFailedCallback
  LOG_INFO << "TcpClient::TcpClient[" << name_
           << "] - connector " << get_pointer(connector_);
}

TCPClient使用Conneccor发起连接, 连接建立成功后, 用socket创建TcpConnection来管理连接。 每个TcpClient class只管理一个TcpConnecction,而TcpServer管理多个TcpConnection。

不管是TcpServer还是TcpClient,都只关注可读事件,如果Socket出错,会变为可读,如果要写入数据,先直接调用write,如果没有写完,再去关注可写事件。

后续的事件,数据等都在TcpConnection这个结构中维护。

建立连接:
TcpClient

关闭连接和TcpServer类似,都和TcpConnection有关,有区别的是Connector

http

HttpContext

解析buffer,生成HttpRequest

http请求储存在buffer里面,每次处理一行都要往后挪动读指针

const char* crlf = buf->findCRLF()
buf->retrieveUntil(crlf + 2)//+2是把\r\n字符算进去

std::equal(start, end, const char*)比较函数

HttpRequest

  1. 第一行:请求行(首行),以空格为单位,请求方法 请求URL HTTP协议的版本
  2. 第二行到空行之前:请求报头(Header),以行为单位陈列,key:value
  3. 空行
  4. 请求正文(Body)

image

HttpResponse

对于HTTP响应的封装,封装的状态码只有5个:0,200,301,400,404

  1. 第一行:状态行(首行)【版本号】+【状态码】+【状态码解释】
  2. 第二行到空行之前:响应报头(Header),遇到空行表示响应报头结束
  3. 空行
  4. 响应正文(网页)Body:空行之后,允许为空字符串,如果有数据,那么在响应报头会有一个Content-Length属性来标识正文的长度。如果服务器返回一个html页面,那么html页面内容就是在正文中。html告诉你是什么,根据css文件来决定图片咋放,动态效果是gs产生的

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3LMJLFQ0-1630570044565)(https://user-images.githubusercontent.com/40709975/131464722-fcbfa80d-81c9-4743-bd52-28f8ab7b8f39.png)]

HttpServer

封装了一下TcpServer,实现的比较简单
image

HTTP请求头:http://tools.jb51.net/table/http_header
HTTP管道化,队头阻塞,管道化/非管道化:https://blog.csdn.net/fesfsefgs/article/details/108294050

muduo优化:

  1. 就绪事件是按时间放在就绪队列里的,并没有做优先级区分
  2. muduo多线程模型优化:为每个连接的每个请求也新建线程去处理,而不是在同一个线程中
  3. buffer类扩容的时候prependable没有修改,还维持在原来的位置。
  4. buffer扩容使用的resize,会对新增的空间进行memset()为0,有点浪费。
  5. 分段连续的zero copy buffer再配合gather scatter IO,buffer的性能会更优。
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-09-04 17:54:21  更:2021-09-04 17:56:49 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/16 22:24:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码