由于日常开发中遇到几次使用延时消息的场景,而且目前业务中使用到的消息中间件有rabbitmq和kafka,对延时消息的支持都不太理想。 其中
- rabbitmq 延时消息是通过 设置队列ttl+死信exchange实现
- 缺点嘛:每次都得设置两个队列,一个用来实现延时,过期后经死信exchange转到对应的业务队列提供消费。
- 另:rabbitmq有提供延时插件,但缺点较多,如:1. 启动插件要么重启,要么引入一个新的集群;2. 不支持高可用,延时消息发送前只存储在当前broker节点的内部数据库Mnesia中,不会被镜像复制;3. 延时不可靠,存在消息数量较大或使用很久后延迟不准确;4:不支持大规模消息,同3;5:只支持ram节点(Mnesia数据库存在磁盘中)
- kafka 延时消息通过在消费端判断消息是否达到消费时间,决定是否进行消费实现。未达到延时时间则暂停消费。
- 缺点:针对单个topic固定的延时时间。 需要额外在消费端进行开发(虽然有同事写了现成的注解就是。) (实际上这种在消费端控制延时的方式大部分消息队列都能做到)
无论是rabbitmq的死信还是kafka消费端控制,基本上都是每个topic只能使用固定的延时时间。但现实中,也存在一些同一个业务场景使用不同延时时间的消息的场景:
- 考试结束后强制交卷。不同的考试规定的时间是不一样的,不可能每个考试都创建一个新的队列。
- 一些异常的重试操作。执行某个操作失败后,需要多次不同等级的延时重试。(虽说这个用一个本地线程也可以,但是在同一台机器上延时重试,仍然存在较大可能失败。所以比较关键场景可以使用延时消息,分发到其他机器上执行。)
于是调研了一下其他的消息中间件,发现rocketmq貌似是支持延时消息的,虽然也只有固定的18个延时等级,但相比rabbitmq和kafka固定的延时消息,要好很多了。于是开始学习探究rocketmq延时消息的实现。
正文开始
先了解一下rocketmq
rocketmq 的架构
整个架构如图,简单描述一下:
- nameServer 提供注册中心的服务,负责broker的管理,以及topic路由信息的管理。
- brokerServer 则主要负责消息的存储、投递和查询及高可用。
- Producer 连接nameServer获取到broker信息后,发送信息到对应的broker。
- Consumer 同样先连接 nameServer,查询topic路由信息,然后连接broker消费消息。
消息的存储
如图,rocketmq 的所有消息都存储在 commitlog 中,然后ConsumerQueue作为逻辑消费队列,维护一个topic消息的索引,记录topic内消息在commitlog中的一些信息。其中 ConsumeQueue的存储单元为 8字节的offset+4字节的size+8字节的tags hashcode, 对于延时消息,最后8字节则用于存储消息计划投递时间。
然后关于rocketmq的延时消息的使用
rocketmq 只支持固定18个等级的延时消息: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
发送延时消息只需要 setDelayTimeLevel 就可以,(连个延时等级相关的常量都没有。。。)
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
然后关于延时消息的实现
先看下流程图
然后文字+代码介绍
-
org.apache.rocketmq.store.CommitLog#putMessage 通过查看putMessage 源码可以得知,rocketmq在最终将message存入commitlog时,会先判断是否延时消息,如果延时消息则替换topic为SCHEDULE_TOPIC_XXXX ,并将原topic存入message.properties,然后根据延时level存入指定的queue。(18个延时等级分别对应18个queue的id)。 ...
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
...
-
org.apache.rocketmq.store.schedule.ScheduleMessageService#start 延时发送的代码,启动1s后,对已创建的delayQueue启动一个投递延时消息的任务,然后根据offset批量拉取对应的消息,判断是否到达投递时间,未到达则使用timer延时对应时长后启动下一次投递任务;到达投递时间则恢复原始topic和queueid并调用writeMessageStore.putMessage(msgInner) 将消息再次投递到commitlog中,然后投递下一个消息。 public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
-
org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup 延时任务处理逻辑 先获取当前延时等级的ConsumeQueue逻辑消费队列。然后根据传入的offset获取消息offset,tagsCode(延时消息存的是计划投递时间)。然后判断消息是否到期,到期则根据offset从commitLog中取出消息内容,并将其投递到原始topic中;如果未到期则再次在timer中添加一个延时任务(延时时间为计划投递的时间)。然后继续处理下一条记录。 public void executeOnTimeup() {
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
....
} catch (Exception e) {
.....
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
}
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
总结
rocketmq 先将不同延时等级的消息存入内部对应延时队列中,然后不断的从延时队列中拉取消息判断是否到期,然后进行投递到对应的topic中。
通过固定延时等级的方式,同一个队列中的消息都是相同的延时等级,不需要对消息进行排序,只需要按顺序拉取消息判断是否可以投递就行了。但也限制了延时时间。
另外,因为只要延时消息存入延时队列中,就会写入commitlog文件中,然后rocketmq的高可用(同步复制或异步复制)就会将消息复制到slave中,从而保证延时消息的可靠性。
虽然rocketmq不支持任意延时时间,但相比于rabbitmq的死信消息,仍然提供了18个延时等级,基本也能覆盖很多场景了。
另外:后面又看到了去哪了开源的qmq,貌似支持任意延迟时间,感觉也可以学习一波。
|