IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> RPC框架实战之手写RPC框架 第一章 -> 正文阅读

[网络协议]RPC框架实战之手写RPC框架 第一章

第一章

第一部分首先实现简单的RPC远程通信,流程如下:

  • 客户端调用接口的方法,通过代理将要调用的方法信息传输给服务端
  • 服务端通过socket监听,当接收到数据后,就创建一个线程去执行
  • 通过客户端传输过来的数据反射找到对应的方法,并执行获取到对应的数据
  • 将数据封装进response中返回给客户端
  • 客户端收到数据后打印。

因为是简单的实现,因此直接指定了服务端的地址,后续会进行优化完善。

让我们开始吧!

项目的整体模块如下:

  • myrpc
    • rpc-api:接口相关的类
    • rpc-common:通用模块,例如服务端和消费端传输的RpcRequest
    • rpc-core:项目的核心模块
    • test-client:客户端相关的类
    • test-server:服务端相关的类

定义接口

首先定义接口,也就客户端调用的接口

package com.lany.api;

/**
 * @author liuyanyan
 * @date 2021/12/21 14:19
 * 声明一个可以被外界调用的接口
 */
public interface HelloService {
    String hello(HelloObject helloObject);
}

参数我们用的是对象,因此还需要创建HelloObject,同时需要实现Serializable接口,因为后面传输时需要接口类型,因此需要序列化,没有序列化会报错。

package com.lany.api;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

/**
 * @author liuyanyan
 * @date 2021/12/21 14:20
 */
@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
    private Integer id;
    private String message;
}

传输对象

接着需要定义传输对象

定义该对象来让服务端唯一确定一个方法,因此需要的参数有接口的名字、方法的名字、参数列表以及参数的类型。

package com.lany.rpc.entity;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;

/**
 * @author liuyanyan
 * @date 2021/12/21 14:49
 * 数据传输时服务端和消费端通过该对象来确定消费端调用的是哪个方法
 */
@Data
@Builder
public class RpcRequest implements Serializable {
    /**
     * 待调用的接口名称
     */
    private String interfaceName;
    /**
     * 待调用方法名称
     */
    private String methodName;
    /**
     * 调用方法的参数
     */
    private Object[] parameters;
    /**
     * 调用方法的参数类型
     */
    private Class<?>[] paramTypes;
}

服务端通过这个对象找到对应的方法后执行将结果封装进RpcResponse中返回给消费因此还需要一个RpcResponse来接收方法执行的结果,返回时成功还是失败。

package com.lany.rpc.entity;

import com.lany.rpc.enumeration.ResponseCode;
import lombok.Data;

import java.io.Serializable;

/**
 * @author liuyanyan
 * @date 2021/12/21 14:57
 * 服务调用成功或者失败返回的响应信息
 */
@Data
public class RpcResponse<T> implements Serializable {

    /**
     * 响应状态码
     */
    private Integer statusCode;

    /**
     * 响应状态补充信息
     */
    private String message;

    /**
     * 响应数据
     */
    private T data;

    /**
     * 返回成功的相应数据
     *
     * @param data
     * @param <T>
     * @return
     */
    public static <T> RpcResponse<T> success(T data) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setStatusCode(ResponseCode.SUCCESS.getCode());
        response.setData(data);
        return response;
    }

    /**
     * 返回失败的响应信息
     *
     * @param code
     * @param <T>
     * @return
     */
    public static <T> RpcResponse<T> fail(ResponseCode code) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setStatusCode(code.getCode());
        response.setMessage(code.getMessage());
        return response;
    }
}

动态代理

消费端通过动态代理来将想要调用的数据传输给服务端,让消费端只管调用,而不用管具体的怎么实现,就像调用本地方法一样。这里使用jdk动态代理来实现。

package com.lany.rpc.client;

import com.lany.rpc.entity.RpcRequest;
import com.lany.rpc.entity.RpcResponse;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author liuyanyan
 * @date 2021/12/21 15:13
 */
public class RpcClientProxy implements InvocationHandler {

