IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 手写RPC学习笔记 -> 正文阅读

[网络协议]手写RPC学习笔记

本博客文章(学习笔记)导航 (点击这里访问)
在这里插入图片描述

一、RPC概况

1.1 RPC简介

RPC:远程过程调用,是分布式系统常见的一种通信方法,从跨进程到跨物理机已经有十几年的历史
优点:可以将远程调用变成像调用本地方法一样简单

1.2 系统交互

系统的交互方式有两种:直接交互 和 间接交互(中间件交互),下面介绍一下这两种交互方式

1.2.1直接交互

image.png

1.2.2 间接交互

image.png

1.3 各种RPC框架的对比

image.png

1.4 RPC核心原理

1.4.1 RPC调用的原理

step1 server把自己的服务注册到registery
step2 client定于redistry,获取自己想知道的服务信息
step3 如果server信息发生了改变,registory会通知订阅者信息发生了改变
step4 client要发起调用,就可以根据从registory中获取的信息直接调用即可
image.png

1.4.2 Call的调用过程

step1 client调用接口方法(stub中的接口方法)
step2 将调用信息序列号,以便于在网络上传输
step3 client和server之间建立网络连接
step4 server反序列化传输对象
step5 server的stub查找要调用的方法以及参数
step6 server找到实际实现类的对象,通过反射获取执行结果,再次发送到stub上
step7 stub序列化传输对象
step8 server和client建立网络连接
step9 client反序列化传输对象
step10 client获取调用结果
image.png

1.5 技术栈

image.png
image.png
image.png
image.png

二、RPC手动实现

2.1 创建工程、制定协议、通用工具方法

2.1.1 项目类图

一共5大模块

image.png

2.1.2 项目搭建

step1 新建项目

step2 在项目下,新建6个模块,删除src文件
client:客户端模块
server:服务端模块
codec:编码解码模块
common:通用模块
propto:协议模块
transport:网络通信模块
image.png

2.1.3 父依赖编写以及导入


    <!--子模块依赖管理 子模块如果需要 就自行导入-->
    <dependencyManagement>
        <dependencies>
            <!--io包-->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.5</version>
            </dependency>
            <!--jetty 网络通信-->
            <dependency>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-servlet</artifactId>
                <version>9.4.19.v20190610</version>
            </dependency>
            <!--序列化-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.44</version>
            </dependency>
        </dependencies>
    </dependencyManagement>


    <!--所有模块都公用的依赖,其他模块不需要导入了-->
    <dependencies>
        <!--单元测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.26</version>
        </dependency>
        <!--日志实现-->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>

    </dependencies>

2.1.4 编译的版本控制

    <!--编译版本控制-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.1.5 子模块版本统一配置

    <!--通过properties统一控制版本 通过${}来控制版本--> 
    <properties>
        <java.version>1.8</java.version>
        <common.version>2.5</common.version>
        <jetty.version>9.4.19.v20190610</jetty.version>
        <fastjson.version>1.2.44</fastjson.version>
        <lombok.version>1.18.8</lombok.version>
        <slf4j.version>1.7.26</slf4j.version>
        <logback.version>1.2.3</logback.version>
        <junit.version>4.12</junit.version>
    </properties>

    <!--子模块依赖管理-->
    <dependencyManagement>
        <dependencies>
            <!--io包-->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>${common.version}</version>
            </dependency>
            <!--jetty 网络通信-->
            <dependency>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-servlet</artifactId>
                <version>${jetty.version}</version>
            </dependency>
            <!--序列化-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <!--所有模块都公用的依赖-->
    <dependencies>
        <!--单元测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!--日志实现-->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>

    </dependencies>

    <!--编译版本控制-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.1.6 Lombok插件下载

去IDEA中的plugin中查找下载即可
image.png

2.1.8 设置annotation

image.png

2.2 协议模块编写

协议模块主要包括 网络结点、请求对象、响应对象、服务类

2.2.1 网络通信端点类

package com.smgeek.gkrpc;


import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * 表示网络传输的一个端点
 */
@Data
@AllArgsConstructor
public class Peer {
    private  String host;
    private int port;
}

2.2.2 服务类

package com.smgeek.gkrpc;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 表示服务
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceDescriptor {
    private String clazz;
    private String method;
    private String returnType;
    private String[] parameterTypes;
}

