IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 一个c++RPC实现 -> 正文阅读

[网络协议]一个c++RPC实现

简述

??1.本文主要由三个部分,第一个是线程池的实现,第二个是RPC的实现,第三个是一些测试的代码。
??2.ROC网络的部分使用我另一篇文章的网络服务实现,所以这里的RPC实现完全没有一行关于网络的代码。
??3.因为使用的是现成的网络轮子,很多地方的接口也不得不受限于其提供的接口。

1.线程池

??这个线程池是我直接拿来别人的改改,来自link.
??threadpool.h文件

class CThreadPool 
{
private:
    std::vector<std::thread>            MdPool;             // 线程池
    std::queue<std::function<void()>>   MdTasks;            // 提交的任务队列
    std::mutex                          MdQueueMutex;       // 队列的锁
    std::condition_variable             MdQueueCondition;   // 队列的条件变量
    std::atomic<bool>                   MdIsStop;           // 队列停止时使用
public:
    CThreadPool();
    ~CThreadPool();
    void MfStart(size_t threads = 5);    
    bool MfIsStop() { return MdIsStop; };
private:
    void MfTheadFun();
public:
    template<class F, class... Args>
    auto MfEnqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
};


// 后置返回类型,提取出参数F的返回值类型
// 模板成员需要写在,h中
template<class F, class... Args>
auto CThreadPool::MfEnqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared<std::packaged_task<return_type()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(MdQueueMutex);
        if (MdIsStop)
            printf("MfEnqueue on MdIsStopped ThreadPool");
        MdTasks.emplace([task]() { (*task)(); });
    }
    MdQueueCondition.notify_one();
    return res;
}

??threadpool.cpp文件

CThreadPool::CThreadPool() :MdIsStop(false) {}

CThreadPool::~CThreadPool()
{
    MdIsStop = true;
    MdQueueCondition.notify_all();
    for (std::thread& worker : MdPool)
        worker.join();
}

void CThreadPool::MfStart(size_t threads)       // 线程不应该在构造函数中启动,因为这些线程使用了数据成员
{
    for (size_t i = 0; i < threads; ++i)
        MdPool.push_back(std::thread(&CThreadPool::MfTheadFun, this));
}

void CThreadPool::MfTheadFun()
{
    while (1)
    {
        std::function<void()> task;     // 要执行的任务
        {
            std::unique_lock<std::mutex> lock(MdQueueMutex);
            MdQueueCondition.wait(lock, [this] { return this->MdIsStop || !this->MdTasks.empty(); });
            if (this->MdIsStop && this->MdTasks.empty())
                return;
            task = std::move(this->MdTasks.front());
            this->MdTasks.pop();
        }
        task();
    }
}

2.RPC

??RPC的server和client是写在同一个文件里的,但是这里分开来说明。
??这里是server和client公用的一些东西。
??过程的函数签名应该像RemoteProc那样,比如:std::shared_ptr<char[]> testf2(std::shared_ptr<char[]> a)
??RpcMsg的定义就像unp卷二里描述的那样,只是一个指针,这个地址上的数据有多长,如何解释,完全自定义。

??RpcCS.h

// 所有远程调用的函数应当遵循此接口
// 返回值是一个智能指针,指向一个char[]
// 参数是一个智能指针,指向一个char[]
// 使用智能指针而不是直接使用char*,是因为很多地方都是跨线程传递的,方便在调用中的内存管理
// 不论是返回值还是参数,都在void的基础上自定义结构解析
typedef std::function
<
	std::shared_ptr<char[]> (std::shared_ptr<char[]>)
> RemoteProc;

enum RpcNetMsgCmd
{
	RpcProc_NOON = 0	// 若CNetMsgHead::MdCmd为该值,
	// 其余自定义的过程号都应该大于0
};

struct CNetMsgHead			// 这个结构是搬过来说明的,他的定义不放在这里
{
	int MdLen;		// 该包总长度
	int MdCmd;		// 该包执行的操作
	CNetMsgHead()
	{
		MdLen = sizeof(CNetMsgHead);
		MdCmd = -1;				// 该值为-1时默认为心跳包
	}
};

struct RpcMsg :public CNetMsgHead
{
	// CNetMsgHead::MdLen 该成员依旧代表整个包的长度
	// CNetMsgHead::MdCmd 该成员不再代表某个操作,而是直接代表要调用的那个过程号CallNo
	void* MdData;	// 数据指针
					// 在server中,收到该结构MdData表示参数,发送该结构MdData表示返回值
					// 在client中,收到该结构MdData表示返回值,发送该结构MdData表示参数
					// 发送时不设置该成员,而是直接写数据到缓冲区
					// 从缓冲区取出时使用这个成员
	RpcMsg()
	{
		MdData = nullptr;
		MdCmd = 0;
	}
};

