IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 自己动手实现一个 RPC 框架 -> 正文阅读

[网络协议]自己动手实现一个 RPC 框架

为了加深对 RPC 框架的理解,自己动手做了个简单的 RPC 框架,名字随便起个,就叫 lsf 吧。

lsf GitHub 地址:https://github.com/buyulian/lsf

目录

一、整体架构

二、各模块含义

三、提供方demo

1、引入客户端 jar 包

2、api 包定义

3、 接口实现

?4、提供者 spring bean 配置

5、启动类

四、调用方 demo

?1、引入客户端 jar 包和提供者的 api 包

2、消费者 spring bean 配置

3、启动类

?五、具体实现

1、注册中心、消费者、生产证 spring bean标签定义

(1) lsf.xsd 文件

(2) 标签解析

2、核心数据结构

(1) rpc参数

?(2) 消费者bean

3、核心处理逻辑

(1) 消费核心处理逻辑

(2) 生产者核心处理逻辑

(3) 序列化接口定义

(4) FastJson autoType 序列化实现

六、代码启动方式


一、整体架构

二、各模块含义

?

三、提供方demo

1、引入客户端 jar 包

        <dependency>
            <groupId>com.me</groupId>
            <artifactId>lsf-client</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

2、api 包定义

3、 接口实现

?4、提供者 spring bean 配置

    <lsf:registry id="registry" host="127.0.0.1" port="25000"/>

    <lsf:provider id="helloWorldServiceLsf" alias="test"
                  interface="com.me.lsf.provider.api.HelloWorldService"
                  registry="registry"
                  ref="helloWorldService"/>

5、启动类

四、调用方 demo

?1、引入客户端 jar 包和提供者的 api 包

        <dependency>
            <groupId>com.me</groupId>
            <artifactId>lsf-provider-api</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.me</groupId>
            <artifactId>lsf-client</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

2、消费者 spring bean 配置

    <lsf:registry id="registry" host="127.0.0.1" port="25000"/>

    <lsf:consumer id="helloWorldService"
                  interface="com.me.lsf.provider.api.HelloWorldService"
                  registry="registry"
                  alias="test"/>

3、启动类

?五、具体实现

1、注册中心、消费者、生产证 spring bean标签定义

(1) lsf.xsd 文件

<?xml version="1.0" encoding="UTF-8" ?>
<schema xmlns="http://www.w3.org/2001/XMLSchema"
        targetNamespace="http://www.me.com/schema/lsf"
        elementFormDefault="qualified">

    <element name="registry">
        <complexType>
            <attribute name="id" type="string"/>
            <attribute name="host" type="string"/>
            <attribute name="port" type="string"/>
        </complexType>
    </element>
    <element name="provider">
        <complexType>
            <attribute name="id" type="string"/>
            <attribute name="alias" type="string"/>
            <attribute name="interface" type="string"/>
            <attribute name="ref" type="string"/>
            <attribute name="registry" type="string"/>
        </complexType>
    </element>
    <element name="consumer">
        <complexType>
            <attribute name="id" type="string"/>
            <attribute name="alias" type="string"/>
            <attribute name="interface" type="string"/>
            <attribute name="registry" type="string"/>
        </complexType>
    </element>
</schema>

(2) 标签解析

2、核心数据结构

(1) rpc参数

public class RpcParam {

    /**
     * 调用类
     */
    private String rClass;
    
    /**
     * 调用方法
     */
    private String method;
    
    /**
     * 参数列表
     */
    private String[] args;
    
    /**
     * 序列化方式
     */
    private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();
}

?(2) 消费者bean


public class Consumerbean {

    /**
     * 调用接口
     */
    private Class interfaceClass;

    /**
     * 调用接口名
     */
    private String interfaceName;

    /**
     * 别名
     */
    private String alias;

    /**
     * 预留
     */
    private Boolean register;

    /**
     * 存活生产者连接
     */
    private List<LsfConnection> aliveConnectionList;

    /**
     * 手工指定生产者连接
     */
    private List<LsfConnection> fixedConnectionList;

