用户使用muduo时编写的代码如下:
#include <mymuduo/TcpServer.h>
#include <string>
#include <functional>
class EchoServer{
public:
EchoServer(EventLoop* loop, const InetAddress& addr, const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
server_.setConnectionCallback(std::bind(&EchoServer::on_connection, this, std::placeholders::_1));
server_.setMessageCallback(std::bind(&EchoServer::on_message, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
server_.setThreadNum(3);
}
void start(){
server_.start();
}
private:
void on_connection(const TcpConnectionPtr& conn){
if(conn->connected()){
LOG_INFO("conn up : %s \n", conn->peerAddress().toIpPort().c_str());
}else{
LOG_INFO("conn down : %s \n", conn->peerAddress().toIpPort().c_str());
}
}
void on_message(const TcpConnectionPtr& conn, Buffer* buff, Timestamp time){
std::string msg = buff->retrieveAllAsString();
conn->send(msg);
conn->shutdown();
}
EventLoop* loop_;
TcpServer server_;
};
int main(){
EventLoop loop;
InetAddress addr(8888, "127.0.0.1");
EchoServer server(&loop, addr, "EchoServer-01");
server.start();
loop.loop();
return 0;
}
首先构造一个EventLoop,创建封装了本地地址和端口号的InetAddress对象,然后就构造一个EchoServer对象,底层还是直接构造的TcpServer对象,然后启动TcpServer以及事件循环EventLoop
一、TcpServer对象构造时,都做了哪些事
1. 构造一个Acceptor对象
首先构造了一个Acceptor对象,工作在mainLoop Acceptor的构造函数首先创建用于监听的socket和Channel,socket绑定本地地址,给Acceptor注册读事件回调(Acceptor只需要处理读事件即可)。此外TcpServer::start()时,Acceptor才开始监听,往mainLoop的Poller上注册listenfd
2. 构造EventLoopThreadPool对象
构造EventLoopThreadPool对象,此时还没开启loop线程
3. 设置有新连接到来时执行的回调
TcpServer设置Acceptor在有新连接到来时,需要执行的回调newConnetionCallback_,有新连接到来时需要执行TcpServer::newConnection。实际上,有新连接到来时,Acceptor对象会先执行Acceptor::handleRead,Acceptor::handleRead内部再调用newConnetionCallback_,即TcpServer::newConnection
二、TcpServer.start()做了什么
首先通过atomic_int变量控制TcpServer重复start,然后启动底层线程池、执行Acceptor的listen方法
1. 启动底层线程池
threadPool_->start(threadInitCallback_);
线程池调用start方法后,根据numThreads_创建执行subloop的线程,然后调用EventLoopThread::startLoop
startLoop方法除了调用Thread对象的start方法,还会返回thread的数据成员EventLoopThread::loop_ Thread::start方法会调用我们在EventLoopThread构造函数中设置的EventLoopThread::threadFunc方法 EventLoopThread::threadFunc方法创建了EventLoop对象loop,loop.loop()然后启动事件循环,这就是启动了subloop
构造EventLoop的时候,会注册wakeupFd到Poller 那EventLoop是什么时候构造的呢?
EventLoopThreadPool::start被调用时,会构造EventLoopThread对象,然而此时该事件循环线程对象EventLoop为空,并没有构造EventLoop对象,EventLoop对象是在 调用thread->startLoop()然后调用thread_.start() 时调用EventLoopThread::threadFunc方法时构造的
2. 执行Acceptor的listen方法
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
这一行代码就相当于开启fd的listen模式,并把listenfd注册在Poller上,即使用epoll监听
至此,subloop全都启动了,listenfd也注册到Poller上了,已经全部准备好了
三、用户调用loop.loop
用户调用loop.loop,就是启动mainloop
四、整个服务器启动的流程
- 构造Acceptor对象,创建listenfd,给Acceptor注册回调
- 构造TcpServer对象,同时包含了注册回调,设置底层线程的个数,构造EventLoopThreadPool对象,设置有新连接到来时执行的回调
- TcpServer.start开启subloop,构造EventLoop对象的同时将wakeupfd注册到Poller,能够让主线程mainloop来唤醒子线程loop
- Acceptor.listen,把listenfd注册到Poller上。
- 最后启动mainLoop
五、新连接到来时,服务器处理
新连接到来时,注册在Poller上的listenfd会调用readCallBack_ Acceptor封装的设置的readCallBack_就是Acceptor::handleRead Acceptor::handleRead中又调用了newConnetionCallback_ 这个newConnetionCallback_就是TcpServer给设置的TcpServer::newConnection 即有新连接到来时,会调用TcpServer::newConnection,函数如下: 首先构造了一个TcpConnection对象,创建对象的同时也就将该对象分发给了一个subloop,即TcpConnection的数据成员loop_指向这个subloop
然后将TcpConnection对象放入TcpServer的数据成员connections_进行存储,然后设置了一系列的回调函数connectionCallback_、messageCallback_、writeCompleteCallback_、CloseCallback,这些回调函数都是TcpServer给TcpConnection设置的,而这些回调函数会在TcpConnection的以下函数中调用: 而上述TcpConnection的成员函数早就被当作回调函数注册在了Channel上 发生事件时,Channel会调用相应的回调函数,即调用到了TcpConnection的成员函数,而在TcpConnection的成员函数内部又调用了TcpServer给TcpConnection设置的回调函数,这些TcpServer的回调数据成员又都是用户编程时,调用set方法给TcpServer设置的
TcpServer提供用于设置回调函数的接口: 用户调用方式: 设置完一系列回调后,还有最后一行代码:
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
如果设置过subloop的数量,这个ioLoop所属的线程一定不是执行TcpServer::newConnection的线程,因为新连接的创建一定是在mainloop中完成,所以这里的runInLoop一定会执行queueInLoop 在queueInLoop里面,这个执行mainloop的线程先把当前待执行的回调函数TcpConnection::connectEstablished放入subloop的数据成员pendingFunctors_
callingPendingFunctors_ == true 表示subloop的线程正在执行pendingFunctors_里的回调函数,不需要唤醒,直接走人 callingPendingFunctors_ == false 则表示subloop暂时没有客户的读写事件,正在epoll_wait阻塞等待,执行mainloop的线程则会执行wakeup,往每个EventLoop都有的成员上写数据,让epoll_wait返回发生的事件,这个subloop所在的线程就会执行pendingFunctors_里的回调了
反之如果没有设置过subloop的数量,整个Reactor模型只有一个mainloop,那直接执行回调函数TcpConnection::connectEstablished connectEstablished方法设置TcpConnection的状态为kConnected、将Channel对象和一条TcpConnection绑定、往Poller上注册Channel的读事件,调用用户传入的on_connection方法
那为什么Channel和TcpConnection需要绑定呢? 因为EventLoop通过Poller调用epoll_wait返回后,会通过Channel调用相应的回调处理事件,而Channel的回调函数都是TcpConnection设置的 如果由于某些原因TcpConnection对象不存在了,一旦发生事件,Channel再执行这些回调也就没有意义了
所以Channel用一个数据成员weak_ptr来观察TcpConnection对象,每次Channel执行回调函数前都会用weak_ptr的lock方法检查TcpConnection对象是否存活,存活则调用回调函数,否则不调用
六、有消息到来时,服务器处理
有已连接用户的消息到来时,EventLoop通过Poller调用epoll_wait返回,然后调用Channel的readCallBack_ TcpServer给Channel设置readCallBack_过程如下:
首先Channel的readCallBack_是通过Channel的成员方法setReadCallBack设置的,设置Channel的回调肯定是在TcpConnection中进行的 可以看到,TcpConnection给Channel的成员readCallBack_设置的是TcpConnection::handleRead,这个函数里面调用了messageCallback_ messageCallback_也是通过调用TcpConnection::setMessageCallback设置的,给TcpConnection设置方法,那只能是在TcpServer中进行
TcpServer调用setMessageCallback那就是用户调用的了,如下:
server_.setMessageCallback(std::bind(&EchoServer::on_message, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
所以,这样一层一层设置,Channel调用readCallBack_,就是调用TcpConnection::handleRead,TcpConnection::handleRead又会调用messageCallback_,即调用用户通过TcpServer设置的自定义函数on_message
七、连接断开时,服务器处理
连接断开包括:客户端断开和服务端主动断开
连接断开后,EventLoop通过Poller调用epoll_wait返回,然后调用Channel的closeCallBack_ 显然,Channel的回调函数Channel::closeCallBack_是TcpConnection设置的,是TcpConnection::handleClose
在TcpConnection::handleClose中,首先设置TcpConnection的状态为kDisconnected,然后把Channel对应的fd以及fd感兴趣的事件从epoll内核事件表中删除
然后执行用户传入的on_connection方法,再执行TcpConnection::closeCallback_方法,这个方法是新连接到来时,在TcpServer::newConnection设置的,即TcpServer::removeConnection方法 最终调用到TcpServer::removeConnectionInLoop方法,先在TcpServer用于记录连接的connections_中删除当前已经断开连接的TcpConnection对象,然后获取该TcpConnection对象所属subloop,然后让对应线程执行TcpConnection::connectDestroyed
一般来说,if语句是不会满足条件的,我们在TcpConnection::handleClose中,就设置了TcpConnection对象的状态为kDisconnected,最后调用Channel::remove方法,Channel::remove会调用到EPollPoller::removeChannel
先从Poller的容器中移除断开连接的Channel,接下来这个if语句一般是不会成立的 因为index在前面的TcpConnection::handleClose中会调用:
channel_->disableAll();
这一句就会把index设置为kDeleted
最后就是把channel的状态设置为kNew
TcpConnection::handleClose中,用shared_ptr指向当前断开连接的TcpConnection对象,这个shared_ptr出作用域后,会自动释放对象空间
|