简述
??1.本文主要由三个部分,第一个是线程池的实现,第二个是RPC的实现,第三个是一些测试的代码。 ??2.ROC网络的部分使用我另一篇文章的网络服务实现,所以这里的RPC实现完全没有一行关于网络的代码。 ??3.因为使用的是现成的网络轮子,很多地方的接口也不得不受限于其提供的接口。
1.线程池
??这个线程池是我直接拿来别人的改改,来自link. ??threadpool.h文件
class CThreadPool
{
private:
std::vector<std::thread> MdPool;
std::queue<std::function<void()>> MdTasks;
std::mutex MdQueueMutex;
std::condition_variable MdQueueCondition;
std::atomic<bool> MdIsStop;
public:
CThreadPool();
~CThreadPool();
void MfStart(size_t threads = 5);
bool MfIsStop() { return MdIsStop; };
private:
void MfTheadFun();
public:
template<class F, class... Args>
auto MfEnqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
};
template<class F, class... Args>
auto CThreadPool::MfEnqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(MdQueueMutex);
if (MdIsStop)
printf("MfEnqueue on MdIsStopped ThreadPool");
MdTasks.emplace([task]() { (*task)(); });
}
MdQueueCondition.notify_one();
return res;
}
??threadpool.cpp文件
CThreadPool::CThreadPool() :MdIsStop(false) {}
CThreadPool::~CThreadPool()
{
MdIsStop = true;
MdQueueCondition.notify_all();
for (std::thread& worker : MdPool)
worker.join();
}
void CThreadPool::MfStart(size_t threads)
{
for (size_t i = 0; i < threads; ++i)
MdPool.push_back(std::thread(&CThreadPool::MfTheadFun, this));
}
void CThreadPool::MfTheadFun()
{
while (1)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(MdQueueMutex);
MdQueueCondition.wait(lock, [this] { return this->MdIsStop || !this->MdTasks.empty(); });
if (this->MdIsStop && this->MdTasks.empty())
return;
task = std::move(this->MdTasks.front());
this->MdTasks.pop();
}
task();
}
}
2.RPC
??RPC的server和client是写在同一个文件里的,但是这里分开来说明。 ??这里是server和client公用的一些东西。 ??过程的函数签名应该像RemoteProc那样,比如:std::shared_ptr<char[]> testf2(std::shared_ptr<char[]> a) ??RpcMsg的定义就像unp卷二里描述的那样,只是一个指针,这个地址上的数据有多长,如何解释,完全自定义。
??RpcCS.h
typedef std::function
<
std::shared_ptr<char[]> (std::shared_ptr<char[]>)
> RemoteProc;
enum RpcNetMsgCmd
{
RpcProc_NOON = 0
};
struct CNetMsgHead
{
int MdLen;
int MdCmd;
CNetMsgHead()
{
MdLen = sizeof(CNetMsgHead);
MdCmd = -1;
}
};
struct RpcMsg :public CNetMsgHead
{
void* MdData;
RpcMsg()
{
MdData = nullptr;
MdCmd = 0;
}
};
2.1RpcServer
??RpcCS.h ????MdCallList :服务器要执行的过程调用,都存放在这里,存储了: ??????①过程号 ??????②对应的过程函数地址 ??????③该过程返回值的长度 ??????④该过程参数的长度 ????MdCallListMutex :因为主线程和复写的网络消息处理线程都会访问该列表,这里使用了读写锁。 ????MdProcRet :每当有一个过程调用被放进线程池执行,代表其结果的对象就会放进这个表,存储了: ??????①对应客户端连接的CSocketObj*对象 ??????②代表执行过程的过程号 ??????③可以异步取得执行结果的std::future<std::shared_ptr<char[]>> ????MdProcRetMutex :MdProcRet是会被异步访问的见,MfCallRetrun ????MfStart :该函数先启动一个线程池,从客户端递送过来的远程调用会被送进该线程池执行 ????MfCallRetrun :这是一个线程,它会在MftSart中被丢到线程池中执行,轮询MdProcRet中是否有结果可用,并把结果写回客户端,相当于MdProcRet的消费者。 ????MfVNetMsgDisposeFun :是基类提供的网络消息处理函数,当收到消息,会调用这个虚函数来处理,另一个身份是MdProcRet的生产者。
class CRpcServer :private CServiceNoBlock
{
private:
CThreadPool MdThreadPool;
std::map<int, std::tuple<RemoteProc, int, int>> MdCallList;
std::shared_mutex MdCallListMutex;
std::map<CSocketObj*, std::tuple<int, std::future<std::shared_ptr<char[]>>>> MdProcRet;
std::shared_mutex MdProcRetMutex;
public:
CRpcServer(int HeartBeatTime = 300, int ServiceMaxPeoples = 100, int DisposeThreadNums = 1);
virtual ~CRpcServer();
void MfStart(const char* ip, unsigned short port, int threadNums = 3);
int MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen);
int MfRemoveCall(int CallNo);
private:
void MfCallRetrun();
virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};
??RpcCS.cpp ????这里主要说明MfVNetMsgDisposeFun 的思路: ??????①从msg参数中取得要调用的过程号,到MdCallList中寻找,如果找不到对应过程,就给客户端发回一个过程号为RpcProc_NOON(0)的包,表示该过程不存在,然后就没事了。 ??????②把可以异步取得结果的future对象放入队列,生产者。
CRpcServer::CRpcServer(int HeartBeatTime, int ServiceMaxPeoples, int DisposeThreadNums) :
CServiceNoBlock(HeartBeatTime, ServiceMaxPeoples, DisposeThreadNums)
{
}
CRpcServer::~CRpcServer()
{
}
void CRpcServer::MfStart(const char* ip, unsigned short port, int threadNums)
{
MdThreadPool.MfStart(threadNums+1);
MdThreadPool.MfEnqueue(&CRpcServer::MfCallRetrun, this);
return CServiceNoBlock::Mf_NoBlock_Start(ip, port);
}
int CRpcServer::MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen)
{
{
std::lock_guard<std::shared_mutex> write_lock(MdCallListMutex);
if (MdCallList.find(CallNo) != MdCallList.end())
{
printf("RempteProcReg CallNo <%d> already existed!\n", CallNo);
LogFormatMsgAndSubmit(std::this_thread::get_id(), ERROR_FairySun, "RempteProcReg CallNo <%d> already existed!\n", CallNo);
return -1;
}
MdCallList[CallNo] = std::make_tuple(Call, ArgLen, RetLen);
}
return 0;
}
int CRpcServer::MfRemoveCall(int CallNo)
{
{
std::lock_guard<std::shared_mutex> write_lock(MdCallListMutex);
auto it = MdCallList.find(CallNo);
if (it != MdCallList.end())
MdCallList.erase(it);
}
return 0;
}
void CRpcServer::MfCallRetrun()
{
while (!MdProcRet.empty() || !MdThreadPool.MfIsStop())
{
RpcMsg ret;
std::vector<CSocketObj*> removelist;
for (auto it = MdProcRet.begin(); it != MdProcRet.end(); ++it)
{
if (std::get<1>(it->second).valid())
{
int procNo = std::get<0>(it->second);
int procRetSize = std::get<2>(MdCallList[procNo]);
ret.MdLen = sizeof(CNetMsgHead) + procRetSize;
ret.MdCmd = procNo;
std::shared_ptr<char[]> data = std::get<1>(it->second).get();
it->first->MfDataToBuffer((char*)&ret, sizeof(CNetMsgHead));
it->first->MfDataToBuffer(data.get(), procRetSize);
removelist.push_back(it->first);
}
}
{
std::unique_lock<std::shared_mutex> write_lock(MdProcRetMutex);
for (auto it = removelist.begin(); it != removelist.end(); ++it)
{
if (MdProcRet.find(*it) != MdProcRet.end())
MdProcRet.erase(*it);
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void CRpcServer::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{
int flag = false;
RpcMsg ret;
{
std::shared_lock<std::shared_mutex> read_lock(MdCallListMutex);
if (MdCallList.find(msg->MdCmd) != MdCallList.end())
flag = true;
}
if (flag == false)
{
ret.MdLen = sizeof(RpcMsg);
ret.MdCmd = RpcProc_NOON;
cli->MfDataToBuffer((char*)&ret, ret.MdLen);
}
else
{
int arglen = std::get<1>(MdCallList[msg->MdCmd]);
std::shared_ptr<char[]> buf(new char[arglen]);
strncpy(buf.get(), ((char*)msg) + sizeof(CNetMsgHead), arglen);
{
std::unique_lock<std::shared_mutex> write_lock(MdProcRetMutex);
MdProcRet[cli] = std::make_tuple
(
msg->MdCmd,
MdThreadPool.MfEnqueue(std::get<0>(MdCallList[msg->MdCmd]), buf)
);
}
}
}
2.2RpcClient
??RpcCs.h ????没有任何新的数据成员,只是简单封装了下CClientLinkManage 的接口 ????MfVNetMsgDisposeFun :基类提供的网络消息处理函数,每收到一条消息就会调用一次,这里虽然复写了它,但是复写成了空函数,它什么都不做,因为客户端的远程过程调用应当是阻塞的。 ????MfRemote: 使用该函数来发起一个远程过程调用,第二三四参数分别指明了调用过程号、过程需要的数据地址、数据的长度。几乎全部的逻辑都集中在该函数中: ??????①声明一个RpcMsg结构,填写包头结构,然后把包头和数据分别写到对应的套接字里。 ??????②循环检查是否有服务端的数据发回。 ??????③有数据发回,检查服务器返回的过程号是否正确,不正确和0都会导致该函数返回空指针来表示失败。 ??????④正确就从套接字中取出数据,然后写到一个智能指针标识的空间中返回。
class CRpcClient :private CClientLinkManage
{
private:
public:
CRpcClient();
virtual ~CRpcClient();
void MfStart();
int MfConnectRpcServer(std::string Linkname, const char* ip, unsigned short port);
void MfCloseRpclink(std::string Linkname);
std::shared_ptr<char[]> MfRemote(std::string Linkname, int CallNo, void* data, int DataSize);
private:
virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};
??RpcCs.cpp
CRpcClient::CRpcClient():
CClientLinkManage()
{
}
CRpcClient::~CRpcClient()
{
}
void CRpcClient::MfStart()
{
CClientLinkManage::MfStart();
}
int CRpcClient::MfConnectRpcServer(std::string Linkname, const char* ip, unsigned short port)
{
return CClientLinkManage::MfCreateAddLink(Linkname, ip, port);
}
void CRpcClient::MfCloseRpclink(std::string Linkname)
{
CClientLinkManage::MfCloseLink(Linkname);
}
std::shared_ptr<char[]> CRpcClient::MfRemote(std::string Linkname, int CallNo, void* data, int DataSize)
{
RpcMsg msg;
msg.MdLen = sizeof(CNetMsgHead) + DataSize;
msg.MdCmd = CallNo;
CClientLinkManage::MfSendData(Linkname, (char*)&msg, sizeof(CNetMsgHead));
CClientLinkManage::MfSendData(Linkname, (char*)data, DataSize);
while (1)
{
if (!CClientLinkManage::MfHasMsg(Linkname))
{
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
else
{
const char * buff = CClientLinkManage::MfGetRecvBufferP(Linkname);
if ( ((RpcMsg*)buff)->MdCmd == RpcProc_NOON )
return nullptr;
else if ( ((RpcMsg*)buff)->MdCmd != CallNo )
return nullptr;
else
{
int retsize = ((RpcMsg*)buff)->MdLen - sizeof(CNetMsgHead);
char* ret = new char[retsize];
strncpy(ret, ((char*)buff) + sizeof(CNetMsgHead) , retsize);
CClientLinkManage::MfPopFrontMsg(Linkname);
return std::shared_ptr<char[]>(ret);
}
}
}
}
void CRpcClient::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{
}
3.测试
??client.cpp
int main()
{
FairySunOfNetBaseStart();
int i = 5;
CRpcClient c;
c.MfStart();
c.MfConnectRpcServer("rpc", "118.31.75.171", 4567);
for(int i = 0; i < 10; ++i)
printf("remote ret:%d\n", *(int*)c.MfRemote("rpc", 2, (void*)&i, sizeof(int)).get());
for (int i = 0; i < 3; ++i)
printf("remote ret:%d\n", *(int*)c.MfRemote("rpc", 1, (void*)&i, sizeof(int)).get());
getchar();
FairySunOfNetBaseOver();
return 0;
}
??server.cpp
std::shared_ptr<char[]> testf1(std::shared_ptr<char[]> a)
{
printf("test11111\n");
char* ret = new char[sizeof(int)];
int s = *((int*)a.get()) + 1;
*(int*)ret = s;
std::this_thread::sleep_for(std::chrono::seconds(5));
return std::shared_ptr<char[]>(ret);
}
std::shared_ptr<char[]> testf2(std::shared_ptr<char[]> a)
{
printf("test22222\n");
char* ret = new char[sizeof(int)];
*(int*)ret = *(int*)a.get() + 2;
return std::shared_ptr<char[]>(ret);
}
int main()
{
FairySunOfNetBaseStart();
CRpcServer s;
s.MfStart(0, 4567);
s.MfRegCall(1, testf1, sizeof(int), sizeof(int));
s.MfRegCall(2, testf2, sizeof(int), sizeof(int));
s.MfRegCall(2, testf2, sizeof(int), sizeof(int));
getchar();
FairySunOfNetBaseOver();
return 0;
}
??????????????????????????????????????2021年11月16日16:19:38
|