1. 前言
在消息中间件中,消费者对于消费成功的消息,一般是需要返回ACK给Broker的,它的目的是让Broker知道消息已经被成功消费,不必再投递给其它消费者重试了。在RocketMQ中,这一过程的具体实现为「上报消费位点」,RocketMQ没有办法针对单个消息返回ACK,Consumer只能上报MessageQueue已经消费的消息偏移量。 ?
Consumer实例启动时,同一Group下所有的实例都会进行「重平衡」操作,给自身重新分配MessageQueue,默认的分配策略就是「平均分配」,这意味着,同一个MessageQueue只会被一个Consumer实例消费,即同一个MessageQueue只会有一个Consumer上报消费位点,不存在冲突问题。但是,Consumer实例是多线程并发消费消息的,请大家思考这样一个问题:**线程A和线程B并发消费同一个队列,拉取到的消息为M1、M2,消息对应的Offset为1、2,线程B成功消费了M2,但是线程A还未消费M1,此时Consumer该如何上报消费位点?**直接上报2的话,M1还没消费,一旦消费失败,M1存在丢失的可能,RocketMQ是如何做的呢? ?
2. OffsetStore
阅读RocketMQ的源码就会发现,OffsetStore是RocketMQ用来管理Consumer消费位点的接口,通过接口定义我们就能知道它具备哪些功能。
public interface OffsetStore {
void load() throws MQClientException;
void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
long readOffset(final MessageQueue mq, final ReadOffsetType type);
void persistAll(final Set<MessageQueue> mqs);
void persist(final MessageQueue mq);
void removeOffset(MessageQueue mq);
Map<MessageQueue, Long> cloneOffsetTable(String topic);
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
}
核心关注的方法有三个,本文会重点分析:
方法 | 说明 |
---|
load | 从磁盘加载消费位点,针对广播模式 | updateOffset | 更新队列消费位点 | persistAll | 持久化消费位点 |
RocketMQ消息消费模式有两种:集群消费和广播消费,对应的实现类分别是RemoteBrokerOffsetStore和LocalFileOffsetStore。从名字也能看得出来,集群模式下消费进度由Broker管理,广播模式下消费进度由客户端自己管理。
2.1 LocalFileOffsetStore
广播模式下,消息需要被所有的Consumer实例消费,因此每个实例的消费进度是不一样的,所以消费进度会交给Consumer客户端自己管理,消费进度会以Json的方式持久化到磁盘文件,默认的路径是home/.rocketmq_offsets/${ClientId}/${groupName}/offsets.json ,Consumer启动时会从本地磁盘文件加载,对应的方法是readLocalOffset() 。
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
String content = null;
try {
content = MixAll.file2String(this.storePath);
} catch (IOException e) {
log.warn("Load local offset store file exception", e);
}
if (null == content || content.length() == 0) {
return this.readLocalOffsetBak();
} else {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper =
OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception, and try to correct", e);
return this.readLocalOffsetBak();
}
return offsetSerializeWrapper;
}
}
从磁盘读取Json文件,然后将队列的消费进度放到Map容器中。
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null
&& offsetSerializeWrapper.getOffsetTable() != null) {
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
}
当Consumer成功消费消息后,会根据消息的Offset去更新消费位点。广播模式下,只需要修改Map容器里的值即可。
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
updateOffset 只是修改了内存中的数据,Consumer会启动定时任务,5秒钟进行一次持久化。广播模式下,持久化就是将Map容器中的数据以Json的方式写入磁盘文件,代码就不贴了。
2.2 RemoteBrokerOffsetStore
集群模式下,消息只需要被其中一个Consumer实例消费即可,相同ConsumerGroup消费进度是一致的,所以消费进度会交给Broker管理。 ?
由于消费进度是由Broker负责管理的,所有RemoteBrokerOffsetStore实现类的load() 方法是空的,什么也不做。 ?
当Consumer成功消费消息后,同样会调用updateOffset 方法更新队列消费位点,代码同上。此时只是将消费位点写入内存里的Map容器,并不会立即上报给Broker。因为消息的消费是一个很频繁的动作,如果每次消费都上报位点,对Consumer和Broker的压力都是很大的。 ?
如果updateOffset 后没有调用persistAll 方法服务就挂了,就会导致Broker认为那些已经消费的消息还没有被消费,从而造成消息的重复投递,不过没关系,RocketMQ本身是允许消息的重复投递的,它的服务质量是「至少一次,允许多次」。 ?
Consumer启动的定时任务会5秒进行一次持久化,对于集群模式而言,所谓的“持久化”就是上报队列的消费位点给Broker。在persistAll 方法中,Consumer会遍历offsetTable,按个将MessageQueue的消费位点进行上报。上报位点的方法是updateConsumeOffsetToBroker() 。上报消费位点需要和Broker交互,涉及到网络请求,默认使用「单向请求」,不关心上报是否成功,即使不成功也没关心,大不了消息重复消费嘛。
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
3. 上报流程
PullMessageService拉取到消息后会提交消费请求到ConsumeMessageService,唤醒消息消费线程开始消费消息,再根据消息消费状态去处理消费结果。消费成功就好处理了,直接更新消费位点即可。对于消费失败的情况,在广播模式下,消费失败的消息会记录日志然后直接丢弃;在集群模式下,消费失败的消息会回传给Broker,等待重新投递。
处理消费结果的方法是processConsumeResult ,ProcessQueue对象缓存了Consumer拉取到的消息,它底层采用TreeMap存储,Key是消息Offset,Value是消息。TreeMap用的是红黑树结构,它的Key是有序的,从小到大排序。消费成功的消息会从TreeMap中删除,Consumer每次上报的消费位点永远是TreeMap的最小的Key,即最小的消息Offset。这就解答了开篇提的问题,线程B虽然消费了M2,但是M1还没消费,TreeMap中的firstKey仍然是1,这时是不会上报消费位点2的,只有M1也被消费了,才会随着2一起上报。
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:// 广播模式下,直接丢掉
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:// 集群模式下,回传给Broker
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
消费失败且回传给Broker失败的消息,不会从ProcessQueue中删除,也就不会上报消费位点。
4. 总结
RocketMQ通过OffsetStore接口来管理消费进度,广播模式下由Consumer客户端自己管理,集群模式下由Broker负责管理。Consumer消费消息后仅仅会更新内存中的消费位点,由定时任务负责5秒进行一次持久化。广播模式的持久化是将数据写入磁盘文件,集群模式的持久化是将消费位点上报给Broker。 ?
Consumer底层采用TreeMap来存储拉取到的消息,Key是消息Offset、Value是消息本身,所以它是根据消息的Offset从小到达排列的,上报消费位点时,上报的总是firstKey,即最小Offset。并发消费时,即使Offset较大的消息先被消费了也没关系,Offset并不会被直接上报。这带来了另一个问题,存在漏报的可能,会导致消息重复消费,所以Consumer对于较敏感的数据,需要做好消费的幂等性。
|