IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 用java实现RPC编解码 -> 正文阅读

[网络协议]用java实现RPC编解码



前言

编解码器在RPC框架中处于最基础也是最重要的部分之一,上一篇文章的编写中,并没有对数据进行过深入的编解码,只是使用字符串进行序列化然后进行的传输,在实际的项目中是不能这么做的,所以这篇文章主要对编解码这块进行了优化,为了保证代码的完整性,会将所有的代码在这片文章中进行粘贴

一、jar包引用

由于不在将对象转换成字符串进行传输,所以不在需要使用fastJson,而是使用对象序列化工具类

 		<dependency>
            <groupId>com.caucho</groupId>
            <artifactId>hessian</artifactId>
            <version>3.1.5</version>
        </dependency>

二、公共基础类

1.定义一个请求协议封装类

import java.util.Arrays;

public class RequestObject {
    /**
     * 接口名称
     */
    private String interfaceName;
    /**
     * 方法名称
     */
    private String methodName;
    /**
     * 方法参数签名
     */
    private String[] argsSig;
    /**
     * 方法参数值,方法参数可能是一个复杂的对象,所以添加transient关机字在对对象进行序列化时,参数不
     * 参加序列化,而是单独进行序列化
     */
    private transient Object[] args;
    
    public RequestObject(String interfaceName, Method method, Object[] args) {
        this.interfaceName = interfaceName;
        this.methodName = method.getName();
        this.args = args;
        String[] argsSig = new String[args.length];
        for (int i = 0; i < args.length; i++) {
            argsSig[i] = args[i].getClass().getTypeName();
        }
        this.argsSig = argsSig;
    }
}

2.定义一个RPC请求对象

import java.util.Arrays;

public class RpcRequest {
    private RequestObject requestObject;
    /**
     * 总长度
     */
    private int count = 8;
    /**
     * 接口名称长度
     */
    private short interfaceNameLen;
    /**
     * 方法名称长度
     */
    private short methodNameLen;
    /**
     * 包体长度
     */
    private int contentLen;
    /**
     * 方法名称序列化数据
     */
    private byte[] interfaceNameByte;
    /**
     * 接口名称序列化数据
     */
    private byte[] methodNameByte;
    /**
     * 包体序列化数据
     */
    private byte[] contentByte;
    }

3.定义响应数据对象

public class ResponseObject {
    /**
     * 返回对象签名
     */
    private String responseSig;
    /**
     * 返回值
     */
    private Object result;

    public ResponseObject(Object result) {
        this.result = result;
        this.responseSig = result.getClass().getTypeName();
    }
}

4.定义响应数据包体对象

public class RpcResponse {
    /**
     * 响应长度
     */
    private int responseLen = 4;
    /**
     * 包体
     */
    private byte[] content;

    public void setContent(byte[] content) {
        this.content = content;
        if (null != content) {
            this.responseLen+= content.length;
        }
    }
}

5.定义一个自定义的SerializerFactory

import com.caucho.hessian.io.JavaSerializer;
import com.caucho.hessian.io.Serializer;
import com.caucho.hessian.io.SerializerFactory;

public class MySerializerFactory extends SerializerFactory {
    @Override
    protected Serializer getDefaultSerializer(Class cl) {
        if (this._defaultSerializer != null) {
            return this._defaultSerializer;
        }
        return new JavaSerializer(cl);
    }
}

三、服务消费端代码编写(Consumer)

public class RequestProxyHandler implements InvocationHandler, Serializable {
    /**
     * 发送数据使用的socket
     */
    private final Socket socket;
    /**
     * 被代理的对象信息
     */
    private final String interfaceName;

