IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Netty+ProtoBuf+Jdk 动态代理=手写简单Rpc -> 正文阅读

[网络协议]Netty+ProtoBuf+Jdk 动态代理=手写简单Rpc

  • rpc 远程处理调用

  • 本地客户端 Consumer(客户端)

  • 远程服务提供者 Provider(服务端)

  • consumer 和 provider 需要有相同的接口

    • 我们在使用feign或者之类的rpc都是类似(个人理解)
  • 使用netty完成连接 在连接中使用动态代理

    • 客户端在动态代理的方法调用中(Invoke) 使用netty 所维持的 channel 完成request 的发送 并异步等待返回结果
    • 服务端在接受特定的请求后解析自定义协议中的数据,使用代理完成请求任务,并且返回respons
  • 服务端动态代理没写了(因为cglib不是很熟)

一、Consumer

1.启动程序
public class ConsumerBootStrap {
    EventLoopGroup consumer;
    Bootstrap bootstrap;

    public ConsumerBootStrap() {
        consumer = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
    }

    public void boot(String hostName, int port) {
        boot0(hostName, port);
    }

    private void boot0(String hostName, int port) {
        ChannelFuture channelFuture = bootstrap.group(consumer)
                .handler(new RpcChannelHandlerInitializerClient())
                .channel(NioSocketChannel.class)
                .connect(hostName, port);
        try {
            channelFuture.sync();
        } catch (InterruptedException interruptedException) {
            interruptedException.printStackTrace();
        }

    }

}
2.协议解码
public class RpcConsumerDecoder extends ReplayingDecoder<RpcMessage.RpcMessagePojo> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Charset charset = CharsetUtil.UTF_8;
        RpcMessage.RpcMessagePojo.Builder builder = RpcMessage.RpcMessagePojo.newBuilder();
        RpcMessage.Response.Builder responseBuilder = RpcMessage.Response.newBuilder();
        builder.setMessageType(RpcMessage.RpcMessagePojo.MessageType.RESPONSE);


        int responseHeaderSize = in.readInt();
        String string=in.readBytes(responseHeaderSize).toString(charset);
        responseBuilder.setResponseHeader(string);

        int descriptionSize = in.readInt();
        string=in.readBytes(descriptionSize).toString(charset);
        System.out.println(string+"----");
        responseBuilder.setDescription(string);
        int resultSize = in.readInt();
        responseBuilder.setResult(in.readBytes(resultSize).toString(charset));
        out.add(builder.setResponse(responseBuilder.build()).build());
        //自由返回对应类型才会被指定了对应类型的handle 捕捉处理
    }
}
3、业务handler
public class RpcConsumerHandler extends SimpleChannelInboundHandler<RpcMessage.RpcMessagePojo> implements Callable<RpcMessage.RpcMessagePojo> {


    ChannelHandlerContext ctx;
    RpcMessage.RpcMessagePojo request;
    RpcMessage.RpcMessagePojo response;

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext ctx, RpcMessage.RpcMessagePojo msg) throws Exception {
        response = msg;
        notify();
        //唤醒线程继续执行call
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("active");
        this.ctx = ctx;
    }



    @Override
    public synchronized RpcMessage.RpcMessagePojo call() throws Exception {
        ctx.writeAndFlush(request);
        //传送到服务端
        //开始等待
        wait();
        return response;
    }

    public void setRequest(RpcMessage.RpcMessagePojo request) {
        this.request = request;
        //设置请求
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("register");

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Writability");
    }
}
4、高层装类
public class Consumer {




    public Consumer (String hostName,int port)
    {
        ConsumerBootStrap consumerBootStrap=new ConsumerBootStrap();
        consumerBootStrap.boot(hostName,port);
    }


    void doFun(Object[] parameters, Class<?> clazz, Method method,Class<?> parameterTypes, Class<?>... interfaces) {

    }

   public RpcMessage.RpcMessagePojo  doFun(RpcMessage.RpcMessagePojo pojo, Object target) {
        InvocationHandle handle = new InvocationHandle(target);
        //为服务对象动态生成真实对象准备
        UserService userService = (UserService) ProxyFactory.getProxy(ClassLoader.getSystemClassLoader(), handle, UserService.class);
        return userService.login(pojo);
    }
}
5、代理工厂
public class ProxyFactory<T> {


  public static Object getProxy(ClassLoader classLoader, InvocationHandle invocationHandle, Class<?>... interfaces) {
        return Proxy.newProxyInstance(classLoader, interfaces, invocationHandle);
        //创建代理对象

    }
}
6、真实代理对象生成
public class InvocationHandle implements InvocationHandler {

    Object target;
    ExecutorService executors = Executors.newFixedThreadPool(NettyRuntime.availableProcessors());
    RpcConsumerHandler client = RpcChannelHandlerInitializerClient.handler;

