我们在之前已经完成了对前面5个模块的剖析和重写。 现在我们看Acceptor Acceptor就是处理accept,监听新用户的连接,拿到跟客户端通信的clientfd,打包成channel,根据muduo的轮询算法找一个subloop,把subloop唤醒,把接收的channel给subloop。 我们打开TCPServer,处理一下Acceptor Acceptor运行在我们的baseloop(mainreactor)里面 封装了socket
socket封装fd
Socket.h
#pragma once
#include "noncopyable.h"
class InetAddress;
class Socket : noncopyable
{
public:
explicit Socket(int sockfd)
: sockfd_(sockfd)
{}
~Socket();
int fd() const { return sockfd_; }
void bindAddress(const InetAddress &localaddr);
void listen();
int accept(InetAddress *peeraddr);
void shutdownWrite();
void setTcpNoDelay(bool on);
void setReuseAddr(bool on);
void setReusePort(bool on);
void setKeepAlive(bool on);
private:
const int sockfd_;
};
Socket.cc
#include "Socket.h"
#include "Logger.h"
#include "InetAddress.h"
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
Socket::~Socket()
{
close(sockfd_);
}
void Socket::bindAddress(const InetAddress &localaddr)
{
if (0 != ::bind(sockfd_, (sockaddr*)localaddr.getSockAddr(), sizeof(sockaddr_in)))
{
LOG_FATAL("bind sockfd:%d fail \n", sockfd_);
}
}
void Socket::listen()
{
if (0 != ::listen(sockfd_, 1024))
{
LOG_FATAL("listen sockfd:%d fail \n", sockfd_);
}
}
int Socket::accept(InetAddress *peeraddr)
{
sockaddr_in addr;
socklen_t len = sizeof addr;
bzero(&addr, sizeof addr);
int connfd = ::accept4(sockfd_, (sockaddr*)&addr, &len, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (connfd >= 0)
{
peeraddr->setSockAddr(addr);
}
return connfd;
}
void Socket::shutdownWrite()
{
if (::shutdown(sockfd_, SHUT_WR) < 0)
{
LOG_ERROR("shutdownWrite error");
}
}
void Socket::setTcpNoDelay(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof optval);
}
void Socket::setReuseAddr(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval);
}
void Socket::setReusePort(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval);
}
void Socket::setKeepAlive(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof optval);
}
Acceptor
Acceptor底层是listenfd,运行在baseloop(mainreactor),
Acceptor.h
#pragma once
#include "noncopyable.h"
#include "Socket.h"
#include "Channel.h"
#include <functional>
class EventLoop;
class InetAddress;
class Acceptor : noncopyable
{
public:
using NewConnectionCallback = std::function<void(int sockfd, const InetAddress&)>;
Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport);
~Acceptor();
void setNewConnectionCallback(const NewConnectionCallback &cb)
{
newConnectionCallback_ = cb;
}
bool listenning() const { return listenning_; }
void listen();
private:
void handleRead();
EventLoop *loop_;
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listenning_;
};
Acceptor.cc
#include "Acceptor.h"
#include "Logger.h"
#include "InetAddress.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <unistd.h>
static int createNonblocking()
{
int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (sockfd < 0)
{
LOG_FATAL("%s:%s:%d listen socket create err:%d \n", __FILE__, __FUNCTION__, __LINE__, errno);
}
}
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop)
, acceptSocket_(createNonblocking())
, acceptChannel_(loop, acceptSocket_.fd())
, listenning_(false)
{
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
Acceptor::~Acceptor()
{
acceptChannel_.disableAll();
acceptChannel_.remove();
}
void Acceptor::listen()
{
listenning_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}
void Acceptor::handleRead()
{
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
::close(connfd);
}
}
else
{
LOG_ERROR("%s:%s:%d accept err:%d \n", __FILE__, __FUNCTION__, __LINE__, errno);
if (errno == EMFILE)
{
LOG_ERROR("%s:%s:%d sockfd reached limit! \n", __FILE__, __FUNCTION__, __LINE__);
}
}
}
|