    public RequestProxyHandler(Socket socket, String interfaceName) {
        this.socket = socket;
        this.interfaceName = interfaceName;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        /**
         * 1.构建出请求协议对象
         */
        RequestObject requestObject = new RequestObject(interfaceName, method, args);
        RpcRequest rpcRequest = new RpcRequest(requestObject);
        /**
         * 获取socket的输出流
         */
        OutputStream outputStream = socket.getOutputStream();
        /**
         * 序列化接口名称
         */
        String interfaceName = requestObject.getInterfaceName();
        rpcRequest.setInterfaceNameByte(interfaceName.getBytes(StandardCharsets.UTF_8));
        /**
         * 序列化接口名称
         */
        String methodName = requestObject.getMethodName();
        rpcRequest.setMethodNameByte(methodName.getBytes(StandardCharsets.UTF_8));
        /**
         * 序列化包体
         */
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Hessian2Output hessian2Output = new Hessian2Output(byteArrayOutputStream);
        /**
         * 由于Hessian2Output默认的序列化工厂会检查类是否实现了Serializable接口,所采用自己实现的序列化工厂
         */
        hessian2Output.setSerializerFactory(new MySerializerFactory());
        try {
            hessian2Output.writeObject(requestObject);
            if (args.length > 0) {
                for (Object arg : requestObject.getArgs()) {
                    hessian2Output.writeObject(arg);
                }
            }
            hessian2Output.close();
            rpcRequest.setContentByte(byteArrayOutputStream.toByteArray());
            byteArrayOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        /**
         * 使用byteBuffer对byte[]进行拼接
         */
        ByteBuffer byteBuffer = ByteBuffer.allocate(rpcRequest.getCount());
        byteBuffer.putShort(rpcRequest.getInterfaceNameLen())
                .putShort(rpcRequest.getMethodNameLen()).putInt(rpcRequest.getContentLen())
                .put(rpcRequest.getInterfaceNameByte()).put(rpcRequest.getMethodNameByte()).put(rpcRequest.getContentByte());
        /**
         * 将转换好的byte[]写入到输出流中
         */
        outputStream.write(byteBuffer.array());
        /**
         * 调用flush方法将数据全部发送出去
         */
        outputStream.flush();
        /**
         * 阻塞获取服务端返回的数据流
         */
        InputStream inputStream = socket.getInputStream();
        /**
         * 读取包体长度
         */
        byte[] contentLength = new byte[4];
        inputStream.read(contentLength);
        ByteBuffer lengthBuffer = ByteBuffer.wrap(contentLength);
        /**
         * 读取整个包体
         */
        byte[] content = new byte[lengthBuffer.getInt()];
        inputStream.read(content);
        /**
         * 进行反序列化
         */
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
        Hessian2Input hessian2Input = new Hessian2Input(byteArrayInputStream);
        hessian2Input.setSerializerFactory(new MySerializerFactory());
        Object object = hessian2Input.readObject();
        if (object instanceof ResponseObject) {
            ResponseObject responseObject = (ResponseObject) object;
            return responseObject.getResult();
        }
        /**
         * 如果没有拿到返回值则直接返回空,这里也可以根据实际业务需求做其他操作,或者抛出异常
         */
        return null;
    }
}

消费端测试代码

public class Consumer {
    public static void main(String[] args) throws IOException {
        /**
         * 创建socket链接
         */
        Socket socket = new Socket("127.0.0.1", 8888);
        /**
         * 创建动态代理对象
         */
        HelloRpc helloRpc = (HelloRpc) Proxy.newProxyInstance(HelloRpc.class.getClassLoader()
                , new Class[]{HelloRpc.class}, new RequestProxyHandler(socket, HelloRpc.class.getTypeName()));
        /**
         * 获取到返回结果并打印
         */
        String result = helloRpc.hello("张三");
        System.out.println(result);
    }
}

四.生产者端代码

public class Provider {
    public static void main(String[] arg) throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException {
        /**
         * 用map存储接口和实现类的对于信息
         */
        Map<String, Object> providerCache = new HashMap<>();
        /**
         * 将测试接口和实现类信息,注册到map中
         */
        providerCache.put(HelloRpc.class.getName(), new HelloRpcImpl());
        /**
         * 创建ServerSocket,绑定8888端口
         */
        ServerSocket serverSocket = new ServerSocket(8888);
        while (true) {
            /**
             * 等待客户端链接
             */
            Socket clientSocket = serverSocket.accept();
            /**
             * 获取数据流
             */
            InputStream inputStream = clientSocket.getInputStream();
            /**
             * 读取字段长度信息
             */
            byte[] length = new byte[8];
            inputStream.read(length);
            ByteBuffer allLengthByte = ByteBuffer.wrap(length);
            /**
             * 读取接口名称
             */
            byte[] interfaceNameByte = new byte[allLengthByte.getShort()];
            inputStream.read(interfaceNameByte);
            String interfaceName = new String(interfaceNameByte, StandardCharsets.UTF_8);
            /**
             * 如果接口名称为空字符串或者不在缓存中则结束本次请求处理
             */
            if ("".equals(interfaceName) || null == providerCache.get(interfaceName)) {
                continue;
            }
            /**
             * 读取接口名称
             */
            byte[] methodNameByte = new byte[allLengthByte.getShort()];
            String methodName = new String(methodNameByte, StandardCharsets.UTF_8);
            /**
             * 读取包体
             */
            byte[] content = new byte[allLengthByte.getInt()];
            Hessian2Input hessian2Input = new Hessian2Input(new ByteArrayInputStream(content));
            hessian2Input.setSerializerFactory(new MySerializerFactory());
            Object object = hessian2Input.readObject();
            if (object instanceof RequestObject) {
                RequestObject requestObject = (RequestObject) object;
                String[] argsSig = requestObject.getArgsSig();
                Object[] args = new Object[argsSig.length];
                Class<?>[] argsClazz = new Class<?>[argsSig.length];
                for (int i = 0; i < argsSig.length; i++) {
                    /**
                     * 这里copy spring的ClassUtils进行类的加载
                     */
                    Class<?> aClass = ClassUtils.forName(argsSig[i], Thread.currentThread().getContextClassLoader());
                    argsClazz[i] = aClass;
                    args[i] = hessian2Input.readObject(aClass);
                }
                requestObject.setArgs(args);
                Object o = providerCache.get(interfaceName);
                Method declaredMethod = o.getClass().getDeclaredMethod(methodName, argsClazz);
                Object result = declaredMethod.invoke(o, args);
                /**
                 * 构建响应对象
                 */
                ResponseObject responseObject = new ResponseObject(result);
                RpcResponse rpcResponse = new RpcResponse();
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                Hessian2Output hessian2Output = new Hessian2Output(byteArrayOutputStream);
                hessian2Output.setSerializerFactory(new MySerializerFactory());
                hessian2Output.writeObject(responseObject);
                hessian2Output.close();
                rpcResponse.setContent(byteArrayOutputStream.toByteArray());
                byteArrayOutputStream.close();
                ByteBuffer byteBuffer = ByteBuffer.allocate(rpcResponse.getResponseLen());
                ByteBuffer put = byteBuffer.putInt(rpcResponse.getResponseLen()).put(rpcResponse.getContent());
                OutputStream outputStream = clientSocket.getOutputStream();
                outputStream.write(put.array());
                outputStream.flush();
            }

        }
    }
}

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-07-04 23:19:41  更:2022-07-04 23:19:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 15:40:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码