2.2.3 请求类

package com.smgeek.gkrpc;


import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * 表示请求的类
 */

@Data
public class Request {
    private ServiceDescriptor service;
    private  Object[] parameters;
}

2.2.4 响应类

package com.smgeek.gkrpc;

import lombok.Data;

/**
 * 表示响应的类
 */
@Data
public class Response {
    /**
     * 服务返回编码
     * 0 成功
     * 非0 失败
     */
    private  int code=0;

    /**
     * 具体错误的响应信息
     */
    private  String message="ok";

    /**
     * 返回数据
     */
    private  Object data;

}

2.3 通用模块编写

通用模块只有一个动态代理

2.3.1 JDK动态代理类

动态代理

package com.smgeek.gkrpc.common.utils;


import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;

/**
 * 反射工具类
 */

public class ReflectionUtils {

    /**
     * 根据class创建对象
     * @param clazz 带创建的对象类
     * @param <T> 对象类型
     * @return 创建好的对象
     */
    public static <T>T newInstance(Class<T> clazz){
        try{
            return  clazz.newInstance();
        }catch (Exception e){
            throw new IllegalStateException(e);
        }
    }

    /**
     * 获取某个clazz的共有方法
     * @param clazz 任意类
     * @return 当前类的所有方法
     */
    public static Method[] getPublicMethods(Class clazz){
        //获取这个对象的所有方法
        Method[] methods=clazz.getDeclaredMethods();
        List<Method> pmethods=new ArrayList<>();
        for(Method m:methods){
            if(Modifier.isPublic(m.getModifiers())){
                pmethods.add(m);
            }
        }
        return pmethods.toArray(new Method[0]);

    }

    /**
     *  调用指定对象的指定方法
     *
     * @param obj 被调用方法的对象
     * @param method 被调用的方法
     * @param args 方法参数
     * @return 返回代理生成的对象
     */
    public static Object invoke(Object obj,Method method,Object... args){
        try{
            return method.invoke(obj,args);
        }catch (Exception e){
            throw  new IllegalStateException(e);
        }
    }
    
}

2.3.2 代理类测试

package com.smgeek.gkrpc.common.utils;


import org.junit.Test;

public class TestClass {


    private String a(){
        return "a";
    }


    public String b(){
        return "b";
    }


    protected  String c(){
        return "c";
    }
}
package com.smgeek.gkrpc.common.utils;

import org.junit.Test;

import java.lang.reflect.Method;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class ReflectionUtilsTest {

    @Test
    public void newInstance(){
        TestClass t=ReflectionUtils.newInstance(TestClass.class);
        assertNotNull(t);
    }

    @Test
    public void getPublicMethods(){
        Method[] methods=ReflectionUtils.getPublicMethods(TestClass.class);
        assertEquals(1,methods.length);
        String mname=methods[0].getName();
        assertEquals("b",mname);
    }

    @Test
    public void invoke(){
        Method[] methods=ReflectionUtils.getPublicMethods(TestClass.class);
        Method b=methods[0];
        TestClass t=new TestClass();
        Object r = ReflectionUtils.invoke(t, b);
        assertEquals("b",r);
    }
}

2.4 序列化模块编写

便于对象在网络上传输,需要序列化和反序列化

2.4.1 编码和解码接口

package com.smgeek.gkrpc.codec;
/**
 * 序列化 对象转化为byte数组
 */
public interface Encoder {
    byte[] encode(Object obj);
}

package com.smgeek.gkrpc.codec;
/**
 * 反序列化 二进制数组转对象
 */
public interface Decoder {
    <T>T decode(byte[] bytes,Class<T> clazz);
}

2.4.2 pom中引入fastjson

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
    </dependencies>

2.4.3 序列化、反序列化接口实现

package com.smgeek.gkrpc.codec;
import com.alibaba.fastjson.JSON;
/**
* 基于json的序列化实现
*/
public class JSONEncoder implements Encoder {
    @Override
    public byte[] encode(Object obj) {
        return JSON.toJSONBytes(obj);
    }
}
package com.smgeek.gkrpc.codec;
import com.alibaba.fastjson.JSON;
/**
* 基于JSON 的反序列化
*/
public class JSONDecoder implements Decoder{
    @Override
    public <T> T decode(byte[] bytes, Class<T> clazz) {
        return JSON.parseObject(bytes,clazz);
    }
}

