IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RocketMQ 消息投递解析—— 时序图、调用链、源码级解析 -> 正文阅读

[Java知识库]RocketMQ 消息投递解析—— 时序图、调用链、源码级解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2022年5月24日

🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ?留言 📝 都是我最大的动力!

消息投递模型

在前几篇文章里我曾经也画过消息投递的模型图,这里再来简单复习一下:
在这里插入图片描述

  • 消息生产者集群从注册中心获取到路由信息(负载均衡),然后将消息发送给Broker集群

  • 注册中心是无状态集群,即每一台服务器都不影响其他的服务器。Broker会同时向所有的注册中心服务器里发送注册信息

  • 注册中心存储的是TopicQueue、IP地址等信息,正常情况下每台机器存储的应该是相同的

  • Broker采用主从架构提供服务,主服务器负责写入操作,从服务器负责处理读请求



消息投递流程

消息发送的时序图如下图所示:
在这里插入图片描述
Producer首先要知道向哪个Broker发送消息,所以具体流程如下:

  1. Producer先从本地尝试获取路由信息
  2. 本地无缓存的路由信息时,从注册中心中获取路由信息,并缓存到本地
  3. 获取到的路由信息包含了Topic下的所有QueueProducer就可以采取负载均衡策略把消息发送到某个队列里
  4. Producer发送消息到Broker成功之后,服务器就会返回消息发送成功对象SendResult



消息投递方法链

下面以时序图的形式展示了从获取路由表到消息投递过程的整体方法调用链:

请添加图片描述
上图涉及到的核心API如下:

// 发送消息
DefaultMQProducer#send(Message msg);
// 发送消息,增加超时时间
DefaultMQProducer#send(Message msg, long timeout);
// 发送消息,增加发送消息的模式(异步/同步)
DefaultMQProducer#sendDefaultImpl(Message msg, CommunicationMode mode, long timeout);
// 查询消息发送的路由信息
DefaultMQProducerImpl#tryToFindTopicPublishInfo(String topic);
// 根据topic的名称更新注册中心的路由信息
MQClientInstance#updateTopicRouteInfoFromNameServer(String topic);
// 根据topic的名称更新注册中心的路由信息,并获取路由信息
MQClientInstance#updateTopicRouteInfoFromNameServer(String topic, Boolean isDefault, MQDefaultProducer mqDefaultProducer);
// 根据负载均衡算法,选择一个队列进行消息发送
DefaultMQProducerImpl#selectOneMessageQueue(TopicPublishInfo topic, String lastBrokerName);
// 发送消息
DefaultMQProducerImpl#sendKernelImpl(Message msg, MessageQueue queue);

接下来我们进行源码级分析,可以对照上图学习:

SendResult

如果消息发送成功,会返回一个SendResult对象:

/**
* 发送消息结果
*/
public class SendResult {
	/**
	* 发送消息结果状态
	*/
    private SendStatus sendStatus;
    /**
	* 消息的唯一key,由Client发送消息时生成
	*/
    private String msgId;
    /**
	* 消息队列
	*/
    private MessageQueue messageQueue;
    /**
	* 消息队列偏移量
	*/
    private long queueOffset;
    /**
	* 事务ID
	*/
    private String transactionId;
    /**
	* 下一条消息的偏移量
	*/
    private String offsetMsgId;
    /**
	* 区域ID
	*/
    private String regionId;
}

其中SendStatus是一个枚举值:

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}
  • SEND_OK:消息发送成功且存储同步成功
  • FLUSH_DISK_TIMEOUT:消息发送成功但存储失败
  • FLUSH_SLAVE_TIMEOUT:消息发送成功但slave节点超时
  • SLAVE_NOT_AVAILABLE:消息发送成功但slave节点不可用



消息投递源码解析

Producer发送消息

DefaultMQProducer发送消息类模型:
在这里插入图片描述

  • MQAdmin:MQ管理的基类
  • ClientConfig:客户端配置类
  • DefaultMQProducer:消息生产者

使用Producer发送消息,具体编码实现方式如下:

  1. 创建DefaultMQProducer,传入指定发送消息所在组
  2. 设置注册中心地址,Producer会从里面获取到Topic以及队列
  3. 发送消息

发送消息时必须指定Topic,消息标签,消息体

package com.wjw;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;

