IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 【自己动手实现一个简单的RPC框架】7、[v4.0-4.2]增加Netty心跳机制与自动扫描注册服务 -> 正文阅读

[网络协议]【自己动手实现一个简单的RPC框架】7、[v4.0-4.2]增加Netty心跳机制与自动扫描注册服务

本文代码地址:https://github.com/MSC419/msc-rpc-framework

0.实现的改进

1.重构部分代码,将RPC消息包装成RpcMessage

2.增加Netty心跳机制确保TCP长连接有效

3.通过注解自动扫描并注册服务

1.RPC消息通用格式RpcMessage

这一部分是为了给后续增加Netty心跳机制做准备。有了RPC消息通用格式之后,可以将RpcRequest、RpcResponse、心跳信息都封装成RpcMessage

具体步骤如下:

1.增加RpcMessage类

2.修改Netty客户端发送RpcRequest与接收RpcResponse的NettyClientClientTransport,将RpcRequest包装成RpcMessage再发送

3.修改Netty客户端与服务端处理业务的Handler:NettyClientHandler与NettyServerHandler,入站数据先是RpcMessage,再根据MessageType判断是RpcRequest、RpcResponse或心跳信息,做对应处理

4.修改序列化与反序列化过程:NettyEncoder与NettyDecoder

2.Netty心跳机制

本次改进的代码参考自:从零开始实现简单 RPC 框架 9:网络通信之心跳与重连机制_小新是也的博客-CSDN博客_rpc心跳机制

2.1 心跳

在TCP保持长连接的过程中,可能会由于突发情况例如断电或者网线被拔出等因素,导致服务端和客户端的连接中断。在这种情况下,如果服务端和客户端之间没有交互的话,就不能发现连接已经中断。为了解决这一问题,我们引入心跳机制。

心跳机制:为了确保长连接有效,客户端与服务端之间一种通知机制告知对方存活状态的机制。

心跳机制的工作原理:客户端和服务端在一定时间段内没有交互后,某一端向另一端发送消息类型为PING消息,另一端接收到PING消息,返回PONG消息,此即一个PING-PONG交互,说明对方仍然在线,长连接有效;如果没有收到PONG消息,则长连接无效,断开该连接。

2.2 IdleStateHandler

Netty提供了实现心跳机制的Handler:IdleStateHandler

构造函数

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }
  • readerIdleTime, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTime, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTime, 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
  • unit,前三者的单位

IdleStateHandler的源码阅读参考:从零开始实现简单 RPC 框架 9:网络通信之心跳与重连机制_小新是也的博客

2.3 实现Netty心跳机制

2.3.1 客户端

IdleStateHandler 放到启动类的 PipleLine 注册上

