前言
注: 大家好我是妈妈的好大儿, 笔者联系方式 QQ:3302254385 微信:yxc3302254385 交个朋友! 创作不易,三连十分感谢!!!
简介
本篇博文将实际代码的方式结合图片的方式演示常用的,rabbitMQ的模式!!按下面列表逐一演示,按需自取,总的配置文件讲放在文章最后!!!
- 普通模式
- 工作模式
- 发布订阅模式
- Direct路由模式
- Topic主题模式
- 发送定时消息
- 定时队列与死信队列
- 可靠生成和消费
Code
1.普通模式
简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费
定义:
public static final String DEFAULT_QUEUE1_TO_TEST ="defaultQueue1ToTest";
创建队列:
@Bean(name = RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST)
public Queue createQueue1Test(){
return new Queue(RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST,true,false,false,null);
}
发送默认消息:
@GetMapping("/sendDefalutMsgToQueue")
public String sendDefalutMsgToQueue(){
rabbitTemplate.convertAndSend(RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST,"普通消息");
return "true";
}
消费者:
@Component
@RabbitListener(queues = RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST)
public class MsgDefaultConsumer1 {
@RabbitHandler
public void getMsg(String msg){
System.out.println("收到了defaultQueue1ToTest的消息--->"+msg);
}
}
结果:
发送几次收到几条
2.工作模式
工作模式是指向多个互相竞争的消费者发送消息的模式,它包含一个生产者、两个消费者和一个队列。两个消费者同时绑定到一个队列上去,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。
就是一个队列多个消费,一条消息只能被一个消费者消费
定义:
public static final String DEFAULT_QUEUE1_TO_TEST ="defaultQueue1ToTest";
创建队列:
@Bean(name = RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST)
public Queue createQueue1Test(){
return new Queue(RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST,true,false,false,null);
}
发送工作消息:
@GetMapping("/sendWorkMsgToQueue")
public String sendWorkMsgToQueue(){
for (int i = 0; i <20 ; i++) {
rabbitTemplate.convertAndSend(RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST,"工作模式消息");
}
return "true";
}
消费者:
@Component
@RabbitListener(queues = RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST)
public class MsgDefaultConsumer1 {
@RabbitHandler
public void getMsg(String msg){
System.out.println("收到了defaultQueue1ToTest的消息--->"+msg);
}
}
-------------------------------------------------------------------------------------------
@Component
public class MsgWorkConsumer2 {
@RabbitListener(queues = RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST)
public void getMsg(String msg){
System.out.println("Consumer2--->收到了defaultQueue1ToTest的消息--->"+msg);
}
}
一.正常发送,谁的能力更强谁的消费条数就越多
二.公平分发 按照消费者的顺序来,性能特别慢
3.发布订阅模式
一条消息通过交换机直接发送到2个订阅的队列
一条消息发送到交换机,只要与交换机绑定关系的(也就相当于队列订阅了交换机)每个队列都将收到消息**
定义:
public static final String PUBLISH_QUEUE1_TO_TEST="publish_Queue1ToTest";
public static final String PUBLISH_QUEUE2_TO_TEST="publish_Queue2ToTest";
public static final String FANOUT_EXCHANGE= "fanoutExchange";
创建队列交换机:
@Bean(name = RabbitMQBeanName.PUBLISH_QUEUE1_TO_TEST)
public Queue createPublishQueue1Test(){
return new Queue(RabbitMQBeanName.PUBLISH_QUEUE1_TO_TEST,true,false,false,null);
}
@Bean(name = RabbitMQBeanName.PUBLISH_QUEUE2_TO_TEST)
public Queue createPublishQueue2Test(){
return new Queue(RabbitMQBeanName.PUBLISH_QUEUE2_TO_TEST,true,false,false,null);
}
@Bean(name = RabbitMQBeanName.FANOUT_EXCHANGE)
public FanoutExchange fanoutExchange() {
return new FanoutExchange("exchange.fanout",true,false,null);
}
@Bean
public Binding fanoutBinding1(@Qualifier(RabbitMQBeanName.FANOUT_EXCHANGE) FanoutExchange fanout, @Qualifier(RabbitMQBeanName.PUBLISH_QUEUE1_TO_TEST)Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanout);
}
@Bean
public Binding fanoutBinding2(@Qualifier(RabbitMQBeanName.FANOUT_EXCHANGE) FanoutExchange fanout, @Qualifier(RabbitMQBeanName.PUBLISH_QUEUE2_TO_TEST)Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanout);
}
发送消息:
@GetMapping("/sendFanoutMsgToQueue")
public String sendFanoutMsgToQueue(){
rabbitTemplate.convertAndSend(RabbitMQBeanName.FANOUT_EXCHANGE,"","发布订阅模式消息");
return "true";
}
消费者:
@Component
public class MsgPublishConsumer3 {
@RabbitListener(queues = RabbitMQBeanName.PUBLISH_QUEUE1_TO_TEST)
public void consumer1(String msg){
System.out.println("Consumer1--->收到了PUBLISH_QUEUE1_TO_TEST的消息--->"+msg);
}
@RabbitListener(queues = RabbitMQBeanName.PUBLISH_QUEUE2_TO_TEST)
public void consumer2(String msg){
System.out.println("Consumer2--->收到了PUBLISH_QUEUE2_TO_TEST的消息--->"+msg);
}
}
4.Direct路由模式
发送消息到交换机,交换机根据路由key,路由到指定的队列,消费者监听队列进行消费
定义:
public static final String PUBLISH_QUEUE1_TO_TEST="publish_Queue1ToTest";
public static final String PUBLISH_QUEUE2_TO_TEST="publish_Queue2ToTest";
public static final String FANOUT_EXCHANGE= "fanoutExchange";
创建队列交换机并绑定关系:
@Bean(name = RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST)
public DirectExchange createDirectExchangeTest(){
return new DirectExchange(RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST,true,false,null);
}
@Bean(name = RabbitMQBeanName.DIRECT_QUEUE1_TO_TEST)
public Queue createDirectQueue1Test(){
return new Queue(RabbitMQBeanName.DIRECT_QUEUE1_TO_TEST,true,false,false,null);
}
@Bean(name = RabbitMQBeanName.DIRECT_QUEUE2_TO_TEST)
public Queue createDirectQueue2Test(){
return new Queue(RabbitMQBeanName.DIRECT_QUEUE2_TO_TEST,true,false,false,null);
}
@Bean
public Binding directBinding1(@Qualifier(RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST) DirectExchange direct, @Qualifier(RabbitMQBeanName.DIRECT_QUEUE1_TO_TEST) Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("sb");
}
@Bean
public Binding directBinding2(@Qualifier(RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST) DirectExchange direct, @Qualifier(RabbitMQBeanName.DIRECT_QUEUE2_TO_TEST) Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("wc");
}
消息发送:
@GetMapping("/sendDirectMsgToQueue")
public String sendDirectMsgToQueue(){
rabbitTemplate.convertAndSend(RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST,"sb","发布路由模式消息");
return "true";
}
消息消费:
@Component
public class MsgDirectConsumer4 {
@RabbitListener(queues = RabbitMQBeanName.DIRECT_QUEUE1_TO_TEST)
public void consumer1(String msg){
System.out.println("Consumer1--->收到了DIRECT_QUEUE1_TO_TEST的消息--->"+msg);
}
@RabbitListener(queues = RabbitMQBeanName.DIRECT_QUEUE2_TO_TEST)
public void consumer2(String msg){
System.out.println("Consumer2--->收到了DIRECT_QUEUE2_TO_TEST的消息--->"+msg);
}
}
结果:
发送2次只有队列1的路由key匹配上,就会收到2条消息
5.Topic主题模式
路由模式的升级版加了通配符 ,消息发送时指定routingKey和队列的routingKey进行匹配,匹配成功则向队列传递消息
routingKey说明
我们在创建队列时,会有一个routingKey—>路由key
定义:
public static final String TOPIC_QUEUE1_TO_TEST ="topicQueue1ToTest";
public static final String TOPIC_QUEUE2_TO_TEST ="topicQueue2ToTest";
public static final String TOPIC_EXCHANGE_TO_TEST="topicExchangeToTest";
创建交换机和队列并绑定关系:
@Bean(name = RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST)
public TopicExchange createTopicExchangeTest(){
return new TopicExchange(RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST,true,false,null);
}
@Bean(name = RabbitMQBeanName.TOPIC_QUEUE1_TO_TEST)
public Queue createTopicQueue1Test(){
return new Queue(RabbitMQBeanName.TOPIC_QUEUE1_TO_TEST,true,false,false,null);
}
@Bean(name = RabbitMQBeanName.TOPIC_QUEUE2_TO_TEST)
public Queue createTopicQueue2Test(){
return new Queue(RabbitMQBeanName.TOPIC_QUEUE2_TO_TEST,true,false,false,null);
}
@Bean
public Binding topicBinding1(@Qualifier(RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST) TopicExchange topic, @Qualifier(RabbitMQBeanName.TOPIC_QUEUE1_TO_TEST) Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topic).with("#.wo.shi.sb.#");
}
@Bean
public Binding topicBinding2(@Qualifier(RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST) TopicExchange topic, @Qualifier(RabbitMQBeanName.TOPIC_QUEUE2_TO_TEST) Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topic).with("*.wo.shi.sb.*");
}
发送消息:
@GetMapping("/sendTopicMsgToQueue")
public String sendTopicMsgToQueue(){
rabbitTemplate.convertAndSend(RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST,"aa.wo.shi.sb.bb","发布主题模式消息1");
rabbitTemplate.convertAndSend(RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST,"aa.wo.shi.sb.bb.cc","发布主题模式消息1");
return "true";
}
消费者消费消息:
@Component
public class MsgTopicConsumer5 {
@RabbitListener(queues = RabbitMQBeanName.TOPIC_QUEUE1_TO_TEST)
public void consumer1(String msg){
System.out.println("Consumer1--->收到了TOPIC_QUEUE1_TO_TEST的消息--->"+msg);
}
@RabbitListener(queues = RabbitMQBeanName.TOPIC_QUEUE2_TO_TEST)
public void consumer2(String msg){
System.out.println("Consumer2--->收到了TOPIC_QUEUE2_TO_TEST的消息--->"+msg);
}
}
结果:
发送2次
队列1收到4条
队列2收到2条
6.发送定时消息
定时消息如果没有被消费,时间到期了就删除了,不会进入死信队列
定义:
创建交换机和队列并绑定关系:
发送消息:
@GetMapping("/sendTTLMsgToQueue")
public String sendTTLMsgToQueue(){
String msg ="定时消息";
MessagePostProcessor messagePostProcessor= new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitMQBeanName.FANOUT_EXCHANGE,"",msg,messagePostProcessor);
return "true";
}
消费者消费消息:
结果:
可以看到消息保存了10秒没有被消费者消费就直接丢弃了
7.定时队列与死信队列
消息放入定时队列中,每条消息都有对应的过期时间,如果绑定了死信交换机,当消息过期时就会根据 路由key,发送到死信队列!!
定义:
public static final String TTL_QUEUE_TO_TEST ="ttlQueueToTest";
public static final String DEAD_LETTER_EXCHANGE="deadLetterExchange";
public static final String DEAD_LETTER_QUEUE_TO_TEST="deadLetterQueueToTest";
创建交换机和队列并绑定关系:
@Bean(name = RabbitMQBeanName.TTL_QUEUE_TO_TEST)
public Queue createTTLQueueTest(){
HashMap arguments = new HashMap<>(4);
arguments.put("x-message-ttl",5000);
arguments.put("x-dead-letter-exchange", RabbitMQBeanName.DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key", "dead");
return new Queue(RabbitMQBeanName.TTL_QUEUE_TO_TEST,true,false,false,arguments);
}
@Bean(RabbitMQBeanName.DEAD_LETTER_EXCHANGE)
public DirectExchange deadLetterExchange(){
return new DirectExchange(RabbitMQBeanName.DEAD_LETTER_EXCHANGE,true,false,null);
}
@Bean(RabbitMQBeanName.DEAD_LETTER_QUEUE_TO_TEST)
public Queue createDeadLetterQueue1Test(){
return new Queue(RabbitMQBeanName.DEAD_LETTER_QUEUE_TO_TEST,true,false,false,null);
}
@Bean
public Binding deadLetterBindingB(@Qualifier(RabbitMQBeanName.DEAD_LETTER_QUEUE_TO_TEST) Queue queue,
@Qualifier(RabbitMQBeanName.DEAD_LETTER_EXCHANGE) DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dead");
}
发送消息:
@GetMapping("/sendMsgToTTLQueue")
public String sendMsgToTTLQueue(){
rabbitTemplate.convertAndSend(RabbitMQBeanName.TTL_QUEUE_TO_TEST,"定时队列消息");
return "true";
}
消费者消费消息:
@Component
public class MsgTTLQueueDeadQueueConsumer6 {
@RabbitListener(queues = RabbitMQBeanName.DEAD_LETTER_QUEUE_TO_TEST)
public void getMsg(String msg){
System.out.println("Consumer1--->收到了DEAD_LETTER_QUEUE_TO_TEST的消息--->"+msg);
}
}
结果:
8.可靠生产和可靠消费
所谓的可靠生成就是从2个层面,第一是消息是否投递到了交换机!!!第二是消息是否被成功消费!!!
配置rabbitmqTemplate
@Bean("diyRabbitTemplate")
public RabbitTemplate initAckRabbitTemplate(@Autowired CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(converter());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
String msgId = correlationData.getId();
log.info("消息成功发送到Exchange--->"+msgId);
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
配置消费者:
publisher-returns: true
publisher-confirm-type: correlated
listener:
simple:
acknowledge-mode: MANUAL
prefetch: 100
retry:
enabled: true
max-attempts: 2
initial-interval: 5000ms
max-interval: 20000ms
multiplier: 1
配置生产者:
publisher-returns: true
publisher-confirm-type: correlated
listener:
simple:
acknowledge-mode: MANUAL
生产者
@GetMapping("/sendAckMsg")
public String sendAckMsg(){
String msgId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(msgId);
diyRabbitTemplate.convertAndSend(RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST,"sb","ack消息-->"+msgId,correlationData);
return "true";
}
消费者:
@RabbitListener(queues = RabbitMQBeanName.DIRECT_QUEUE1_TO_TEST)
public void consumer1(Message msg, Channel channel) throws IOException {
MessageProperties properties = msg.getMessageProperties();
long tag = properties.getDeliveryTag();
System.out.println("Consumer1--->收到了DIRECT_QUEUE1_TO_TEST的消息--->"+msg.toString());
throw new RuntimeException();
}
注:
- 发送者发送可靠消息
- 第一配置可靠生产
- 第二如果需要发送到死信队列配置,死信交换机和队列
- 第三发送消息时添加一个CorrelationData correlationData = new CorrelationData(msgId); 生成一个唯一的msgid用于确认消息或拒绝消息!!!
- 生产者确认消息
- 第一配置可靠消费,重试次数,重试时间间隔
- 第二确认消息
- 消息确认接收 channel.basicAck(tag, true);
- 消息拒绝 拒绝交付重新进入队列会无限发送(发送次数和配置无关) 最后进入死信队列 channel.basicNack(tag, false, true);
- 消息拒绝 拒绝交付重新进入死信队列 channel.basicNack(tag, false, flase);
- 抛出异常会重试 按照配置文件的里面的
完整配置文件
配置类
package com.cc.config;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.impl.AMQImpl;
import com.sun.org.apache.bcel.internal.generic.NEW;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class RabbitMQConfiguration {
@Bean("defaultRabbitTemplate")
public RabbitTemplate initRabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean("diyRabbitTemplate")
public RabbitTemplate initAckRabbitTemplate(@Autowired CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(converter());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
String msgId = correlationData.getId();
log.info("消息成功发送到Exchange--->"+msgId);
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean(name = RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST)
public Queue createQueue1Test(){
return new Queue(RabbitMQBeanName.DEFAULT_QUEUE1_TO_TEST,true,false,false,null);
}
@Bean(name = RabbitMQBeanName.PUBLISH_QUEUE1_TO_TEST)
public Queue createPublishQueue1Test(){
return new Queue(RabbitMQBeanName.PUBLISH_QUEUE1_TO_TEST,true,false,false,null);
}
@Bean(name = RabbitMQBeanName.PUBLISH_QUEUE2_TO_TEST)
public Queue createPublishQueue2Test(){
return new Queue(RabbitMQBeanName.PUBLISH_QUEUE2_TO_TEST,true,false,false,null);
}
@Bean(name = RabbitMQBeanName.FANOUT_EXCHANGE)
public FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitMQBeanName.FANOUT_EXCHANGE,true,false,null);
}
@Bean
public Binding fanoutBinding1(@Qualifier(RabbitMQBeanName.FANOUT_EXCHANGE) FanoutExchange fanout, @Qualifier(RabbitMQBeanName.PUBLISH_QUEUE1_TO_TEST)Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanout);
}
@Bean
public Binding fanoutBinding2(@Qualifier(RabbitMQBeanName.FANOUT_EXCHANGE) FanoutExchange fanout, @Qualifier(RabbitMQBeanName.PUBLISH_QUEUE2_TO_TEST)Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanout);
}
@Bean(name = RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST)
public DirectExchange createDirectExchangeTest(){
return new DirectExchange(RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST,true,false,null);
}
@Bean(name = RabbitMQBeanName.DIRECT_QUEUE1_TO_TEST)
public Queue createDirectQueue1Test(){
HashMap arguments = new HashMap<>(4);
arguments.put("x-message-ttl",5000);
arguments.put("x-dead-letter-exchange", RabbitMQBeanName.DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key", "dead");
return new Queue(RabbitMQBeanName.DIRECT_QUEUE1_TO_TEST,true,false,false,arguments);
}
@Bean(name = RabbitMQBeanName.DIRECT_QUEUE2_TO_TEST)
public Queue createDirectQueue2Test(){
return new Queue(RabbitMQBeanName.DIRECT_QUEUE2_TO_TEST,true,false,false,null);
}
@Bean
public Binding directBinding1(@Qualifier(RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST) DirectExchange direct, @Qualifier(RabbitMQBeanName.DIRECT_QUEUE1_TO_TEST) Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("sb");
}
@Bean
public Binding directBinding2(@Qualifier(RabbitMQBeanName.DIRECT_EXCHANGE_TO_TEST) DirectExchange direct, @Qualifier(RabbitMQBeanName.DIRECT_QUEUE2_TO_TEST) Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("wc");
}
@Bean(name = RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST)
public TopicExchange createTopicExchangeTest(){
return new TopicExchange(RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST,true,false,null);
}
@Bean(name = RabbitMQBeanName.TOPIC_QUEUE1_TO_TEST)
public Queue createTopicQueue1Test(){
return new Queue(RabbitMQBeanName.TOPIC_QUEUE1_TO_TEST,true,false,false,null);
}
@Bean(name = RabbitMQBeanName.TOPIC_QUEUE2_TO_TEST)
public Queue createTopicQueue2Test(){
return new Queue(RabbitMQBeanName.TOPIC_QUEUE2_TO_TEST,true,false,false,null);
}
@Bean
public Binding topicBinding1(@Qualifier(RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST) TopicExchange topic, @Qualifier(RabbitMQBeanName.TOPIC_QUEUE1_TO_TEST) Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topic).with("#.wo.shi.sb.#");
}
@Bean
public Binding topicBinding2(@Qualifier(RabbitMQBeanName.TOPIC_EXCHANGE_TO_TEST) TopicExchange topic, @Qualifier(RabbitMQBeanName.TOPIC_QUEUE2_TO_TEST) Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topic).with("*.wo.shi.sb.*");
}
@Bean(name = RabbitMQBeanName.TTL_QUEUE_TO_TEST)
public Queue createTTLQueueTest(){
HashMap arguments = new HashMap<>(4);
arguments.put("x-message-ttl",5000);
arguments.put("x-dead-letter-exchange", RabbitMQBeanName.DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key", "dead");
return new Queue(RabbitMQBeanName.TTL_QUEUE_TO_TEST,true,false,false,arguments);
}
@Bean(RabbitMQBeanName.DEAD_LETTER_EXCHANGE)
public DirectExchange deadLetterExchange(){
return new DirectExchange(RabbitMQBeanName.DEAD_LETTER_EXCHANGE,true,false,null);
}
@Bean(RabbitMQBeanName.DEAD_LETTER_QUEUE_TO_TEST)
public Queue createDeadLetterQueue1Test(){
return new Queue(RabbitMQBeanName.DEAD_LETTER_QUEUE_TO_TEST,true,false,false,null);
}
@Bean
public Binding deadLetterBindingB(@Qualifier(RabbitMQBeanName.DEAD_LETTER_QUEUE_TO_TEST) Queue queue,
@Qualifier(RabbitMQBeanName.DEAD_LETTER_EXCHANGE) DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dead");
}
}
生产者配置
spring:
rabbitmq:
host: xxxx
port: xxx
username: xxx
password: xxx
virtual-host: crm
publisher-returns: true
publisher-confirm-type: correlated
listener:
simple:
acknowledge-mode: MANUAL
消费者配置
server:
port: 9090
spring:
rabbitmq:
host: xxxx
port: xxxx
username: xxxx
password: xxxx
virtual-host: scrm
publisher-returns: true
publisher-confirm-type: correlated
listener:
simple:
acknowledge-mode: MANUAL
prefetch: 100
retry:
enabled: true
max-attempts: 2
initial-interval: 5000ms
max-interval: 20000ms
multiplier: 1
|