IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【Nacos源码篇(五)】Subscriber源码分析 -> 正文阅读

[大数据]【Nacos源码篇(五)】Subscriber源码分析


这是事件通知里处理事件的落脚点,也就是事件执行的落地

先贴下接口Subscriber

public abstract class Subscriber<T extends Event> {
    /**
     * 事件执行的落地
     */
    public abstract void onEvent(T event);

    /**
     * 获取当前订阅者订阅的事件的类型
     */
    public abstract Class<? extends Event> subscribeType();
    
    /**
     * 其子类可以来决定是同步还是异步
     */
    public Executor executor() {
        return null;
    }
    
    /**
     * 是否忽略过期事件。
     */
    public boolean ignoreExpireEvent() {
        return false;
    }
}

从接口描述上,就能清晰的了解到这个接口的主流程

SmartSubscriber

Subscriber有一个SmartSubscriber的扩展子类,是用来针对多事件订阅的List<Class<? extends Event>> subscribeTypes() 该方法返回的是这个订阅者所需要订阅的事件列表

----->代码片段1
public abstract class SmartSubscriber extends Subscriber {
    /**
     * 实现该方法,用来返回多订阅的指定集合
     */
    public abstract List<Class<? extends Event>> subscribeTypes();
    
    // ...
}

----->代码片段2
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    @Override
    public List<Class<? extends Event>> subscribeTypes() {
        List<Class<? extends Event>> result = new LinkedList<>();
        result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);
        result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);
        result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);
        result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);
        result.add(ClientEvent.ClientDisconnectEvent.class);
        return result;
    }
}

----->代码片段3 
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
        if (consumer instanceof SmartSubscriber) {
            for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
                if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
                    INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
                } else {
                    // For case, producer: defaultPublisher -> consumer: subscriber.
                    addSubscriber(consumer, subscribeType, factory);
                }
            }
            return;
        }
    // ........
}

这里贴了三段代码
??代码片段一,就是SmartSubscriber的抽象List<Class<? extends Event>> subscribeTypes()
??代码片段二,拿某一个该接口的实现举例子,返回了一个List,里面包含了该订阅者订阅的事件类型集合,那么我就可以这样理解,这个接口的返回值所对应的事件,就是该订阅者订阅的事件,通过代码片段三NotifyCenter#registerSubscriber注册
??代码片段三,又回到了NotifyCenter,订阅者注册的时候,首先是先判定了当前订阅者是监听的是否是多事件的,其事件注册的具体细节可以参考NotifyCenter的源码分析

subscribeType

这个接口和上面说的SmartSubscribersubscribeTypes()接口没有多大的区别,只是这个返回的是一个事件,上面的返回的是多个事件

    @Override
    public Class<? extends Event> subscribeType() {
        return Xxxxxxxx.class;
    }

executor

executor()上一篇的EventPublisher在选择通过异步方式还是同步方式的时候就是调用的这里来判定的,我们可以看到这个的默认实现是null

   @Override
    public void notifySubscriber(Subscriber subscriber, Event event) {
        final Runnable job = () -> subscriber.onEvent(event);
        // 判断当前事件是异步还是同步
        final Executor executor = subscriber.executor();
        if (executor != null) {
            // 异步
            executor.execute(job);
        } else {
            try {
                // 同步
                job.run();
            } catch (Throwable e) {
               // ..
            }
        }
    }

?? 所以订阅者如果没有实际的实现executor这个接口,那么事件发布者就是走的同步方式,这里简单的回顾了一下上一篇的事件发布者

ignoreExpireEvent

这个接口的话,可以看下上游的调用处

    @Override
    public void receiveEvent(Event event) {
		// ..
        for (Subscriber subscriber : subscribers) {
            // 是否忽略过期事件
            if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
                continue;
            }
            // Notify single subscriber for slow event.
            notifySubscriber(subscriber, event);
        }
    }

?? 这段代码是事件发布者里的,是在准备发布广播给订阅者(notifySubscriber)之前,判断的是否需要忽略过期的事件的
?? 订阅者们可以自己去判断是否订阅过期事件,当然这个接口给的默认实现是false

onEvent

最重要的接口最后说
其实也没啥好说的,就随便拿一个实现lou一眼就行

@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            // 处理客户端断开
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            // 处理客户端操作
            handleClientOperation((ClientOperationEvent) event);
        }
    }
}

● 这里拿的ClientServiceIndexesManager这个类的onEvent方法来举例子,上面也贴出过这个类了(SmartSubscriber接口的描述) 这里接收的Event参数通过判断其实际的类型,来走这个事件所对应的不同的逻辑
Event是事件的抽象类,每个不同的事件都往其自身塞入了其事件所携带的信息,这个抽象类在下一篇描述

Nacos源码篇

语雀版文档

Nacos源码注释

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-29 23:09:15  更:2022-01-29 23:11:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:28:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码