依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
配置
# 生产者配置
rocketmq.name-server=localhost:9876
rocketmq.producer.group=my-group1
rocketmq.producer.sendMessageTimeout=300000
# 消费者配置
#rocketmq.name-server=localhost:9876
消费者 一
@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "string_consumer")
public class StringConsumeListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("------- StringConsumer received: " + message);
}
}
消费者二
@Component
@RocketMQMessageListener(topic = "my-topic-delay", consumerGroup = "string_consumer_delay")
public class StringConsumeListenerDelay implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("------- StringConsumer received: " + s);
}
}
测试
/**
* 延迟消费
* @return
*/
@RequestMapping("/send2")
public String testSend2( ) {
try {
// 级别
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
DefaultMQProducer producer = new DefaultMQProducer("my-group2");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg1 = new Message(
"my-topic-delay",
"66666666666 30 秒".getBytes());
msg1.setDelayTimeLevel(4);//延迟30秒
Message msg2 = new Message(
"my-topic-delay",
"66666666666 一分钟 秒".getBytes());
msg2.setDelayTimeLevel(4);//延迟30秒
SendResult sendResult1 = producer.send(msg1);
SendResult sendResult2 = producer.send(msg2);
System.out.println("Product1-同步发送-Product信息={}" + sendResult1);
System.out.println("Product2-同步发送-Product信息={}" + sendResult2);
producer.shutdown();
return "success";
} catch (Exception e) {
e.printStackTrace();
return "fail";
}
}
|