一、Socket类
用于封装fd、bind、listen、accept等操作
1. Socket.h
#include "noncopyable.h"
struct tcp_info;
class InetAddress;
class Socket : noncopyable{
public:
explicit Socket(int sockfd)
: sockfd_(sockfd)
{}
~Socket();
int fd() const { return sockfd_; }
void bindAddress(const InetAddress& localaddr);
void listen();
int accept(InetAddress* peeraddr);
void shutdownWrite();
void setTcpNoDelay(bool on);
void setReuseAddr(bool on);
void setReusePort(bool on);
void setKeepAlive(bool on);
private:
const int sockfd_;
};
2. Socket.cc
shutdown函数
#include "Socket.h"
#include "Logger.h"
#include "InetAddress.h"
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
Socket::~Socket(){
::close(sockfd_);
}
void Socket::bindAddress(const InetAddress& localaddr){
if(0 > ::bind(sockfd_, (sockaddr*)localaddr.getSockAddr(), sizeof(sockaddr_in))){
LOG_FATAL("bind sockfd:%d fail \n", sockfd_);
}
}
void Socket::listen(){
if(0 > ::listen(sockfd_, 1024)){
LOG_FATAL("listen sockfd:%d fail \n", sockfd_);
}
}
int Socket::accept(InetAddress* peeraddr){
sockaddr_in addr;
socklen_t len;
::memset(&addr, 0, sizeof(addr));
int connfd = ::accept(sockfd_, (sockaddr*)&addr, &len);
if(connfd >= 0){
peeraddr->setSockAddr(addr);
}
return connfd;
}
void Socket::shutdownWrite(){
if(::shutdown(sockfd_, SHUT_WR)){
LOG_ERROR("Socket::shutdownWrite error");
}
}
void Socket::setTcpNoDelay(bool on){
int optval = on ? 1 : 0;
::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
}
void Socket::setReuseAddr(bool on){
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
void Socket::setReusePort(bool on){
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
}
void Socket::setKeepAlive(bool on){
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
}
二、Accept类
Acceptor工作在mainReactor,用于监听新用户的连接,将与客户端通信的fd打包成Channel,muduo采用轮询算法找一个subloop,将其唤醒,把打包好的Channel给subloop
1. Acceptor.h
- 成员变量acceptSocket_:封装了listenfd,用于监听新用户的连接
- 成员变量acceptChannel_:封装了listenfd,以及感兴趣的事件events和发生的事件revents
- 成员变量loop_:通过事件循环监听listenfd,也就是mainLoop
- 成员变量newConnectionCallback_:如果一个客户端连接成功,Acceptor返回一个Channel给TcpServer,TcpServer会通过轮询唤醒一个subLoop,并把新客户端的Channel给subLoop,而这些事情都是交给一个回调函数做的,即newConnectionCallback_
此处TcpServer流程待修改
#pragma once
#include "noncopyable.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"
#include <functional>
class EventLoop;
class InetAddress;
class Acceptor : noncopyable{
public:
using NewConnetionCallback = std::function<void(int sockfd, const InetAddress&)>;
Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
~Acceptor();
void setNewConnectionCallback(const NewConnetionCallback& cb){
newConnetionCallback_ = std::move(cb);
}
bool listening(){ return listening_; }
void listen();
private:
void handleRead();
EventLoop* loop_;
Socket acceptSocket_;
Channel acceptChannel_;
NewConnetionCallback newConnetionCallback_;
bool listening_;
};
- 成员函数handleRead:服务器的listenfd发生读事件调用,即新用户连接
2. Acceptor.cc
socket函数第二个参数type SOCK_NONBLOCK :非阻塞形式,没有事件发生立刻返回 SOCK_CLOEXEC :fork之后子进程会继承父进程所有的文件描述符,此时子进程会关闭这些继承来的文件描述符
#include "Acceptor.h"
#include "Logger.h"
#include "InetAddress.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <errno.h>
static int createNonblocking(){
int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
if(sockfd <= 0){
LOG_FATAL("%s:%s:%d listen socket crete err:%d \n", __FILE__, __FUNCTION__, __LINE__, errno);
}
return sockfd;
}
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop)
, acceptSocket_(createNonblocking())
, acceptChannel_(loop, acceptSocket_.fd())
, listening_(false)
{
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallBack(std::bind(&Acceptor::handleRead, this));
}
Acceptor::~Acceptor(){
acceptChannel_.disableAll();
acceptChannel_.remove();
}
void Acceptor::listen(){
listening_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}
void Acceptor::handleRead(){
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if(connfd >= 0){
if(newConnetionCallback_){
newConnetionCallback_(connfd, peerAddr);
}else{
::close(connfd);
}
}else{
LOG_ERROR("%s:%s:%d accept err:%d \n", __FILE__, __FUNCTION__, __LINE__, errno);
if(errno == EMFILE){
LOG_ERROR("%s:%s:%d the open fd exceeding the resource limit\n", __FILE__, __FUNCTION__, __LINE__);
}
}
}
|