本文重点在muduo TcpServer的启动,I/O线程池的启动,以及各种回调
文章目录
- base
- AsyncLogging.{h,cc}
- Atomic.h
- BlockinQueue.h
- BoundedBlockinQueue.h
- Condition.h
- copyable.h
- CountDownLatch.{h,cc}
- Date.{h,cc}
- Exception.{h,cc}
- Logging.{h,cc}
- Mutex.h
- ProcessInfo.{h,cc}
- Singleton.h
- StringPiece.h
- Thread.{h,cc}
- ThreadLocal.h
- ThreadLocalSingleton.h
- ThreadPool.{h,cc}
- Timestamp.{h,cc}
- TimeZone.{h,cc}
- Types.h
- net
- ~~Socket.{h,cc}~~
- ~~SocketOps.{h,cc}~~
- InetAddress.{h,cc}
- Edian.h
- ~~Poller.{h,cc}~~
- PollPoller
- select和poll在内核态的遍历:
- EPollPoller
-
- Channel.{h,cc}
- 定时器
-
- Buffer.{h,cc}
-
- ~~Acceptor.{h,cc}~~
- EventLoop.{h,cc}
- EventLoopThread.{h,cc}
- EventLoopThreadPool.{h,cc}
- muduo TcpServer线程池的启动
-
- TcpConnection.{h,cc}
- TcpServer.{h,cc}
-
- ~~Connector.{h,cc}~~
- TcpClient.{h,cc}
- http
-
- muduo优化:
用户可见的类,用户不可见的类
base
一些基础库,都是用户可见的类 如何实现一个不能被继承的类:
final、使用友元、私有构造函数、虚继承等方式可以使一个类不能被继承
AsyncLogging.{h,cc}
异步日志backend
Atomic.h
原子操作与原子整数
BlockinQueue.h
无界阻塞队列(生产者消费者模型)
BoundedBlockinQueue.h
有界阻塞队列
Condition.h
条件变量,与Mutex.h一起使用
copyable.h
一个空基类,用于标识(tag)值类型
CountDownLatch.{h,cc}
“倒计时门闩”同步
Date.{h,cc}
Julian日期库(即公历)
Exception.{h,cc}
带stack trace的异常基类
Logging.{h,cc}
简单的日志,搭配AsyncLogging使用
Mutex.h
互斥器
ProcessInfo.{h,cc}
进程信息
Singleton.h
线程安全的singleton 实现懒汉单例,用pthread_once()函数保证初始化函数只会在本进程中执行一次。如果在单例中有函数no_destroy(),程序结束时就不会通过atexit()函数注册清理函数,会造成内存泄漏
StringPiece.h
从Google开源代码借用的字符串参数传递类型
Thread.{h,cc}
线程对象 注意latch_的变化:它会等待子线程执行完startThread try后面接catch语句捕捉异常
ThreadLocal.h
线程局部数据 线程本地变量(线程特定变量,线程私有变量) pthread_key_create pthread_key_delete pthread_key_getspecific pthread_key_setspecific
ThreadLocalSingleton.h
每个线程一个singleton 线程本地懒汉单例,用的__thread。 每个线程都有自己的一份单例,利用pthread_key_create管理单例的生命周期,生成单例之后利用pthread_key_create传入的destructor析构该单例对象。
ThreadPool.{h,cc}
简单的固定大小线程池
Timestamp.{h,cc}
UTC时间戳
TimeZone.{h,cc}
时区与夏令时
Types.h
基本类型的声明,包括muduo::string
net
muduo的I/O模型采用非阻塞模式,避免阻塞在read()或write()或其他系统调用上
Socket.{h,cc}
KeepAlive:https://zhuanlan.zhihu.com/p/28894266 socket阻塞和非阻塞有哪些不同
- 建立连接
阻塞方式下,connect首先发送SYN请求到服务器,当客户端收到服务器返回的SYN的确认时,则connect返回,否则的话一直阻塞。
非阻塞方式,connect将启用TCP协议的三次握手,但是connect函数并不等待连接建立好才返回,而是立即返回,返回的错误码为EINPROGRESS,表示正在进行某种过程。
- 接收连接
阻塞模式下调用accept()函数,而且没有新连接时,进程会进入睡眠状态,直到有可用的连接,才返回。
非阻塞模式下调用accept()函数立即返回,有连接返回客户端套接字描述符,没有新连接时,将返回EWOULDBLOCK错误码,表示本来应该阻塞。
muduo使用的是非阻塞IO,和IO多路复用结合起来的一般都是非阻塞。 封装了socket套接字编程,诸Listen/bind/accept方法等等,有的是调用SocketOps.h的接口
SocketOps.{h,cc}
对Socket.h方法的补充和填充,实际调用系统接口,实现了create/bind/listen/accept/connect/read/write/close/shutdown等函数
需要关注非阻塞connect和close-on-exec的写法:
void setNonBlockAndCloseOnExec(int sockfd)
{
// non-block
int flags = ::fcntl(sockfd, F_GETFL, 0);
flags |= O_NONBLOCK;
int ret = ::fcntl(sockfd, F_SETFL, flags);
// FIXME check
// close-on-exec
flags = ::fcntl(sockfd, F_GETFD, 0);
flags |= FD_CLOEXEC;
ret = ::fcntl(sockfd, F_SETFD, flags);
// FIXME check
(void)ret;
}
InetAddress.{h,cc}
对sockaddr_in和sockaddr_in6的封装,方便构造,获取ip地址和port
Edian.h
网络字节序与本机字节序的转换
Poller.{h,cc}
负责监听事件是否触发的部分,在 muduo 中叫做 Poller
基类实现了hasChannel函数,判断Map是否拥有此Channel
protected:
typedef std::map<int, Channel*> ChannelMap;
ChannelMap channels_;
private:
EventLoop* ownerLoop_;
Poller使用一个map来存放描述符fd和对应的Channel类型的指针, 这样我们就可以通过fd很方便的得到Channel了,该map是protected变量。
私有成员是一个EventLoop的指针,用来指向当前EventLoop,用来判断防止Poller被跨线程调用。
该类要被PollPoller和EPollPoller继承
PollPoller
封装了高级IO:poll
pollfds_存放pollfd的数组,用来传入poll模式中的第一个事件集合参数:
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
缺点:当事件就绪时,用户并不知道哪些事件就绪了,需要遍历pollfds_,如果pollfds_的某个元素(假设为A)的revents字段大于0,则说明该事件就绪,此时会到channels_这个map里根据fd找到对应的channel,将pollfds_中A的revents字段赋值给对应channel的revents字段,然后把该channel放到活跃事件数组activeChannel中。
事件的fd的修改技巧: 不直接修改为-1,而是改为-fd-1。方便后续还原(还原是为了去map中寻找对应的channel),还原方式仍为:-fd-1
select和poll在内核态的遍历:
将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。
epoll在醒着的时候只需要去检测就绪队列有无就绪的事件就行,不需要去遍历。
EPollPoller
封装了高级IO:epoll
muduo的epoll采用的是水平触发 原因:
- 与传统的poll兼容,在文件描述符数目较少而活跃的文件描述符数目又较多时,回调函数触发太频繁,此时的poll甚至比epoll效率更高。(epoll试用于连接较多,活动较少的情况)
- 水平触发编程更加简单,不会有漏掉事件的bug
- 读写的时候不必等候出现EAGAIN,可以节省系统调用次数,降低延迟。(ET模式下,read一个fd的时候一定要把它的buffer读光,也就是说一直读到read的返回值小于请求值,或者遇到EAGAIN错误)
在VxWorks和Windows上,EAGAIN的名字叫做EWOULDBLOCK。在linux进行非阻塞的socket接收数据时经常出现Resource temporarily unavailable,errno代码为11(EAGAIN),该错误不会破坏socket的同步。对非阻塞socket而言,EAGAIN不是一种错误。
epoll水平触发和边缘触发的区别:
水平模式下,只要有事件就绪,可以先不处理,下次调用epoll_wait的时候回再次通知。 边缘触发:文件描述符A的可读写事件就绪,通知一次。如果没有一次性读取完毕,下一次文件描述符B的可读事件就绪,通知用户的时候不会通知A还没读完,只会通知B的,只有等到A文件描述符再次有事件就绪时,才会通知。这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符。
ET在很大程度上降低了同一个epoll事件被重复触发的次数。
epoll优点
- select/poll一般只能处理几千的并发连接。epoll能监控的文件描述符没有上限,1G内存大概能监控10W个端口
- 不用轮询,只有活跃事件才会调用回调函数。select和poll在内核态需要遍历,epoll只需要查看就绪队列
- 每次调用select和poll都需要有用户态到内核态的拷贝,而且每次都需要把当前进程挂到设备等待队列中,epoll只需要一次拷贝,而且也只需要把进程挂一次等待对列(注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等待队列)
- epoll的三个接口更加方便
epoll机制:
epoll_create方法时,Linux内核会创建一个eventpoll结构体
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的节点数)。
而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
在epoll中,对于每一个事件,都会建立一个epitem结构体:
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
epoll惊群问题: 一瞬间大量的连接都活跃,epoll_wait一直在活跃,一直在处理数据,最高效的是不用epoll,直接用非阻塞的读就好了。大部分时间花在epoll_wait之后的处理。
Channel.{h,cc}
用于每个Socket连接的事件分发 构造函数:
Channel::Channel(EventLoop* loop, int fd__)
: loop_(loop),
fd_(fd__),
events_(0),
revents_(0),
index_(-1),
logHup_(true),
tied_(false),
eventHandling_(false),
addedToLoop_(false)
{
}
不管是poll还是epoll,最终的事件都会存放到channel中 index_在poll中表示下标,在epoll中表示三个标志位(新增、删除、更新)。因为PollPoller::poller_这个数组是poll的传入传出参数,就绪的和没有就绪的都在这个数组里面,因此可以根据index快速的定位。而epoll的EPollPollPoller::events_ 只用来存放就绪的事件,是一个传出参数,因此index_可以用来做其他的事情
定时器
为什么网络编程中需要定时器呢?
在开发Linux网络程序时,通常需要维护多个定时器,如维护客户端心跳时间、检查多个数据包的超时重传等。如果采用Linux的SIGALARM信号实现,则会带来较大的系统开销,且不便于管理。
timerfd是Linux为用户程序提供的一个定时器接口。这个接口基于文件描述符,通过文件描述符的可读事件进行超时通知,所以能够被用于select/poll的应用场景,采用文件描述符实现定时有利于统一事件源。
mudo的定时器由三个类实现,TimerId,Timer,TimerQueue,用户只能看到第一个类,其它两个类都是内部实现细节。
TimerId.h
TimerId被设计用来取消Timer的,它的结构很简单,只有一个Timer指针和其序列号。其中还声明了TimerQueue为其友元,可以操作其私有数据。
Timer.{h,cc}
Timer是对定时器的高层次抽象,封装了定时器的一些参数,例如超时回调函数、超时时间、超时时间间隔、定时器是否重复、定时器的序列号。其函数大都是设置这些参数,run()用来调用回调函数,restart()用来重启定时器(如果设置为重复)。
TimerQueue.{h,cc}
TimerQueue 采用了最简单的实现(链表)来管理定时器,它的效率比不上常见的 binary heap 的做法,如果程序中大量(10 个以上)使用重复触发的定时器,或许值得考虑改用更高级的实现。
TimerQueue的接口很简单,只有两个函数addTimer()和cancel()。它的内部有channel,和timerfd相关联。添加新的Timer后,在超时后,timerfd可读,会处理channel事件,之后调用Timer的回调函数;在timerfd的事件处理后,还有检查一遍超时定时器,如果其属性为重复还有再次添加到定时器集合中。
TimeQueue的优化: 用二叉搜索树(例如std::set/std::map),把Timer按到期时间先后排好序,其操作的复杂度是O(logN),从而快速地根据当前时间找到已经到期的Timer,也要能高效地添加和删除Timer。 但我们使用时还要处理两个Timer到期时间相同的情况(map不支持key相同的情况),做法如下:
两种类型的set,一种按时间戳排序,一种按Timer的地址排序 实际上,这两个set保存的是相同的定时器列表
定时器主要是在EventLoop中使用,EventLoop中为我们提供了四个函数,供用户使用
Buffer.{h,cc}
Non-blocking IO 的核心思想是避免阻塞在 read() 或 write() 或其他 IO 系统调用上,这样可以最大限度地复用 thread-of-control,让一个线程能服务于多个 socket 连接。IO 线程只能阻塞在 IO-multiplexing 函数上,如 select()/poll()/epoll_wait()。这样一来,应用层的缓冲是必须的,每个 TCP socket 都要有 stateful 的 input buffer 和 output buffer。详情见书P205。
性能和易用性:muduo的buffer更偏向于易用性。
- 对外表现为一块连续的内存(char*, len),以方便客户代码的编写。
- 其 size() 可以自动增长,以适应不同大小的消息。它不是一个 fixed size array (即 char buf[8192])。
- 内部以 vector of char 来保存数据,并提供相应的访问函数。vector有capcity()机制减少了分配内存的次数。
- vector重新分配内存会使原来的指针失效,因此下标使用size_t而不用const char *
Buffer 其实像是一个 queue,从末尾写入数据,从头部读出数据。
muduo::net::Buffer 不是线程安全的,这么做是有意的,原因见书P209:
muduo的缓冲区Buffer类不是线程安全的,因为它是每个连接私有的,不需要锁的操作
- 对于 input buffer,onMessage() 回调始终发生在该 TcpConnection 所属的那个 IO 线程,应用程序应该在 onMessage() 完成对 input buffer 的操作,并且不要把 input buffer 暴露给其他线程。这样所有对 input buffer 的操作都在同一个线程,Buffer class 不必是线程安全的。
- 对于 output buffer,应用程序不会直接操作它,而是调用 TcpConnection::send() 来发送数据,后者是线程安全的。
调用send,如果没有一次写完就会往outbuffer内写数据,导致writeIndex后移。
调用系统函数write,将outbuffer中的数据往socket中写,会导致readIndex后移,如果全部都写入socket中了,则会将readIndex和writeIndex重新赋值8,指向最开始的位置。
buffer的初始大小为1kB多,如果经常发送10kB的数据,几次之后buffer的size()就会自动增长到10kB。初始值小可以避免内存浪费,自适应大小可以避免反复分配内存。
readFd函数的优点:
//节省一次ioctl系统调用(获取当前有多少可读数据) //为什么这么说?因为我们准备了足够大的extrabuf,那么我们就不需要使用ioctl去查看fd有多少可读字节数了
//保证只调用一次read,不反复调用read导致返回EAGAIN //而且muduo采用的水平触发,保证一次读完 //高效,只需要一次系统调用 //公平,不会因为某个连接上数据量过大而影响其他连接处理消息
前方添加(prepend)
buffer前面预留了8个字节,即提供了prependable空间,可以简化客户代码,以空间换时间。比如程序以固定的4个字节表示消息长度。
Acceptor.{h,cc}
构造函数:
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listening_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));
}
Acceptor用于accept(2)接受TCP连接。 Acceptor的数据成员包括Socket、Channel。 Acceptor的socket是listening socket(即server socket)。 Channel用于观察此socket的readable事件,并Acceptor::handleRead(),后者调用accept(2)来接受连接,并回调用户callback。
不过,Acceptor类在上层应用程序中我们不直接使用,而是把它封装作为TcpServer的成员。
在监听套接字可读事件触发时,我们会调用accept接受连接。如果此时注册过回调函数,就执行它。如果没有就直接关闭!
成员变量 idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) //预先准备一个空闲文件描述符 如果已用文件描述符过多,accept会返回-1,我们构造函数中注册的idleFd_就派上用场了。当前文件描述符过多,无法接收新的连接。但是由于我们采用LT模式,如果无法接收,可读事件会一直触发。那么在这个地方的处理机制就是,关掉之前创建的空心idleFd_,然后去accept让这个事件不会一直触发,然后再关掉该文件描述符,重新将它设置为空文件描述符。
这种机制可以让网络库在处理连接过多,文件描述符不够用时,不至于因为LT模式一直触发而产生坏的影响。
if (errno == EMFILE) //太多的文件描述符
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
EventLoop.{h,cc}
EventLoop是初始分发器,其实就是一个reactor角色,负责事件循环的部分在 muduo 被命名为 EventLoop
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading();
}
该类中有一个loop函数,执行了该函数的线程就是I/O线程,负责调用poll来监控文件描述符,当有事件发生时,会去调用对应的回调函数,有四种回调函数:读、写、出错和关闭。这些函数在Channel中。
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
I/O线程比较灵活。当I/O线程不忙的时候,也就是poll监控的文件描述符的事件不怎么发生,I/O线程还能去执行计算任务:doPendingFunctors();
queueInLoop和runInloop函数的区别:
- 前者仅是加入等待队列pendingFunctors_,等待执行
- 后者,如果本线程是I/O线程,就直接执行,如果不是I/O线程就放入等待队列pendingFunctors_
EventLoopThread.{h,cc}
构造函数:
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
const string& name)
: loop_(NULL),
exiting_(false),
thread_(std::bind(&EventLoopThread::threadFunc, this), name),
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
关于EventLoopThread有以下几点:
-
任何一个线程,只要创建并运行了EventLoop,都称之为I/O线程。 -
I/O线程不一定是主线程。I/O线程中可能有I/O线程池和计算线程池。(主Reactor(TcpServer)的I/O线程有I/O线程池) -
muduo并发模型one loop per thread + threadpool -
为了方便使用,就直接定义了一个I/O线程的类,就是EventLoopThread类,该类实际上就是对I/O线程的封装。
(1)EventLoopThread创建了一个线程。 (2)在该类线程函数中创建了一个EventLoop对象并调用EventLoop::loop,调用EventLoopThread::startLoop(),就启动了线程也调用了loop
EventLoopThread::startLoop()和EventLoopThread::threadFunc()的讲解放在后面 muduo TcpServer线程池的启动。
EventLoopThreadPool.{h,cc}
构造函数:
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
: baseLoop_(baseLoop),
name_(nameArg),
started_(false),
numThreads_(0),
next_(0)
{
}
EventLoopThreadPool 是一个线程池,只不过该线程池有一点特殊,该线程池中的每一个线程都要执行EventLoop进行文件描述符的监听。
此时一个线程用于管理分配线程池中的EventLoop,如果线程池为空,主线程的EventLoop用与监听所有的文件描述符baseLoop_
每个muduo网络库有一个事件驱动循环线程池EventLoopThreadPool 每个线程池中有多个事件驱动线程EventLoopThread 每个线程运行一个EventLoop事件循环 每个EventLoop事件循环包含一个io复用Poller,一个计时器队列TimerQueue 每个Poller监听多个Channel,TimerQueue其实也是一个Channel 每个Channel对应一个fd,在Channel被激活后调用回调函数 每个回调函数是在EventLoop所在线程执行 所有激活的Channel回调结束后EventLoop继续让Poller监听
muduo TcpServer线程池的启动
TcpServer::start
首先是TcpServer::start()函数内部会启动线程池,调用EventLoopThreadPool::start(threadInitCallback_),同时传入一个回调函数,这个函数由用户通过调用TcpServer::setThreadInitCallback()显示设置,TcpServer并没有设置,这个函数也就是在EventLoopThreadPool启动的时候会执行一下。
EventLoopThreadPool::start
此处传入的cb就是threadInitCallback_。可见如果numThreads_大于1时才会创建EventLoopThread。numThreads_被初始化为0,需要用户调用TcpServer::setThreadNum()手动设置。 第46行:把每个子线程启动后返回的EventLoop都存储起来
EventLoopThread::startLoop
42行thread_.start(),线程启动,(后面的事情就是新创建的线程去做了,主线程等待一下loop_不为空,就返回了,不需要等待threadFunc函数执行完毕,见base::thread::runInThread())去执行的函数是EventLoopThread::threadFunc(),在EventLoopThread的构造函数中就设置了:
thread_(std::bind(&EventLoopThread::threadFunc, this), name), //绑定线程运行函数
TcpConnection.{h,cc}
构造函数:
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}
该类起到一个承上启下的作用,维持着TcpServer, Channel, Socket等等之间的联系。
TcpConnection里面的五个回调函数都会在TcpClient和TcpServer里面设置。
该类对象,客户端和服务器都会用到 。这是一个接口类。
TcpConnection在构造函数中开启了保活机制。
TcpServer.{h,cc}
Tcp服务端,就是一个服务器。 该类即支持单线程,也支持多线程。
TcpServer的构造函数最少要传三个参数:EventLoop,InetAddress,string。还有第四个参数,这个参数给了默认值,默认为0,还可以给1,默认值是在构造Acceptor对象的时候,不开启ReusePort,为1的时候则去开启。
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
注意这个loop,可以看到该loop还参与了Acceptor和EventLoopThreadPool的初始化:
Acceptor的初始化中,又用该loop初始化了Channel:
EventLoopThreadPool的初始化需要传一个loop来标明baseloop是谁: 线程池初始化成功后可以调用一个线程池初始化回调函数,这个函数要由用户手动设置,muduo的测试用例没有设置。
由此可见baseloop(主Reactor)的TcpServer,Acceptor,EventLoopThreadPool和Channel的loop都是同一个。 start: 启动线程池 Acceptor::listen----->acceptChannel_.enableReading()------>loop_->updateChannel(this)------>poller_->updateChannel(channel)
建立连接:
目前在TcpServer和TcpClient都没有设置TcpConnection::highWaterMarkCallback_和TcpServer::threadInitCallback_。后者有暴露接口给用户设置,前者没有暴露接口。
关闭连接:
muduo断开连接的方式:
- 被动关闭:即对方先关闭连接,本地read()返回0,触发关闭逻辑,调用TcpConnection::handleClose()
- 主动关闭:本地调用forceClose(),调用TcpConnection::handleClose()
- Channel监听到POLLHUP事件,并且没有POLLIN事件,调用channel的closeCallback_
- Channel的close回调就是TcpConnection中的handleClose(在TcpConnection构造函数中设置)
- HandleRead调用readFd的返回值是0,会调用handleClose
- forceCloseInLoop会调用handleClose
handleClose–>clsoeCallback_(在TcpServer的newConnection中设置)–>removeConnection–>removeConnectionInLoop–>connectDestroyed
- TcpServer的析构函数,会直接调用connectDestroyed函数
TcpConnection的removeConnection,erase将这个连接对象从列表中移除。按照正常的思路,我们还应该将这个对象销毁掉,但是在这里我们不能立即销毁这个连接对象,如果销毁了这个对象,TcpConnection所包含的Channel对象也就跟着销毁了。而当前正在调用这个Channel对象的handleEvent函数,而这个Channel对象又销毁了,就会出现coredump。因而这个不能销毁TcpConnection对象,也就是说TcpConnection对象的生存期应该长于HandleEvent函数,如何做到这一点,可以利用shared_ptr来管理TcpConnection对象。
当连接到来,创建一个TcpConnection对象,立刻用shared_ptr来管理,这时候引用计数为1。
在Channel中维护一个weak_ptr(tie_),将这个shared_ptr对象赋值给tie_,因为是弱引用,所以引用计数不会加1。
当连接关闭,调用了Channel的handleEvent函数,在这个函数中,将tie_提升,得到一个shared_ptr对象,此时引用计数为2。
Connector.{h,cc}
连接器,用于客户端发起连接 EchoClient中的TcpClient直接调用的Connector::connect()。没有调用start()。
TcpClient.{h,cc}
TCP客户端
TcpClient::TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
connector_(new Connector(loop, serverAddr)),
name_(nameArg),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
retry_(false),
connect_(true),
nextConnId_(1)
{
connector_->setNewConnectionCallback(
std::bind(&TcpClient::newConnection, this, _1));
LOG_INFO << "TcpClient::TcpClient[" << name_
<< "] - connector " << get_pointer(connector_);
}
TCPClient使用Conneccor发起连接, 连接建立成功后, 用socket创建TcpConnection来管理连接。 每个TcpClient class只管理一个TcpConnecction,而TcpServer管理多个TcpConnection。
不管是TcpServer还是TcpClient,都只关注可读事件,如果Socket出错,会变为可读,如果要写入数据,先直接调用write,如果没有写完,再去关注可写事件。
后续的事件,数据等都在TcpConnection这个结构中维护。
建立连接:
关闭连接和TcpServer类似,都和TcpConnection有关,有区别的是Connector
http
HttpContext
解析buffer,生成HttpRequest
http请求储存在buffer里面,每次处理一行都要往后挪动读指针
const char* crlf = buf->findCRLF()
buf->retrieveUntil(crlf + 2)//+2是把\r\n字符算进去
std::equal(start, end, const char*)比较函数
HttpRequest
- 第一行:请求行(首行),以空格为单位,请求方法 请求URL HTTP协议的版本
- 第二行到空行之前:请求报头(Header),以行为单位陈列,key:value
- 空行
- 请求正文(Body)
HttpResponse
对于HTTP响应的封装,封装的状态码只有5个:0,200,301,400,404
- 第一行:状态行(首行)【版本号】+【状态码】+【状态码解释】
- 第二行到空行之前:响应报头(Header),遇到空行表示响应报头结束
- 空行
- 响应正文(网页)Body:空行之后,允许为空字符串,如果有数据,那么在响应报头会有一个Content-Length属性来标识正文的长度。如果服务器返回一个html页面,那么html页面内容就是在正文中。html告诉你是什么,根据css文件来决定图片咋放,动态效果是gs产生的
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3LMJLFQ0-1630570044565)(https://user-images.githubusercontent.com/40709975/131464722-fcbfa80d-81c9-4743-bd52-28f8ab7b8f39.png)]
HttpServer
封装了一下TcpServer,实现的比较简单
HTTP请求头:http://tools.jb51.net/table/http_header HTTP管道化,队头阻塞,管道化/非管道化:https://blog.csdn.net/fesfsefgs/article/details/108294050
muduo优化:
- 就绪事件是按时间放在就绪队列里的,并没有做优先级区分
- muduo多线程模型优化:为每个连接的每个请求也新建线程去处理,而不是在同一个线程中
- buffer类扩容的时候prependable没有修改,还维持在原来的位置。
- buffer扩容使用的resize,会对新增的空间进行memset()为0,有点浪费。
- 分段连续的zero copy buffer再配合gather scatter IO,buffer的性能会更优。
|