    private String host;
    private int port;

    public RpcClientProxy(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);

    }

    /**
     * 代理类在被调用时的动作,通过代理将RPCRequest对象发送出去
     * 然后获取到返回的数据
     * 用户只需要调用就行了,代理来帮忙发送数据给服务端。
     *
     * @param proxy
     * @param method
     * @param args
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest rpcRequest = RpcRequest.builder()
                .interfaceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameters(args)
                .paramTypes(method.getParameterTypes())
                .build();
        RpcClient rpcClient = new RpcClient();
        return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host, port)).getData();
    }
}

封装了一个getProxy()方法,来返回代理对象。在代理对象调用具体的方法的时候调用了invoke()方法来把消费端想要调用的方法数据传输给服务端并接受到服务端传回来的数据。

具体与服务端通信的逻辑在RpcClient类中的sendRequest方法中实现。

package com.lany.rpc.client;

import com.lany.rpc.entity.RpcRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;


/**
 * @author liuyanyan
 * @date 2021/12/21 15:12
 */
public class RpcClient {
    private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);

    /**
     * 通过socket发送给服务端,并接受到返回的数据
     * 通过Java的序列化方式在socket中传输
     *
     * @param rpcRequest
     * @param host
     * @param port
     * @return
     */
    public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
        try (Socket socket = new Socket(host, port)) {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();
            return objectInputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            logger.error("调用时有错误发生:" + e);
            return null;
        }
    }
}

反射调用

服务端通过反射进行调用对应的方法

服务端一直监听9000端口,当有请求连接时通过线程池创建线程让其执行通信的逻辑

package com.lany.rpc.server;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;

/**
 * @author liuyanyan
 * @date 2021/12/21 15:37
 */
public class RpcServer {

    private final ExecutorService threadPool;
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

    /**
     * 初始化线程池,当有连接时就新建一个线程去执行
     */
    public RpcServer() {
        // 核心线程数
        int corePoolSize = 5;
        // 最大线程数
        int maximumPoolSize = 50;
        // 空闲线程的等待时间
        long keepAliveTime = 60;
        // 阻塞队列
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
        // 线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();

        threadPool = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                workingQueue,
                threadFactory);
    }

    /**
     * 有连接建立时创建一个线程去执行 进行数据的传输
     *
     * @param service
     * @param port
     */
    public void register(Object service, int port) {
        try (ServerSocket serverSocket = new ServerSocket(port)) {
            logger.info("服务器正在启动...");
            Socket socket;
            while ((socket = serverSocket.accept()) != null) {
                logger.info("客户端连接!Ip为:" + socket.getInetAddress());
                threadPool.execute(new WorkerThread(socket, service));
            }
        } catch (IOException e) {
            logger.error("连接时有错误发生:", e);
        }
    }
}

这里RpcServer咱叔只能注册一个接口,只能对外提供一个接口的调用方法,下一章会进行优化。

测试运行

消费端测试代码:

package com.lany.test;

import com.lany.api.HelloObject;
import com.lany.api.HelloService;
import com.lany.rpc.client.RpcClientProxy;

/**
 * @author liuyanyan
 * @date 2021/12/21 16:04
 */
public class TestClient {
    public static void main(String[] args) {
        RpcClientProxy rpcClientProxy = new RpcClientProxy("127.0.0.1", 9000);
        HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
        HelloObject helloObject = new HelloObject(12, "this is a message");
        String hello = helloService.hello(helloObject);
        System.out.println(hello);
    }
}

服务端测试代码:

package com.lany.test;

import com.lany.rpc.server.RpcServer;

/**
 * @author liuyanyan
 * @date 2021/12/21 16:04
 */
public class TestServer {

    public static void main(String[] args) {
        HelloServiceImpl helloService = new HelloServiceImpl();
        RpcServer rpcServer = new RpcServer();
        rpcServer.register(helloService, 9000);

    }
}

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-01-14 02:22:35  更:2022-01-14 02:24:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/8 5:35:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码