IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> 【Linux】基于Reactor模式的epoll ET简易服务器 -> 正文阅读

[系统运维]【Linux】基于Reactor模式的epoll ET简易服务器

什么是Reactor

Reactor 释义“反应堆”,是一种事件驱动机制

普通函数调用的不同之处在于:

  • 应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor上, 如果相应的事件发生Reactor主动调用应用程序注册的接口,这些接口又称为“回调函数”
  • Reactor 模式处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上
  • 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中
  • Reactor架构模式允许事件驱动的应用通过多路分发的机制去处理来自不同客户端的多个请求
  • Reactor可以理解为I/O管理集合,和多线程、多进程没有关系。但后续可以额外在reactor中引入多线程

转载自https://blog.csdn.net/qq_21539375/article/details/123561321

服务器实现

在这里插入图片描述

除了listen_sock用accepter处理,其他clientfd用recver和senerd处理

以前是通过epoll对I/O进行管理,而现在通过reactor对事件进行管理,可以把reactor理解为一种设计模式

server.cc

#include "reactor.hpp"
#include "socket.hpp"
#include "appInterface.hpp"
#include "util.hpp"
#include <string>
static void Usage(std::string proc) {
    std::cerr << "Usage :" << "\n\t" << proc << " port " << std::endl;
}

int main(int argc, char* argv[]) {
    if (argc != 2) {
        Usage(argv[0]);
        exit(4);
    }

    //套接字相关
    int port = atoi(argv[1]);
    int listen_sock = ns_sock::Sock::Socket();

    //设置listen_sock 为非阻塞状态
    ns_until::SetNoBlock(listen_sock);

    ns_sock::Sock::Bind(listen_sock, port);
    ns_sock::Sock::Listen(listen_sock, BACKLOG);

    //事件管理器相关
    ns_epoll::Reactor reactor;
    reactor.InitReactor();

    ns_epoll::EventItem item;
    item.sock = listen_sock;
    item.R = &reactor;



    item.ManagerCallBack(ns_appinterface::accepter, nullptr, nullptr);

    //将listen_sock托管给reactor//使用ET模式
    reactor.AddEvent(listen_sock, EPOLLIN|EPOLLET, item);

    int timeout = 1000;
    while (true) {
        reactor.Distatch(timeout);
    }
}

reactor.hpp

#pragma once
#include "socket.hpp"

#include <sys/epoll.h>
#include <string>
#include <unordered_map>


#define BACKLOG 5
#define SIZE 256
#define MAXNUM 64

namespace ns_epoll {

    class Reactor; 
    class EventItem;

    typedef int(*callback_t)(EventItem*);

    //当成结构体使用
    class EventItem {
    public:
        //与通信相关
        int sock;

        //回指Reactor
        Reactor* R;

        //有关数据处理的回调函数,用来进行逻辑解耦!  应用数据就绪等通信细节和数据的处理模块使用该方法进行解耦!
        callback_t recv_handler;
        callback_t send_handler;
        callback_t error_handler; 

        std::string inbuffer; //读取到的数据缓冲区
        std::string outbuffer;//待发送的数据缓冲区
    public:
        EventItem() :sock(0), R(nullptr), recv_handler(nullptr), send_handler(nullptr), error_handler(nullptr) {}
        
        //管理回调
        void ManagerCallBack(callback_t _recv,callback_t _send,callback_t _err) {
            recv_handler = _recv;
            send_handler = _send;
            error_handler = _err;
        }

        ~EventItem() {
            //TODO
        }

    };

    class Reactor {
    private:
        int epfd;

        std::unordered_map<int, EventItem> event_item; //sock:EventItem

    public:
        Reactor() :epfd(-1) {}

    public:
        void InitReactor() {
            
            //创建epoll模型
            if ((epfd = epoll_create(SIZE)) < 0) {
                std::cerr << "epoll_creat error!!" << std::endl;
                exit(4);
            }
            std::cout << "server start" << std::endl;
        }