2.1RpcServer

??RpcCS.h
????MdCallList:服务器要执行的过程调用,都存放在这里,存储了:
??????①过程号
??????②对应的过程函数地址
??????③该过程返回值的长度
??????④该过程参数的长度
????MdCallListMutex:因为主线程和复写的网络消息处理线程都会访问该列表,这里使用了读写锁。
????MdProcRet:每当有一个过程调用被放进线程池执行,代表其结果的对象就会放进这个表,存储了:
??????①对应客户端连接的CSocketObj*对象
??????②代表执行过程的过程号
??????③可以异步取得执行结果的std::future<std::shared_ptr<char[]>>
????MdProcRetMutex:MdProcRet是会被异步访问的见,MfCallRetrun
????MfStart:该函数先启动一个线程池,从客户端递送过来的远程调用会被送进该线程池执行
????MfCallRetrun:这是一个线程,它会在MftSart中被丢到线程池中执行,轮询MdProcRet中是否有结果可用,并把结果写回客户端,相当于MdProcRet的消费者
????MfVNetMsgDisposeFun:是基类提供的网络消息处理函数,当收到消息,会调用这个虚函数来处理,另一个身份是MdProcRet的生产者

class CRpcServer :private CServiceNoBlock
{
private:
	CThreadPool											MdThreadPool;		// 执行过程时的线程池
	std::map<int, std::tuple<RemoteProc, int, int>>		MdCallList;			// 注册的过程表,分别描述过程号、过程函数,参数的长度、返回值的长度
	std::shared_mutex									MdCallListMutex;	// 该表的互斥元
	std::map<CSocketObj*, std::tuple<int, std::future<std::shared_ptr<char[]>>>>	MdProcRet;		// 过程放入线程池执行时会返回一个std::future<void*>,以便后续异步取得该过程的返回值
																									// 分别描述客户端连接、执行的过程号、可以取得返回值的future对象
	std::shared_mutex																MdProcRetMutex; // 过程结果集的锁
																			
public:
	CRpcServer(int HeartBeatTime = 300, int ServiceMaxPeoples = 100, int DisposeThreadNums = 1);
	virtual ~CRpcServer();
	void MfStart(const char* ip, unsigned short port, int threadNums = 3);	// 启动收发线程和线程池,threadNums代表线程池线程数量
	int MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen);		// 注册一个过程
	int MfRemoveCall(int CallNo);											// 移除一个过程
private:
	void MfCallRetrun();
	virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};

??RpcCS.cpp
????这里主要说明MfVNetMsgDisposeFun的思路:
??????①从msg参数中取得要调用的过程号,到MdCallList中寻找,如果找不到对应过程,就给客户端发回一个过程号为RpcProc_NOON(0)的包,表示该过程不存在,然后就没事了。
??????②把可以异步取得结果的future对象放入队列,生产者。

CRpcServer::CRpcServer(int HeartBeatTime, int ServiceMaxPeoples, int DisposeThreadNums) :
	CServiceNoBlock(HeartBeatTime, ServiceMaxPeoples, DisposeThreadNums)
{

}

CRpcServer::~CRpcServer()
{

}

void CRpcServer::MfStart(const char* ip, unsigned short port, int threadNums)
{
	MdThreadPool.MfStart(threadNums+1);						// 线程池启动
	MdThreadPool.MfEnqueue(&CRpcServer::MfCallRetrun, this);// 给对应客户端写回结果的线程丢尽线程池执行
	return CServiceNoBlock::Mf_NoBlock_Start(ip, port);		// 网络收发处理启动
}

int CRpcServer::MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen)
{
	{
		std::lock_guard<std::shared_mutex> write_lock(MdCallListMutex);
		if (MdCallList.find(CallNo) != MdCallList.end())
		{
			printf("RempteProcReg CallNo <%d> already existed!\n", CallNo);
			LogFormatMsgAndSubmit(std::this_thread::get_id(), ERROR_FairySun, "RempteProcReg CallNo <%d> already existed!\n", CallNo);
			return -1;
		}
		MdCallList[CallNo] = std::make_tuple(Call, ArgLen, RetLen);
	}
	return 0;
}

