????????在消息系统中,消费者消费消息有拉和推消息两种实现方式,拉消息是消费者主动向消息服务器发送拉消息请求,消息服务器将消息返回给消费者,而推消息是消息服务器主动向消费者推送消息的形式,这两种消息消费实现各有各的优势和劣势。
??????? Rocketmq中采用长轮训的机制来实现消息消费功能。长轮训模式兼顾了拉和推消息的优势。
从整体看下org.apache.rocketmq.client.consumer.DefaultMQPushConsumer消费者实现类在消费客户端的启动流程。
消费者实例启动大致流程:
1、订阅topic信息
2、初始化基础资源,负载均衡器,offset存储器,集群消息存broker,广播消息存本地
3、2/min获取nameserver,30/stopic路由信息(队列信息,broker信息
4、30/s发送心跳包(topic,offset)到broker
5、5/s持久化offset
6、20/s 重新负载均衡计算,广播模式清除无效的队列,新增新的消息队列,集群模式除了更新最新的消息队列,还要根据cid分配最新的消息队列。?
消费者拉取消息源码说明
消费者启动过程中和负载均衡之后,会将需要拉取消息的队列组织成任务信息,加入本地缓存?
1、根据topic找到队列信息
2、根据过滤规则,构建SubscriptionData?
3、向broker发起请求查询消息,构造请求头?
PullMessageRequestHeader?requestHeader?=?new?PullMessageRequestHeader(); // 消费者组 requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); //需要消费的消息队列id requestHeader.setQueueId(mq.getQueueId()); //消费开始的偏移量 requestHeader.setQueueOffset(offset); //最大消息条数 requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); //已经消费成功的偏移量 requestHeader.setCommitOffset(commitOffset); //长轮训参数,毫秒,broker暂停多少毫秒再返回 requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); //tag等订阅规则 requestHeader.setSubscription(subExpression); //订阅版本 requestHeader.setSubVersion(subVersion); ?//订阅类型??????????? ?requestHeader.setExpressionType(expressionType); //构造请求体 RemotingCommand?request?=?RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,?requestHeader);
4、返回结果接收?
broker拉取消息处理源码
broker处理入口在
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest()方法。
以前的方式查找消息
public?SelectMappedBufferResult?selectMappedBuffer(int?pos)?{ int readPosition = getReadPosition(); if (pos < readPosition && pos >= 0) { if (this.hold()) { ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos); int size = readPosition - pos; //操作byteBuffer ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } ????????} return null; ????}
rocketmq接入了raft协议后的实现
????public?SelectMappedBufferResult?getMessage(final?long?offset,?final?int?size)?{ if (offset < dividedCommitlogOffset) { return super.getMessage(offset, size); } ????????int?mappedFileSize?=?this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); //返回mmap,结合write方法实现零拷贝。 MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return convertSbr(mappedFile.selectMappedBuffer(pos, size)); } return null; }
计算下次偏移量,并判断下次是否需要从slave节点读消息
//下次拉取消息的新偏移量 nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); //计算下次是否要从slave读取数据 // 物理实际偏移量-已经拉取的偏移量=剩余还有多少没有读取的数据 long diff = maxOffsetPy - maxPhyOffsetPulling; //物理内存*40%=物理内存的百分之40 long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); //如果还要读取的数据大于物理内存百分之40,则需要重slave读取,判断消费慢。 getResult.setSuggestPullingFromSlave(diff > memory);
零拷贝技术体现:消息内容写到socket缓冲区
????public?long?transferTo(WritableByteChannel?target,?long?position)?throws?IOException?{ if (this.byteBufferHeader.hasRemaining()) { transferred += target.write(this.byteBufferHeader); return transferred; } else { List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList(); for (ByteBuffer bb : messageBufferList) { if (bb.hasRemaining()) { //调用java.nio.channels.WritableByteChannel#write transferred += target.write(bb); return transferred; } } ????????} return 0; }
长轮训实现原理?
如果没查找到消息,则进入长轮训逻辑判断?
case?ResponseCode.PULL_NOT_FOUND: if (brokerAllowSuspend && hasSuspendFlag) { //使用客户端的轮训时间 long pollingTimeMills = suspendTimeoutMillisLong; //如果broker未开启长轮训开关,使用短轮训时间 if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); ????} String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); //停止当前请求,实际是将当前请求放入到队列ArrayList,等待线程池调度 this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }
当前请求加入等待队列?
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest(); ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); if (prev != null) { mpr = prev; } } ? mpr.addPullRequest(pullRequest); }
通过CountDownLatch进行wait,LockSupport.wait(time) ,使当前线程进入等待状态
public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { //park,需要写入新消息数据到缓冲区后进行唤醒,或者时间到了再缓存 if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); ????????????????} long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } ????????} log.info("{} service end", this.getServiceName()); }
消费消息整体流程图
主要包括客户端和broker两边的处理:
1、客户端获取broker,queueId,offset,topic
2、向broker发送请求读消息
3、broker服务端参数校验
4、通过offset和通过ByteBuffer或者mmap技术读取消息
5、结合channel.write将消息写入socket缓冲区
6、如果没读取到消息,判断是否需要长轮训或短轮训,不立即返回客户端,兼顾轮训和push的优势。
7、通过LockSupport.park将暂停当前线程,将Request加入队列
8、如果写消息线程唤醒了这个线程,通过线程池异步执行Request并返回,不开启长轮训。
为什么Rocketmq采用长轮训拉取技术??
拉取方式的弊端:循环拉取的间隔不好设定,间隔太短,处于忙等状态,浪费资源,空拉取,间隔太长,消息不能及时处理
推送消息机制:即服务端有数据之后立马推送消息给客户端,需要客户端和服务器建立长连接,实时性很高,对客户端来说也简单,接收处理消息即可;缺点就是服务端不知道客户端处理消息的能力,可能会导致数据积压,同时也增加了服务端的工作量,影响服务端的性能;
基于长轮训的拉取模式:RocketMQ使用了长轮询的方式,兼顾了push和pull两种模式的优点,如果broker队列中没有消息,服务端将自旋3次,阻塞客户端连接,将客户端请求记录下来,直到有数据或者超时时间过了才返回请求。?
?
总结:
Rocketmq分布式消息队列? 就是一个 分布式数据库,天然支持分库分表 读写分离
|