        //事件分派器
        void Distatch(int timeout) {
            //如果底层事件就绪,就把对应的事件分派给指定的回调函数进行统一处理
            
            struct epoll_event revs[MAXNUM];

            //返回值num表示就绪的事件数,内核会将就绪事件依次放入revs中
            int num = epoll_wait(epfd, revs, MAXNUM, timeout);

            for (int i = 0; i < num; ++i) {
                int sock = revs[i].data.fd;
                uint32_t mask = revs[i].events;

                
                //把所有的异常事件统一交给read,write处理
                //if ((revs[i].events & EPOLLERR || revs[i].events & EPOLLHUP)) mask | (EPOLLIN | EPOLLOUT);
                //异常事件发生 或者对端关闭
                if (revs[i].events & EPOLLERR || revs[i].events & EPOLLHUP) {
                    //如果error_handler被设置,就直接调用
                    if (event_item[sock].error_handler) event_item[sock].error_handler(&event_item[sock]);
                }
                //读事件发生
                if (revs[i].events & EPOLLIN) {
                    //如果recv_handler被设置,就直接调用 
                    if (event_item[sock].recv_handler) event_item[sock].recv_handler(&event_item[sock]);
                }
                //写事件发生
                if (revs[i].events & EPOLLOUT) {
                    //如果send_handler被设置,就直接调用
                    if (event_item[sock].send_handler) event_item[sock].send_handler(&event_item[sock]);
                }   

            }
        }

        bool AddEvent(int sock, int event, const EventItem& item) {
            struct epoll_event ev;
            ev.events = 0;
            ev.events |= event;
            ev.data.fd = sock;
            if (epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev) < 0) {
                std::cerr << "epoll_ctr error  fd:" << sock << std::endl;
                return false;
            }

            event_item.insert({ sock,item });

            std::cout << "debug  添加 :" << sock << " 到epoller中,成功" << std::endl;
            return true;
        }
        bool DelEvent(int sock) {
            if (epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr) < 0) {
                std::cerr << "epoll_ctl error  fd:" << sock << std::endl;
                return false;
            }

            event_item.erase(sock);
            return true;
        }

        //
        void EnableReadWrite(int sock,bool read,bool write) {
            struct epoll_event evt;
            evt.data.fd = sock;
            evt.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
            if (epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &evt)) {
                std::cerr << "epoll_ctl_mod error , fd: " << sock << std::endl;
            }
        }

        ~Reactor() {
            if (epfd >= 0) close(epfd);
        }
    };
}

appInterface.hpp

#pragma once
#include "reactor.hpp"
#include "util.hpp"
#include<iostream>
#include <vector>


namespace ns_appinterface {

	int recver(ns_epoll::EventItem* item);
	int sender(ns_epoll::EventItem * item);
	int errorer(ns_epoll::EventItem * item);

	int accepter(ns_epoll::EventItem* item) {
		std::cout << "get a new link [fd:" << item->sock <<"]" << std::endl;
		while (true) {
			struct sockaddr_in peer;
			socklen_t len = sizeof(peer);
			int sock = accept(item->sock, (struct sockaddr*)&peer, &len);

			if (sock < 0) {
				if (errno == EAGAIN || errno == EWOULDBLOCK) {
					//并没有读取出错,只是底层没有连接了
					return 0;
				}
				if (errno == EINTR) {
					//读取过程被信号打断了
					continue;
				}
				else {
					//出错了
					return -1; 
				}
			}
			//成功读取完毕
			else {

				ns_until::SetNoBlock(sock);


				ns_epoll::EventItem tmp;
				tmp.sock = sock;
				tmp.R = item->R;
				tmp.ManagerCallBack(recver, sender, errorer);

				ns_epoll::Reactor* reactor = item->R;
			
 
				//epoll使用时,经常会设置读事件就绪,写事件就绪按需打开 
				reactor->AddEvent(sock, EPOLLIN | EPOLLET, tmp);
			}
		}
		return 0;
	}


	//return 0 success
	//return -1 error
	int recver_helper(int sock, std::string* out) {
		while (true) {
			char buffer[128];
			ssize_t size = recv(sock, buffer, sizeof(buffer) - 1, 0);
			if (size < 0) {
				if (errno == EAGAIN || errno == EWOULDBLOCK) {
					//循环读取,读取完毕了
					return  0;
				}
				else if (errno == EINTR) {
					//被信号中断,继续读
					continue;
				}
				else {
					//读取出错 ,//TODO
					return -1;
				}
			}
			else {
				buffer[size] = 0;
				*out += buffer; //将每次读取到的数据追加到inbuffer中
			}
		}
	}

