前言
延迟消息对于RocketMQ来说实现这个操作挺简单的,不用像RabbitMQ那样需要设置死信队列,简化了客户端操作,但是对于RocketMQ开源版(也就是RocketMQ)的是不支持自定义延迟时间的,只能使用RocketMQ提供的1-18个延迟级别,但是阿里云ONS是可以设置自定义时间的,收费版的还是牛逼!
延迟消息实现原理
首先生产者发送一条消息费Broker,那么发送一条延迟消息是需要设置延迟等级的,然后这条消息会被写入到CommitLog中,那么写入到CommitLog中后会分发到相应的Queue中,那么在投递给Queue时,会先判断这条消息是否携带了延迟等级,如果没有设置延迟等级,那么直接进入5的流程,正常投递给对应的Topic下的Queue中,如果有的话那么就会修改Topic为SCHEDULE_TOPIC前缀,然后根据延迟等级,在ConsumerQueue目录中SCHEDULE_TOPIC_XXX主题下创建对应的QueueID目录与ConsumerQueue文件,(如果没有SCHEDULE_TOPIC_XXX那么就自动创建),然后修改消息索引单元中的Message Tag HashCode部分原本存放的消息的Tag的Hash值,现修改为消息的投递时间,投递时间是指该消息被从新修改为原Topic后再次被写入到commitLog中的时间,(投递时间=消息存储时间+延迟等级时间),消息存储时间指的是消息被发送到Broker时的时间戳,然后将消息索引写入到SCHEDULE_TOPIC_XXX主题下对应的ConsumerQueue中
投递延迟消息
Broker内部有一个延迟消息的服务类,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息投递时间,将延迟消息投递到目标Topic中,不过,在投递之前会从CommitLog中将原来写入的消息再次读出,并将其原来的延迟等级设置为0,即原消息变为一条不延迟的普通消息,然后再次投递到目标Topic中
ScheduleMessageService在Broker启动时,会创建并启动一个定时器Timer,用于执行相应的定时任务,系统会根据延迟等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟等级消息的消息投递,每个TimerTask都会检测相应Queue队列的第一条消息是否到期,若第一条消息未到期,则后面所有消息更不会到期,(消息是按投递时间排序的),若第一条消息到期了,则该消息投递到目标Topic,即消费该消息
将消息从新写入commitLog
延迟消息服务类ScheduleMessageService将延迟消息再次发送给commitLog,并再次形成新的消息索引条目,分发到对应的Queue中
这其实就是一次普通消息发送,只不过这次消息的Producer是延迟消息服务类ScheduleMessageService
就会投递到Topic为SCHEDULE_TOPIC开头的Queue中去,那么这时发送的消息就不会再有延迟了,会直接写入到目标Topic下的消费者队列中去的,那么消费者就订阅消费的
|