IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 《RockerMQ源码分析》客户端是如何发送心跳到Broker的?含心跳包数据来源 -> 正文阅读

[Java知识库]《RockerMQ源码分析》客户端是如何发送心跳到Broker的?含心跳包数据来源

一、前言

RocketMQ所有的心跳机制:
1)Producer端:

  1. Producer与NameSrv随机建立长连接,定期从NameSrv获取topic路由信息;
  2. Producer与Broker的Master结点建立长连接,用于发送消息;
  3. 此外Producer与Master维持了一个心跳。

2)ConSumer端:

  1. Conumser与NamseSrv随机建立长连接,定期从NameSrv获取topic路由信息;
  2. Consumer与Broker的Master和Slave结点建立长连接,用于订阅消息;
  3. 此外Consumer与Master和slave维持一个心跳。

二、客户端发送心跳

(1)Producer和Consumer通过MQClientInstancesendHeartbeatToAllBrokerWithLock()方法实现发送心跳请求;

public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            // 发送心跳包
            this.sendHeartbeatToAllBroker();
            // 上传类过滤器源码
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed. [{}]", this.clientId);
    }
}

我们看到sendHeartbeanToAllBrokerWithLock()方法中在sendHeartbeatToAllBroker()之前加了锁,这是因为点啥嘞?

1、RocketMQ对底层进行通信的MQClientInstance进行了复用,即在同一个jvm里的不同的Consumer下面使用的都是同一个MQClientInstance
2、既然是复用的,那么就可能存在并发,因此这里进行了上锁操作。
3、所以这里是为了防止心跳错乱

(2)另外在MQClientInstance启动时会启动会调用startScheduledTask()方法,开始一堆定时任务,其中包括:定期默认每30s发送心跳信息到Broker。

MQClientInstance类源码:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                .......
                /**
                 * 1.定时2min拉取最新的nameServer信息
                 * 2.默认定时30秒拉取最新的broker和topic路由信息(可配置)
                 * 3.默认定时30s向broker发送心跳包(可配置)
                 * 4.默认定时5s持久化consumer的offset(可配置)
                 * 5.定时1分钟,动态调整线程池线程数量
                 */
                this.startScheduledTask();
                .......
        }
    }
}

startScheduledTask()方法启动定时任务:

private void startScheduledTask() {
   this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

       @Override
       public void run() {
           try {
               // 清理下线的broker
               MQClientInstance.this.cleanOfflineBroker();
               // 向所有的broker发送心跳
               MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
           } catch (Exception e) {
               log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
           }
       }
   }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
}

我们来看看sendHeartbeatToAllBroker()做了什么?

1、MQClientInstance#sendHeartbeatToAllBroker()

1.准备心跳信息HeartbeatData,如果心跳信息为空,直接返回;
2. 遍历所有的Broker,尝试向所有的Broker发送心跳包

注意:根据客户端的类型(Producer、Consumer)不同,发送到的Broker对象会又差别。
1、如果启动的是生产者,那么心跳保证消费者的相关信息为空,这时只会向Broker的Mater节点发送心跳;因为RocketMQ中主要Master的Broker才能处理写请求。
2、如果启动的是消费者,则会向所有的Broker发送心跳。

private void sendHeartbeatToAllBroker() {
    // 心跳包--包装类,主要是Producer和Consumer相关信息
    final HeartbeatData heartbeatData = this.prepareHeartbeatData();
    final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
    final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
    // 生产者和消费者数据都为空时
    if (producerEmpty && consumerEmpty) {
        log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
        return;
    }

    // broker列表不为空时
    // todo brokerAddrTable是什么时候初始化的?
    // 1)当topic的路由信息改变后,会往brokerAddrTable中添加数据
    if (!this.brokerAddrTable.isEmpty()) {
        // 统计发送心跳的次数
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        // 遍历broker列表
        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, HashMap<Long, String>> entry = it.next();
            String brokerName = entry.getKey();
            // 获取一个broker地址
            HashMap<Long, String> oneTable = entry.getValue();
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    Long id = entry1.getKey();
                    String addr = entry1.getValue();
                    if (addr != null) {
                        // 消费数据为空 并且 broker不是Mater节点时,不发送心跳。
                        // 因为Producer只需要与Mater维护心跳即可
                        if (consumerEmpty) {
                            // broker不是mater节点
                            if (id != MixAll.MASTER_ID)
                                continue;
                        }

                        try {
                            // 发送心跳
                            // todo MQClientAPIImpl是什么时候初始化的?
                            // 1)实例化MQClientInstance时初始化mQClientAPIImpl
                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                            if (!this.brokerVersionTable.containsKey(brokerName)) {
                                this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                            }
                            this.brokerVersionTable.get(brokerName).put(addr, version);
                            if (times % 20 == 0) {
                                log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                log.info(heartbeatData.toString());
                            }
                        } catch (Exception e) {
                            if (this.isBrokerInNameServer(addr)) {
                                log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                            } else {
                                log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                        id, addr, e);
                            }
                        }
                    }
                }
            }
        }
    }
}

