安装ActiveMQ
官网下载地址,以apache-activemq-5.16.4-bin.zip为例。将压缩包解压,进入\bin\win64目录,双击activemq.bat运行启动。
打开浏览器,访问localhost:8161,输入账户名密码(admin/admin),出现ActiveMQ的管理界面。
添加ActiveMQ依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
ActiveMQ配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=false
spring.activemq.packages.trust-all=true
spring.activemq.user=admin
spring.activemq.password=admin
消息发送与接收
Queue(点对点)
1. 定义消息模式(Queue)对象
@Component
public class ActiveMQConfig{
@Bean
Queue queue(){
return new ActiveMQQueue("my-queue");
}
}
2. Queue消息发送端
@RestController
public class JmsController {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Queue queue;
@RequestMapping("/queue/sendMsg")
public void send(String msg) {
jmsMessagingTemplate.convertAndSend(queue, msg);
}
}
3. Queue消息接收端
@Component
public class QueueListener {
@JmsListener(destination = "my-queue")
public void consumer1(String message){
System.out.println("consumer1:message = " + message);
}
@JmsListener(destination = "my-queue")
public void consumer2(String message){
System.out.println("consumer2:message = " + message);
}
}
这里定义了两个消费者。Queue模式是点对点发送,多个消费者会轮询接收消息
consumer1:message = hello Queue!
Topic(发布-订阅)
1. 定义消息模式(Topic)对象
@Component
public class ActiveMQConfig{
@Bean
Topic topic(){
return new ActiveMQTopic("my-topic");
}
}
2. Topic消息发送端
@RestController
public class JmsController {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Topic topic;
@RequestMapping("/topic/sendMsg")
public void sendTopic(String msg) {
jmsMessagingTemplate.convertAndSend(topic, msg);
}
}
3. Topic消息接收端,需要自定义topic消息监听器
问题
Springboot整合ActiveMQ,默认只能监听一种模式消息进行处理,默认只能处理Queue消息
# 开启发布订阅模式 ,开启后只能处理Topic消息
spring.jms.pub-sub-domain=true
自定义Topic消息监听器,实现同时处理Queue和Topic消息
自定义Topic消息监听器
@Component
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Bean
public ActiveMQConnectionFactory activeMqConnectionFactory() {
ActiveMQConnectionFactory activeMqConnectionFactory =
new ActiveMQConnectionFactory(brokerUrl);
return activeMqConnectionFactory;
}
@Bean(name = "jmsTemplate")
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMqConnectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(activeMqConnectionFactory);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
return jmsTemplate;
}
@Bean(name = "jmsTopicListener")
public JmsListenerContainerFactory<?> jmsTopicListener(ActiveMQConnectionFactory activeMqConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMqConnectionFactory);
factory.setPubSubDomain(true);
factory.setRecoveryInterval(1000L);
return factory;
}
@Bean
Queue queue(){
return new ActiveMQQueue("my-queue");
}
@Bean
Topic topic(){
return new ActiveMQTopic("my-topic");
}
}
Topic消息接收端,需要指定topic消息监听器
@Component
public class TopicListener {
@JmsListener(destination = "my-topic", containerFactory = "jmsTopicListener")
public void consumer1(String message){
System.out.println("consumer1:message = " + message);
}
@JmsListener(destination = "my-topic", containerFactory = "jmsTopicListener")
public void consumer2(String message){
System.out.println("consumer2:message = " + message);
}
}
这里定义了两个消费者。Topic模式是发布-订阅模式,多个消费者都会接收到消息。
consumer1:message = hello Topic!
consumer2:message = hello Topic!
|