int CRpcServer::MfRemoveCall(int CallNo)
{
	{
		std::lock_guard<std::shared_mutex> write_lock(MdCallListMutex);
		auto it = MdCallList.find(CallNo);
		if (it != MdCallList.end())
			MdCallList.erase(it);
	}
	return 0;
}

void CRpcServer::MfCallRetrun()
{
	while (!MdProcRet.empty() || !MdThreadPool.MfIsStop())
	{
		RpcMsg ret;
		// for循环中 执行完成的远程调用 的CSocketObj*会被加入该队列,结束后统一从MdProcRet中移除
		std::vector<CSocketObj*> removelist;

		// 遍历MdProcRet,如果有一个过程可以取得结果,就把结果发回对应的socket
		for (auto it = MdProcRet.begin(); it != MdProcRet.end(); ++it)
		{
			if (std::get<1>(it->second).valid())		// 如果结果可用
			{
				int procNo = std::get<0>(it->second);
				int procRetSize = std::get<2>(MdCallList[procNo]);
				ret.MdLen = sizeof(CNetMsgHead) + procRetSize;
				ret.MdCmd = procNo;
				std::shared_ptr<char[]> data = std::get<1>(it->second).get();
				it->first->MfDataToBuffer((char*)&ret, sizeof(CNetMsgHead));	// 先写包头
				it->first->MfDataToBuffer(data.get(), procRetSize);				// 再写数据
				removelist.push_back(it->first);
			}
		}

		// 将执行完成的远程调用统一移除
		{
			std::unique_lock<std::shared_mutex> write_lock(MdProcRetMutex);
			for (auto it = removelist.begin(); it != removelist.end(); ++it)
			{
				if (MdProcRet.find(*it) != MdProcRet.end())
					MdProcRet.erase(*it);
			}
		}
				
		std::this_thread::sleep_for(std::chrono::seconds(1));	// 暂停一秒防止执行过快
	}
}

void CRpcServer::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{
	// 注册列表中没有找到对应的过程号,发回一个为0的消息,表示找不到对应过程
	int flag = false;
	RpcMsg ret;
	{
		std::shared_lock<std::shared_mutex> read_lock(MdCallListMutex);
		if (MdCallList.find(msg->MdCmd) != MdCallList.end())
			flag = true;
	}
	if (flag == false)
	{
		ret.MdLen = sizeof(RpcMsg);
		ret.MdCmd = RpcProc_NOON;
		cli->MfDataToBuffer((char*)&ret, ret.MdLen);
	}
	else		// 否则就是找到了对应的过程,保存客户端对象和对应的future,放进结果表
	{
		int arglen = std::get<1>(MdCallList[msg->MdCmd]);
		std::shared_ptr<char[]> buf(new char[arglen]);
		strncpy(buf.get(), ((char*)msg) + sizeof(CNetMsgHead), arglen);
		{
			std::unique_lock<std::shared_mutex> write_lock(MdProcRetMutex);
			MdProcRet[cli] = std::make_tuple
			(
				msg->MdCmd,
				MdThreadPool.MfEnqueue(std::get<0>(MdCallList[msg->MdCmd]), buf)
			);
		}
	}
}

2.2RpcClient

??RpcCs.h
????没有任何新的数据成员,只是简单封装了下CClientLinkManage的接口
????MfVNetMsgDisposeFun:基类提供的网络消息处理函数,每收到一条消息就会调用一次,这里虽然复写了它,但是复写成了空函数,它什么都不做,因为客户端的远程过程调用应当是阻塞的。
????MfRemote:使用该函数来发起一个远程过程调用,第二三四参数分别指明了调用过程号、过程需要的数据地址、数据的长度。几乎全部的逻辑都集中在该函数中:
??????①声明一个RpcMsg结构,填写包头结构,然后把包头和数据分别写到对应的套接字里。
??????②循环检查是否有服务端的数据发回。
??????③有数据发回,检查服务器返回的过程号是否正确,不正确和0都会导致该函数返回空指针来表示失败。
??????④正确就从套接字中取出数据,然后写到一个智能指针标识的空间中返回。

class CRpcClient :private CClientLinkManage
{
private:
public:
	CRpcClient();
	virtual ~CRpcClient();
	void MfStart();																					// 启动收发线程
	int MfConnectRpcServer(std::string Linkname, const char* ip, unsigned short port);				// 连接Rpc服务
	void MfCloseRpclink(std::string Linkname);														// 关闭和Rpc的连接
	std::shared_ptr<char[]> MfRemote(std::string Linkname, int CallNo, void* data, int DataSize);	// 远程过程调用,阻塞等待
private:
	virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};

