前文博客【muduo学习笔记:net部分之实现TCP网络编程库TcpConnection】和【muduo学习笔记:net部分之实现TCP网络编程库Acceptor】 是TcpServer是基本组件,博文【muduo学习笔记:net部分之多线程TCP服务端的设计模式演进】介绍了TcpServer的各种设计方式以及muduo网络库的结构。本文主要介绍TcpServer代码结构、工作流程。
1、多线程 TcpServer 服务端结构
作为服务器端,首先需要封装了listening socket的Acceptor 来监听客户端的连接,每个建立的连接TcpConnection 都存放在connetion map中进行管理,已连接的TcpConnection的操作全部通过回调函数执行。Acceptor上的IO事件由在主线程的main reactor监听,建立连接的TcpConnection上的IO事件由sub reactors监听。sub reactors使用EvenetLoopThreadPool 实现,当线程池数量使用默认0时,退化为单线程模型。
1.1、接受连接流程
TcpServer使用Acceptor处理新的连接,channel处理Acceptor的listening socket事件,IO事件在EventLoop的Poller上取得。之后通过一层层的回调返回到TcpServer,创建一个TcpConnection并通过回调反馈给用户端。
1.2、接收数据回调流程
当TcpServer建立连接后创建TcpConnetion,已连接的socket上的IO事件被channel注册到Poller上。当事件循环检测到当前连接的socket上有可读事件,将通过回调通知TcpConnetion并执行数据读取,并通过回调将读取的数据返回给用户。
1.3、断开连接流程
先介绍客户端主动断开连接的情况。当TcpServer建立连接后创建TcpConnetion,已连接的socket上的IO事件被channel注册到Poller上。事件循环检测到断连IO事件,通过回调通知TcpConnettion进行相应处理。TcpConnettion首先通知用户连接已经断开,之后通过回调通知TcpServer断开连接,并在IO线程中执行回调。TcpServer首先从connetion map中移除当前的TcpConnection,并在IO线程回调TcpConntion的销毁步骤,通过channel从Poller移除。 如果服务端主动发起关闭,可直接使用TcpConnection的forceClose函数,通过调用handleClose() 主动关闭连接。用户可以通过调用TcpConnection的shutdown() 优雅的关闭socket达到断开连接的目的。
2、TcpServer 定义
class TcpServer : noncopyable
{
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
enum Option{ kNoReusePort, kReusePort,};
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option = kNoReusePort);
~TcpServer();
const string& ipPort() const { return ipPort_; }
const string& name() const { return name_; }
EventLoop* getLoop() const { return loop_; }
void setThreadNum(int numThreads);
void setThreadInitCallback(const ThreadInitCallback& cb)
{ threadInitCallback_ = cb; }
std::shared_ptr<EventLoopThreadPool> threadPool()
{ return threadPool_; }
void start();
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
private:
void newConnection(int sockfd, const InetAddress& peerAddr);
void removeConnection(const TcpConnectionPtr& conn);
void removeConnectionInLoop(const TcpConnectionPtr& conn);
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
EventLoop* loop_;
const string ipPort_;
const string name_;
std::unique_ptr<Acceptor> acceptor_;
std::shared_ptr<EventLoopThreadPool> threadPool_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
ThreadInitCallback threadInitCallback_;
AtomicInt32 started_;
int nextConnId_;
ConnectionMap connections_;
};
3、TcpServer 实现
TcpServer实现代码逻辑简单,管理Acceptor接收连接,EventLoopThreadPoll管理建立连接的IO事件循环,设置TcpConnection的回调函数。
3.1、构造和实现
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));
}
TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
for (auto& item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset();
conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
}
3.2、启动服务端
函数setThreadNum() 必须在调用start() 前使用,若不设置线程数量,则默认单线程模式处理。start()函数启动Acceptor::listen() 开始监听新连接到来的事件,一旦有新连接的事件就触发回调TcpServer::newConnection 。
void TcpServer::setThreadNum(int numThreads)
{
assert(0 <= numThreads);
threadPool_->setThreadNum(numThreads);
}
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_);
assert(!acceptor_->listenning());
loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
3.3、新连接的回调
当Accettor检测到新连接到来,回调TcpServer::newConnection() 。使用使用round-robin策略从线程池中获取一个ioLoop,用于后续处理当前TcpConnettion上的IO事件。之后在ioLoop事件循环中调用TcpConnection::connectEstablished() 函数。
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
3.4、断开连接的回调
从connection map中通过连接name移除当前连接。在当前连接所在的loop中执行TcpConnection::connectDestroyed 进行收尾工作。
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name());
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
3.5、一些说明
在前面的个函数中,TcpConnection 的函数执行都是在其所属的ioLoop中执行,主要是因为TcpServer是无锁的,方便客户端代码的编写。TcpServer和tcpConnection的代码都只处理单线程的情况(没有Mutex成员),而借助EventLoop::runInLoop() 并引入EventLoopThreadPool 让多线程TcpServer实现简单。
muduo采用最简单的round-robin算法来选取pool中的EventLoop,不允许TcpCOnnection运行中更换EventLoop,对长连接、短连接都适用,不易造成偏载。目前设计的每个TcpServer有自己的EventLoopThreadPool,多个TcpServer之间不共享EventLoopThreadPool。若有必要,也可以在多个TcpServer之间共享EventLoopThreadPool,比如一个服务有多个等价TCP端口,每个TcpServer负责一个端口,而来自这些端口的连接共享一个EventLoopThreadPool。
4、测试
4.1、EchoServer测试
以EchoServer为例,回复后主动断开连接。
#include <muduo/net/TcpServer.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <utility>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
int numThreads = 0;
class EchoServer
{
public:
EchoServer(EventLoop* loop, const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "EchoServer")
{
server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(std::bind(&EchoServer::onMessage, this, _1, _2, _3));
server_.setThreadNum(numThreads);
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN")
<< " - EventLoop " << conn->getLoop();
LOG_INFO << conn->getTcpInfoString();
conn->send("hello\n");
}
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time)
{
string msg(buf->retrieveAllAsString());
LOG_INFO << conn->name() << " recv " << msg.size() << " bytes at " << time.toString();
if (msg == "exit\n"){
conn->send("bye\n");
conn->shutdown();
}
if (msg == "quit\n"){
loop_->quit();
}
conn->send(msg);
}
EventLoop* loop_;
TcpServer server_;
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
LOG_INFO << "sizeof TcpConnection = " << sizeof(TcpConnection);
if (argc > 1){
numThreads = atoi(argv[1]);
}
bool ipv6 = argc > 2;
EventLoop loop;
InetAddress listenAddr(2000, false, ipv6);
EchoServer server(&loop, listenAddr);
server.start();
loop.loop();
}
测试时,设置pool线程池数量为2。启用三个客户端,三个客户端分别发送“123”、“456”、“789”消息。之后,第一个客户端发送“exit”,再发送"123"。最后,第二个客户端发送quit。 运行结果如下。先打开三个连接,观察到第一个和第三个连接分配的EventLoop地址相同0x7F857506F9C0,第二个连接分配的EventLoop地址为0x7F857485F9C0,round-robin算法这里其实就是轮询分配。发送消息能正常回显。
当发送"exit"消息后,回收到"bye"消息,之后服务端主动调用conn->shutdown()实际为::shutdown(sockfd, SHUT_WR),从而关闭了本端发送功能,后续发送消息服务端依然能正常接受,但由于关闭服务端写功能不再接收回显数据。发送“quit”消息,服务端退出。
20210805 06:42:25.656446Z 13043 INFO pid = 13043, tid = 13043 - EchoServer_unittest.cc:75
20210805 06:42:25.656830Z 13043 INFO sizeof TcpConnection = 400 - EchoServer_unittest.cc:76
20210805 06:42:29.294193Z 13043 INFO TcpServer::newConnection [EchoServer] - new connection [EchoServer-0.0.0.0:2000#1] from 192.168.3.100:2982 - TcpServer.cc:80
20210805 06:42:29.294250Z 13043 INFO 192.168.3.100:2982 -> 192.168.3.100:2000 is UP - EventLoop 0x7F857506F9C0 - EchoServer_unittest.cc:42
20210805 06:42:29.294261Z 13043 INFO - EchoServer_unittest.cc:46
20210805 06:42:31.662242Z 13043 INFO TcpServer::newConnection [EchoServer] - new connection [EchoServer-0.0.0.0:2000#2] from 192.168.3.100:2984 - TcpServer.cc:80
20210805 06:42:31.662302Z 13043 INFO 192.168.3.100:2984 -> 192.168.3.100:2000 is UP - EventLoop 0x7F857485F9C0 - EchoServer_unittest.cc:42
20210805 06:42:31.662313Z 13043 INFO - EchoServer_unittest.cc:46
20210805 06:42:37.501991Z 13043 INFO TcpServer::newConnection [EchoServer] - new connection [EchoServer-0.0.0.0:2000#3] from 192.168.3.100:2986 - TcpServer.cc:80
20210805 06:42:37.502071Z 13043 INFO 192.168.3.100:2986 -> 192.168.3.100:2000 is UP - EventLoop 0x7F857506F9C0 - EchoServer_unittest.cc:42
20210805 06:42:37.502085Z 13043 INFO - EchoServer_unittest.cc:46
20210805 06:42:46.500798Z 13043 INFO EchoServer-0.0.0.0:2000#1 recv 4 bytes at 1628145766.500734 - EchoServer_unittest.cc:55
20210805 06:42:49.004797Z 13043 INFO EchoServer-0.0.0.0:2000#2 recv 4 bytes at 1628145769.004770 - EchoServer_unittest.cc:55
20210805 06:42:51.556946Z 13043 INFO EchoServer-0.0.0.0:2000#3 recv 4 bytes at 1628145771.556920 - EchoServer_unittest.cc:55
20210805 06:43:03.372943Z 13043 INFO EchoServer-0.0.0.0:2000#1 recv 5 bytes at 1628145783.372917 - EchoServer_unittest.cc:55
20210805 06:43:12.972814Z 13043 INFO EchoServer-0.0.0.0:2000#1 recv 4 bytes at 1628145792.972789 - EchoServer_unittest.cc:55
20210805 06:43:32.124884Z 13043 INFO EchoServer-0.0.0.0:2000#2 recv 5 bytes at 1628145812.124855 - EchoServer_unittest.cc:55
20210805 06:43:37.445024Z 13043 INFO EchoServer-0.0.0.0:2000#2 recv 5 bytes at 1628145817.444998 - EchoServer_unittest.cc:55
echoserver_unittest: net/Channel.cc:61: void muduo::net::Channel::remove(): Assertion `isNoneEvent()' failed.
Aborted (core dumped)
发现再程序退出时,这里报错了。查验发现,不管单线程还是多线程、不论多少个客户端,目前主要发现客户端发送了"exit"后,再发送“quit”必报错。但是,客户端发送“exit”后主动断开连接则一切正常。
4.2、使用 shutdown() 的问题
客户端主动发起关闭连接,会执行TcpConnection::handleClose() 函数,内部会调用Channel::disableAll() ,之后从TcpServer和Poller移除channel。而这里服务端主动关闭写段,没有做任何的Channel状态的处理。而在EchoServer中,TcpServer接收"exit"后关闭本端发送功能,在TcpServer析构会调用TcpConnection::connectDestroyed() 再调用channel_->remove() ,内部断言assert(isNoneEvent());失败,造成运行时崩溃。
简单解决方案是,在onMessage()中使用conn->forceClose(); 代替conn->shutdown(); ,修改了channel的时间监听状态。但是区别在于客户端会响应断开,不再能继续发数据。
我们在应用层设计时,一个正常连接创建后,应该是由客户端断开连接。服务端或者客户端调用shutdown,是为了保证能够继续接受对端的数据,保证接收消息的完整性。参见【muduo学习笔记:net部分之实现TCP网络编程库-Buffer】。
|