	int recver(ns_epoll::EventItem* item) {
		std::cout << "recv event ready: " << item->sock << std::endl;
		//负责数据读取
		//1.需要整体读,非阻塞
		if (recver_helper(item->sock, &item->inbuffer) < 0) {
			//item->error_handler;
			return -1;
		}
		std::cout << "client#" << item->inbuffer << std::endl;
		
		//2.根据发来的数据流,进行包和包之间的分离,防止粘包
		std::vector<std::string> messages;
		ns_until::StringUtil::Split(item->inbuffer, &messages, "X");

		//3.针对一个一个的报文,协议反序列化decode
		//1+1X2*2X5%5X
		struct data {
			int x;
			int y;
		};

		//已经拿到了所有的数据报文
		for (auto s : messages) {
			struct data d;
			ns_until::StringUtil::Deserialize(s, &d.x, &d.y);
			std::cout << d.x << " : " << d.y << std::endl;

			//可以接入线程池
			//Task t(d)
			//threadpool->push

			//4.业务处理
			int z = d.x + d.y;

			//5.形成响应报文,序列化转化成为一个字符串
			std::string response;
			response += std::to_string(d.x);
			response += "+";
			response += std::to_string(d.y);
			response += "=";
			response += std::to_string(z);

			//响应报文追加进发送缓冲区
			item->outbuffer += response;
			//5.1 设置响应报文之间的分隔符
			item->outbuffer += "X"; //encode
		}
		//6.写回
		if(!item->outbuffer.empty())
			item->R->EnableReadWrite(item->sock, true, true);


		return 0;
	}

	//return 0 : 写完inbuffer
	//return 1 :缓冲区打满,下次再写
	//return -1 : error
	int sender_helper(int sock, std::string& in) {

		size_t total = 0;

		while (true) {
			ssize_t s = send(sock, in.c_str() + total, in.size() - total, 0);

			if (s > 0) {
				total += s;
				if (total >= in.size()) {
					//已经写完
					return 0;
				}
			}
			else if(s < 0) {
				if (errno == EAGAIN || errno == EWOULDBLOCK) {
					//无论是否发送完inbuffer,都需要将已经发送的数据,全部移出缓冲区
					in.erase(total);

					return  1;//已经将缓冲区写满了,不能再写入了,但是不知道有没有写完
				}
				else if (errno == EINTR) {
					//被信号中断,继续写
					continue;
				}
				else {
					//写失败 ,//TODO
					return -1;
				}
			}
			else {
				//TODO
			}
		}
	}

	int sender(ns_epoll::EventItem* item) {
		int hret = sender_helper(item->sock, item->outbuffer);
		if (hret == 0) {
			//已经发完
			//关闭写
			item->R->EnableReadWrite(item->sock, true, false);
		}
		else if(hret == 1) {
			item->R->EnableReadWrite(item->sock, true, true);
		}
		else {
			//item->error_handler(item);
		}
		return 0;
	}
	int errorer(ns_epoll::EventItem* item) {
		/*close(item->sock);
		item->R->DelEvent(item->sock);*/
		return 0;
	}

}

util.hpp

#pragma once
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
//#include <vector>
namespace ns_until {

	//设置文件描述符为非阻塞状态
	void SetNoBlock(int sock) {
		int f1 = fcntl(sock, F_GETFL);
		fcntl(sock, F_SETFL, f1 | O_NONBLOCK);
	}
	class StringUtil {
	public:
		static void Split(std::string& in, std::vector<std::string>* out,std::string sep) {
			while (true) {
				size_t pos = in.find(sep);
				if (pos == std::string::npos) {
					break;
				}

				std::string s = in.substr(0, pos);
				out->push_back(s);
				in.erase(0,pos+sep.size());

			}
		}
		static void Deserialize(std::string& in, int* x, int* y) {
			ssize_t pos = in.find('+');
			std::string left = in.substr(0, pos);
			std::string right = in.substr(pos + 1);

			*x = atoi(left.c_str());
			*y = atoi(right.c_str());


		}
	};
 }

简易demo版,还有差错处理模块和线程池未接入,功能简易,还有很多bug,主要是练习epoll ET模式和Reactor基本思路。

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-05-11 16:45:38  更:2022-05-11 16:45:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/2 0:37:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码