public class NettyClient {
    static {
        ...
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                // 设定 IdleStateHandler 心跳检测每 5 秒进行一次写检测
                // write()方法超过 5 秒没调用,就调用 userEventTrigger
                ch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
                /*自定义序列化编解码器*/
                ...

NettyClientHandler 设置处理心跳事件

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    ...
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
        if (evt instanceof IdleStateEvent) {
            // 根据上面的配置,超过 5 秒没有写请求,会触发 WRITER_IDLE 事件
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                log.info("write idle happen [{}]", ctx.channel().remoteAddress());
                Channel channel = ctx.channel();
                // 触发写空闲事件后,就应该发心跳了。
                // 组装消息
                RpcMessage rpcMessage = new RpcMessage();
                rpcMessage.setSerializeType(SerializerCode.KRYO.getCode());
                rpcMessage.setMessageType(PackageType.HEARTBEAT_PACK.getCode());
                // 发心跳消息
                channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
    ...

2.3.3 服务端

服务端的 IdleStateHandler 放到启动类的 PipleLine 注册上

public class NettyServer {
    ...
    public void start(){
        ...
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                //给workerGroup的EventLoop对应的管道设置处理器
                //给pipeline管道设置处理器

                // 30 秒之内没有收到客户端请求的话就关闭连接
                ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
                ch.pipeline().addLast(new NettyDecoder());
                ...

服务端收到超过 30 秒没有读请求的事件后,调用 ctx.close 将连接关闭。

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcMessage requestMsg) {
    // 不处理心跳消息,只处理RpcRequest
    if (requestMsg.getMessageType() != MessageType.REQUEST_PACK.getType()) {
        return;
    }
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    // 处理空闲状态的
    if (evt instanceof IdleStateEvent) {
        IdleState state = ((IdleStateEvent) evt).state();
        if (state == IdleState.READER_IDLE) {
            log.info("idle check happen, so close the connection");
            ctx.close();
        }
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

案例里单位是秒(TimeUnit.SECONDS),如果把单位换成TimeUnit.MILLISECONDS,就能在控制台看见效果

image-20220523161545061

3.通过注解自动扫描注册服务

参考:

(1条消息) 使用注解的方式来自动注册服务_米拉娜的博客-CSDN博客

一起写个Dubbo——7. 服务端自动注册服务_何人听我楚狂声的博客-CSDN博客_dubbo自动注册服务

现在我们是在服务端手动注册服务,要想通过注解自动注册服务,首先我们需要一个注解@RpcService来标识一个类是服务类。还需要一个注解@RpcScan 放在启动的入口类上(main 方法所在的类),标识服务的扫描的包的范围。

@RpcService注解的值定义为该服务的名称,默认值是该类的完整类名。

@RpcScan的值定义为扫描范围的根包,默认值为空(如果为空,其值被赋为入口类所在的包名),扫描时会扫描该包及其子包下所有的类,找到标记有 Service 的类,并注册。

3.1 注解类

@RpcScan

/**
 * @Description 指示哪里需要扫描
 * @Author MSC419
 * @Date 2022/5/23 17:34
 * @Version 1.0
 */
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcScan {
    /**
     * 扫描范围的根包名,默认为启动类的包名
     * (如果为空,会将启动类的包名赋给它)
     */
    String value() default "";
}

@RpcService

/**
 * @Description RPC 服务注解,打上这个注解的类会被扫描并且实例化后注册到注册中心
 * @Author MSC419
 * @Date 2022/5/23 17:22
 * @Version 1.0
 */
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService {

    /**
     * @Description 服务名,默认值是该类的完整类名
     * @Author MSC419
     * @Date 2022/5/23 19:31
     */
    String name() default "";
}

3.2 工具类 ReflectUtil

主要作用是扫描该包及其子包下所有的类,并将其 Class 对象放入一个 Set 中返回。

/**
 * @Description 主要就是 getClasses 方法
 * 传入一个包名,用于扫描该包及其子包下所有的类,并将其 Class 对象放入一个 Set 中返回。
 * @author ziyang
 */
public class ReflectUtil {

    //获取栈底方法也就是main方法的类名
    public static String getStackTrace() {
        StackTraceElement[] stack = new Throwable().getStackTrace();
        return stack[stack.length - 1].getClassName();
    }

    //传入一个包名,用于扫描该包及其子包下所有的类,并将其 Class 对象放入一个 Set 中返回。
    public static Set<Class<?>> getClasses(String packageName) {
        Set<Class<?>> classes = new LinkedHashSet<>();
        boolean recursive = true;
        String packageDirName = packageName.replace('.', '/');
        Enumeration<URL> dirs;
        try {
            dirs = Thread.currentThread().getContextClassLoader().getResources(
                    packageDirName);
            // 循环迭代下去
            while (dirs.hasMoreElements()) {
                // 获取下一个元素
                URL url = dirs.nextElement();
                // 得到协议的名称
                String protocol = url.getProtocol();
                // 如果是以文件的形式保存在服务器上
                if ("file".equals(protocol)) {
                    // 获取包的物理路径
                    String filePath = URLDecoder.decode(url.getFile(), "UTF-8");
                    // 以文件的方式扫描整个包下的文件 并添加到集合中
                    findAndAddClassesInPackageByFile(packageName, filePath,
                            recursive, classes);
                } else if ("jar".equals(protocol)) {
                    // 如果是jar包文件
                    // 定义一个JarFile
                    JarFile jar;
                    try {
                        // 获取jar
                        jar = ((JarURLConnection) url.openConnection())
                                .getJarFile();
                        // 从此jar包 得到一个枚举类
                        Enumeration<JarEntry> entries = jar.entries();
                        // 同样的进行循环迭代
                        while (entries.hasMoreElements()) {
                            // 获取jar里的一个实体 可以是目录 和一些jar包里的其他文件 如META-INF等文件
                            JarEntry entry = entries.nextElement();
                            String name = entry.getName();
                            // 如果是以/开头的
                            if (name.charAt(0) == '/') {
                                // 获取后面的字符串
                                name = name.substring(1);
                            }
                            // 如果前半部分和定义的包名相同
                            if (name.startsWith(packageDirName)) {
                                int idx = name.lastIndexOf('/');
                                // 如果以"/"结尾 是一个包
                                if (idx != -1) {
                                    // 获取包名 把"/"替换成"."
                                    packageName = name.substring(0, idx)
                                            .replace('/', '.');
                                }
                                // 如果可以迭代下去 并且是一个包
                                if ((idx != -1) || recursive) {
                                    // 如果是一个.class文件 而且不是目录
                                    if (name.endsWith(".class")
                                            && !entry.isDirectory()) {
                                        // 去掉后面的".class" 获取真正的类名
                                        String className = name.substring(
                                                packageName.length() + 1, name
                                                        .length() - 6);
                                        try {
                                            // 添加到classes
                                            classes.add(Class
                                                    .forName(packageName + '.'
                                                            + className));
                                        } catch (ClassNotFoundException e) {
                                            // log
                                            // .error("添加用户自定义视图类错误 找不到此类的.class文件");
                                            e.printStackTrace();
                                        }
                                    }
                                }
                            }
                        }
                    } catch (IOException e) {
                        // log.error("在扫描用户定义视图时从jar包获取文件出错");
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        return classes;
    }

    private static void findAndAddClassesInPackageByFile(String packageName,
                                                         String packagePath, final boolean recursive, Set<Class<?>> classes) {
        // 获取此包的目录 建立一个File
        File dir = new File(packagePath);
        // 如果不存在或者 也不是目录就直接返回
        if (!dir.exists() || !dir.isDirectory()) {
            // log.warn("用户定义包名 " + packageName + " 下没有任何文件");
            return;
        }
        // 如果存在 就获取包下的所有文件 包括目录
        File[] dirfiles = dir.listFiles(new FileFilter() {
            // 自定义过滤规则 如果可以循环(包含子目录) 或则是以.class结尾的文件(编译好的java类文件)
            public boolean accept(File file) {
                return (recursive && file.isDirectory())
                        || (file.getName().endsWith(".class"));
            }
        });
        // 循环所有文件
        for (File file : dirfiles) {
            // 如果是目录 则继续扫描
            if (file.isDirectory()) {
                findAndAddClassesInPackageByFile(packageName + "."
                                + file.getName(), file.getAbsolutePath(), recursive,
                        classes);
            } else {
                // 如果是java类文件 去掉后面的.class 只留下类名
                String className = file.getName().substring(0,
                        file.getName().length() - 6);
                try {
                    // 添加到集合中去
                    //classes.add(Class.forName(packageName + '.' + className));
                    //经过回复同学的提醒,这里用forName有一些不好,会触发static方法,没有使用classLoader的load干净
                    classes.add(Thread.currentThread().getContextClassLoader().loadClass(packageName + '.' + className));
                } catch (ClassNotFoundException e) {
                    // log.error("添加用户自定义视图类错误 找不到此类的.class文件");
                    e.printStackTrace();
                }
            }
        }
    }

}

3.3 扫描服务

将服务端抽象成RpcServer接口

public interface RpcServer {
    //启动连接
    void start();
	//注册服务
    <T> void publishService(Object service, Class<T> serviceClass);
}

AbstractRpcServer继承RpcServer接口,并实现扫描注册的功能:扫描启动包下所有带@RpcService的类,将其实例化并注册

/**
 * @Description 扫描服务并注册
 * @Author MSC419
 * @Date 2022/5/23 20:02
 * @Version 1.0
 */
@Slf4j
public abstract class AbstractRpcServer implements RpcServer{
    protected String host;
    protected int port;

    protected ServiceRegistry serviceRegistry;
    protected ServiceProvider serviceProvider;

    public void scanServices() {
        //获取服务端main方法的类名
        String mainClassName = ReflectUtil.getStackTrace();
        Class<?> startClass;
        try {
            startClass = Class.forName(mainClassName);
            if(!startClass.isAnnotationPresent(RpcScan.class)) {
                log.error("启动类缺少 @RpcScan 注解");
                throw new RpcException(RpcErrorMessageEnum.SERVICE_SCAN_PACKAGE_NOT_FOUND);
            }
        } catch (ClassNotFoundException e) {
            log.error("出现未知错误");
            throw new RpcException(RpcErrorMessageEnum.UNKNOWN_ERROR);
        }
        String basePackage = startClass.getAnnotation(RpcScan.class).value();
        //如果RpcScan没有指定value(即basePackage为空),那么把启动类的包名赋给它
        if("".equals(basePackage)) {
            //basePackage:com.wx.mscrpc.example.server
            basePackage = mainClassName.substring(0, mainClassName.lastIndexOf("."));
        }
        log.info("basePackage:{}",basePackage);
        //扫描basePackage及其子包下所有的类,并将其 Class 对象放入一个 Set 中返回。
        Set<Class<?>> classSet = ReflectUtil.getClasses(basePackage);
        for(Class<?> clazz : classSet) {
            //如果该类有注解@RpcService,说明是个服务类
            if(clazz.isAnnotationPresent(RpcService.class)) {
                String serviceName = clazz.getAnnotation(RpcService.class).name();
                Object obj;
                try {
                    //实例化该服务类
                    obj = clazz.newInstance();
                } catch (InstantiationException | IllegalAccessException e) {
                    log.error("创建 " + clazz + " 时有错误发生");
                    continue;
                }
                //注册服务
                if("".equals(serviceName)) {
                    Class<?>[] interfaces = clazz.getInterfaces();
                    for (Class<?> oneInterface: interfaces){
                        publishService(obj, oneInterface);
                    }
                } else {
                    publishService(obj, clazz);
                }
            }
        }
    }


    public <T> void publishService(Object service, Class<T> serviceClass) {
        serviceProvider.addServiceProvider(service);
        //注册服务
        serviceRegistry.registerService(serviceClass.getCanonicalName(), new InetSocketAddress(host, port));

    }
}

3.4 服务端开启自动注册

NettyServer构造方法后面加上scanServices();

public class NettyServer extends AbstractRpcServer {
	...
    public NettyServer(String host, int port) {
            this.host = host;
            this.port = port;
            serviceRegistry = new ZkServiceRegistry();
            serviceProvider = new ServiceProviderImpl();
            scanServices();
        }
    ...

在服务类HelloServiceImplUserServiceImpl加上注解@RpcService

3.5 测试

不用手动注册。服务端代码清爽好多!

@RpcScan
public class NettyServerMain {
    public static void main(String[] args) {
        NettyServer nettyServer = new NettyServer("127.0.0.1",9999);
        nettyServer.start();
    }
}

4.总结

4.1 学到的知识

1.行号附近的小图标

实现接口类方法:

image-20220522161350897

重载方法:

image-20220522161647031

2.查看类图:右键单击你要查看的类,选择Diagrams中的show Diagrams

3.多看几个同类型开源项目(视野要开阔),看看他们的异同点

4.{@link}注解的使用:将内容相互关联起来

5.抽象类的使用

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-06-01 15:28:43  更:2022-06-01 15:29:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 21:42:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码