RPC简单实现
用的netty实现,没搭建注册中心,就是只有客户端和服务端,实现客户端远程调用服务端的sevice 仓库地址,https://github.com/wuhene/rpc-demo 结合代码看文章会更好理解,点个star支持下八~
模块划分
- rpc-common:公共模块,存放remote接口、公共bean、工具包,编/解码器
- rpc-server:服务端模块,remote接口的具体实现类,–provider
- rpc-client:客户端 --consumer
rpc-common模块
依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
rpc消息定义
-
所有的网络传输涉及到的消息对象都实现Message接口
-
MessageType类:定义消息类型和消息类型映射的class对象,用于decode时反序列化
-
public class MessageType {
public static final byte RPC_REQUEST = 1;
public static final byte RPC_RESPONSE = 2;
private static Map<Byte,Class> map = new HashMap<>();
static {
map.put(RPC_REQUEST, RpcRequest.class);
map.put(RPC_RESPONSE, RpcResponse.class);
}
public static Class type2Class(byte type){
return map.get(type);
}
}
rpc消息对象
-
RpcRequest:客户端发起Rpc请求消息
-
RpcResponse:服务端响应给客户端这次远程调用的结果
编/解码处理器编写
-
客户端和服务器端消息通讯的处理器,所以应该既是入站处理器也是出站处理器;入站就作为首个处理器进行解码,出站就作为最后一个处理器进行编码 -
@ChannelHandler.Sharable
@Component
@Slf4j
public class MessageCodec extends MessageToMessageCodec<ByteBuf, Message> {
private byte[] procotl = new byte[]{'w','h','r','p','c'};
}
- 继承netty提供的MessageToMessageCodec类,这个类间接继承了ChannelInboundHandlerAdapter和间接实现了ChannelOutboundHandler,符合既是入站又是出站的要求
- Message是我们定义的类,之后我们调用channel的write方法传输数据只要写入Message对象就可以走这个处理器编码
- ByteBuf是netty提供的类,就是字节流存储的对象,我们最后对象传输都得转为字节形式写进ByteBuf,netty再进行传输,同样解码的时候也是解的字节,还是ByteBuf对象
- 这两个泛型对应的是进站和出站;
- 进站ByteBuf:表示我们接收到的数据,数据是ByteBuf类或子类,就会进MessageCodeC的decode方法;所以是每次接收数据都会进,因为netty最底层传输单位可以理解为就是这个ByteBuf
- 出站Message:表示我们write出数据,数据是Message类或子类,就会进MessageCodeC的encode方法
-
encode方面:
-
一次请求包含以下内容:请求头(协议、消息类型、内容大小)和消息体;填充内容只是为了把请求头填满16字节,强迫症 -
实际内容序列化为json,目前没写其他类型 -
@Override
protected void encode(ChannelHandlerContext ctx, Message message, List<Object> list) throws Exception {
try {
ByteBuf byteBuf = ctx.alloc().buffer();
byteBuf.writeBytes(procotl);
byteBuf.writeByte(message.getType());
byte[] content = JSONUtils.toJson(message).getBytes();
byteBuf.writeBytes(new byte[]{'h','c','s','c','s','s'});
byteBuf.writeInt(content.length);
byteBuf.writeBytes(content);
list.add(byteBuf);
} catch (Exception e){
log.error("编码异常:",e);
}
}
-
decode方面:
-
根据encode的内容进行解码,将消息体反序列化为对象传递给下一逻辑处理器 -
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
try {
byte[] protocol = new byte[5];
byteBuf.readBytes(protocol);
if (!protocolCheck(protocol)) return;
byte type = byteBuf.readByte();
byteBuf.readBytes(new byte[6]);
int length = byteBuf.readInt();
byte[] content = new byte[length];
byteBuf.readBytes(content);
Object obj = JSONUtils.fromJson(new String(content), MessageType.type2Class(type));
list.add(obj);
} catch (Exception e) {
log.error("编码异常:",e);
}
}
private boolean protocolCheck(byte[] bytes){
for (int i = 0; i < procotl.length; i++) {
if (bytes[i] != procotl[i]) return false;
}
return true;
}
rpc远程调用api定义
这个就是定义出remote接口,用来测试客户端能否根据接口调用到服务端的具体实现类
public interface UserServiceRemote {
User getUser(long userId);
void saveUser(User user);
}
工具类
public class JSONUtils {
public static String toJson(Object object){
return JSON.toJSONString(object);
}
public static <T> T fromJson(String json,Class<T> tClass){
return JSON.parseObject(json,tClass);
}
public static <T> List<T> fromJson2List(String json,Class<T> tClass){
return JSON.parseArray(json,tClass);
}
}
配置类
用于其他服务引入此公共包时扫描作用,主要也就是扫一个编/解码处理器
@ComponentScan(basePackages = {"com.wuhen.common"})
public class Config {
@Bean
public LoggingHandler loggingHandler(){
return new LoggingHandler();
}
}
启动注解
为容器导入配置类
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(Config.class)
public @interface EnableRpc {
}
rpc-server模块
依赖
<dependencies>
<dependency>
<groupId>com.wuhen</groupId>
<artifactId>rpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
Service工厂
根据客户端提供的接口全限定名获取服务端具体的实现类实例
public class ServiceFactory {
private static final Map<String,Object> SERVICE_MAP = new ConcurrentHashMap<>();
public static void register(Object bean){
SERVICE_MAP.put(bean.getClass().getInterfaces()[0].getName(),bean);
}
public static Object getService(String interfaceName){
return SERVICE_MAP.get(interfaceName);
}
}
Rpc请求处理器
- 只需要继承入站处理器,因为rpc请求对服务端来说只有入站;
- 在编/解码处理器处理完后,rpc请求消息会被反序列RpcRequest对象,就会执行本处理器
- 原理就是根据客户端请求过来的具体接口全限定名,去我们Service工厂获取到具体实现类实例,然后利用反射调用方法,最后将返回值write,由编/解码处理器编码成ByteBuf传输给客户端
@Slf4j
@ChannelHandler.Sharable
@Component
@Order(1)
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequest> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {
log.info("rpc 请求信息:{}", JSONUtils.toJson(rpcRequest));
RpcResponse response = new RpcResponse();
try {
Object service = ServiceFactory.getService(rpcRequest.getInterfaceName());
if (service == null) throw new Exception("未找到service对象");
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Class[] parameterTypes = rpcRequest.getParameterTypes();
Object[] parameterValues = rpcRequest.getParameterValues();
for (int i = 0; i < parameterTypes.length; i++) {
parameterValues[i] = JSONUtils.fromJson(JSONUtils.toJson(parameterValues[i]),parameterTypes[i]);
}
response.setRequestId(rpcRequest.getRequestId());
response.setRes(method.invoke(service, rpcRequest.getParameterValues()));
response.setSuccess(true);
} catch (Exception e) {
response.setSuccess(false);
response.setException(e);
log.error("rpc请求发生异常,请求id:{}, 异常信息为:",rpcRequest.getRequestId(),e);
}
log.info("rpc 响应信息:{}", JSONUtils.toJson(response));
channelHandlerContext.channel().writeAndFlush(response);
}
}
Rpc远程调用api实现类
就是测试用的类,随便写写逻辑
@Service
@Slf4j
public class UserServiceRemoteImpl implements UserServiceRemote {
@PostConstruct
public void postConstruct(){
ServiceFactory.register(this);
}
@Override
public User getUser(long userId) {
User user = new User();
user.setUserId(userId);
user.setUsername("紧张的无痕");
return user;
}
@Override
public void saveUser(User user,long pid) {
log.info("保存用户:{}", JSONUtils.toJson(user));
log.info("视频pid:{}",pid);
}
}
Server服务主类
@ConfigurationProperties(prefix = "rpc.server")
@Component
@Slf4j
@Data
public class Server {
private int port;
private int maxFrameLength;
@Autowired
private LoggingHandler loggingHandler;
@Autowired
private List<SimpleChannelInboundHandler> hanlders;
@Autowired
private MessageCodec messageCodec;
@PostConstruct
public void start(){
new Thread(() -> {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast(new LengthFieldBasedFrameDecoder(maxFrameLength,
12,4,0,0));
sc.pipeline().addLast(loggingHandler);
sc.pipeline().addLast(messageCodec);
for (SimpleChannelInboundHandler hanlder : hanlders) {
sc.pipeline().addLast(hanlder);
}
}
});
log.info("启动RPC服务端...");
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
log.info("RPC服务端启动成功,port:{}",port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("发生异常,信息为:",e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
},"rpc-server").start();
}
}
application.yaml
rpc:
server:
port: 8080 #端口号
maxFrameLength: 102400 #最大帧长度 单位byte
启动类
@EnableRpc
@SpringBootApplication
public class ServerApp {
public static void main(String[] args) {
SpringApplication.run(ServerApp.class,args);
}
}
rpc-client模块
依赖
<dependencies>
<dependency>
<groupId>com.wuhen</groupId>
<artifactId>rpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
Rpc代理工厂
根据接口的interfaceClass创建代理对象,主类RpcProxyFactory
-
RpcProxyFactory
-
@Slf4j
public class RpcProxyFactory {
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
private static final Map<Long,Promise<Object>> PROMISE_MAP = new ConcurrentHashMap<>();
private static Channel channel;
public static <T> T getProxy(Class<T> interfaceClass){
Object o = Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass},
new RpcProxyHandler(interfaceClass.getName()));
return (T)o;
}
public static void setChannel(Channel channel) {
RpcProxyFactory.channel = channel;
}
public static Promise<Object> getPromise(long requestId) {
return PROMISE_MAP.remove(requestId);
}
}
-
RpcProxyHandler:RpcProxyFactory的静态内部类,InvocationHandler实现类,主要方法调用逻辑在这,发起rpc请求后,创建一个Promise,然后等待后端响应后唤醒
-
static class RpcProxyHandler implements InvocationHandler {
private String interfaceName;
public RpcProxyHandler(String interfaceName) {
this.interfaceName = interfaceName;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
long requestId = ID_GENERATOR.incrementAndGet();
rpcRequest.setRequestId(requestId);
rpcRequest.setInterfaceName(interfaceName);
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameterValues(args);
channel.writeAndFlush(rpcRequest);
DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());
PROMISE_MAP.put(requestId,promise);
promise.await();
if (promise.isSuccess()){
Object res = promise.getNow();
res = JSONUtils.fromJson(JSONUtils.toJson(res),method.getReturnType());
log.info("远程调用结果为:{}",res);
return res;
}
log.info("远程调用失败,异常为:",promise.cause());
return null;
}
}
Rpc响应处理器
- 就是将解码处理器解完的数据存放到rpc请求创建的Promise,有数据后就会唤醒上面代理对象方法,得到方法响应结果
@ChannelHandler.Sharable
@Component
@Slf4j
@Order(1)
public class RpcResponseHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception {
log.info("远程调用结果,response:{}", JSONUtils.toJson(rpcResponse));
Promise<Object> promise = RpcProxyFactory.getPromise(rpcResponse.getRequestId());
if (promise == null) return;
if (!rpcResponse.isSuccess()) {
promise.setFailure(rpcResponse.getException());
}else {
promise.setSuccess(rpcResponse.getRes());
}
}
}
配置类
生成代理对象注入到spring容器中,这里后面可以换成递归加载包的方式,获取到class对象,然后调用工厂方法创建,再注入bean,就真的可以提供成包投入使用了
@Configuration
public class ClientConfig {
@Bean
public UserServiceRemote userServiceRemote(){
return RpcProxyFactory.getProxy(UserServiceRemote.class);
}
}
Service测试类编写
就是我们最后调用这个类进行测试,能否远程调用server端的userServiceRemote成功
@Component
@Slf4j
public class VideoServiceImpl implements VideoService {
@Autowired
private UserServiceRemote userServiceRemote;
@Override
public void get() {
User user = userServiceRemote.getUser(10001);
log.info("VideoSevice get Result:{}",JSONUtils.toJson(user));
}
@Override
public void save(User user,long pid) {
userServiceRemote.saveUser(user,pid);
}
}
Client主类
-
@ConfigurationProperties(prefix = "rpc.client")
@Component
@Data
@Slf4j
public class Client {
private String host;
private int port;
private int maxFrameLength;
@Autowired
private LoggingHandler loggingHandler;
@Autowired
private List<SimpleChannelInboundHandler> handlers;
@Autowired
private MessageCodec messageCodec;
@Autowired
private VideoService videoService;
@PostConstruct
public void start(){
new Thread(() -> {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast(new LengthFieldBasedFrameDecoder(maxFrameLength,
12,4,0,0));
sc.pipeline().addLast(loggingHandler);
sc.pipeline().addLast(messageCodec);
for (SimpleChannelInboundHandler handler : handlers) {
sc.pipeline().addLast(handler);
}
}
});
log.info("启动RPC客户端...");
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
log.info("RPC客户端启动成功,host:{},port:{}",host,port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("发生异常,信息为:",e);
} finally {
worker.shutdownGracefully();
}
},"rpc-client").start();
}
}
-
这段代码就是我们测试的起点了,键盘输入1就请求videoService的get,2就请求save,这两个方法都会远程调用UserServiceRemote
-
sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接成功");
RpcProxyFactory.setChannel(ctx.channel());
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String s = scanner.nextLine();
if (s.equals("1")){
videoService.get();
}else if (s.equals("2")){
videoService.save(new User(10011,"saber"),26);
}
}
}).start();
}
});
application.yaml
rpc:
client:
host: 127.0.0.1
port: 8080
maxFrameLength: 102400 #最大帧长度 单位byte
启动类
@EnableRpc
@SpringBootApplication
public class ClientApp {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ClientApp.class, args);
}
}
测试
-
启动ServerApp -
启动ClientApp -
键盘键入1,结果如下
- client
- server
-
键盘键入2,结果如下
- client
- server
-
与预想结果一致,算是简单实现了Rpc远程调用
扩展
- 还可再写一个注册中心,将所有server注册到注册中心,然后客户端拿着全限定名请求注册中心,注册中心再请求server,这样可以在注册中心做负载均衡
- client的配置类可以改进,新增一个注解,写上要扫描的远程调用api接口包,然后根据这些接口全限定名去代理工厂创建对象并注入容器,就不用每新增一个就手写一个创建代理对象了
|