public class MQProducerA {
    public static void main(String[] args) throws Exception {
        // 创建消息生产者,指定组
        DefaultMQProducer producer = new DefaultMQProducer("group-A");
        // 设置注册中心地址
        producer.setNamesrvAddr("localhost");

        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息对象
            Message message = new Message("topic-A", "tagA", ("Hello MQ " + i)
                    .getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 设置消息延时级别
            message.setDelayTimeLevel(6);
            // 发送消息
            SendResult result = producer.send(message);
            System.out.println("发送消息结果:" + result);
        }
        producer.shutdown();
    }
}

DefaultMQProducer

发送消息的producer.send()方法调用的是DefaultMQPrducer里的send方法:

在这里插入图片描述
在这里插入图片描述
这里又调用了defaultMQProducerImpl.send(msg)

public class DefaultMQProducer extends ClientConfig implements MQProducer {

	// ...
	
	@Override
    public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }
    
	// ...
	
}

defaultMQProducerImpl

使用defaultMQProducerImplsend方法发送消息,这里的调用多传了一个超时时间参数,当producer没有指定时,取默认值3000ms:

public class DefaultMQProducerImpl implements MQProducerInner {

	// ...

	public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		// 发送消息,指定消息发送的超时时间
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }

	public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		// 发送消息,指定消息发送类型:同步 or 异步,超时时间
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }

	// ...

}

在这里插入图片描述

sendDefaultImpl

上面调用的sendDefaultImpl方法需要做下面几件事:

  1. 获取消息路由信息,包含Topic下的队列和IP信息
  2. 选择要发送到的消息队列,这个过程会采用负载均衡策略选择一个队列进行消息存储
  3. 发送消息(sendKernelImpl)并返回结果

核心逻辑我已经标注在下面的代码片段里,非核心代码已省略

public class DefaultMQProducerImpl implements MQProducerInner {

	// ...

	/**
     * 发送消息
     *
     * @param msg                   消息
     * @param communicationMode     通信模式
     * @param sendCallback          发送回调
     * @param timeout               请求超时时间
     * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    private SendResult sendDefaultImpl(//
                                       Message msg, //
                                       final CommunicationMode communicationMode, //
                                       final SendCallback sendCallback, //
                                       final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 确保MQ服务正在运行
        this.makeSureStateOK();
        // 检查消息、Topic、消息体是否为空且满足系统要求
        Validators.checkMessage(msg, this.defaultMQProducer);

        // ...

        // 获取Broker的路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 根据消息是否是同步的,确定总的发送时间
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            // 循环调用发送消息方法,直到成功或超时
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 选择消息要发送到的队列,默认策略是轮流发送,当发送失败时,按顺序发送到下一个Broker的MessageQueue
                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        // 发送消息核心方法
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        // 更新Broker的可用性信息,当发送时间超时时会有30s的不可用时长。只有开启了延迟容错机制才生效
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                // 同步没有发送成功 且 配置了存储异常重新发送时,重试
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                // 返回发送结果
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        /** (省略)异常处理逻辑 **/
                        continue;
                    } catch (MQClientException e) {
                        /** (省略)异常处理逻辑 **/
                        continue;
                    } catch (MQBrokerException e) {
                        /** (省略)异常处理逻辑 **/
                    } catch (InterruptedException e) {
                        /** (省略)异常处理逻辑 **/
                    }
                } else {
                    break;
                }
            } // end of for

            // 返回发送结果
            if (sendResult != null) {
                return sendResult;
            }

            /** (省略)异常处理逻辑 **/
        }

        // 获取不到注册中心的地址信息则抛出异常
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                    "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NoNameServerException);
        }

        // Topic为空则抛出异常
        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                null).setResponseCode(ClientErrorCode.NotFoundTopicException);
    }

	// ...

}

sendKernelImpl

其实看函数名就能看出来,这是发送消息的核心方法。

  1. 根据brokername从本地缓存表brokerAddrTable中获取Broker服务器的IP地址,如果无法从本地获取到Broker的地址,则去请求注册中心获取;
  2. Broker会开启两个端口对外服务,如果开启VIP通道,则VIP端口号是原始端口号 - 2
  3. 构造RequestHeader请求头
  4. 根据同步策略发送消息,ONEWAY表示单向消息无需返回结果,发送失败会抛异常

核心逻辑我已经标注在下面的代码片段里,非核心代码已省略

