IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Rocketmq消费消息原理 -> 正文阅读

[Java知识库]Rocketmq消费消息原理

????????在消息系统中,消费者消费消息有拉和推消息两种实现方式,拉消息是消费者主动向消息服务器发送拉消息请求,消息服务器将消息返回给消费者,而推消息是消息服务器主动向消费者推送消息的形式,这两种消息消费实现各有各的优势和劣势。

??????? Rocketmq中采用长轮训的机制来实现消息消费功能。长轮训模式兼顾了拉和推消息的优势。

从整体看下org.apache.rocketmq.client.consumer.DefaultMQPushConsumer消费者实现类在消费客户端的启动流程。

消费者实例启动大致流程:

1、订阅topic信息

2、初始化基础资源,负载均衡器,offset存储器,集群消息存broker,广播消息存本地

3、2/min获取nameserver,30/stopic路由信息(队列信息,broker信息

4、30/s发送心跳包(topic,offset)到broker

5、5/s持久化offset

6、20/s 重新负载均衡计算,广播模式清除无效的队列,新增新的消息队列,集群模式除了更新最新的消息队列,还要根据cid分配最新的消息队列。?


消费者拉取消息源码说明

消费者启动过程中和负载均衡之后,会将需要拉取消息的队列组织成任务信息,加入本地缓存?

1、根据topic找到队列信息

2、根据过滤规则,构建SubscriptionData?

3、向broker发起请求查询消息,构造请求头?

PullMessageRequestHeader?requestHeader?=?new?PullMessageRequestHeader(); // 消费者组           requestHeader.setConsumerGroup(this.consumerGroup);            requestHeader.setTopic(mq.getTopic()); //需要消费的消息队列id           requestHeader.setQueueId(mq.getQueueId()); //消费开始的偏移量            requestHeader.setQueueOffset(offset); //最大消息条数            requestHeader.setMaxMsgNums(maxNums);            requestHeader.setSysFlag(sysFlagInner); //已经消费成功的偏移量            requestHeader.setCommitOffset(commitOffset);//长轮训参数,毫秒,broker暂停多少毫秒再返回            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);  //tag等订阅规则            requestHeader.setSubscription(subExpression); //订阅版本            requestHeader.setSubVersion(subVersion);?//订阅类型????????????requestHeader.setExpressionType(expressionType);//构造请求体RemotingCommand?request?=?RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,?requestHeader);

4、返回结果接收?

broker拉取消息处理源码

broker处理入口在

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest()方法。

以前的方式查找消息

public?SelectMappedBufferResult?selectMappedBuffer(int?pos)?{        int readPosition = getReadPosition();        if (pos < readPosition && pos >= 0) {            if (this.hold()) {                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();                byteBuffer.position(pos);                int size = readPosition - pos;                //操作byteBuffer                ByteBuffer byteBufferNew = byteBuffer.slice();                byteBufferNew.limit(size);                return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);            }????????}        return null;????}

rocketmq接入了raft协议后的实现

????public?SelectMappedBufferResult?getMessage(final?long?offset,?final?int?size)?{        if (offset < dividedCommitlogOffset) {            return super.getMessage(offset, size);        }????????int?mappedFileSize?=?this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();       //返回mmap,结合write方法实现零拷贝。        MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);        if (mappedFile != null) {            int pos = (int) (offset % mappedFileSize);            return  convertSbr(mappedFile.selectMappedBuffer(pos, size));        }        return null;    }

计算下次偏移量,并判断下次是否需要从slave节点读消息

//下次拉取消息的新偏移量nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//计算下次是否要从slave读取数据// 物理实际偏移量-已经拉取的偏移量=剩余还有多少没有读取的数据long diff = maxOffsetPy - maxPhyOffsetPulling;//物理内存*40%=物理内存的百分之40long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));//如果还要读取的数据大于物理内存百分之40,则需要重slave读取,判断消费慢。 getResult.setSuggestPullingFromSlave(diff > memory);

零拷贝技术体现:消息内容写到socket缓冲区

????public?long?transferTo(WritableByteChannel?target,?long?position)?throws?IOException?{        if (this.byteBufferHeader.hasRemaining()) {            transferred += target.write(this.byteBufferHeader);            return transferred;        } else {            List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();            for (ByteBuffer bb : messageBufferList) {                if (bb.hasRemaining()) {//调用java.nio.channels.WritableByteChannel#write                                   transferred += target.write(bb);                    return transferred;                }            }????????}        return 0;    }

长轮训实现原理?

如果没查找到消息,则进入长轮训逻辑判断?

case?ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {//使用客户端的轮训时间                  long pollingTimeMills = suspendTimeoutMillisLong;    //如果broker未开启长轮训开关,使用短轮训时间    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();????}    String topic = requestHeader.getTopic();    long offset = requestHeader.getQueueOffset();    int queueId = requestHeader.getQueueId();    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,        this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);        //停止当前请求,实际是将当前请求放入到队列ArrayList,等待线程池调度    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);    response = null;    break;}

当前请求加入等待队列?

public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {        String key = this.buildKey(topic, queueId);        ManyPullRequest mpr = this.pullRequestTable.get(key);        if (null == mpr) {            mpr = new ManyPullRequest();            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);            if (prev != null) {                mpr = prev;            }        }?        mpr.addPullRequest(pullRequest);    }

通过CountDownLatch进行wait,LockSupport.wait(time) ,使当前线程进入等待状态

public void run() {        log.info("{} service started", this.getServiceName());        while (!this.isStopped()) {            try {            //park,需要写入新消息数据到缓冲区后进行唤醒,或者时间到了再缓存                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {                    this.waitForRunning(5 * 1000);                } else {                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());????????????????}                long beginLockTimestamp = this.systemClock.now();                this.checkHoldRequest();                long costTime = this.systemClock.now() - beginLockTimestamp;                if (costTime > 5 * 1000) {                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);                }            } catch (Throwable e) {                log.warn(this.getServiceName() + " service has exception. ", e);            }????????}        log.info("{} service end", this.getServiceName());    }

消费消息整体流程图

主要包括客户端和broker两边的处理:

1、客户端获取broker,queueId,offset,topic

2、向broker发送请求读消息

3、broker服务端参数校验

4、通过offset和通过ByteBuffer或者mmap技术读取消息

5、结合channel.write将消息写入socket缓冲区

6、如果没读取到消息,判断是否需要长轮训或短轮训,不立即返回客户端,兼顾轮训和push的优势。

7、通过LockSupport.park将暂停当前线程,将Request加入队列

8、如果写消息线程唤醒了这个线程,通过线程池异步执行Request并返回,不开启长轮训。

为什么Rocketmq采用长轮训拉取技术??

拉取方式的弊端:循环拉取的间隔不好设定,间隔太短,处于忙等状态,浪费资源,空拉取,间隔太长,消息不能及时处理

推送消息机制:即服务端有数据之后立马推送消息给客户端,需要客户端和服务器建立长连接,实时性很高,对客户端来说也简单,接收处理消息即可;缺点就是服务端不知道客户端处理消息的能力,可能会导致数据积压,同时也增加了服务端的工作量,影响服务端的性能;

基于长轮训的拉取模式:RocketMQ使用了长轮询的方式,兼顾了push和pull两种模式的优点,如果broker队列中没有消息,服务端将自旋3次,阻塞客户端连接,将客户端请求记录下来,直到有数据或者超时时间过了才返回请求。?


?

总结:

Rocketmq分布式消息队列? 就是一个 分布式数据库,天然支持分库分表 读写分离

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-09-11 18:40:35  更:2021-09-11 18:42:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 17:20:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码