什么是Reactor
Reactor 释义“反应堆”,是一种事件驱动机制
和普通函数调用的不同 之处在于:
- 应用程序
不是主动的调用某个 API 完成处理 ,而是恰恰相反,Reactor逆置了事件处理流程 ,应用程序需要提供相应的接口并注册到 Reactor上 , 如果相应的事件发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函数” Reactor 模式 是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上- 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中
- Reactor架构模式允许事件驱动的应用通过多路分发的机制去处理来自不同客户端的多个请求
- Reactor可以理解为I/O管理集合,和多线程、多进程没有关系。但后续可以额外在reactor中引入多线程
转载自https://blog.csdn.net/qq_21539375/article/details/123561321
服务器实现
除了listen_sock用accepter处理,其他clientfd用recver和senerd处理
以前是通过epoll对I/O进行管理,而现在通过reactor对事件进行管理,可以把reactor理解为一种设计模式
server.cc
#include "reactor.hpp"
#include "socket.hpp"
#include "appInterface.hpp"
#include "util.hpp"
#include <string>
static void Usage(std::string proc) {
std::cerr << "Usage :" << "\n\t" << proc << " port " << std::endl;
}
int main(int argc, char* argv[]) {
if (argc != 2) {
Usage(argv[0]);
exit(4);
}
int port = atoi(argv[1]);
int listen_sock = ns_sock::Sock::Socket();
ns_until::SetNoBlock(listen_sock);
ns_sock::Sock::Bind(listen_sock, port);
ns_sock::Sock::Listen(listen_sock, BACKLOG);
ns_epoll::Reactor reactor;
reactor.InitReactor();
ns_epoll::EventItem item;
item.sock = listen_sock;
item.R = &reactor;
item.ManagerCallBack(ns_appinterface::accepter, nullptr, nullptr);
reactor.AddEvent(listen_sock, EPOLLIN|EPOLLET, item);
int timeout = 1000;
while (true) {
reactor.Distatch(timeout);
}
}
reactor.hpp
#pragma once
#include "socket.hpp"
#include <sys/epoll.h>
#include <string>
#include <unordered_map>
#define BACKLOG 5
#define SIZE 256
#define MAXNUM 64
namespace ns_epoll {
class Reactor;
class EventItem;
typedef int(*callback_t)(EventItem*);
class EventItem {
public:
int sock;
Reactor* R;
callback_t recv_handler;
callback_t send_handler;
callback_t error_handler;
std::string inbuffer;
std::string outbuffer;
public:
EventItem() :sock(0), R(nullptr), recv_handler(nullptr), send_handler(nullptr), error_handler(nullptr) {}
void ManagerCallBack(callback_t _recv,callback_t _send,callback_t _err) {
recv_handler = _recv;
send_handler = _send;
error_handler = _err;
}
~EventItem() {
}
};
class Reactor {
private:
int epfd;
std::unordered_map<int, EventItem> event_item;
public:
Reactor() :epfd(-1) {}
public:
void InitReactor() {
if ((epfd = epoll_create(SIZE)) < 0) {
std::cerr << "epoll_creat error!!" << std::endl;
exit(4);
}
std::cout << "server start" << std::endl;
}
void Distatch(int timeout) {
struct epoll_event revs[MAXNUM];
int num = epoll_wait(epfd, revs, MAXNUM, timeout);
for (int i = 0; i < num; ++i) {
int sock = revs[i].data.fd;
uint32_t mask = revs[i].events;
if (revs[i].events & EPOLLERR || revs[i].events & EPOLLHUP) {
if (event_item[sock].error_handler) event_item[sock].error_handler(&event_item[sock]);
}
if (revs[i].events & EPOLLIN) {
if (event_item[sock].recv_handler) event_item[sock].recv_handler(&event_item[sock]);
}
if (revs[i].events & EPOLLOUT) {
if (event_item[sock].send_handler) event_item[sock].send_handler(&event_item[sock]);
}
}
}
bool AddEvent(int sock, int event, const EventItem& item) {
struct epoll_event ev;
ev.events = 0;
ev.events |= event;
ev.data.fd = sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev) < 0) {
std::cerr << "epoll_ctr error fd:" << sock << std::endl;
return false;
}
event_item.insert({ sock,item });
std::cout << "debug 添加 :" << sock << " 到epoller中,成功" << std::endl;
return true;
}
bool DelEvent(int sock) {
if (epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr) < 0) {
std::cerr << "epoll_ctl error fd:" << sock << std::endl;
return false;
}
event_item.erase(sock);
return true;
}
void EnableReadWrite(int sock,bool read,bool write) {
struct epoll_event evt;
evt.data.fd = sock;
evt.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
if (epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &evt)) {
std::cerr << "epoll_ctl_mod error , fd: " << sock << std::endl;
}
}
~Reactor() {
if (epfd >= 0) close(epfd);
}
};
}
appInterface.hpp
#pragma once
#include "reactor.hpp"
#include "util.hpp"
#include<iostream>
#include <vector>
namespace ns_appinterface {
int recver(ns_epoll::EventItem* item);
int sender(ns_epoll::EventItem * item);
int errorer(ns_epoll::EventItem * item);
int accepter(ns_epoll::EventItem* item) {
std::cout << "get a new link [fd:" << item->sock <<"]" << std::endl;
while (true) {
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(item->sock, (struct sockaddr*)&peer, &len);
if (sock < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
if (errno == EINTR) {
continue;
}
else {
return -1;
}
}
else {
ns_until::SetNoBlock(sock);
ns_epoll::EventItem tmp;
tmp.sock = sock;
tmp.R = item->R;
tmp.ManagerCallBack(recver, sender, errorer);
ns_epoll::Reactor* reactor = item->R;
reactor->AddEvent(sock, EPOLLIN | EPOLLET, tmp);
}
}
return 0;
}
int recver_helper(int sock, std::string* out) {
while (true) {
char buffer[128];
ssize_t size = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (size < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
else if (errno == EINTR) {
continue;
}
else {
return -1;
}
}
else {
buffer[size] = 0;
*out += buffer;
}
}
}
int recver(ns_epoll::EventItem* item) {
std::cout << "recv event ready: " << item->sock << std::endl;
if (recver_helper(item->sock, &item->inbuffer) < 0) {
return -1;
}
std::cout << "client#" << item->inbuffer << std::endl;
std::vector<std::string> messages;
ns_until::StringUtil::Split(item->inbuffer, &messages, "X");
struct data {
int x;
int y;
};
for (auto s : messages) {
struct data d;
ns_until::StringUtil::Deserialize(s, &d.x, &d.y);
std::cout << d.x << " : " << d.y << std::endl;
int z = d.x + d.y;
std::string response;
response += std::to_string(d.x);
response += "+";
response += std::to_string(d.y);
response += "=";
response += std::to_string(z);
item->outbuffer += response;
item->outbuffer += "X";
}
if(!item->outbuffer.empty())
item->R->EnableReadWrite(item->sock, true, true);
return 0;
}
int sender_helper(int sock, std::string& in) {
size_t total = 0;
while (true) {
ssize_t s = send(sock, in.c_str() + total, in.size() - total, 0);
if (s > 0) {
total += s;
if (total >= in.size()) {
return 0;
}
}
else if(s < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
in.erase(total);
return 1;
}
else if (errno == EINTR) {
continue;
}
else {
return -1;
}
}
else {
}
}
}
int sender(ns_epoll::EventItem* item) {
int hret = sender_helper(item->sock, item->outbuffer);
if (hret == 0) {
item->R->EnableReadWrite(item->sock, true, false);
}
else if(hret == 1) {
item->R->EnableReadWrite(item->sock, true, true);
}
else {
}
return 0;
}
int errorer(ns_epoll::EventItem* item) {
return 0;
}
}
util.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
namespace ns_until {
void SetNoBlock(int sock) {
int f1 = fcntl(sock, F_GETFL);
fcntl(sock, F_SETFL, f1 | O_NONBLOCK);
}
class StringUtil {
public:
static void Split(std::string& in, std::vector<std::string>* out,std::string sep) {
while (true) {
size_t pos = in.find(sep);
if (pos == std::string::npos) {
break;
}
std::string s = in.substr(0, pos);
out->push_back(s);
in.erase(0,pos+sep.size());
}
}
static void Deserialize(std::string& in, int* x, int* y) {
ssize_t pos = in.find('+');
std::string left = in.substr(0, pos);
std::string right = in.substr(pos + 1);
*x = atoi(left.c_str());
*y = atoi(right.c_str());
}
};
}
简易demo版,还有差错处理模块和线程池未接入,功能简易,还有很多bug,主要是练习epoll ET模式和Reactor基本思路。
|