本项目将从零开始搭建出一个基于协程的异步RPC框架。
学习本项目需要有一定的C++,网络,RPC知识
项目地址:zavier-wong/acid: A high performance fiber RPC network framework. 高性能协程RPC网络框架
RPC框架设计
通过本项目你将学习到:
- 序列化协议
- 通信协议
- 服务注册
- 服务发现
- 负载均衡
- 心跳机制
- 三种异步调用方式
相信大家对RPC的相关概念都已经很熟悉了,这里不做过多介绍,直接进入重点。 本文档将简单介绍框架的设计,在最后的 examples 会给出完整的使用范例。 更多的细节需要仔细阅读源码。
本RPC框架主要有网络模块, 序列化模块,通信协议模块,客户端模块,服务端模块,服务注册中心模块,负载均衡模块
主要有以下三个角色:
注册中心 Registry
主要是用来完成服务注册和服务发现的工作。同时需要维护服务下线机制,管理了服务器的存活状态。
服务提供方 Service Provider
其需要对外提供服务接口,它需要在应用启动时连接注册中心,将服务名发往注册中心。服务端还需要启动Socket服务监听客户端请求。
服务消费方 Service Consumer
客户端需要有从注册中心获取服务的基本能力,它需要在发起调用时, 从注册中心拉取开放服务的服务器地址列表存入本地缓存, 然后选择一种负载均衡策略从本地缓存中筛选出一个目标地址发起调用, 并将这个连接存入连接池等待下一次调用。
序列化协议
序列化有以下规则:
- 默认情况下序列化,8,16位类型以及浮点数不压缩,32,64位有符号/无符号数采用 zigzag 和 varints 编码压缩
- 针对 std::string 会将长度信息压缩序列化作为元数据,然后将原数据直接写入。char数组会先转换成 std::string 后按此规则序列化
- 调用 writeFint 将不会压缩数字,调用 writeRowData 不会加入长度信息
对于任意类型,只要实现了以下的重载,即可参与传输时的序列化。
template<typename T>
Serializer &operator >> (Serializer& in, T& i){
return *this;
}
template<typename T>
Serializer &operator << (Serializer& in, T i){
return *this;
}
本模块同时提供了将tuple序列化和反序列化回tuple的功能。
调用方发起过程调用时,先将参数打包成tuple,再进行序列化传输。 被调用方收到调用请求时,先将参数包反序列回tuple,再解包转发给函数。
通信协议
封装通信协议,使RPC Server和RPC Client 可以基于同一协议通信。
采用私有通信协议,协议如下:
第一个字节是魔法数。
第二个字节代表协议版本号,以便对协议进行扩展,使用不同的协议解析器。
第三个字节是请求类型,如心跳包,请求,响应等。
第四个字节表示消息长度,占四个字节,表示后面还有多少个字节的数据。
除了协议头固定7字节,消息不定长。
目前提供了以下几种请求
enum class MsgType : uint8_t {
HEARTBEAT_PACKET,
RPC_PROVIDER,
RPC_CONSUMER,
RPC_REQUEST,
RPC_RESPONSE,
RPC_METHOD_REQUEST ,
RPC_METHOD_RESPONSE,
RPC_SERVICE_REGISTER,
RPC_SERVICE_REGISTER_RESPONSE,
RPC_SERVICE_DISCOVER,
RPC_SERVICE_DISCOVER_RESPONSE
};
服务注册
每一个服务提供者对应的机器或者实例在启动运行的时候, 都去向注册中心注册自己提供的服务以及开放的端口。 注册中心维护一个服务名到服务地址的多重映射,一个服务下有多个服务地址, 同时需要维护连接状态,断开连接后移除服务。
服务发现
虽然服务调用是服务消费方直接发向服务提供方的,但是分布式的服务,都是集群部署的, 服务的提供者数量也是动态变化的, 所以服务的地址也就无法预先确定。 因此如何发现这些服务就需要一个统一注册中心来承载。
客户端从注册中心获取服务,它需要在发起调用时, 从注册中心拉取开放服务的服务器地址列表存入本地缓存,
RPC连接池采用LRU淘汰算法,关闭最旧未使用的连接。
负载均衡
实现通用类型负载均衡路由引擎(工厂)。 通过路由引擎获取指定枚举类型的负载均衡器, 降低了代码耦合,规范了各个负载均衡器的使用,减少出错的可能。
提供了三种路由策略(随机、轮询、哈希), 由客户端使用,在客户端实现负载均衡
template<class T>
class RouteEngine {
public:
static typename RouteStrategy<T>::ptr queryStrategy(typename RouteStrategy<T>::Strategy routeStrategyEnum) {
switch (routeStrategyEnum){
case RouteStrategy<T>::Random:
return s_randomRouteStrategy;
case RouteStrategy<T>::Polling:
return std::make_shared<impl::PollingRouteStrategyImpl<T>>();
case RouteStrategy<T>::HashIP:
return s_hashIPRouteStrategy ;
default:
return s_randomRouteStrategy ;
}
}
private:
static typename RouteStrategy<T>::ptr s_randomRouteStrategy;
static typename RouteStrategy<T>::ptr s_hashIPRouteStrategy;
};
选择客户端负载均衡策略,根据路由策略选择服务地址。默认随机策略。
acid::rpc::RouteStrategy<std::string>::ptr strategy =
acid::rpc::RouteEngine<std::string>::queryStrategy(
acid::rpc::RouteStrategy<std::string>::Random);
客户端同时会维护RPC连接池,以及服务发现结果缓存,减少频繁建立连接。
通过上述策略尽量消除或减少系统压力及系统中各节点负载不均衡的现象。
心跳机制
服务中心必须管理服务器的存活状态,也就是健康检查。 注册服务的这一组机器,当这个服务组的某台机器如果出现宕机或者服务死掉的时候, 就会剔除掉这台机器。这样就实现了自动监控和管理。
项目采用了心跳发送的方式来检查健康状态。
服务器端: 开启一个定时器,定期给注册中心发心跳包,注册中心会回一个心跳包。
注册中心: 开启一个定时器倒计时,每次收到一个消息就更新一次定时器。如果倒计时结束还没有收到任何消息,则判断服务掉线。
三种异步调用方式
整个框架最终都要落实在服务消费者。为了方便用户,满足用户的不同需求,项目设计了三种异步调用方式。 三种调用方式的模板参数都是返回值类型,对void类型会默认转换uint8_t 。
- 以同步的方式异步调用
整个框架本身基于协程池,所以在遇到阻塞时会自动调度实现以同步的方式异步调用
auto sync_call = con->call<int>("add", 123, 321);
ACID_LOG_INFO(g_logger) << sync_call.getVal();
- future 形式的异步调用
调用时会立即返回一个future
auto async_call_future = con->async_call<int>("add", 123, 321);
ACID_LOG_INFO(g_logger) << async_call_future.get().getVal();
- 异步回调
async_call的第一个参数为函数时,启用回调模式,回调参数必须是返回类型的包装。收到消息时执行回调。
con->async_call<int>([](acid::rpc::Result<int> res){
ACID_LOG_INFO(g_logger) << res.getVal();
}, "add", 123, 321);
对调用结果及状态的封装如下
enum RpcState{
RPC_SUCCESS = 0,
RPC_FAIL,
RPC_NO_METHOD,
RPC_CLOSED,
RPC_TIMEOUT
};
template<typename T = void>
class Result {
...
private:
code_type m_code = 0;
msg_type m_msg;
type m_val;
}
最后
通过以上介绍,我们粗略地了解了分布式服务的大概流程。但篇幅有限,无法面面俱到, 更多细节就需要去阅读代码来理解了。
这并不是终点,项目只是实现了简单的服务注册、发现。后续将考虑加入注册中心集群,限流,熔断,监控节点等。
示例
rpc服务注册中心
#include "acid/rpc/rpc_service_registry.h"
void Main() {
acid::Address::ptr address = acid::Address::LookupAny("127.0.0.1:8080");
acid::rpc::RpcServiceRegistry::ptr server = std::make_shared<acid::rpc::RpcServiceRegistry>();
while (!server->bind(address)){
sleep(1);
}
server->start();
}
int main() {
acid::IOManager::ptr loop = std::make_shared<acid::IOManager>();
loop->submit(Main);
}
rpc 服务提供者
#include "acid/rpc/rpc_server.h"
int add(int a,int b){
return a + b;
}
std::string getStr() {
return "hello world";
}
void Main() {
int port = 9000;
acid::Address::ptr local = acid::IPv4Address::Create("127.0.0.1",port);
acid::Address::ptr registry = acid::Address::LookupAny("127.0.0.1:8080");
acid::rpc::RpcServer::ptr server = std::make_shared<acid::rpc::RpcServer>();
server->registerMethod("add",add);
server->registerMethod("getStr",getStr);
server->registerMethod("echo", [](std::string str){
return str;
});
while (!server->bind(local)){
sleep(1);
}
server->bindRegistry(registry);
server->start();
}
int main() {
acid::IOManager::ptr loop = std::make_shared<acid::IOManager>();
loop->submit(Main);
}
rpc 服务消费者,并不直接用RpcClient,而是采用更高级的封装,RpcConnectionPool。 提供了连接池和服务地址缓存。
#include "acid/log.h"
#include "acid/rpc/rpc_connection_pool.h"
static acid::Logger::ptr g_logger = ACID_LOG_ROOT();
void Main() {
acid::Address::ptr registry = acid::Address::LookupAny("127.0.0.1:8080");
acid::rpc::RpcConnectionPool::ptr con = std::make_shared<acid::rpc::RpcConnectionPool>(5);
con->connect(registry);
acid::rpc::Result<int> sync_call = con->call<int>("add", 123, 321);
ACID_LOG_INFO(g_logger) << sync_call.getVal();
std::future<acid::rpc::Result<int>> async_call_future = con->async_call<int>("add", 123, 321);
ACID_LOG_INFO(g_logger) << async_call_future.get().getVal();
con->async_call<int>([](acid::rpc::Result<int> res){
ACID_LOG_INFO(g_logger) << res.getVal();
}, "add", 123, 321);
}
int main() {
acid::IOManager::ptr loop = std::make_shared<acid::IOManager>();
loop->submit(Main);
}
|