2.4.4 接口实现的测试

package com.smgeek.gkrpc.codec;


import lombok.Data;

@Data
public class TestBean {
    private  String name;
    private int age;
}

package com.smgeek.gkrpc.codec;

import org.junit.Test;

import java.lang.reflect.Method;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class JSONEncoderTest {

    @Test
    public void encode(){
        Encoder encoder=new JSONEncoder();
        TestBean bean=new TestBean();
        bean.setName("smgeek");
        bean.setAge(18);
        byte[] bytes=encoder.encode(bean);
        assertNotNull(bytes);
    }
}
package com.smgeek.gkrpc.codec;

import java.lang.reflect.Method;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;

public class JSONDecoderTest {

    @Test
    public void decode() {
        Encoder encoder=new JSONEncoder();
        TestBean bean=new TestBean();
        bean.setName("smgeek");
        bean.setAge(18);
        byte[] bytes=encoder.encode(bean);

        Decoder decoder=new JSONDecoder();
        TestBean bean2 = decoder.decode(bytes, TestBean.class);
        assertEquals(bean.getName(),bean2.getName());
        assertEquals(bean.getAge(),bean2.getAge());
    }

}

2.5 网络模块

2.5.1 依赖引入

    <dependencies>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-servlet</artifactId>
        </dependency>
        <!--引入协议模块-->
        <dependency>
            <!--父模块的group id-->
            <groupId>hand.candy</groupId>
            <!--父模块下的 协议模块-->
            <artifactId>gk-rpc-proto</artifactId>
            <!--使用工程版本号-->
            <version>${project.version}</version>
        </dependency>
    </dependencies>

2.5.2 客户端网络传输接口

客户端主要作用

  • 1 创建连接
  • 2 发送数据 并且等待响应
  • 3 关闭连接
package com.smgeek.gkrpc.transport;

import com.smgeek.gkrpc.Peer;

import java.io.InputStream;

/**
* 1 创建连接
* 2 发送数据 并且等待响应
* 3 关闭连接
*/

public interface TransportClient {
    
    void connect(Peer peer);
    
    InputStream write(InputStream data);
    
    void close();
}

2.5.3 服务端网络传输接口

服务端主要作用

  • 1 启动 监听端口
  • 2 接受请求
  • 3 关闭监听
package com.smgeek.gkrpc.transport;


/**
* 1 启动 监听端口
* 2 接受请求
* 3 关闭监听
*/
public interface TransportServer {
    void init(int port,RequestHandler handler);
    void start();
    void stop();
}

2.5.4 请求处理类接口

package com.smgeek.gkrpc.transport;

import java.io.InputStream;
import java.io.OutputStream;

/**
* 处理网络请求的handler
*/
public interface RequestHandler {
    void onRequest(InputStream recive, OutputStream toResp);
}

2.5.5 接口实现

package com.smgeek.gkrpc.transport;

import com.smgeek.gkrpc.Peer;
import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;


/**
 * HTTP 客户端
 */
public class HTTPTransportClient implements TransportClient {

    private String url;

    @Override
    public void connect(Peer peer) {
        this.url="http://"+peer.getHost()+":"+peer.getPort();
    }

