springboot+rockermq 实现简单的消息发送与接收
普通消息的发送方式有3种:单向发送、同步发送和异步发送 。
下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收
一、创建Springboot项目,添加rockermq 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
二、配置rocketmq
server:
port: 8083
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: group1
send-message-timeout: 3000
compress-message-body-threshold: 4096
max-message-size: 4194304
retry-times-when-send-failed: 3
retry-next-server: true
retry-times-when-send-async-failed: 3
三、新建一个 controller 来做消息发送:
package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RocketMQCOntroller {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/send")
public void send(){
rocketMQTemplate.convertAndSend("test-topic","test-message");
}
@RequestMapping("/testSyncSend")
public void testSyncSend(){
SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试");
System.out.println(sendResult);
}
@RequestMapping("/testASyncSend")
public void testASyncSend(){
rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送异常");
throwable.printStackTrace();
}
});
}
@RequestMapping("/testOneWay")
public void testOneWay(){
rocketMQTemplate.sendOneWay("test-topic","单向消息测试");
}
}
SpringBoot给我们提供了RocketMQTemplate模板类,我们利用这个类可以以多种形式发送消息。 发送方法指定Topic主题test-topic。
四、新建消息消费者监听RocketMQConsumerListener,监听消息,消费消息
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(consumerGroup = "test",topic = "test-topic")
public class RocketMQConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费消息:"+s);
}
}
消费者类要实现RocketMQListener 接口,以及动态指定消息类型String。
类上要加上@RocketMQMessageListener注解 ,指定topic主题test-topic,以及消费者组test
简单的消息发送与接收搭建完毕!
五、启动服务,测试消息消费
测试同步消息: 测试异步消息: 测试单向消息:
测试OK,成功消费!
|