前述文章围绕base、net两个模块各种组件,已经形成了初具规模的Reactor事件处理框架。从现在开始,逐步实现一个非阻塞的TCP网络编程库。不同于传统的Reactor,将timers 做成循环中单独的一步,muduo将 timers 和 IO handlers 视为等同的。 整个TCP网络库,分为服务端和客户端。服务端TcpServer 使用reactor模式进行IO事件循环,使用Acceptor接收新的连接,每个连接都是一个TcpConnetion 对象。客户端TcpClient 使用Connector 发起连接,通过TcpConection 收发数据。客户端主动发起连接比服务端被动接收连接需要额外的错误处理、考虑重连,因此先介绍服务端的实现,本文先以Acceptor 为开端。
1、Acceptor定义
Acceptor class用于accept(2)新TCP连接,并通过回调通知使用者,它是内部class,在上层应用程序中我们不直接使用,而是把它封装作为TcpServer 的成员,生命期由后者控制。
class Acceptor : noncopyable
{
public:
typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;
Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
~Acceptor();
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }
bool listenning() const { return listenning_; }
void listen();
private:
void handleRead();
EventLoop* loop_;
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listenning_;
int idleFd_;
};
定义比较简单,直接看实现。
2、Acceptor实现
(1)构造和析构
构造时,需要传递当前Acceptor 所属的EventLoop ,一般是是主线程或者main reactor中。创建一个listening socket,绑定指定ip和port,并构造一个channel用于监听此的连接。为处理无可用描述符时的连接请求,使用一个预留fd处理。
析构时,关闭channel关注的IO事件,从Poller注销,关闭socket对象。
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listenning_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
Acceptor::~Acceptor()
{
acceptChannel_.disableAll();
acceptChannel_.remove();
::close(idleFd_);
}
(2)开始监听新连接
将状态设置为监听,然后调用监听socket的listen函数,将监听channel的读事件注册到poller的管理。
void Acceptor::listen()
{
loop_->assertInLoopThread();
listenning_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}
(3)处理新连接
当有客户端发起连接时,监听channel触发读事件,那么调用其对应的回调函数Acceptor::handleRead() 。
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
if (newConnectionCallback_){
newConnectionCallback_(connfd, peerAddr);
}
else{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
当前文件描述符过多,无法接收新的连接。但是由于Poller采用LT模式,如果无法接收,可读事件会一直触发。那么在这个地方的处理机制就是,关掉之前创建的空闲的idleFd_,然后去accept让这个事件不会一直触发,然后再关掉该文件描述符,重新将它设置为空文件描述符。这种机制可以让网络库在处理连接过多、文件描述符不够用时,不至于因为LT模式一直触发而产生坏的影响。
|