    @Override
    public InputStream write(InputStream data) {
        try {
            HttpURLConnection httpConn =(HttpURLConnection)new URL(url).openConnection();
            httpConn.setDoOutput(true);
            httpConn.setDoInput(true);
            httpConn.setRequestMethod("POST");
            httpConn.connect();
            IOUtils.copy(data,httpConn.getOutputStream());
            int resultCode=httpConn.getResponseCode();

            if(resultCode==HttpURLConnection.HTTP_OK){
                return httpConn.getInputStream();
            }else{
                return  httpConn.getErrorStream();
            }

        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    @Override
    public void close() {

    }
}

package com.smgeek.gkrpc.transport;


import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * HTTP 服务端
 */
@Slf4j
public class HTTPTransportServer implements TransportServer{

    private RequestHandler handler;
    private Server server;


    @Override
    public void init(int port, RequestHandler handler) {
        this.handler=handler;
        this.server=new Server(port);

        //servlet 接收请求
        ServletContextHandler ctx=new ServletContextHandler();
        server.setHandler(ctx);

        ServletHolder holder=new ServletHolder(new ResquestServlet());
        ctx.addServlet(holder,"/*");


    }

    @Override
    public void start() {
        try {
            server.start();
            server.join();
        } catch (Exception e) {
            log.error(e.getMessage(),e);
        }

    }

    @Override
    public void stop() {
        try {
            server.stop();
        } catch (Exception e) {
            log.error(e.getMessage(),e);
        }
    }


    class  ResquestServlet extends HttpServlet{

        @Override
        protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws  IOException {
            log.info("client connect");

            InputStream in=req.getInputStream();
            OutputStream out=resp.getOutputStream();

            if(handler!=null){
                handler.onRequest(in,out);
            }

            out.flush();
        }
    }
}

2.6 服务端模块

2.6.1 依赖引入

<dependencies>
  <dependency>
    <groupId>hand.candy</groupId>
    <artifactId>gk-rpc-proto</artifactId>
    <version>1.0-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>hand.candy</groupId>
    <artifactId>gk-rpc-codec</artifactId>
    <version>1.0-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>hand.candy</groupId>
    <artifactId>gk-rpc-transport</artifactId>
    <version>1.0-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>hand.candy</groupId>
    <artifactId>gk-rpc-common</artifactId>
    <version>1.0-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
  </dependency>
 </dependencies>

2.6.2 重写ServiceDescriptor

package com.smgeek.gkrpc;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Objects;

/**
 * 表示服务
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceDescriptor {
    private String clazz;
    private String method;
    private String returnType;
    private String[] parameterTypes;

    public static  ServiceDescriptor from(Class clazz, Method method){
        ServiceDescriptor sdp=new ServiceDescriptor();
        sdp.setClazz(clazz.getName());
        sdp.setMethod(method.getName());
        sdp.setReturnType(method.getReturnType().getName());
        Class[] parameterClasses=method.getParameterTypes();
        String[] parameterTypes=new String[parameterClasses.length];
        for(int i=0;i<parameterClasses.length;i++){
            parameterTypes[i]=parameterClasses[i].getName();
        }
        sdp.setParameterTypes(parameterTypes);
        return sdp;
    }

    @Override
    public String toString() {
        return "ServiceDescriptor{" +
                "clazz='" + clazz + '\'' +
                ", method='" + method + '\'' +
                ", returnType='" + returnType + '\'' +
                ", parameterTypes=" + Arrays.toString(parameterTypes) +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ServiceDescriptor that = (ServiceDescriptor) o;
        return Objects.equals(clazz, that.clazz) &&
                Objects.equals(method, that.method) &&
                Objects.equals(returnType, that.returnType) &&
                Arrays.equals(parameterTypes, that.parameterTypes);
    }

    @Override
    public int hashCode() {
        return toString().hashCode();
    }
}

2.6.3 写服务端配置

package com.smgeek.gkrpc.server;


import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.codec.JSONEncoder;
import com.smgeek.gkrpc.transport.HTTPTransportServer;
import com.smgeek.gkrpc.transport.TransportServer;
import lombok.Data;

/**
 * 配置
 */
@Data
public class RpcServerConfig {

    private Class<? extends TransportServer> transportClass = HTTPTransportServer.class;
    private  Class<? extends Encoder> encoderClass= JSONEncoder.class;
    private  Class<? extends Decoder> decoderClass=Decoder.class;
    private  int port=3000;

}

2.6.4 写服务实例

package com.smgeek.gkrpc.server;


import lombok.AllArgsConstructor;
import lombok.Data;

import java.lang.reflect.Method;

/**
 * 表示一个具体服务
 */
@Data
@AllArgsConstructor
public class ServiceInstance {

    private  Object target;
    private Method method;

}

2.6.5 写服务管理

package com.smgeek.gkrpc.server;


import com.smgeek.gkrpc.Request;
import com.smgeek.gkrpc.ServiceDescriptor;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 管理rpc暴漏的服务
 */
@Slf4j
public class ServiceManager {

    private Map<ServiceDescriptor ,ServiceInstance> sercices;

    public ServiceManager(){
        this.sercices=new ConcurrentHashMap<>();
    }

    public <T> void register(Class<T> interfaceClass,T bean ){
        Method[] methods = ReflectionUtils.getPublicMethods(interfaceClass);

        for(Method method:methods){
            ServiceInstance sis=new ServiceInstance(bean,method);
            ServiceDescriptor sdp=ServiceDescriptor.from(interfaceClass,method);
            sercices.put(sdp,sis);
            log.info("register service: {} {}",sdp.getClazz(),sdp.getMethod());
        }
    }

    public ServiceInstance lookup(Request request){
        ServiceDescriptor sdp=request.getService();
        return sercices.get(sdp);
    }

}

2.6.6 注册与发现测试

package com.smgeek.gkrpc.server;

public interface TestInterface {
    void hello();
}

package com.smgeek.gkrpc.server;

public class TestClass implements TestInterface {
    @Override
    public void hello() {

    }
}

package com.smgeek.gkrpc.server;

import com.smgeek.gkrpc.Request;
import com.smgeek.gkrpc.ServiceDescriptor;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import org.junit.Before;
import org.junit.Test;

import java.lang.reflect.Method;

import static org.junit.Assert.assertNotNull;

public class ServiceManagerTest {
    
    ServiceManager sm;

    @Before
    public void init(){
        sm=new ServiceManager();
        TestInterface bean=new TestClass();
        sm.register(TestInterface.class,bean);
    }

    @Test
    public void register(){
        TestInterface bean=new TestClass();
        sm.register(TestInterface.class,bean);
    }
    
    @Test
    public void lookup(){
        Method method = ReflectionUtils.getPublicMethods(TestInterface.class)[0];
        ServiceDescriptor sdp=ServiceDescriptor.from(TestInterface.class,method);
        Request request=new Request();
        request.setService(sdp);
        ServiceInstance sis = sm.lookup(request);
        assertNotNull(sis);

    }
    
}

2.6.7 RPC Server代码实现

package com.smgeek.gkrpc.server;

import com.smgeek.gkrpc.Request;
import com.smgeek.gkrpc.Response;
import com.smgeek.gkrpc.ServiceDescriptor;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import com.smgeek.gkrpc.transport.RequestHandler;
import com.smgeek.gkrpc.transport.TransportServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;

/**
 *
 * @author CandyDingDing
 * @version 1.1.0
 * @date 2022/4/3
 */
@Slf4j
public class RpcServer {
    private RpcServerConfig config;
    private TransportServer net;
    private Encoder encoder;
    private Decoder decoder;
    private ServiceManager serviceManager;
    private ServiceInvoker serviceInvoker;

    public RpcServer(RpcServerConfig config) {
        this.config = config;
        this.net = ReflectionUtils.newInstance(config.getTransportClass());
        this.net.init(config.getPort(),handler);
        this.encoder = ReflectionUtils.newInstance(config.getEncoderClass());;
        this.decoder = ReflectionUtils.newInstance(config.getDecoderClass());
        this.serviceManager = new ServiceManager();
        this.serviceInvoker = new ServiceInvoker();
    }

    public <T> void register(Class<T> interfaceClass,T bean ){
        serviceManager.register(interfaceClass,bean);
    }

    public void start(){
        this.net.start();;
    }

    public void stop(){
        this.net.stop();
    }

    private RequestHandler handler= new RequestHandler() {
        @Override
        public void onRequest(InputStream recive, OutputStream toResp) {
            Response resp=new Response();

            try {
                byte[] inBytes= IOUtils.readFully(recive,recive.available());
                Request request=decoder.decode(inBytes,Request.class);
                log.info("get request:{}",request);

                ServiceInstance sis=serviceManager.lookup(request);
                Object ret=serviceInvoker.invoke(sis,request);
                resp.setData(ret);

            } catch (Exception e) {
                log.warn(e.getMessage(),e);
                resp.setCode(1);
                resp.setMessage("RescServer got error:"+e.getClass().getName()+":"+e.getMessage());
            }finally {
                try {
                    byte[] outBytes=encoder.encode(resp);
                    toResp.write(outBytes);
                } catch (IOException e) {
                   log.warn(e.getMessage(),e);
                }

            }
        }
    };
}

2.7 客户端模块

2.7.1 依赖引入

    <dependencies>
        <dependency>
            <groupId>hand.candy</groupId>
            <artifactId>gk-rpc-proto</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>hand.candy</groupId>
            <artifactId>gk-rpc-codec</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>hand.candy</groupId>
            <artifactId>gk-rpc-transport</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>hand.candy</groupId>
            <artifactId>gk-rpc-common</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
        </dependency>
    </dependencies>

2.7.2 网络连接接口以及实现

package com.smgeek.gkrpc.client;

import com.smgeek.gkrpc.Peer;
import com.smgeek.gkrpc.transport.TransportClient;

import java.util.List;

/**
* @Describe 选择哪个Server来服务
* @Author CandyDingDing
* @Version 1.0
* @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
* @Date 2022/4/3
*/
public interface TransportSelector {
    
    /**
    * 初始化selector
    * @param peers 可以链接的server端点信息
    * @param count client和server的建立多少连接
    * @param clazz client实现的class
    */
    void init(List<Peer> peers, int count,Class<? extends TransportClient> clazz);
    
    /**
    * 选择一个transport 与 server交互
    *
    * @return 网络client
    */
    TransportClient select();
    
    
    /**
    * 释放用完的client
    * @param client TransportClient
    */
    void release(TransportClient client);
    
    void close();
    
    
}

package com.smgeek.gkrpc.client;

import com.smgeek.gkrpc.Peer;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import com.smgeek.gkrpc.transport.TransportClient;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;


@Slf4j
public class RandomTransportSelector implements TransportSelector {


    /**
     * 存储连介绍的客户端
     */
    private  List<TransportClient> clients;

    public RandomTransportSelector() {
        clients=new ArrayList<>();
    }

    @Override
    public synchronized void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz) {
        count=Math.max(count,1);

        for(Peer peer:peers){
            for(int i=0;i<count;i++){
                TransportClient client= ReflectionUtils.newInstance(clazz);
                client.connect(peer);
                clients.add(client);
                log.info("connect server:{} ",peer);
            }
        }
    }

    @Override
    public synchronized TransportClient select() {
        int i=new Random().nextInt(clients.size());
        return  clients.remove(i);
    }

    @Override
    public synchronized void release(TransportClient client) {
        clients.add(client);
    }

    @Override
    public synchronized void close() {
        for(TransportClient client:clients){
            client.close();
        }
        clients.clear();
    }
}

2.7.3 配置类

package com.smgeek.gkrpc.client;


import com.smgeek.gkrpc.Peer;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.codec.JSONDecoder;
import com.smgeek.gkrpc.codec.JSONEncoder;
import com.smgeek.gkrpc.transport.HTTPTransportClient;
import com.smgeek.gkrpc.transport.TransportClient;
import lombok.Data;

import java.util.Arrays;
import java.util.List;

/**
 * @Author CandyDingDing
 * @Version 1.0
 * @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
 * @Describe
 * @Date 2022/4/4
 */
@Data
public class RpcClientConfig {

