一、前言
RocketMQ所有的心跳机制: 1)Producer端:
- Producer与NameSrv随机建立长连接,定期从NameSrv获取topic路由信息;
- Producer与Broker的Master结点建立长连接,用于发送消息;
- 此外Producer与Master维持了一个心跳。
2)ConSumer端:
- Conumser与NamseSrv随机建立长连接,定期从NameSrv获取topic路由信息;
- Consumer与Broker的Master和Slave结点建立长连接,用于订阅消息;
- 此外Consumer与Master和slave维持一个心跳。
二、客户端发送心跳
(1)Producer和Consumer通过MQClientInstance 的sendHeartbeatToAllBrokerWithLock() 方法实现发送心跳请求;
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
}
我们看到sendHeartbeanToAllBrokerWithLock() 方法中在sendHeartbeatToAllBroker() 之前加了锁,这是因为点啥嘞?
1、RocketMQ对底层进行通信的MQClientInstance 进行了复用,即在同一个jvm 里的不同的Consumer下面使用的都是同一个MQClientInstance 。 2、既然是复用的,那么就可能存在并发,因此这里进行了上锁操作。 3、所以这里是为了防止心跳错乱 。
(2)另外在MQClientInstance启动时会启动会调用startScheduledTask()方法,开始一堆定时任务,其中包括:定期默认每30s发送心跳信息到Broker。
MQClientInstance类源码:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
.......
this.startScheduledTask();
.......
}
}
}
startScheduledTask() 方法启动定时任务:
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
}
我们来看看sendHeartbeatToAllBroker()做了什么?
1、MQClientInstance#sendHeartbeatToAllBroker()
1.准备心跳信息HeartbeatData ,如果心跳信息为空,直接返回; 2. 遍历所有的Broker,尝试向所有的Broker发送心跳包 ;
注意:根据客户端的类型(Producer、Consumer)不同,发送到的Broker对象会又差别。 1、如果启动的是生产者 ,那么心跳保证消费者的相关信息为空,这时只会向Broker的Mater节点发送心跳 ;因为RocketMQ中主要Master的Broker才能处理写请求。 2、如果启动的是消费者,则会向所有的Broker发送心跳。
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return;
}
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
}
}
}
RocketMQ中客户端和服务端的通信通过Netty实现,这里我们的客户端是Consumer/Producer、服务端是Broker。
我们先看一下RocketMQ是如何准备心跳包数据的?
2、心跳包HeartBeatData
心跳包内容包括:客户端id、生产者信息、消费者信息;
一般情况下生产者信息和消费者信息是互斥的,producerDataSet和consumerDataSet会有一个为空。但如果一个应用既是生产者,也是消费者,那么这种情况下producerDataSet和consumerDataSet都不为空。
public class HeartbeatData extends RemotingSerializable {
private String clientID;
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
......
}
我们分别看一下生产者信息和消费者信息都包括什么?
1)生产者信息ProducerData
不能再简单了,就一个生产者组的名称。
public class ProducerData {
private String groupName;
}
2)消费者信息ConsumerData
ConsumerData消费者信息包括:
groupName 消费类型 :push/pull消息传播方式 :集群还是广播启动消费者时从哪开始消费 订阅信息SubscriptionData :过滤消息相关标签、SQL规则等。
public class ConsumerData {
private String groupName;
private ConsumeType consumeType;
private MessageModel messageModel;
private ConsumeFromWhere consumeFromWhere;
private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
}
在我们日常写代码时,这些属性很常见、经常会配置到。SubscriptionData 是我们的消费者订阅信息, 其内容如下:
public class SubscriptionData implements Comparable<SubscriptionData> {
public final static String SUB_ALL = "*";
private boolean classFilterMode = false;
private String topic;
private String subString;
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis();
private String expressionType = ExpressionType.TAG;
@JSONField(serialize = false)
private String filterClassSource;
}
这里面,我们平时最常用到的是topic 、subString 。
consumer.subscribe("saint-study-topic", "TAG-A || TAG-B");
创建订阅信息 的时候,subString 会被分割 成TAG-A 、TAG-B,然后保存至tagsSet 集合里,tag的hashcode 会保存到codeSet 集合里。
那么心跳包数据是如何组装的?我接着来看。
3、MQClientInstance#prepareHeartbeatData()
private HeartbeatData prepareHeartbeatData() {
HeartbeatData heartbeatData = new HeartbeatData();
heartbeatData.setClientID(this.clientId);
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName(impl.groupName());
consumerData.setConsumeType(impl.consumeType());
consumerData.setMessageModel(impl.messageModel());
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
for (Map.Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}
1)以准备Consumer的心跳信息来看:
其遍历MQClientInstance 的属性consumerTable.entrySet() ,获取到MQConsumerInner 信息,然后将其填充到consumerData中。
DefaultMQPushConsumerImplement#start()方法–消费者启动 时,会调用MQClientInstance#registerConsumer() 方法,将消费者信息(含订阅信息)填充 到consumerTable 。
咦,这里只是把DefaultMQPushConsumerImpl作为MQConsumerInner传入到了MQClientInstance#registerConsumer,订阅信息在哪里 可以看到撒?
往心跳包HeartbeatData的consumerData属性中填充的是MQConsumerInner#subscriptions() 方法的返回值 。
那我们就看一下DefaultMQPushConsumerImplement#subscriptions() 方法: 原来是取的负载均衡服务RebalanceImpl 中的subscriptionInner 属性,那RebalanceImpl 的subscriptionInner 属性又是怎么填充的?
既然是订阅信息,会不会和我们的subscribe()订阅操作有关呢?
我们看一下DefaultMQPushConsumerImpl#subscribe() 方法: 果然是这样填充的消费者订阅者信息。 下面是组装SubscriptionData 订阅信息的代码:
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
2)以准备Producer的心跳信息来看:
这里和Consumer的心跳信息来源类似。
其遍历MQClientInstance 的属性producerTable.entrySet() ,获取到MQProducerInner 信息,然后将其填充到producerData中。
DefaultMQProducerImpl#start()方法 生产者启动 时,会调用MQClientInstance#registerProducer() 方法,将生产者信息填充到producerTable 中。
我们接着看一下MQClientInstance#sendHearbeat() 是如何发送心跳的?
4、MQClientInstance#sendHearbeat()
1、封装请求,包括:请求编码为HEART_BEAT、编程语言为Java、心跳包HeartBeatData; 2、调用远程服务类NettyRemotingClient 通过Netty发送心跳。
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
对于RocketMQ是如何通过Netty通信的,不是本文的重点,后续专文分析。
三、总结
以上所有分析相关的源码注释请见GitHub中的release-4.8.0分支:https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0
|