??RpcCs.cpp

CRpcClient::CRpcClient():
	CClientLinkManage()
{

}

CRpcClient::~CRpcClient()
{

}

void CRpcClient::MfStart()
{
	CClientLinkManage::MfStart();
}

int CRpcClient::MfConnectRpcServer(std::string Linkname, const char* ip, unsigned short port)
{
	return CClientLinkManage::MfCreateAddLink(Linkname, ip, port);
}

void CRpcClient::MfCloseRpclink(std::string Linkname)
{
	CClientLinkManage::MfCloseLink(Linkname);
}

std::shared_ptr<char[]> CRpcClient::MfRemote(std::string Linkname, int CallNo, void* data, int DataSize)
{
	RpcMsg msg;
	msg.MdLen = sizeof(CNetMsgHead) + DataSize;
	msg.MdCmd = CallNo;
	CClientLinkManage::MfSendData(Linkname, (char*)&msg, sizeof(CNetMsgHead));	// 先写包头
	CClientLinkManage::MfSendData(Linkname, (char*)data, DataSize);				// 再写数据	

	while (1)
	{
		if (!CClientLinkManage::MfHasMsg(Linkname))		// 如果没有数据
		{
			std::this_thread::sleep_for(std::chrono::seconds(1));
			continue;
		}
		else
		{
			const char * buff = CClientLinkManage::MfGetRecvBufferP(Linkname);
			if (  ((RpcMsg*)buff)->MdCmd == RpcProc_NOON  )						// 收到了为0的callno
				return nullptr;
			else if (  ((RpcMsg*)buff)->MdCmd != CallNo  )						// 收到的callno和发出去的callno不一样
				return nullptr;
			else
			{
				int retsize = ((RpcMsg*)buff)->MdLen - sizeof(CNetMsgHead);		// 计算返回的数据长度
				char* ret = new char[retsize];									// 申请空间
				strncpy(ret, ((char*)buff) + sizeof(CNetMsgHead) , retsize);	// 复制到新申请的空间
				CClientLinkManage::MfPopFrontMsg(Linkname);						// 缓冲区的消息弹出
				return std::shared_ptr<char[]>(ret);							// 转换成智能指针返回
			}
		}
	}
}

void CRpcClient::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{

}

3.测试

??client.cpp

int main()
{
	FairySunOfNetBaseStart();				// https://blog.csdn.net/qq_43082206/article/details/110383165
	int i = 5;
	CRpcClient c;
	c.MfStart();
	c.MfConnectRpcServer("rpc", "118.31.75.171", 4567);
	for(int i = 0; i < 10; ++i)
		printf("remote ret:%d\n", *(int*)c.MfRemote("rpc", 2, (void*)&i, sizeof(int)).get());
	for (int i = 0; i < 3; ++i)
	printf("remote ret:%d\n", *(int*)c.MfRemote("rpc", 1, (void*)&i, sizeof(int)).get());
	getchar();
	FairySunOfNetBaseOver();
	return 0;
}

??server.cpp

std::shared_ptr<char[]> testf1(std::shared_ptr<char[]> a)
{
	printf("test11111\n");
	char* ret = new char[sizeof(int)];
	int s = *((int*)a.get()) + 1;
	*(int*)ret = s;
	std::this_thread::sleep_for(std::chrono::seconds(5));
	return std::shared_ptr<char[]>(ret);
}
std::shared_ptr<char[]> testf2(std::shared_ptr<char[]> a)
{
	printf("test22222\n");
	char* ret = new char[sizeof(int)];
	*(int*)ret = *(int*)a.get() + 2;
	return std::shared_ptr<char[]>(ret);
}

int main()
{
	FairySunOfNetBaseStart();			// https://blog.csdn.net/qq_43082206/article/details/110383165
	CRpcServer s;
	s.MfStart(0, 4567);
	s.MfRegCall(1, testf1, sizeof(int), sizeof(int));
	s.MfRegCall(2, testf2, sizeof(int), sizeof(int));
	s.MfRegCall(2, testf2, sizeof(int), sizeof(int));
	//s.MfRemoveCall(1);
	//s.MfRemoveCall(2);
	getchar();
	FairySunOfNetBaseOver();
	return 0;
}

??????????????????????????????????????2021年11月16日16:19:38

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-11-17 13:06:20  更:2021-11-17 13:06:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年7日历 -2024/7/3 20:37:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码