    private Class<? extends TransportClient> transportClass= HTTPTransportClient.class;

    private  Class<? extends Encoder> encoderClass =JSONEncoder.class;

    private  Class<? extends Decoder> decoderClass = JSONDecoder.class;

    private Class<? extends TransportSelector> selectorClass=RandomTransportSelector.class;

    private  int connectCount=1;

    private List<Peer> severs=Arrays.asList(new Peer("127.0.0.1",3000));

}

2.7.4 Rpc客户端

package com.smgeek.gkrpc.client;


import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;

import java.lang.reflect.Proxy;

/**
 * @Author CandyDingDing
 * @Version 1.0
 * @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
 * @Describe
 * @Date 2022/4/4
 */
public class RpcClient {

    private  RpcClientConfig config;
    private Encoder encoder;
    private Decoder decoder;
    private  TransportSelector selector;

    public RpcClient() {

    }

    public RpcClient(RpcClientConfig config) {
        this.config = config;
        this.encoder= ReflectionUtils.newInstance(this.config.getEncoderClass());
        this.decoder= ReflectionUtils.newInstance(this.config.getDecoderClass());
        this.selector= ReflectionUtils.newInstance(this.config.getSelectorClass());
        this.selector.init(this.config.getSevers(),this.config.getConnectCount(),this.config.getTransportClass());
    }