RocketMQ中客户端和服务端的通信通过Netty实现,这里我们的客户端是Consumer/Producer、服务端是Broker。

我们先看一下RocketMQ是如何准备心跳包数据的?

2、心跳包HeartBeatData

心跳包内容包括:客户端id、生产者信息、消费者信息;

一般情况下生产者信息和消费者信息是互斥的,producerDataSet和consumerDataSet会有一个为空。但如果一个应用既是生产者,也是消费者,那么这种情况下producerDataSet和consumerDataSet都不为空。

public class HeartbeatData extends RemotingSerializable {
    // consumer 客户端ID
    private String clientID;
    /**
     * 生产者信息
     * 1. groupName
     */
    private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
    /**
     * 消费者信息
     * 1. groupName
     * 2. 消费类型:push/pull
     * 3. 消息传播方式:集群还是广播
     * 4. 启动消费者时从哪开始消费
     * 5. 订阅信息:过滤消息相关标签、SQL规则。
     */
    private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
    ......
}

我们分别看一下生产者信息和消费者信息都包括什么?

1)生产者信息ProducerData

不能再简单了,就一个生产者组的名称。

public class ProducerData {
    /**
     * 生产组名称
     */
    private String groupName;
}

2)消费者信息ConsumerData

ConsumerData消费者信息包括:

  1. groupName
  2. 消费类型:push/pull
  3. 消息传播方式:集群还是广播
  4. 启动消费者时从哪开始消费
  5. 订阅信息SubscriptionData:过滤消息相关标签、SQL规则等。
public class ConsumerData {
    /**
     * 消费者名称
     */
    private String groupName;
    /**
     * 消费类型:push/pull
     */
    private ConsumeType consumeType;
    /**
     * 消息传播方式:广播 / 集群消费
     */
    private MessageModel messageModel;
    /**
     * 从哪开始消费:从一开始偏移量、从最后偏移量、按时间戳消费
     */
    private ConsumeFromWhere consumeFromWhere;
    /**
     * 订阅数据:过滤消息相关标签、SQL规则等
     */
    private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
}

在我们日常写代码时,这些属性很常见、经常会配置到。SubscriptionData是我们的消费者订阅信息,
其内容如下:

public class SubscriptionData implements Comparable<SubscriptionData> {
    // 表示订阅该topic下所有类型消息
    public final static String SUB_ALL = "*";
    // 是否开启类过滤模式,默认不开启
    private boolean classFilterMode = false;
    // 订阅的topic
    private String topic;
    // 订阅表达式
    private String subString;
    // 如果是tag过滤模式,这里是tag列表
    private Set<String> tagsSet = new HashSet<String>();
    // 如果是tag过滤模式,这里是tag对应的hashCode列表
    private Set<Integer> codeSet = new HashSet<Integer>();
    private long subVersion = System.currentTimeMillis();
    // 表达式类型,有TAG和SQL两种,默认是Tag
    private String expressionType = ExpressionType.TAG;

    @JSONField(serialize = false)
    // 如果开启了类过滤模式,这里存放过滤类java代码
    private String filterClassSource;
}

这里面,我们平时最常用到的是topicsubString

// topic , 过滤器 * 表示根据SQL不过滤、TAG-A || TAG-B表示根据TAG过滤
consumer.subscribe("saint-study-topic", "TAG-A || TAG-B");

