🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2022年5月24日
🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ?留言 📝 都是我最大的动力!
消息投递模型
在前几篇文章里我曾经也画过消息投递的模型图,这里再来简单复习一下:
-
消息生产者集群从注册中心获取到路由信息(负载均衡),然后将消息发送给Broker 集群 -
注册中心是无状态集群,即每一台服务器都不影响其他的服务器。Broker 会同时向所有的注册中心服务器里发送注册信息 -
注册中心存储的是Topic 、Queue 、IP地址等信息,正常情况下每台机器存储的应该是相同的 -
Broker采用主从架构提供服务,主服务器负责写入操作,从服务器负责处理读请求
消息投递流程
消息发送的时序图如下图所示: Producer 首先要知道向哪个Broker 发送消息,所以具体流程如下:
Producer 先从本地尝试获取路由信息- 本地无缓存的路由信息时,从注册中心中获取路由信息,并缓存到本地
- 获取到的路由信息包含了
Topic 下的所有Queue ,Producer 就可以采取负载均衡策略把消息发送到某个队列里 Producer 发送消息到Broker 成功之后,服务器就会返回消息发送成功对象SendResult
消息投递方法链
下面以时序图的形式展示了从获取路由表到消息投递过程的整体方法调用链:
上图涉及到的核心API如下:
DefaultMQProducer#send(Message msg);
DefaultMQProducer#send(Message msg, long timeout);
DefaultMQProducer#sendDefaultImpl(Message msg, CommunicationMode mode, long timeout);
DefaultMQProducerImpl#tryToFindTopicPublishInfo(String topic);
MQClientInstance#updateTopicRouteInfoFromNameServer(String topic);
MQClientInstance#updateTopicRouteInfoFromNameServer(String topic, Boolean isDefault, MQDefaultProducer mqDefaultProducer);
DefaultMQProducerImpl#selectOneMessageQueue(TopicPublishInfo topic, String lastBrokerName);
DefaultMQProducerImpl#sendKernelImpl(Message msg, MessageQueue queue);
接下来我们进行源码级分析,可以对照上图学习:
SendResult
如果消息发送成功,会返回一个SendResult 对象:
public class SendResult {
private SendStatus sendStatus;
private String msgId;
private MessageQueue messageQueue;
private long queueOffset;
private String transactionId;
private String offsetMsgId;
private String regionId;
}
其中SendStatus 是一个枚举值:
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
- SEND_OK:消息发送成功且存储同步成功
- FLUSH_DISK_TIMEOUT:消息发送成功但存储失败
- FLUSH_SLAVE_TIMEOUT:消息发送成功但slave节点超时
- SLAVE_NOT_AVAILABLE:消息发送成功但slave节点不可用
消息投递源码解析
Producer发送消息
DefaultMQProducer 发送消息类模型:
MQAdmin :MQ管理的基类ClientConfig :客户端配置类DefaultMQProducer :消息生产者
使用Producer 发送消息,具体编码实现方式如下:
- 创建
DefaultMQProducer ,传入指定发送消息所在组 - 设置注册中心地址,
Producer 会从里面获取到Topic 以及队列 - 发送消息
发送消息时必须指定Topic ,消息标签,消息体
package com.wjw;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
public class MQProducerA {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group-A");
producer.setNamesrvAddr("localhost");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic-A", "tagA", ("Hello MQ " + i)
.getBytes(RemotingHelper.DEFAULT_CHARSET));
message.setDelayTimeLevel(6);
SendResult result = producer.send(message);
System.out.println("发送消息结果:" + result);
}
producer.shutdown();
}
}
DefaultMQProducer
发送消息的producer.send() 方法调用的是DefaultMQPrducer 里的send 方法:
这里又调用了defaultMQProducerImpl.send(msg) :
public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
}
defaultMQProducerImpl
使用defaultMQProducerImpl 的send 方法发送消息,这里的调用多传了一个超时时间参数,当producer 没有指定时,取默认值3000ms:
public class DefaultMQProducerImpl implements MQProducerInner {
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
}
sendDefaultImpl
上面调用的sendDefaultImpl 方法需要做下面几件事:
- 获取消息路由信息,包含Topic下的队列和IP信息
- 选择要发送到的消息队列,这个过程会采用负载均衡策略选择一个队列进行消息存储
- 发送消息(
sendKernelImpl )并返回结果
核心逻辑我已经标注在下面的代码片段里,非核心代码已省略
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
continue;
} catch (MQClientException e) {
continue;
} catch (MQBrokerException e) {
} catch (InterruptedException e) {
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NoNameServerException);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NotFoundTopicException);
}
}
sendKernelImpl
其实看函数名就能看出来,这是发送消息的核心方法。
- 根据
brokername 从本地缓存表brokerAddrTable 中获取Broker服务器的IP地址,如果无法从本地获取到Broker的地址,则去请求注册中心获取; - Broker会开启两个端口对外服务,如果开启VIP通道,则VIP端口号是原始端口号 - 2
- 构造
RequestHeader 请求头 - 根据同步策略发送消息,
ONEWAY 表示单向消息无需返回结果,发送失败会抛异常
核心逻辑我已经标注在下面的代码片段里,非核心代码已省略
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
} catch (MQBrokerException e) {
} catch (InterruptedException e) {
} finally {
msg.setBody(prevBody);
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
该方法调用了sendMessage 执行真正的发送逻辑:
sendMessage
- 构建消息发送的请求对象
sendMessageRequestHeader - 使用
RemotingCommand 创建请求指令并设置参数 - 发起远程调用请求,实现消息发送
- 消息发送模式为
ONEWAY 时,消息只会单向发送一次 - 消息发送模式为
ASYNC 时,如果消息发送失败,会根据重试次数重发消息 - 消息发送模式为
SYNC 时,直接发送消息,不重试
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null;
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
default:
assert false;
break;
}
return null;
}
这里调用了remotingClient 客户端远程调用Broker 服务发送消息
|