在自己大四的实习中,对于计算机知识的理解有了更大的认知,对于网络协议、数据传输这块有了一定的认知,在快过22年的春节的时候,公司在年前的工作比较少,我闲来手写一个RPC框架来学习学习。
在选取网络传输技术上层API的时候,刚开始我打算使用NIO来调用socket从而去调用传输层的TCP协议,但是鉴于我在面试过程中遇到了面试官问我Netty的知识点,并且我从来没有使用过Netty,那么我这次的手写RPC我就是用Netty来做,当然Netty其实也是NIO框架,原理是一样的,只是Netty框架简化了开发。在这个博客中我应该会加一些Netty的知识点,就当学习了。
首先来介绍一下Netty。
Netty
etty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
接着我们介绍以下RPC。
RPC
当我们的服务端从单机变为多机时,我们想要调用远程服务的方法时不能直接调用,RPC可以使我们调用远程服务的方法相当于调用本地方法那样直接调用,其实说到底也是一个网络通信的问题,我们在本地伪造一个本地方法(代理),再通过网络将自己调用的方法的信息传输到存在该服务的远程服务器,之后找到该方法,再将该方法的返回值通过网络传输到本地方法,这样操作上就是本地直接调用伪造方法。在实现技术上,通信方面就是使用Netty,调用的话使用反射(服务端)与代理(客户端)。
接着我们来进行rpc框架的搭建
搭建RPC框架
搭建简介:
服务端:自定义一个@RPCService注解,统一标识提供服务方法的类(Bean)。在服务启动的时候将提供服务方法的Bean创建好并且注册到一个容器(类似于IOC容器)中。
客户端:自定义一个@RpcReference注解,将其加到消费类的实例对象中的属性中标识想要调用的对象,调用启动后通过java proxy创建服务端对象的代理对象,并且重写调用逻辑,其实就是将调用方的信息封装,之后通过Netty将调用信息传输到服务端。服务端拿到信息通过上面的@RPCService注解信息找到对应的类调用方法返回传输。
搭建过程
首先要将我们使用的依赖,通过Maven引入到我们的项目中。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.55.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>8.0.11</version>
</dependency>
接下来我们把我们需要的实体类、实现类、注解、编解码器以及服务接口编写好。
实体类:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
//服务的实体类
public class Book {
private Integer id;
private String bookName;
}
服务接口:
public interface BookService {
Book getBook(Integer id);
Book updateBook(Book book);
Integer delBook(Integer id);
Integer insertBook(Book book);
}
注解:
//客户端标识作用的注解
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcReference {
public String referenceName();
}
//服务端标识作用的注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService {
public String serviceName();
}
传输的消息体:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageContext {
private String messageId;
private Boolean isSuccess;
private Object context;
}
请求消息体、响应消息体,会被封装到传输消息体中:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Request {
private String className;
private String methodName;
private Class<?>[] parameterType;
private Object[] parameterValue;
private String serviceNameByAnnotated;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Response {
private Boolean isSuccess;
private Object context;
private Class<?> contextType;
}
重写Netty中的编解码器(二进制与数据之间的转换):
//解码器
public class MessageDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
byte[] body = new byte[byteBuf.readInt()];
byteBuf.readBytes(body);
String context = new String(body);
System.out.println("decoder出来的字符串"+context);
MessageContext messageContext = JSON.parseObject(context,MessageContext.class);
list.add(messageContext);
byteBuf.skipBytes(byteBuf.readableBytes());
}
}
public class MessageEncoder extends MessageToByteEncoder {
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
String s = JSONObject.toJSONString(o);
byte[] bytes = s.getBytes();
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
System.out.println("已经成功写入通道"+s);
}
}
Netty服务端:
public class NettyServer {
public static void main(String[] args) {
final NettyServerHandler nettyServerHandler = new NettyServerHandler();
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup,childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new MessageEncoder());
pipeline.addLast(nettyServerHandler);
}
});
ChannelFuture future = bootstrap.bind(8000).sync();
if (future.isSuccess()){
nettyServerHandler.setBeans(InitAnnotatedObject.init());
System.out.println("成功加载bean,数量"+nettyServerHandler.getBeans().size());
System.out.println("已加载的beans:--["+nettyServerHandler.getBeans().toString()+"]--");
System.out.println("服务器已启动。。。");
}
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
服务端Handler:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private Map<String,Object> beans;
public Map<String,Object> getBeans(){
return beans;
}
public void setBeans(Map<String,Object> beans){
this.beans = beans;
}
public NettyServerHandler() {
super();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客户端的消息 ===== " + msg);
if (msg instanceof MessageContext){
channelRead0(ctx,(MessageContext)msg);
}
}
protected void channelRead0(ChannelHandlerContext channelHandlerContext,MessageContext messageContext) throws Exception{
Object context = messageContext.getContext();
Request request = JSON.parseObject(JSONObject.toJSONString(context),Request.class);
//请求参数的类型
Class<?>[] parameterType = request.getParameterType();
//请求参数的值
Object[] parameterValue = request.getParameterValue();
if (parameterType.length == parameterValue.length){
for (int i = 0; i < parameterValue.length; i++) {
Object o = JSON.parseObject(JSONObject.toJSONString(parameterValue[i]),parameterType[i]);
parameterValue[i] = o;
}
}
Object result = null;
Class<?> aClass = Class.forName(request.getClassName());
if (beans == null || beans.size() == 0) {
System.out.println("没有找到被调用的bean实例");
channelHandlerContext.writeAndFlush(new MessageContext(String.valueOf(new Random()
.nextInt(9999)),false,new Response(true,new Object(),Object.class)));
return;
}
Object o = beans.get(request.getServiceNameByAnnotated());
if (o == null){
System.out.println("beans里没有找到实例,根据class创建");
o = aClass.newInstance();
}
Method method = aClass.getMethod(request.getMethodName(),request.getParameterType());
Class<?> returnType = method.getReturnType();
Object invoke = method.invoke(o,request.getParameterValue());
System.out.println("反射调用的结果为"+invoke.toString());
if (returnType.isInstance(invoke)){
result = returnType.cast(invoke);
}
System.out.println("调用完成,返回结果"+result.toString());
channelHandlerContext.writeAndFlush(new MessageContext(String.valueOf(new Random().nextInt(9999)),
true,new Response(true,result,returnType)));
System.out.println("写入通道正常");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生了异常");
cause.printStackTrace();
ctx.close();
}
}
提供服务的类如何被注册到“Bean容器中的”:
public class InitAnnotatedObject {
public static Map<String,Object> init(){
Map<String, Object> map = new HashMap<String,Object>();
try{
Reflections reflections = new Reflections("com.haina.rpc");//创建一个Reflections 对象,并指定要扫描的包路径
Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcService.class);//该方法会扫描指定包路径下带有@RpcService注解的类的Class对象,并封装成一个set集合
typesAnnotatedWith.stream().forEach(aClass -> {
if (aClass.isAnnotationPresent(RpcService.class)){
RpcService annotation = aClass.getAnnotation(RpcService.class);
try{
Object o = aClass.newInstance();
map.put(annotation.serviceName(),o);
}catch (Exception e){
e.printStackTrace();
}
}
});
}catch (Exception e){
e.printStackTrace();
}
return map;
}
}
Netty客户端:
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NettyClientHandler nettyClientHandler = new NettyClientHandler();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
io.netty.bootstrap.Bootstrap bootstrap = new io.netty.bootstrap.Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new MessageEncoder());
pipeline.addLast(nettyClientHandler);
}
});
ChannelFuture future = bootstrap.connect("localhost",8000).sync();
if (future.isSuccess()){
System.out.println("链接到服务端,开始调用服务");
InvockService invockService = new InvockService(BookService.class,nettyClientHandler);
Book book = invockService.getBook(1);
System.out.println("---getBook返回结果"+book.toString());
Integer integer = invockService.insertBook(book);
System.out.println("---insertBook返回结果"+integer.toString());
Book book1 = invockService.updateBook(book);
System.out.println("---updateBook返回结果"+book1.toString());
Integer integer1 = invockService.delBook(integer);
System.out.println("delBook返回结果"+integer1.toString());
}
ChannelFuture sync = future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
if (eventLoopGroup != null){
eventLoopGroup.shutdownGracefully();
}
}
}
}
代理对象:
public class InvockService {
@RpcReference(referenceName = "BookServiceImpl")
private BookService bookService;
private ExecutorService executorService = Executors.newFixedThreadPool(8);
public InvockService(Class<?> className,NettyClientHandler nettyClientHandler) {
this.bookService = (BookService) getBean(className,nettyClientHandler);
}
public InvockService(BookService bookService){
this.bookService = bookService;
}
public Object getBean(Class<?> className,NettyClientHandler nettyClientHandler){
//通过proxy创建被调用者className的代理对象。
Object proxyInstance = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{className}, ((proxy, method, args) -> {
//并重写调用逻辑。当对象被代理后,你调用原对象方法,会拦截进入到该逻辑处理
//封装请求体数据
Request request = new Request();
request.setClassName("com.haina.rpc.server.Impl.BookServiceImpl");
request.setMethodName(method.getName());
request.setParameterType(method.getParameterTypes());
request.setParameterValue(args);
// Field userServiceField = null;
// try{
// userServiceField = InvockService.class.getDeclaredField("bookService");
// }catch (Exception e){
// }
//获取bookService字段的@RpcReference注解的name属性传给服务端,服务端通过这个bean的name属性去bean容器查找对象
request.setServiceNameByAnnotated("BookServiceImpl");
MessageContext messageContext = new MessageContext(String.valueOf(new Random().nextInt(9999)),true,request);
//把消息体放入到nettyClientHandler中,以供nettyClientHandler去发送数据
nettyClientHandler.setMessageContext(messageContext);
//把具体远程调用处理传给线程池去异步处理。
Object o = executorService.submit(nettyClientHandler).get();
System.out.println("调用返回的o:"+o);
return o;
}));
return proxyInstance;
}
Book getBook(Integer id){
return bookService.getBook(id);
}
Book updateBook(Book book){
return bookService.updateBook(book);
}
Integer delBook(Integer id){
return bookService.delBook(id);
}
Integer insertBook(Book book){
return bookService.insertBook(book);
}
}
客户端Handler:
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext channelHandlerContext;
private MessageContext messageContext;
public NettyClientHandler() {
super();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("调用ctx方法");
this.channelHandlerContext = ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("来自服务端的消息"+msg);
if (msg instanceof MessageContext){
System.out.println("调用read0方法");
channelRead0(ctx,(MessageContext)msg);
}
}
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext,MessageContext response)throws Exception{
System.out.println("调用服务返回值"+response.toString());
this.messageContext = response;
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生了异常");
cause.printStackTrace();
ctx.close();
}
@Override
public synchronized Object call() throws Exception {
System.out.println("调用call方法");
channelHandlerContext.writeAndFlush(messageContext);
this.wait();
Response response = JSON.parseObject(JSONObject.toJSONString(messageContext.getContext()),Response.class);
return JSON.parseObject(JSONObject.toJSONString(response.getContext()),response.getContextType());
}
public MessageContext getMessageContext(){
return messageContext;
}
public void setMessageContext(MessageContext messageContext){
this.messageContext = messageContext;
}
}
以上就是我这次手写RPC框架的所有内容。
|