    public <T> T getProxy(Class<T> clazz){
        return (T) Proxy.newProxyInstance(
                getClass().getClassLoader(),
                new Class[]{clazz},
                new RemoteInvoker(clazz,encoder,decoder,selector)
                );
    }
}

package com.smgeek.gkrpc.client;

import com.smgeek.gkrpc.Request;
import com.smgeek.gkrpc.Response;
import com.smgeek.gkrpc.ServiceDescriptor;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.transport.TransportClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;


/**
 * @Author CandyDingDing
 * @Version 1.0
 * @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
 * @Describe 调用远程服务的代理类
 * @Date 2022/4/4
 */
@Slf4j
public class RemoteInvoker implements InvocationHandler {

    private Class clazz;
    private Encoder encoder;
    private Decoder decoder;
    private  TransportSelector selector;

    public  RemoteInvoker(Class clazz, Encoder encoder, Decoder decoder,TransportSelector selector){
        this.decoder=decoder;
        this.encoder=encoder;
        this.selector=selector;
        this.clazz=clazz;
    };

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request request=new Request();
        request.setService(ServiceDescriptor.from(clazz,method));
        request.setParameters(args);

        Response resp = invokeRemote(request);
        if(resp==null || resp.getCode()!=0){
            throw  new IllegalStateException("fail to invoke remotr"+resp);
        }

