IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 5、RocketMQ 源码解析之 命名服务启动 -> 正文阅读

[大数据]5、RocketMQ 源码解析之 命名服务启动

在 RocketMQ 当中,消息发送方以及消息接收方都是配置命名服务(Name Server)的地址。通过命名服务解耦合了消息发送者以及消息接收方,不同于 Kafka 直接连接 Broker 地址。命名服务的主要功能包含:Broker 管理以及消息的路由管理。具有如下:

  • Broker 管理,NameServer接受来自 Broker 集群的注册,并提供心跳机制来检查Broker是否活着
  • 路由管理,每个NameServer将保存关于Broker集群的整个路由信息和供客户端查询的 queue (队列) 信息。

源码分析基于 RocketMQ - 4.9.3

1、NameServer 启动整体流程

从之前 3、RocketMQ 源码解析之 源代码环境搭建 这篇文章中我可以看到命名服务的启动类是:NamesrvStartup。而 命名服务最重要的类其实是 NamesrvController,它控制着命名服务的整个流程。命名服务启动其实就是调用NamesrvController的三个方法的过程:

  • NamesrvController#<init>:通过有参构建器传入 NamesrvConfig 以及 NettyServerConfig 这两个类,创建NamesrvController对象。
  • NamesrvController#initialize:调用该方法对 NamesrvController对象进行属性初始化。
  • NamesrvController#start: 调用该方法完成对 NameServer 的启动,启动 NettyRemotingServer 暴露 Netty 服务端的 Socket 服务。

下面我们就来详细的分析一下这三个过程。

2、NamesrvController 对象构建

NamesrvController.init

    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

在 NamesrvController 对象构建里面最核心的还是创建 RouteInfoManager 对象,这个对象里面保存了就完成了 Broker 管理 以及 消息的路由管理

下面我们来看一下这个对象里面的核心字段:

  • HashMap<String/* topic */, List<QueueData>> topicQueueTable :消息主题以及对应的 队列数据映射,在 RocketMQ 当中,一个消息主题可以发送不到同的 Queue 当中,达到负载均衡的目的。
  • HashMap<String/* brokerName */, BrokerData> brokerAddrTable :broker 名称以及 Broker 数据的映射信息。
  • HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable:集群名称以及对应的 Broker 名称列表
  • HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable:broker 地址对应 Broker 通道(Channel) 的对应信息。
  • HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable:broker 地址以及消息的各种过滤机制。

这里包含了 Broker 对应的所有元数据信息。Broker 每过一段时间(默认 30 秒)就会向 NameServer 发送心跳,告诉 NameServer 我当前还是存活的。并且如果心跳超过 2 分钟没有发送就会把这个 Broker 从上面的 brokerLiveTable 列表当中移除。

RouteInfoManager#scanNotActiveBroker

    public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

上面就是 Broker 存活检测逻辑。

3、NamesrvController 初始化

下面我们来看一下 的初始化逻辑。

NamesrvController#initialize

    public boolean initialize() {

        this.kvConfigManager.load();

        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }

上面的逻辑其实挺简单的,主要的逻辑如下:

  • kvConfigManager KV 类型的配置管理器从配置文件中加载配置
  • 创建 Netty 远程服务 NettyRemotingServer 暴露命名服务 Socket 通信
  • 创建一个远程线程池 remotingExecutor 用于异步处理客户端的 Socket 连接请求,默认没 8 个工作线程。
  • registerProcessor():向 NettyRemotingServer 远程服务端注册请求处理器(DefaultRequestProcessor,这个处理器我们稍后在分析),并且使用上一步创建的 remotingExecutor进行异步处理
  • 使用单个线程池调用 RouteInfoManager#scanNotActiveBroker 扫描并删除 2 分钟没有心跳的 Broker 以及 KVConfigManager#printAllPeriodically 打印kvConfigManager KV 类型的配置管理器里面的配置值。
  • 如果服务器开启了 SSL 加密传输就启动 FileWatchService 定时扫描文件更新 SSL

DefaultRequestProcessor 其实就是整个 NameServer 处理 Socket 请求的类.其实最终就是操作 KVConfigManager 这个配置类或者 RouteInfoManager 这个类。

4、NamesrvController 启动

NamesrvController 的启动就相对比较简单了。

    public void start() throws Exception {
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

它的只包括了两个步骤:

  • NettyRemotingServer#start,调用 Netty 的服务端 ServerBootstrap 进行服务端启动暴露 Socket 服务,默认是 9876;
  • 启动 FileWatchService 服务,当 SSL 有更新时,就更新 SSL 配置。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-08 14:05:51  更:2022-01-08 14:07:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 3:39:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码