    /**
     * 父对象
     */
    private ParentObject parentObject;

    /**
     * 序列化方式
     */
    private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();

    /**
     * 注册中心 bean
     */
    private RegistryBean registryBean;
}

3、核心处理逻辑

(1) 消费核心处理逻辑

public class ConsumerBeanInvocationHandler implements InvocationHandler {

    private static Logger logger = LoggerFactory.getLogger(ConsumerBeanInvocationHandler.class);

    private Consumerbean consumerbean;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        //获取调用的类
        Class tClass = consumerbean.getInterfaceClass();

        String canonicalName = tClass.getCanonicalName();
        String methodName = method.getName();

        ParentObject parentObject = consumerbean.getParentObject();

        boolean isNoRpc = isNoRpc(methodName, parentObject);

        if (isNoRpc) {
            return method.invoke(parentObject, args);
        }
        logger.debug("执行了 rpc 调用, class {}, method {}, args {}",canonicalName, methodName, Arrays.toString(args));

        //获取序列化方式
        String serializeType = consumerbean.getSerializeType();
        
        //组装 rpc 参数
        RpcParam rpcParam = getRpcParam(args, canonicalName, methodName, serializeType, method);

        //获取可用的生产者连接
        LsfConnection connection = consumerbean.getConnection();

        //调用并得到字符串结果
        String rpcResponseParamStr = getBody(rpcParam, connection);

        RpcResponseParam rpcResponseParam = JSON.parseObject(rpcResponseParamStr, RpcResponseParam.class);

        if (ErrorCodeEnum.SUCCESS.getCode().equals(rpcResponseParam.getCode())) {
            //获取序列化处理类
            LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);
            //反序列化结果
            Object result = lsfSerialize.deSerializeResult(method, rpcResponseParam.getResult());
            return result;
        } else {
            //生产者抛出的异常处理
            throw new RuntimeException(rpcResponseParam.getException());
        }

    }

    private String getBody(RpcParam rpcParam, LsfConnection connection) {
        LsfClient client = LsfHttpClientFactory.getClient();
        String host = connection.getHost();
        int port = connection.getPort();
        ClientParam clientParam = new ClientParam();
        clientParam.setHost(host);
        clientParam.setPort(port);
        clientParam.setUrl("/");

        String rpcBody = JSON.toJSONString(rpcParam);
        clientParam.setBody(rpcBody);
        // netty 执行网络调用
        return client.post(clientParam);
    }


    private RpcParam getRpcParam(Object[] args, String canonicalName, String methodName, String serializeType, Method method) {
        RpcParam rpcParam = new RpcParam();

        rpcParam.setrClass(canonicalName);
        rpcParam.setMethod(methodName);
        rpcParam.setSerializeType(serializeType);

        //获取序列化方式
        LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);

        //序列化参数
        String[] argsStrs = lsfSerialize.serializeParam(method, args);

        rpcParam.setArgs(argsStrs);
        return rpcParam;
    }

    private boolean isNoRpc(String methodName, ParentObject parentObject) {
        boolean isNoRpc = false;
        Method[] declaredMethods = parentObject.getClass().getDeclaredMethods();
        for (Method declaredMethod : declaredMethods) {
            if (declaredMethod.getName().equals(methodName)) {
                isNoRpc = true;
                break;
            }
        }
        return isNoRpc;
    }
}

(2) 生产者核心处理逻辑