    //传入我们的 业务handler 动态生成 真实代理对象
    public InvocationHandle(Object target) {
        this.target = target;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("do proxy");
        // Object result = method.invoke(target, args);
        //动态代理的本来功能 但是我们本地没有真实服务
        //启动代理类上的方法 (远程调用 rpc 从这里开始)
        client.setRequest((RpcMessage.RpcMessagePojo) args[0]);
        //传入参数

        RpcMessage.RpcMessagePojo response = executors.submit(client).get();
        return response;
        //进入线程池调用
        //返回结果

    }
}
7、handlerChain初始化
public class RpcChannelHandlerInitializerClient  extends ChannelInitializer<SocketChannel> {
public static RpcConsumerHandler handler;
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        handler=new RpcConsumerHandler();
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new RpcEncoder());
        pipeline.addLast(new RpcConsumerDecoder());
        pipeline.addLast(handler);

    }
}
8、测试类
public class NettyConsumerRpcTest {
    public static void main(String[] args) throws InstantiationException, IllegalAccessException {

        RpcMessage.RpcMessagePojo request= RpcMessage
                .RpcMessagePojo.newBuilder()
                .setRequest(RpcMessage.RpcRequest.newBuilder().setRequestHeader("0x8848")
                        .setReqClassName("")
                        .setMethodName("login")
                        .addParameterTypes(String.class.getName())
                        .addParameterTypes(Integer.class.getName())
                        .addParameters("hey lxc")
                        .addParameters("123456.aas")
                        .build())
                .setMessageType(RpcMessage.RpcMessagePojo.MessageType.REQUEST)
                .build();
        //为集合添加参数 使用add 方法

        Consumer consumer=new Consumer("127.0.0.1",9920);
        RpcMessage.RpcMessagePojo response = consumer.doFun(request, new UserServiceImpl());
        System.out.println(response.getResponse().getResult());
        System.out.println(response.getResponse().getDescription());
        System.out.println(response);
        //很奇怪的 为什么服务端直接输出没问题 客户端直接输出又不行 明明存进去是String

    }
}

二、provider

1、启动程序
public class ServerBootStrap {

    ServerBootstrap serverBootStrap;
    EventLoopGroup acceptor;
    EventLoopGroup worker;
    public ServerBootStrap() {
        serverBootStrap=new ServerBootstrap();
        acceptor=new NioEventLoopGroup();
        worker=new NioEventLoopGroup();
    }




     public void boot(String hostName,int port)
     {
         boot0(hostName,port,NioServerSocketChannel.class);
     }

    private void boot0(String hostName, int port, Class<? extends ServerChannel> channel) {
        try {
            ChannelFuture channelFuture = serverBootStrap.group(acceptor, worker)
                    .channel(channel)
                    .childHandler(new RpcChannelHandlerInitializer())
                    .bind(hostName, port);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException interruptedException) {
            interruptedException.printStackTrace();
        } finally {
            acceptor.shutdownGracefully();
            worker.shutdownGracefully();
        }

    }


}
3、协议解码
public class RpcProviderDecoder extends ReplayingDecoder<RpcMessage.RpcMessagePojo> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        RpcMessage.RpcMessagePojo.Builder builder = RpcMessage.RpcMessagePojo.newBuilder();
        RpcMessage.RpcRequest.Builder requestBuilder = RpcMessage.RpcRequest.newBuilder();
        Charset charset = CharsetUtil.UTF_8;
        //请求头
        int requestHeaderSize = in.readInt();
        requestBuilder.setRequestHeader(in.readBytes(requestHeaderSize).toString(charset));
        //类名
        int classNameSize = in.readInt();
        requestBuilder.setReqClassName(in.readBytes(classNameSize).toString(charset));
        //方法名
        int methodNameSize = in.readInt();
        requestBuilder.setMethodName(in.readBytes(methodNameSize).toString(charset));
        //参数类型
        int parameterTypesSize = in.readInt();

        String string = in.readBytes(parameterTypesSize).toString(charset);
        requestBuilder.addAllParameterTypes(Arrays.asList(string.replace("[", "").replace("]", "").trim().split(",")));
        //参数类型列表
        int parametersSize = in.readInt();
        string = in.readBytes(parametersSize).toString(charset);
        requestBuilder.addAllParameters(Arrays.asList(string.replace("[", "").replace("]", "").trim().split(",")));
        out.add(builder.setRequest(requestBuilder.build()).build());
        //转完传回handler chain
    }
}
4、handlerChain初始化
public class RpcChannelHandlerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new RpcEncoder());
        pipeline.addLast(new RpcProviderDecoder());
        pipeline.addLast(new RpcProviderHandler());
    }
}
6、测试类
public class NettyProviderRpcTest {