public class DefaultMQProducerImpl implements MQProducerInner {

	// ...

	/**
     * 发送消息核心方法,返回发送结果
     *
     * @param msg               消息
     * @param mq                消息队列
     * @param communicationMode 通信模式
     * @param sendCallback      发送回调
     * @param topicPublishInfo  Topic信息
     * @param timeout           超时时间
     * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    private SendResult sendKernelImpl(final Message msg, //
                                      final MessageQueue mq, //
                                      final CommunicationMode communicationMode, //
                                      final SendCallback sendCallback, //
                                      final TopicPublishInfo topicPublishInfo, //
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 根据broker name查询Broker的地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        // 本地缓存为空则从注册中心查询Broker的地址
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            // 是否使用VIP channel
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {

                // ......省略部分逻辑

                // 构造请求头
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                // 设置producer的group
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                // 设置topic名称
                requestHeader.setTopic(msg.getTopic());
                // 设置默认topic
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                // 设置topic queue的数量
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                // 设置queue的id
                requestHeader.setQueueId(mq.getQueueId());
                // 设置系统标记
                requestHeader.setSysFlag(sysFlag);
                // 设置消息创建时间
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                // 设置消息属性
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 设置被消费过的次数
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                // 如果topic是"%RETRY%"表示消息重发
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                // 根据消息发送的不同模式发送消息
                switch (communicationMode) {
                    case ASYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                                brokerAddr, // 1
                                mq.getBrokerName(), // 2
                                msg, // 3
                                requestHeader, // 4
                                timeout, // 5
                                communicationMode, // 6
                                sendCallback, // 7
                                topicPublishInfo, // 8
                                this.mQClientFactory, // 9
                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
                                context, //
                                this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                                brokerAddr, // 1
                                mq.getBrokerName(), // 2
                                msg, // 3
                                requestHeader, // 4
                                timeout, // 5
                                communicationMode, // 6
                                context,//
                                this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                /** 异常处理逻辑 **/
            } catch (MQBrokerException e) {
                /** 异常处理逻辑 **/
            } catch (InterruptedException e) {
                /** 异常处理逻辑 **/
            } finally {
                msg.setBody(prevBody);
            }
        }
        // broker不存在,抛异常
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

	// ...

}

该方法调用了sendMessage执行真正的发送逻辑:
在这里插入图片描述


sendMessage

  1. 构建消息发送的请求对象sendMessageRequestHeader
  2. 使用RemotingCommand创建请求指令并设置参数
  3. 发起远程调用请求,实现消息发送
  • 消息发送模式为ONEWAY时,消息只会单向发送一次
  • 消息发送模式为ASYNC时,如果消息发送失败,会根据重试次数重发消息
  • 消息发送模式为SYNC时,直接发送消息,不重试
/**
     * 发送消息,返回发送结果
     *
     * @param addr                      Broker地址
     * @param brokerName
     * @param msg                       消息
     * @param requestHeader             请求头
     * @param timeoutMillis             超时时间
     * @param communicationMode         通信模式
     * @param sendCallback              发送回调
     * @param topicPublishInfo          Topic信息
     * @param instance                  Client实例
     * @param retryTimesWhenSendFailed  最大重试次数
     * @param context                   发送消息context
     * @param producer
     * @return
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    public SendResult sendMessage(//
                         final String addr, // 1
                         final String brokerName, // 2
                         final Message msg, // 3
                         final SendMessageRequestHeader requestHeader, // 4
                         final long timeoutMillis, // 5
                         final CommunicationMode communicationMode, // 6
                         final SendCallback sendCallback, // 7
                         final TopicPublishInfo topicPublishInfo, // 8
                         final MQClientInstance instance, // 9
                         final int retryTimesWhenSendFailed, // 10
                         final SendMessageContext context, // 11
                         final DefaultMQProducerImpl producer // 12
    ) throws RemotingException, MQBrokerException, InterruptedException {
        // 创建请求,如果将sendSmartMsg设为true,可以将请求keey压缩,加快序列化
        RemotingCommand request = null;
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                // 基于Netty快速通信框架,发送消息给broker
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
            default:
                assert false;
                break;
        }

        return null;
    }

这里调用了remotingClient客户端远程调用Broker服务发送消息
在这里插入图片描述

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-05-25 11:32:41  更:2022-05-25 11:33:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 20:03:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码