Netty搭建简易RPC
什么是RPC
-
RPC 全称 Remote Procedure Call——远程过程调用。 -
在学校学编程,我们写一个函数都是在本地调用就行了。 -
但是在互联网公司,服务都是部署在不同服务器上的分布式系统,如何调用呢? -
RPC技术简单说就是为了解决远程调用服务的一种技术,使得调用者像调用本地服务一样方便透明。 -
下图是客户端调用远端服务的过程:
图片来源网络
- 客户端(Client),服务的调用方。
- 服务端(Server),真正的服务提供者。
- 客户端存根,存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
- 服务端存根,接收客户端发送过来的消息,将消息解包,并调用本地的方法。
图片来源网络
为什么需要RPC
- 1、首先要明确一点:RPC可以用HTTP协议实现,并且用HTTP是建立在 TCP 之上最广泛使用的 RPC,但是互联网公司往往用自己的私有协议,比如鹅厂的JCE协议,私有协议不具备通用性为什么还要用呢?因为相比于HTTP协议,RPC采用二进制字节码传输,更加高效也更加安全。
- 2、现在业界提倡“微服务“的概念,而服务之间通信目前有两种方式,RPC就是其中一种。RPC可以保证不同服务之间的互相调用。即使是跨语言跨平台也不是问题,让构建分布式系统更加容易。
- 3、RPC框架都会有服务降级、流量控制的功能,保证服务的高可用。
代码实现
项目说明
-
使用SpringBoot 2.6.6 -
netty 4 -
jdk8
项目结构图:
依赖导入
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.14</version>
</dependency>
一、消息实体
Message抽象类
package cn.netty.netty_rpc.message;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public abstract class Message implements Serializable {
public static final Map<Integer,Class<?>> messageClasses = new HashMap<>();
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
static {
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST,RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE,RpcResponseMessage.class);
}
private int sequenceId;
private int messageType;
public static Class<?> getMessageClass(int messageType){
return messageClasses.get(messageType);
}
public abstract int getMessageType();
public int getSequenceId() {
return sequenceId;
}
public void setSequenceId(int sequenceId) {
this.sequenceId = sequenceId;
}
public void setMessageType(int messageType) {
this.messageType = messageType;
}
}
RpcRequestMessage
package cn.netty.netty_rpc.message;
public class RpcRequestMessage extends Message{
private String interfaceName;
private String methodName;
private Class<?> returnType;
private Class[] parameterTypes;
private Object[] parameterValue;
public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?> getReturnType() {
return returnType;
}
public void setReturnType(Class<?> returnType) {
this.returnType = returnType;
}
public Class[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameterValue() {
return parameterValue;
}
public void setParameterValue(Object[] parameterValue) {
this.parameterValue = parameterValue;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}
RpcResponseMessage
package cn.netty.netty_rpc.message;
public class RpcResponseMessage extends Message{
private Object returnValue;
private Exception exceptionValue;
public Object getReturnValue() {
return returnValue;
}
public void setReturnValue(Object returnValue) {
this.returnValue = returnValue;
}
public Exception getExceptionValue() {
return exceptionValue;
}
public void setExceptionValue(Exception exceptionValue) {
this.exceptionValue = exceptionValue;
}
@Override
public int getMessageType() {
return 0;
}
@Override
public String toString() {
return "RpcResponseMessage{" +
"returnValue=" + returnValue +
", exceptionValue=" + exceptionValue +
'}';
}
}
二、序列化算法
Serializer接口
package cn.netty.netty_rpc.utils.serializer;
public interface Serializer {
<T> byte[] serialize(T object);
<T> T deserialize(Class<T> clazz, byte[] bytes);
}
枚举类实现
package cn.netty.netty_rpc.utils.serializer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.io.*;
import java.nio.charset.StandardCharsets;
public enum SerializerAlgorithm implements Serializer{
Java{
@Override
public <T> byte[] serialize(T object) {
byte[] bytes = null;
try{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
bytes = bos.toByteArray();
}catch (IOException e){
e.printStackTrace();
}
return bytes;
}
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
T target = null;
try{
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis);
target =(T) ois.readObject();
}catch (IOException | ClassNotFoundException e){
e.printStackTrace();
}
return target;
}
},
Json{
@Override
public <T> byte[] serialize(T object) {
if( object == null){
return new byte[0];
}
return JSON.toJSONString(object).getBytes(StandardCharsets.UTF_8);
}
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
if (null == bytes || bytes.length <= 0) {
return null;
}
String str = new String(bytes, StandardCharsets.UTF_8);
return (T) JSON.parseObject(str, clazz);
}
}
}
三、自定义协议与解析
自定义可共享MessageCodecSharable
package cn.netty.netty_rpc.protocol;
import cn.netty.netty_rpc.message.Message;
import cn.netty.netty_rpc.utils.serializer.SerializerAlgorithm;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import java.util.List;
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf,Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message msg, List<Object> outList) throws Exception {
ByteBuf out = channelHandlerContext.alloc().buffer();
out.writeBytes(new byte[]{1,0,1,6});
out.writeByte(1);
out.writeByte(1);
out.writeByte(msg.getMessageType());
out.writeInt(msg.getSequenceId());
out.writeByte(0xff);
SerializerAlgorithm[] values = SerializerAlgorithm.values();
byte[] bytes = values[out.getByte(5)-1].serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> outList) throws Exception {
int magic = in.readInt();
byte version = in.readByte();
byte seqType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
SerializerAlgorithm[] values = SerializerAlgorithm.values();
Message message = (Message) values[seqType-1].deserialize(Message.getMessageClass(messageType), bytes);
outList.add(message);
}
}
四、远程调用接口
IHelloService
package cn.netty.netty_rpc.service;
public interface IHelloService {
String sayHello(String name);
}
HelloServiceImpl
package cn.netty.netty_rpc.service.impl;
import cn.netty.netty_rpc.service.IHelloService;
public class HelloServiceImpl implements IHelloService {
@Override
public String sayHello(String name) {
return "hello, "+name;
}
}
ServiceFactory
package cn.netty.netty_rpc.factory;
import java.util.HashMap;
import java.util.Map;
public class ServiceFactory {
static Map<Class<?>,Object> map = new HashMap<>(16);
public static Object getInstance(Class<?> interfaceClass){
try{
Class<?> clazz = Class.forName("cn.netty.netty_rpc.service.IHelloService");
Object instance = Class.forName("cn.netty.netty_rpc.service.impl.HelloServiceImpl").newInstance();
map.put(clazz, instance);
}catch ( ClassNotFoundException | InstantiationException | IllegalAccessException e){
e.printStackTrace();
}
return map.get(interfaceClass);
}
}
五、Handler
RpcRequestMessageHandler
package cn.netty.netty_rpc.handler;
import cn.netty.netty_rpc.factory.ServiceFactory;
import cn.netty.netty_rpc.message.RpcRequestMessage;
import cn.netty.netty_rpc.message.RpcResponseMessage;
import cn.netty.netty_rpc.service.IHelloService;
import cn.netty.netty_rpc.service.impl.HelloServiceImpl;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.lang.reflect.Method;
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage rpcRequestMessage) throws Exception {
RpcResponseMessage rpcResponseMessage = new RpcResponseMessage();
try{
rpcResponseMessage.setSequenceId(rpcRequestMessage.getSequenceId());
IHelloService helloService = (IHelloService)ServiceFactory.getInstance(Class.forName(rpcRequestMessage.getInterfaceName()));
Method method = helloService.getClass().getMethod(rpcRequestMessage.getMethodName(),rpcRequestMessage.getParameterTypes());
Object invoke = method.invoke(helloService, rpcRequestMessage.getParameterValue());
rpcResponseMessage.setReturnValue(invoke);
}catch (Exception e){
e.printStackTrace();
rpcResponseMessage.setExceptionValue(e);
}
ctx.writeAndFlush(rpcResponseMessage);
}
}
RpcResponseMessageHandler
package cn.netty.netty_rpc.handler;
import cn.netty.netty_rpc.message.RpcResponseMessage;
import cn.netty.netty_rpc.server.RpcServer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
public static Map<Integer, Promise<Object>> promiseMap = new ConcurrentHashMap<>(16);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponseMessage rpcResponseMessage) throws Exception {
Promise<Object> promise = promiseMap.remove(rpcResponseMessage.getSequenceId());
if(promise != null){
Object returnValue = rpcResponseMessage.getReturnValue();
Exception exceptionValue = rpcResponseMessage.getExceptionValue();
if(exceptionValue != null){
promise.setFailure(exceptionValue);
}else {
promise.setSuccess(returnValue);
}
}
System.out.println(rpcResponseMessage);
}
}
六、服务端
package cn.netty.netty_rpc.server;
import cn.netty.netty_rpc.handler.RpcRequestMessageHandler;
import cn.netty.netty_rpc.protocol.MessageCodecSharable;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable messageSharableCodec = new MessageCodecSharable();
RpcRequestMessageHandler rpcRequestMessageHandler = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024*1024,12,4,0,0));
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageSharableCodec);
ch.pipeline().addLast(rpcRequestMessageHandler);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
七、客户端
省略无数个包...
public class RpcClient {
public static AtomicInteger sequenceId = new AtomicInteger(0);
public static volatile Channel channel = null;
public static final Object lock = new Object();
public static void main(String[] args) {
IHelloService service = (IHelloService) getProxy(IHelloService.class);
System.out.println(service.sayHello("小翟"));
System.out.println(service.sayHello("甲粒子"));
}
public static Channel getChannel(){
if (channel == null){
synchronized (lock){
if (channel == null){
init();
}
}
}
return channel;
}
public static Object getProxy(Class<?> serviceClass){
Class<?>[] classes = new Class<?>[]{serviceClass};
Object proxy = Proxy.newProxyInstance(serviceClass.getClassLoader(), classes, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
int id = sequenceId.getAndIncrement();
RpcRequestMessage message = new RpcRequestMessage(id,serviceClass.getName(),
method.getName(),method.getReturnType(),method.getParameterTypes(),args);
getChannel().writeAndFlush(message);
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.promiseMap.put(id,promise);
promise.await();
if (promise.isSuccess()) {
return promise.getNow();
} else {
throw new RuntimeException(promise.cause());
}
}
});
return proxy;
}
private static void init() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
RpcResponseMessageHandler rpcResponseMessageHandler = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024*1024,12,4,0,0));
socketChannel.pipeline().addLast(loggingHandler);
socketChannel.pipeline().addLast(messageCodecSharable);
socketChannel.pipeline().addLast(rpcResponseMessageHandler);
}
});
try {
channel = bootstrap.connect(new InetSocketAddress("localhost",8080)).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
客户端 服务器端: 与客户端一样会触发四次 日志handler
|