        return resp.getData();
    }

    private Response invokeRemote(Request request) {
        TransportClient client=null;
        Response resp=null;
        try{
            client=selector.select();
            byte[] outBytes=encoder.encode(request);
            InputStream revice = client.write(new ByteArrayInputStream(outBytes));

            byte[]  inBytes=IOUtils.readFully(revice,revice.available());
            resp=decoder.decode(inBytes,Response.class);


        }catch (Exception e){
            log.warn(e.getMessage(),e);
            resp=new Response();
            resp.setCode(1);
            resp.setMessage("RpcClient got error :"+e.getClass()+":"+e.getMessage());

        } finally {
            if(client!=null){
                selector.release(client);
            }

        }


        return  resp;
    }
}

2.8 RPC使用

2.8.1 新建模块example

image.png

2.8.2 引入依赖

  <dependencies>
        <dependency>
            <groupId>hand.candy</groupId>
            <artifactId>gk-rpc-client</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>hand.candy</groupId>
            <artifactId>gk-rpc-server</artifactId>
            <version>${project.version}</version>
        </dependency>
    </dependencies>

2.8.3 Client

package single.rpc.example;

import single.rpc.client.RpcClient;

/**
 * @Author CandyDingDing
 * @Version 1.0
 * @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
 * @Describe 
 * @Date 2022/4/4
 */
public class Client {
    public static void main(String[] args) {
        RpcClient client = new RpcClient();
        CalcService service = client.getProxy(CalcService.class);

        int add = service.add(1, 2);
        int minus = service.minus(1, 2);
        System.out.println(add);
        System.out.println(minus);
    }
}

2.8.4 Server

package single.rpc.example;

import single.rpc.server.RpcServer;
import single.rpc.server.RpcServerConfig;


/**
 * @Author CandyDingDing
 * @Version 1.0
 * @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
 * @Describe 
 * @Date 2022/4/4
 */
public class Server {
    public static void main(String[] args) {
        RpcServer server = new RpcServer(new RpcServerConfig());
        server.register(CalcService.class, new CalcServiceImpl());
        server.start();
    }
}

2.8.5 CascService

package single.rpc.example;

/**
 * @Author CandyDingDing
 * @Version 1.0
 * @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
 * @Describe 
 * @Date 2022/4/4
 */
public interface CalcService {
    int add(int a, int b);
    int minus(int a, int b);
}

2.8.6 CalcServiceImpl

package single.rpc.example;

/**
 * @Author CandyDingDing
 * @Version 1.0
 * @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
 * @Describe  git镜像:https://hub.fastgit.xyz/
 * @Date 2022/4/4
 */
public class CalcServiceImpl implements CalcService {
    @Override
    public int add(int a, int b) {
        return a + b;
    }

    @Override
    public int minus(int a, int b) {
        return a - b;
    }
}

2.8.7 运行结果

视频课里面的代码有问题:

修正后的代码可以参考https://https://gitee.com/candydingding/rpc

先启动server 再启动client

server结果

image.png
client结果

image.png

三、参考

视频连接:https://www.imooc.com/video/20219
文档笔记:https://www.yuque.com/lililil-9bxsv/kb/tg9xha
代码地址:https://gitee.com/candydingding/rpc

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-06 16:25:55  更:2022-04-06 16:28:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 15:49:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码