一、延迟消息
对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。
二、延迟消息的使用场景
? 延迟消息的使用场景很多,一种比较常见的场景就是在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉(取消订单释放库存),这样就不需要使用定时任务的方式去处理了 。
三、Rocket的延迟消息 使用限制级别
RocketMQ 支持定时的延迟消息,但是不支持任意时间精度,仅支持特定的 level(共18级),例如定时 5s, 10s, 1m 等 。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。
支持的level如下:level=0 级表示不延时,level=1 表示 延时1s,level=2 表示 延时5s
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
四、延迟消息 的整合实现
4.1创建Springboot项目,添加rockermq 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
4.2配置rocketmq
server:
port: 8083
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: group1
send-message-timeout: 3000
compress-message-body-threshold: 4096
max-message-size: 4194304
retry-times-when-send-failed: 3
retry-next-server: true
retry-times-when-send-async-failed: 3
4.3 生产者 发送延时消息
package com.example.springbootrocketdemo.controller;
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
@RestController
public class RocketMQDelayCOntroller {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/testDelaySend")
public void testDelaySend(){
Map<String,Object> orderMap = new HashMap<>();
orderMap.put("orderNumber","1357890");
orderMap.put("createTime", LocalDateTime.now());
rocketMQTemplate.syncSend("test-topic-delay", MessageBuilder.withPayload(JSONObject.toJSONString(orderMap)).build(),3000,3);
}
}
4.4 消费者 监听延时消息,消费消息
package com.example.springbootrocketdemo.config;
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
@RocketMQMessageListener(consumerGroup = "test-delay",topic = "test-topic-delay")
public class RocketMQDelayConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
Map<String,Object> orderMap = JSONObject.parseObject(s,Map.class);
String orderNumber = String.valueOf(orderMap.get("orderNumber"));
String createTime = String.valueOf(orderMap.get("createTime"));
System.out.println("consumer 延时消息消费 orderNumber:"+orderNumber+",createTime:"+createTime);
}
}
消费者类要实现RocketMQListener 接口,以及动态指定消息类型 String。
类上要加上@RocketMQMessageListener注解,指定topic主题test-topic-delay,以及消费者组test-delay
延时消息发送与接收搭建完毕!
4.5启动服务,测试延时消息
测试OK,成功消费!
|