创建订阅信息的时候,subString会被分割成TAG-A 、TAG-B,然后保存至tagsSet集合里,tag的hashcode会保存到codeSet集合里。

那么心跳包数据是如何组装的?我接着来看。

3、MQClientInstance#prepareHeartbeatData()

private HeartbeatData prepareHeartbeatData() {
    HeartbeatData heartbeatData = new HeartbeatData();

    // clientID
    heartbeatData.setClientID(this.clientId);

    // Consumer
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            ConsumerData consumerData = new ConsumerData();
            consumerData.setGroupName(impl.groupName());
            consumerData.setConsumeType(impl.consumeType());
            consumerData.setMessageModel(impl.messageModel());
            consumerData.setConsumeFromWhere(impl.consumeFromWhere());
            consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
            consumerData.setUnitMode(impl.isUnitMode());

            heartbeatData.getConsumerDataSet().add(consumerData);
        }
    }

    // Producer
    for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
        MQProducerInner impl = entry.getValue();
        if (impl != null) {
            ProducerData producerData = new ProducerData();
            producerData.setGroupName(entry.getKey());

            heartbeatData.getProducerDataSet().add(producerData);
        }
    }

    return heartbeatData;
}

1)以准备Consumer的心跳信息来看:

其遍历MQClientInstance的属性consumerTable.entrySet(),获取到MQConsumerInner信息,然后将其填充到consumerData中。

DefaultMQPushConsumerImplement#start()方法–消费者启动时,会调用MQClientInstance#registerConsumer()方法,将消费者信息(含订阅信息)填充consumerTable

在这里插入图片描述
在这里插入图片描述
咦,这里只是把DefaultMQPushConsumerImpl作为MQConsumerInner传入到了MQClientInstance#registerConsumer,订阅信息在哪里可以看到撒?

往心跳包HeartbeatData的consumerData属性中填充的是MQConsumerInner#subscriptions()方法的返回值

那我们就看一下DefaultMQPushConsumerImplement#subscriptions()方法:
在这里插入图片描述
原来是取的负载均衡服务RebalanceImpl中的subscriptionInner属性,那RebalanceImplsubscriptionInner属性又是怎么填充的?

既然是订阅信息,会不会和我们的subscribe()订阅操作有关呢?

我们看一下DefaultMQPushConsumerImpl#subscribe()方法:
在这里插入图片描述
果然是这样填充的消费者订阅者信息。
下面是组装SubscriptionData订阅信息的代码:

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
    String subString) throws Exception {
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);

    if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
        subscriptionData.setSubString(SubscriptionData.SUB_ALL);
    } else {
        String[] tags = subString.split("\\|\\|");
        if (tags.length > 0) {
            for (String tag : tags) {
                if (tag.length() > 0) {
                    String trimString = tag.trim();
                    if (trimString.length() > 0) {
                        subscriptionData.getTagsSet().add(trimString);
                        subscriptionData.getCodeSet().add(trimString.hashCode());
                    }
                }
            }
        } else {
            throw new Exception("subString split error");
        }
    }

    return subscriptionData;
}

2)以准备Producer的心跳信息来看:

这里和Consumer的心跳信息来源类似。

其遍历MQClientInstance的属性producerTable.entrySet(),获取到MQProducerInner信息,然后将其填充到producerData中。

DefaultMQProducerImpl#start()方法 生产者启动时,会调用MQClientInstance#registerProducer()方法,将生产者信息填充到producerTable中。

在这里插入图片描述
在这里插入图片描述

我们接着看一下MQClientInstance#sendHearbeat()是如何发送心跳的?

4、MQClientInstance#sendHearbeat()

1、封装请求,包括:请求编码为HEART_BEAT、编程语言为Java、心跳包HeartBeatData;
2、调用远程服务类NettyRemotingClient通过Netty发送心跳。

在这里插入图片描述

@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 发送心跳前,执行的钩子函数
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // 发送心跳
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            // 发送心跳后,执行的钩子函数
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

对于RocketMQ是如何通过Netty通信的,不是本文的重点,后续专文分析。

在这里插入图片描述

三、总结

以上所有分析相关的源码注释请见GitHub中的release-4.8.0分支:https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0
在这里插入图片描述

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-11-24 07:48:53  更:2021-11-24 07:50:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 1:48:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码