手写一个 RPC 远程调用(C++)
版权声明
个人作品,禁止转载
参考资料
- Build Remote Procedure Calls (RPC) - from scratch in C(https://www.udemy.com/course/linuxrpc/)
- Linux manual page(https://man7.org/linux/man-pages/)
- Cpp reference(https://en.cppreference.com/w/)
代码下载
https://github.com/541380000/write_rpc_from_scratch.git 或者从网盘下载 链接:https://pan.baidu.com/s/1BjTK3aH_T80umXsF9Yzqmg?pwd=5mof 提取码:5mof
推荐进阶工具
- protobuf(对象序列化工具,按字节序列化)
- msgpack(对象序列化工具,类似 json,但更加精简)
- Boost 的序列化库
Overview
- 这是一个学习型项目,旨在学习 RPC 如何运行
- 这是一个 C++项目,只是用 C++标准库,不使用第三方库
- 各个步骤纯手写,代码无法用于商业项目
- 本项目不考虑机器大小端
预备知识
- 基础的 c/c++知识
- 基础的 socket 编程
- 基础的 debug 能力
RPC 的步骤
- RPC 客户端,序列化 RPC 头和 RPC 参数
- RPC 客户端,将 RPC 头和参数打包成数据包,通过网络(等其他通信手段)发送
- RPC 服务器,反序列化 RPC 头和 RPC 参数
- RPC 服务器,执行远程调用
- RPC 服务器,将 RPC 头和返回值打包成数据包,发送给客户端
- RPC 客户端,反序列化 RPC 头和返回值,得到 RPC 结果
序列化和反序列化
基础数据类型:SerBuffer
结构体定义
SerBuffer是数据序列化和反序列化的缓冲区
```
using byte=char;
struct SerBuffer{
vector<byte> buffer;
};
```
方法
创建新的 SerBuffer,预留若干字节的空间
unique_ptr<SerBuffer> creatSerBuffer(const uint32_t buffer_size=DEFAULT_INIT_SERIALIZE_BUFFER_SIZE){
auto ptr = make_unique<SerBuffer>();
ptr->buffer.reserve(buffer_size);
return move(ptr);
}
将 nBytes 的数据复制到 buffer 中
void serializeData(const unique_ptr<SerBuffer>& buffer, const byte* data, const uint32_t nBytes){
copy(data, data+nBytes, back_inserter(buffer->buffer));
}
跳过 n 字节,nBytes 为负则从缓冲区删除数据,为正则追加 nBytes 个 0
void serializeBufferSkip(const unique_ptr<SerBuffer>& buffer, int32_t nBytes){
if (nBytes < 0)
while(nBytes < 0) buffer->buffer.pop_back(), nBytes ++;
else if (nBytes > 0)
while(nBytes > 0) buffer->buffer.emplace_back('0'), nBytes --;
}
清空缓冲区
void resetSerBuffer(const unique_ptr<SerBuffer>& buffer){
buffer->buffer.clear();
}
填充一个空指针,空指针用 0xFFFFFFFF 表示
void fillNull(const unique_ptr<SerBuffer>& buffer){
uint32_t sentinel = 0xFFFFFFFF;
serializeData(buffer, (byte*)&sentinel, 4);
}
将 RPC 头序列化到 buffer 中(RPC 头定义见下文)
void serializeRPCHeader(const unique_ptr<SerBuffer>& buffer, const RPCHeader* const header){
serializeData(buffer, (char*)header, sizeof(header));
}
简单对象(反)序列化
简单对象定义为:只包含基础数据类型的结构体(不包含指针和其他结构体)
对象定义
using byte=char;
struct SimplePerson{
byte name[30];
uint32_t age;
uint32_t weight;
};
序列化方法
- 将数据拷贝到内存中,不考虑内存对齐。
- 对于 SimplePerson,内存中应该包含:30 字节的 name,4 字节的 age,4 字节的 weight,共 38 字节
- 如果传入参数为 NULL,则填充空指针,表示为 0xFFFFFFFF,否则填充数据
void serialize_simple_person(const SimplePerson* const obj, const unique_ptr<SerBuffer>& buffer){
if(!obj){
fillNull(buffer);
return ;
}
serializeData(buffer, obj->name, sizeof(obj->name));
serializeData(buffer, (char*)&obj->age, sizeof(obj->age));
serializeData(buffer, (char*)&obj->weight, sizeof(obj->weight));
}
反序列化方法
- 相反的过程,将数据提取出来即可
- offset 为:从 buffer 的第几个字节开始,反序列化出来一个对象
unique_ptr<SimplePerson> deserialize_simple_person(const unique_ptr<SerBuffer>& buffer, uint32_t offset = 0){
if (*(uint32_t*)buffer->buffer.data() == 0xFFFFFFFF)
return {nullptr};
auto ptr = make_unique<SimplePerson>();
auto& person = *ptr;
copy(buffer->buffer.begin() + offset, buffer->buffer.begin() + sizeof(person) + offset, person.name);
offset += sizeof(person.name);
person.age = *(decltype(person.age)*)(buffer->buffer.data() + offset);
offset += sizeof(person.age);
person.weight = *(decltype(person.weight)*)(buffer->buffer.data() + offset);
offset += sizeof(person.weight);
return move(ptr);
}
测试
void testSimplePerson(){
SimplePerson p{"1234", 1, 1};
auto buf = make_unique<SerBuffer>();
printf("-------------------> SimplePerson\n");
serialize_simple_person(&p, buf);
printf("Full size of person after serialize: %zu, expected is %d\n", buf->buffer.size(), 30+4+4);
cout << "Before serialize: " << p << "\nAfter deserialize: " << *deserialize_simple_person(buf) << endl << endl;
}
嵌套对象(反)序列化
简单对象定义为:只包含基础数据类型和其他结构体的结构体(不包含指针)
对象定义
struct Occupation{
char title[30];
uint32_t id;
}
struct NestedPerson{
char name[30];
Occupation occupation;
uint32_t age;
uint32_t weight;
}
Occupation 的(反)序列化方法(简单对象序列化)
void serialize_simple_occupation(const Occupation* const obj, const unique_ptr<SerBuffer>& buffer){
serializeData(buffer, obj->title, sizeof(obj->title));
serializeData(buffer, (char*)&obj->id, sizeof(obj->id));
}
unique_ptr<Occupation> deserialize_simple_occupation(const unique_ptr<SerBuffer>& buffer, uint32_t offset = 0){
auto ptr = make_unique<Occupation>();
auto& occupation = *ptr;
copy(buffer->buffer.begin() + offset, buffer->buffer.begin() + offset + sizeof(occupation.title), occupation.title);
offset += sizeof(occupation.title);
occupation.id = *(decltype(occupation.id)*)(buffer->buffer.data() + offset);
return move(ptr);
}
序列化与反序列化方法
- 如果遇到嵌套结构体,则调用嵌套结构体的序列化方法,将内层结构体序列化到 buffer 中
void serialize_nested_person(const NestedPerson* const obj, const unique_ptr<SerBuffer>& buffer){
if (!obj){
fillNull(buffer);
return ;
}
serializeData(buffer, obj->name, sizeof(obj->name));
serialize_simple_occupation(&obj->occupation, buffer);
serializeData(buffer, (char*)&obj->age, sizeof(obj->age));
serializeData(buffer, (char*)&obj->weight, sizeof(obj->weight));
}
unique_ptr<NestedPerson> deserialize_nested_person(const unique_ptr<SerBuffer>& buffer, uint32_t offset = 0){
if (*(uint32_t*)buffer->buffer.data() == 0xFFFFFFFF)
return {nullptr};
auto ptr = make_unique<NestedPerson>();
auto& person = *ptr;
copy(buffer->buffer.begin() + offset, buffer->buffer.begin() + sizeof(person) + offset, person.name);
offset += sizeof(person.name);
person.occupation = *deserialize_simple_occupation(buffer, offset);
offset += sizeof(person.occupation.title) + sizeof(person.occupation.id);
person.age = *(decltype(person.age)*)(buffer->buffer.data() + offset);
offset += sizeof(person.age);
person.weight = *(decltype(person.weight)*)(buffer->buffer.data() + offset);
offset += sizeof(person.weight);
return move(ptr);
}
测试
void testNestedPerson(){
NestedPerson p{"1234", {"doctor", 294}, 1, 1};
auto buf = make_unique<SerBuffer>();
printf("-------------------> NestedPerson\n");
serialize_nested_person(&p, buf);
printf("Full size of nested person after serialize: %zu, expected is %d\n", buf->buffer.size(), 30+4+4+30+4);
cout << "Before serialize: " << p << "\nAfter deserialize: " << *deserialize_nested_person(buf) << endl << endl;
}
含有指针的结构体的(反)序列化
结构体定义
struct PointerPerson{
char name[30];
Occupation* occupation;
uint32_t age;
uint32_t weight;
}
序列化与反序列化方法
如果有指针
- 指针为空,则填充 0xFFFFFFFF(如你所见,这会导致问题)
- 指针不为空,调用内层对象的序列化方法,将内层对象序列化到 buffer 中
void serialize_pointer_person(const PointerPerson* const obj, const unique_ptr<SerBuffer>& buffer){
if (!obj){
fillNull(buffer);
return ;
}
serializeData(buffer, obj->name, sizeof(obj->name));
serialize_simple_occupation(obj->occupation, buffer);
serializeData(buffer, (char*)&obj->age, sizeof(obj->age));
serializeData(buffer, (char*)&obj->weight, sizeof(obj->weight));
}
unique_ptr<PointerPerson> deserialize_pointer_person(const unique_ptr<SerBuffer>& buffer, uint32_t offset = 0){
if (*(uint32_t*)buffer->buffer.data() == 0xFFFFFFFF)
return {nullptr};
auto ptr = make_unique<PointerPerson>();
auto& person = *ptr;
copy(buffer->buffer.begin() + offset, buffer->buffer.begin() + sizeof(person) + offset, person.name);
offset += sizeof(person.name);
person.occupation = deserialize_simple_occupation(buffer, offset).release();
offset += sizeof(person.occupation->title) + sizeof(person.occupation->id);
person.age = *(decltype(person.age)*)(buffer->buffer.data() + offset);
offset += sizeof(person.age);
person.weight = *(decltype(person.weight)*)(buffer->buffer.data() + offset);
offset += sizeof(person.weight);
return move(ptr);
}
测试
void testPointerPerson(){
Occupation occ{"doctor", 294};
PointerPerson p{"1234", &occ, 1, 1};
auto buf = make_unique<SerBuffer>();
printf("-------------------> PointerPerson\n");
serialize_pointer_person(&p, buf);
printf("Full size of pointer person after serialize: %zu, expected is %d\n", buf->buffer.size(), 30+4+4+30+4);
cout << "Before serialize: " << p << "\nAfter deserialize: " << *deserialize_pointer_person(buf) << endl << endl;
}
还有其他很多数据结构需要序列化,但是不应该这样手动实现
RPC 头
RPC 头应该包含如下信息
- 远程调用 id,这里用 uint32_t 表示
- payload 大小,即参数大小,用 uint32_t 表示
定义为:
struct RPCHeader{
uint32_t id; // id of procedure
uint32_t size; // size of payload(argument)
};
RPC 服务器实现
下方的 id 为 1 的 RPC,实现如下功能:接受两个 SimplePerson 对象,给他们办结婚证 ,返回值的 name 字段为 name1 -Merry- name2,age 最大值,weight 取最小值
// socket相关代码省略
// 从socket_fd读取指定字节的数据,存入buffer
int read_n_bytes(int connfd, char* buffer, int n_bytes){
int tmp = 0;
int cnt = 0;
while (cnt < n_bytes) {
if ((tmp = read(connfd, buffer + cnt, n_bytes - cnt)) == -1) {cout << "failed reading" << endl;return -1;}
cnt += tmp;
}
return 0;
}
void handle_rpc(int connfd)
{
char buff[BUFFER_SIZE];
RPCHeader header{};
for (;;) {
bzero(buff, BUFFER_SIZE);
// 读取RPC头
if (read_n_bytes(connfd, buff, sizeof(RPCHeader))){
printf("Error reading nbytes header\n");
exit(-1);
}
memcpy(&header, buff, sizeof(RPCHeader));
printf("Get header: id: %d\t\tpayload: %d\n", header.id, header.size);
// 根据RPC头中指示的payload大小,读取参数
bzero(buff, BUFFER_SIZE);
if (read_n_bytes(connfd, buff, header.size)) {
printf("Error reading nbytes payload\n");
exit(-1);
}
auto buffer = make_unique<SerBuffer>();
buffer->buffer.assign(buff, buff + header.size);
cout << "Get header: " << header << endl;
// 根据不同的远程调用id,执行远程调用
switch (header.id) {
// get merry of two person
case 1:{
// 解析两个参数
auto p1 = deserialize_simple_person(buffer);
auto p2 = deserialize_simple_person(buffer, header.size / 2);
// 产生新的对象
auto res = string(p1->name) + string(" -Merry- ") + string(p2->name);
auto resp_buf = make_unique<SerBuffer>();
SimplePerson p{};
memcpy(p.name, res.c_str(), res.length());
p.age = max(p1->age, p2->age);
p.weight = min(p1->weight, p2->weight);
// 产生RPC头,id为1的RPC请求,其返回的id为100+1=101
RPCHeader resp_header{100+header.id, sizeof(p.weight) + sizeof(p.name) + sizeof(p.age)};
// 序列化返回值并发送
serializeRPCHeader(resp_buf, &resp_header);
serialize_simple_person(&p, resp_buf);
// send it back
if (send(connfd, resp_buf->buffer.data(), resp_buf->buffer.size(), 0) == -1){
return ;
}
break;
}
case 2:{
// .... do something
}
default:{
printf("Invalid rpc id: %d\n", header.id);
break;
}
}
}
}
RPC 客户端实现
如下代码将调用 id 为 1 的 RPC,并接收结果
// socket相关代码省略
void func(int sockfd)
{
char buff[BUFFER_SIZE];
int n;
bzero(buff, sizeof(buff));
/**
* to run rpc, we will follow the steps:
* 1. serialize RPC header to a buffer
* 2. serialize RPC argument to a buffer
* 3. send RPC package through socket
* 4. receive the response
* 5. deserialize RPC package
*/
// 1. serialize RPC header to a buffer
RPCHeader rpcHeader{};
rpcHeader.id = 1;
// TODO: size will be assigned after argument serialize
// 2. serialize RPC argument to a buffer
SimplePerson p1{"Tony", 23, 160};
SimplePerson p2{"Jenny", 21, 100};
auto argumentBuffer = make_unique<SerBuffer>();
serialize_simple_person(&p1, argumentBuffer);
serialize_simple_person(&p2, argumentBuffer);
rpcHeader.size = argumentBuffer->buffer.size();
// combine 1 & 2
auto buffer = make_unique<SerBuffer>();
serializeRPCHeader(buffer, &rpcHeader);
copy(argumentBuffer->buffer.begin(), argumentBuffer->buffer.end(), back_inserter(buffer->buffer));
cout << "send request -> p1: " << p1 << "\t\tp2: " << p2 << endl;
// cout << "buf sent: " << buffer->buffer.size() << endl;
send(sockfd, buffer->buffer.data(), buffer->buffer.size(), 0);
rpcHeader = {};
read_n_bytes(sockfd, (char*)&rpcHeader, sizeof(rpcHeader));
// printf("From Server : %s", buff);
read_n_bytes(sockfd, buff, rpcHeader.size);
resetSerBuffer(argumentBuffer);
copy(buff, buff+rpcHeader.size, back_inserter(argumentBuffer->buffer));
auto resp = deserialize_simple_person(argumentBuffer, 0);
cout << "Get response -> " << *resp << endl;
}
|