一、Callbacks.h
#pragma once
#include <memory>
#include <functional>
class Buffer;
class TcpConnection;
class Timestamp;
using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void(const TcpConnectionPtr&)>;
using CloseCallback = std::function<void(const TcpConnectionPtr&)>;
using WriteCompleteCallback = std::function<void(const TcpConnectionPtr&)>;
using MessageCallback = std::function<void(const TcpConnectionPtr&, Buffer*, Timestamp)>;
using HighWaterMarkCallback = std::function<void (const TcpConnectionPtr&, size_t)>;
二、TcpServer.h
#pragma once
#include "Acceptor.h"
#include "Logger.h"
#include "InetAddress.h"
#include "noncopyable.h"
#include "EventLoop.h"
#include "EventLoopThreadPool.h"
#include "Callbacks.h"
#include "TcpConnection.h"
#include "Buffer.h"
#include <functional>
#include <string>
#include <memory>
#include <atomic>
#include <unordered_map>
class TcpServer : noncopyable{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;
enum Option{
kNoReusePort,
kReusePort,
};
TcpServer(EventLoop* loop, const InetAddress& listenAddr, const std::string& nameArg, Option = kNoReusePort);
~TcpServer();
void setThreadInitCallback(const ThreadInitCallback& cb){ threadInitCallback_ = cb; }
void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback& cb){ writeCompleteCallback_ = cb; }
void setThreadNum(int numThreads);
void start();
private:
void newConnection(int connfd, const InetAddress& peerAddr);
void removeConnection(const TcpConnectionPtr& conn);
void removeConnectionInLoop(const TcpConnectionPtr& conn);
using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;
const std::string ipPort_;
const std::string name_;
EventLoop* loop_;
std::unique_ptr<Acceptor> acceptor_;
std::shared_ptr<EventLoopThreadPool> threadPool_;
ThreadInitCallback threadInitCallback_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
std::atomic_int started_;
int nextConnId_;
ConnectionMap connections_;
};
三、TcpServer类
首先TcpServer对象创建一个Acceptor对象,Acceptor创建listenfd,并将listenfd封装成acceptChannel,先设置acceptChannel的读事件,然后通过loop把acceptChannel注册到当前EventLoop的Poller(Poller封装poll/epoll),Poller就监听acceptChannel上的事件。一旦acceptChannel上有事件发生,acceptChannel就会执行相应的回调函数,即Acceptor::handleRead() 这个回调函数首先将新用户的地址保存在peerAddr,并获取和客户端通信的fd,然后传入Acceptor::newConnetionCallback_ 并执行,而这个回调就是TcpServer对象的成员acceptor_提前调用Acceptor::setNewConnectionCallback 设置的 新用户到来就会执行TcpServer提前注册的回调函数,这个回调函数就会将与客户通信的connfd封装成Channel(实际上mainLoop封装成了TcpConnection,包含一个成员变量Channel),采用轮询算法选择一个subloop,通过其数据成员wakeupFd_写8字节数据来唤醒,然后分发Channel 运行在主线程的mainLoop调用TcpServer注册好的回调函数TcpServer::newConnection,选择一个ioLoop,但是每个线程的事需要等到CPU调度到这个线程的时候才能处理,所以EventLoop提供了两个方法runInLoop和queueInLoop
- runInLoop:如果当前执行线程就是创建EventLoop对象的线程,就直接执行回调cb
- queueInLoop:当前执行的线程并不是创建EventLoop对 象的线程,先把回调cb放到队列,然后往该EventLoop对象的wakeupFd_上写数据通知对应线程
为什么我们总要使用EventLoop::isInLoopThread 判断执行线程的是否就是创建EventLoop对象的线程呢?
因为muduo网络库并没有在mainLoop和subLoop之间添加同步队列,没有使用mainLoop生产Channel和subLoop消费Channel的生产者——消费者模型,而是采用了one loop per thread的方法,让每个线程监听自己的wakeupFd_,执行mainLoop的线程往wakeupFd_上写数据就是通知subLoop,需要处理新的Channel
mainLoop传递给sunLoop的TcpConnection包含两个成员:
- connectionCallback_:就是我们使用muduo编程时,给服务器注册用户连接的创建和断开回调,当有新连接到来或者断开时subLoop就会调用
- messageCallback_:使用muduo编程时,给服务器注册的用户读写事件回调,TcpConnection里的Channel有读写事件时会被调用
muduo编程模板
1. 建立连接
TcpServer对象构造的时候,就会同时构造Acceptor对象和threadPool_对象,并调用Acceptor::setNewConnectionCallback方法,把TcpServer::newConnection传给成员Acceptor::newConnetionCallback_,Acceptor::newConnetionCallback_又会在新连接到来时在Acceptor::handleRead中调用
即TcpServer::newConnection最终调用处是在Acceptor::handleRead
TcpServer::newConnection主要做:组装新连接的名字、创建TcpConnection连接对象、并设置相应的一系列回调函数
void TcpServer::newConnection(int connfd, const InetAddress& peerAddr){
EventLoop* ioLoop = threadPool_->getNextLoop();
char buff[64];
snprintf(buff, sizeof(buff), "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buff;
LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n", name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
sockaddr_in local;
memset(&local, 0, sizeof(local));
socklen_t addr_len = sizeof(local);
if(::getsockname(connfd, (sockaddr*)&local, &addr_len) < 0){
LOG_ERROR("sockets::getLocalAddr \n");
}
InetAddress localAddr(local);
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, connfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback_(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
这一系列的回调方法都是用户传递给TcpServer,TcpServer => TcpConnection => Channel,Channel会把自己封装的fd和events注册到Poller,发生事件时Poller调用Channel的handleEvent方法处理
我们来说一下,用户在使用muduo编程时通常会写一个on_message函数,这个函数是怎么被调用的
用户调用TcpServer::setMessageCallback将on_message传递给TcpServer::messageCallback_,TcpServer会调用TcpConnection::setMessageCallback,那么TcpConnection的成员messageCallback_就保存了on_message,TcpConnection会把handleRead设置到Channel的readCallBack_,而handleRead就包括了TcpConnection::messageCallback_(on_message)
2. 断开连接
断开连接时,Poller会设置Channel的revents并将发生事件的Channel对象放入activateChannels_,EventLoop会遍历activateChannels_,并通过自己的Channel成员调用Channel::handleEvent处理EPOLLHUP事件,handleEvent会调用Channel::closeCallBack_,而这个Channel::closeCallBack_就是TcpConnection::handleClose
void TcpServer::removeConnection(const TcpConnectionPtr& conn){
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn){
LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\n", name_.c_str(), conn->name().c_str());
connections_.erase(conn->name());
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
四、TcpServer.cc
#include "TcpServer.h"
#include "TcpConnection.h"
static EventLoop* CheckLoopNotNull(EventLoop* loop){
if(loop == nullptr){
LOG_FATAL("%s:%s%d mainloop is null! \n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const std::string& nameArg, Option option)
: loop_(CheckLoopNotNull(loop))
, ipPort_(listenAddr.toIpPort())
, name_(nameArg)
, acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
, threadPool_(new EventLoopThreadPool(loop, name_))
, connectionCallback_()
, messageCallback_()
, nextConnId_(1)
, started_(0)
{
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
}
TcpServer::~TcpServer(){
for (auto& item : connections_){
TcpConnectionPtr tmp_conn(item.second);
item.second.reset();
tmp_conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestroyed, tmp_conn));
}
}
void TcpServer::setThreadNum(int numThreads)
{
threadPool_->setThreadNum(numThreads);
}
void TcpServer::start(){
if (started_++ == 0){
threadPool_->start(threadInitCallback_);
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
}
void TcpServer::newConnection(int connfd, const InetAddress& peerAddr){
EventLoop* ioLoop = threadPool_->getNextLoop();
char buff[64];
snprintf(buff, sizeof(buff), "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buff;
LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n", name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
sockaddr_in local;
memset(&local, 0, sizeof(local));
socklen_t addr_len = sizeof(local);
if(::getsockname(connfd, (sockaddr*)&local, &addr_len) < 0){
LOG_ERROR("sockets::getLocalAddr \n");
}
InetAddress localAddr(local);
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, connfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback_(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
void TcpServer::removeConnection(const TcpConnectionPtr& conn){
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn){
LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\n", name_.c_str(), conn->name().c_str());
connections_.erase(conn->name());
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
|