连接回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr& conn)
{
if (!conn->connected())
{
conn->shutdown();
}
}
读写事件回调的设计
对于在网络上接收的字符流,在框架内部,RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型,定义proto的message类型,进行数据头的序列化和反序列化。 比如需要得知:service_name, method_name, args。不定义类型,传过来的字符流,是没办法识别的。
另外,为了防止粘包,可以在头部的message类型里添加 args_size,即参数的长度。
所以需要设计的部分有: header_size(4个字节)(服务名字、方法名字) + header_str + args_str
最后设计出该类型 rpcHeader.proto
syntax = "proto3";
package mprpc;
message RpcHeader
{
bytes service_name = 1;
bytes method_name = 2;
uint32 args_size = 3;
}
OnMessage的实现
该方法表示已建立连接用户的读写事件操作,如果有一个远程RPC服务的调用请求,那么OnMessage方法就会响应。
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buffer,
muduo::Timestamp);
- 首先要从网络上接收的远程rpc调用请求的字符流
std::string recv_buf = buffer->retrieveAllAsString();
- 从字符流中读取前4个字节的内容,上面说到,前四个字节表示header_size,有人问头部只有四字节够吗? 原因是我们将头部的大小转换成二进制存到这四字节里,不可能会超出范围。
uint32_t header_size = 0;
recv_buf.copy((char*)&header_size, 4, 0);
这里用到了std::string的copy方法:
- 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
std::string rpc_header_str = recv_buf.substr(4, header_size);
mprpc::RpcHeader rpcHeader;
std::string service_name;
std::string method_name;
uint32_t args_size;
if (rpcHeader.ParseFromString(rpc_header_str))
{
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
}
else
{
std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
return;
}
- 获取rpc方法参数的字符流数据,略过recv_buf的前面的头部信息(header_size和header_str),4字节加header_size即为开始的位置。 顺便打印一下调试信息。
std::string args_str = recv_buf.substr(4 + header_size, args_size);
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str.c_str() << std::endl;
std::cout << "============================================" << std::endl;
- 获取service对象和method对象
在之前设计过这样一个表:
std::unordered_map<std::string, ServiceInfo> m_serviceMap;
struct ServiceInfo
{
google::protobuf::Service* m_service;
std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> m_methodMap;
};
根据上面表的对应关系,先对其进行判空,然后获取service对象和method对象
auto it = m_serviceMap.find(service_name);
if (it == m_serviceMap.end())
{
std::cout << service_name << " is not exist!" << std::endl;
return;
}
auto mit = it->second.m_methodMap.find(method_name);
if (mit == it->second.m_methodMap.end())
{
std::cout << service_name << ":" << method_name << " is not exist!" << std::endl;
return;
}
google::protobuf::Service *service = it->second.m_service;
const google::protobuf::MethodDescriptor *method = mit->second;
- 生成rpc方法调用的请求request和响应response参数
google::protobuf::Message *request = service->GetRequestPrototype(method).New();
if (!request->ParseFromString(args_str))
{
std::cout << "request parse error, content:" << args_str << std::endl;
return;
}
google::protobuf::Message *response = service->GetResponsePrototype(method).New();
- 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
new UserService().Login(controller, request, response, done)
void SendRpcResponse(const muduo::net::TcpConnectionPtr&, google::protobuf::Message*);
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response)
{
std::string response_str;
if (response->SerializeToString(&response_str))
{
conn->send(response_str);
}
else
{
std::cout << "serialize response_str error!" << std::endl;
}
conn->shutdown();
}
google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
const muduo::net::TcpConnectionPtr&,
google::protobuf::Message*>
(this,
&RpcProvider::SendRpcResponse,
conn, response);
service->CallMethod(method, nullptr, request, response, done);
|