public static String dealRequest(String body) {

        logger.info("center asyncDeal request {}",body);

        //解析 rpc 调用参数
        RpcParam rpcParam = JSON.parseObject(body, RpcParam.class);

        String rClassStr = rpcParam.getrClass();
        Object provider = getProvider(rClassStr);

        if (provider == null) {
            throw new RuntimeException("没有 对应的 provider");
        }

        String method = rpcParam.getMethod();
        String[] argsStr = rpcParam.getArgs();

        Class<?> aClass = provider.getClass();

        String resultStr = "error";
        RpcResponseParam rpcResponseParam = new RpcResponseParam();
        try {
            Method declaredMethod = null;

            //通过反射获取rpc调用的方法
            Method[] declaredMethods = aClass.getDeclaredMethods();
            for (Method declaredMethod1 : declaredMethods) {
                if (declaredMethod1.getName().equals(method)) {
                    declaredMethod = declaredMethod1;
                    break;
                }
            }

            if (declaredMethod == null) {
                throw new RuntimeException("没有这个方法 " + method);
            }

            //获取序列化实现类
            LsfSerialize lsfSerialize = LsfSerializeFactory.get(rpcParam.getSerializeType());

            //反序列参数
            Object[] inArgs = lsfSerialize.deSerializeParam(declaredMethod, argsStr);

            //调用实现类
            Object result = declaredMethod.invoke(provider, inArgs);
            
            //序列化执行结果
            String result1 = lsfSerialize.serializeResult(declaredMethod, result);
            rpcResponseParam.setCode(ErrorCodeEnum.SUCCESS.getCode());
            rpcResponseParam.setResult(result1);

        } catch (Exception e) {
            //若原始方法发生异常,则封装异常信息并返回给消费者
            rpcResponseParam.setCode(ErrorCodeEnum.EXCEPTION.getCode());
            rpcResponseParam.setException(e.toString());
            logger.error("lsf rpc exception rpc param {}", JSON.toJSONString(rpcParam), e);
        }
        resultStr = JSON.toJSONString(rpcResponseParam);

        logger.info("center asyncDeal result {}", resultStr);
        return resultStr;
    }

(3) 序列化接口定义

对扩展开放。新的的序列化方式可通过实现这个接口,并注册到序列化工厂里去实现。

public interface LsfSerialize {

    String[] serializeParam(Method method, Object[] args);

    Object[] deSerializeParam(Method method, String[] contents);

    String serializeResult(Method method, Object result);

    Object deSerializeResult(Method method, String content);

}

(4) FastJson autoType 序列化实现


public class JsonAutoTypeSerialize implements LsfSerialize {

    private static Logger logger = LoggerFactory.getLogger(JsonAutoTypeSerialize.class);

    {
        ParserConfig.getGlobalInstance().setSafeMode(false);
        ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
    }

    @Override
    public String[] serializeParam(Method method, Object[] args) {

        String[] argsStrs = null;
        if (args != null) {
            argsStrs = new String[args.length];
            for (int i = 0; i < args.length; i++) {
                argsStrs[i] = JSON.toJSONString(args[i], SerializerFeature.WriteClassName);
            }
        }
        return argsStrs;
    }

    @Override
    public Object[] deSerializeParam(Method method, String[] contents) {
        Class<?>[] parameterTypes = method.getParameterTypes();
        return getInArgs(contents, parameterTypes, method);
    }

    private Object[] getInArgs(String[] strs, Class<?>[] parameterTypes, Method method) {

        if (strs == null) {
            return null;
        }
        Type[] genericParameterTypes = method.getGenericParameterTypes();
        Object[] inArgs = new Object[strs.length];
        for (int i = 0; i < parameterTypes.length; i++) {
            Class<?> parameterType = parameterTypes[i];
            inArgs[i] = getObjectSuper(strs[i], parameterType, genericParameterTypes[i]);
        }
        return inArgs;
    }

    @Override
    public String serializeResult(Method method, Object result) {
        return JSON.toJSONString(result, SerializerFeature.WriteClassName);
    }

    @Override
    public Object deSerializeResult(Method method, String content) {
        Type genericReturnType = method.getGenericReturnType();
        Class<?> returnType = method.getReturnType();
        return getObjectSuper(content, returnType, genericReturnType);
    }

    private Object getObjectSuper(String content, Class<?> returnType, Type genericReturnType) {
        Object result = JSON.parseObject(content, returnType);
        return result;
    }
}

六、代码启动方式

先启动注册中心,然后启动生产者,最后启动消费者。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-09-13 11:52:32  更:2022-09-13 11:53:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/19 6:04:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码