    public static void main(String[] args) throws ClassNotFoundException {
        ServerBootStrap serverBootStrap=new ServerBootStrap();
        serverBootStrap.boot("127.0.0.1",9920);
    }
}

7、动态代理

这里因为不会、省略了,是可以用cgli基于二进制去根据类名从类加载器生成的

三、通用类

1、协议编码handler
package handler;

import com.google.protobuf.ProtocolStringList;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import protobuf.RpcMessage;

import java.nio.charset.Charset;

public class RpcEncoder extends MessageToByteEncoder<RpcMessage.RpcMessagePojo> {
    @Override
    protected void encode(ChannelHandlerContext ctx, RpcMessage.RpcMessagePojo msg, ByteBuf out) throws Exception {
        Charset charset = CharsetUtil.UTF_8;
        if(msg.getMessageType().equals(RpcMessage.RpcMessagePojo.MessageType.REQUEST)) {
            //请求
            RpcMessage.RpcRequest request = msg.getRequest();
            out.writeInt(request.getRequestHeader().getBytes(charset).length);
            out.writeBytes(request.getRequestHeader().getBytes(charset));
            //请求头
            out.writeInt(request.getReqClassName().getBytes(charset).length);
            out.writeBytes(request.getReqClassName().getBytes(charset));
            //类名
            out.writeInt(request.getMethodName().getBytes().length);
            out.writeBytes(request.getMethodName().getBytes());
            //方法名
            ProtocolStringList requestParameterTypesList = request.getParameterTypesList();
            out.writeInt(requestParameterTypesList.toString().getBytes(charset).length);
            out.writeBytes(requestParameterTypesList.toString().getBytes(charset));
            //写入参数列表
            out.writeInt(request.getParametersList().toString().getBytes(charset).length);
            out.writeBytes(request.getParametersList().toString().getBytes(charset));
            //写入参数列表
        }
        else {
            //响应
            RpcMessage.Response response=msg.getResponse();
            out.writeInt(response.getResponseHeader().getBytes(charset).length);
            out.writeBytes(response.getResponseHeader().getBytes(charset));
            //响应头
            out.writeInt(response.getDescription().getBytes(charset).length);
            out.writeBytes(response.getDescription().getBytes(charset));
            //响应描述
            out.writeInt(response.getResult().getBytes(charset).length);
            out.writeBytes(response.getResult().getBytes(charset));
            //响应结果
        }
    }
}
2、服务接口
public interface UserService {
    RpcMessage.RpcMessagePojo login(RpcMessage.RpcMessagePojo msg);
}
3、服务实现类
public class UserServiceImpl implements UserService {
    @Override
    public RpcMessage.RpcMessagePojo login(RpcMessage.RpcMessagePojo msg) {
        //返回一个响应体
        return
                RpcMessage.RpcMessagePojo.newBuilder()
                        .setResponse(RpcMessage.Response.newBuilder()
                                .setResult("成功")
                                .setDescription("账号密码验证通过")
                                .setResponseHeader("0xff1")
                                .build())
                        .setMessageType(RpcMessage.RpcMessagePojo.MessageType.RESPONSE)
                        .build();

    }
}
4、协议结构
syntax = "proto3";

option java_package = 'protobuf';
option java_outer_classname = "RpcMessage";

//编译器的版本和依赖的版本必须保持一致

message RpcRequest {
    //请求头
    string request_header = 1;
    string req_class_name = 2;
    string method_name = 3;
    //参数类型
    repeated string parameter_types = 4;
    //    参数列表
    repeated string parameters = 5;
    int32 header_size=6;
    int32 class_name_size=7;
    int32 method_name_size=8;
    int32 parameter_types_size=9;
    int32 parameters_size=10;
}

message Response
{
    string response_header = 1;
    string description = 2;
    string result = 3;
    int32 response_header_size=4;
    int32 description_size=5;
    int32 result_size=6;
}

message RpcMessagePojo
{
    enum MessageType{
        REQUEST=0;
        RESPONSE=1;
    }
    MessageType message_type = 1;
    oneof message_body{
        Response response = 2;
        RpcRequest request = 3;
    }
}

四、测试结果

  • 启动这两个

  • image-20210925181059614

  • Provider控制台

image-20210925181210800

这里输出的是请求体

  • Consumer控制台
  • image-20210925181304207
    • 这里输出的是响应体
      • 不知道为什么直接输出对象输出的是码元字符串
      • 但是使用get方法拿取是可以获得正确结果的(一度怀疑解码器错了)
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-09-26 10:33:32  更:2021-09-26 10:33:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年6日历 -2024/6/27 2:06:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码