一、依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、代码
1、死信队列
1.1、TTL过期
1.1.1、RabbitMQ配置类
@Configuration
public class RabbitMQConfig11 {
public static final String COMMON_EXCHANGE_NAME = "common_exchange11";
public static final String DEAD_EXCHANGE_NAME = "dead_exchange11";
public static final String COMMON_QUEUE_NAME = "common_queue11";
public static final String DEAD_QUEUE_NAME = "dead_queue11";
@Bean
public Exchange commonExchange11() {
return new DirectExchange(COMMON_EXCHANGE_NAME);
}
@Bean
public Exchange deadExchange11() {
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
@Bean
public Queue commonQueue11() {
return QueueBuilder.durable(COMMON_QUEUE_NAME).deadLetterExchange(DEAD_EXCHANGE_NAME).deadLetterRoutingKey(DEAD_QUEUE_NAME).ttl(10000).build();
}
@Bean
public Queue deadQueue11() {
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
@Bean
public Binding commonBinding11(@Qualifier("commonQueue11") Queue commonQueue, @Qualifier("commonExchange11") Exchange commonExchange) {
return BindingBuilder.bind(commonQueue).to(commonExchange).with(COMMON_QUEUE_NAME).noargs();
}
@Bean
public Binding deadBinding11(@Qualifier("deadQueue11") Queue deadQueue, @Qualifier("deadExchange11") Exchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_QUEUE_NAME).noargs();
}
}
1.1.2、生产者
@RestController
public class Provider11 {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg11")
public String sendMsg() {
rabbitTemplate.convertAndSend(RabbitMQConfig11.COMMON_EXCHANGE_NAME, RabbitMQConfig11.COMMON_QUEUE_NAME, "测试消息");
return "发送成功";
}
}
1.1.3、消费者
@Component
public class Consumer11 {
@RabbitListener(queues = {RabbitMQConfig11.DEAD_QUEUE_NAME})
public void receiveMsg(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消息内容:" + msg);
}
}
1.2、消息数量超过队列最大长度
1.2.1、RabbitMQ配置类
@Configuration
public class RabbitMQConfig12 {
public static final String COMMON_EXCHANGE_NAME = "common_exchange12";
public static final String DEAD_EXCHANGE_NAME = "dead_exchange12";
public static final String COMMON_QUEUE_NAME = "common_queue12";
public static final String DEAD_QUEUE_NAME = "dead_queue12";
@Bean
public Exchange commonExchange12() {
return new DirectExchange(COMMON_EXCHANGE_NAME);
}
@Bean
public Exchange deadExchange12() {
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
@Bean
public Queue commonQueue12() {
return QueueBuilder.durable(COMMON_QUEUE_NAME).deadLetterExchange(DEAD_EXCHANGE_NAME).deadLetterRoutingKey(DEAD_QUEUE_NAME).maxLength(6).build();
}
@Bean
public Queue deadQueue12() {
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
@Bean
public Binding commonBinding12(@Qualifier("commonQueue12") Queue commonQueue, @Qualifier("commonExchange12") Exchange commonExchange) {
return BindingBuilder.bind(commonQueue).to(commonExchange).with(COMMON_QUEUE_NAME).noargs();
}
@Bean
public Binding deadBinding12(@Qualifier("deadQueue12") Queue deadQueue, @Qualifier("deadExchange12") Exchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_QUEUE_NAME).noargs();
}
}
1.2.2、生产者
@RestController
public class Provider12 {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg12")
public String sendMsg() {
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig12.COMMON_EXCHANGE_NAME, RabbitMQConfig12.COMMON_QUEUE_NAME, "测试消息" + i);
}
return "发送成功";
}
}
1.2.3、消费者
@Component
public class Consumer12 {
@RabbitListener(queues = {RabbitMQConfig12.DEAD_QUEUE_NAME})
public void receiveMsg2(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("死信队列接收到的消息内容:" + msg);
}
}
1.3、消费者拒绝接收消息,并拒绝将消息重新放回队列
1.3.1、RabbitMQ配置类
@Configuration
public class RabbitMQConfig13 {
public static final String COMMON_EXCHANGE_NAME = "common_exchange13";
public static final String DEAD_EXCHANGE_NAME = "dead_exchange13";
public static final String COMMON_QUEUE_NAME = "common_queue13";
public static final String DEAD_QUEUE_NAME = "dead_queue13";
@Bean
public Exchange commonExchange13() {
return new DirectExchange(COMMON_EXCHANGE_NAME);
}
@Bean
public Exchange deadExchange13() {
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
@Bean
public Queue commonQueue13() {
return QueueBuilder.durable(COMMON_QUEUE_NAME).deadLetterExchange(DEAD_EXCHANGE_NAME).deadLetterRoutingKey(DEAD_QUEUE_NAME).build();
}
@Bean
public Queue deadQueue13() {
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
@Bean
public Binding commonBinding13(@Qualifier("commonQueue13") Queue commonQueue, @Qualifier("commonExchange13") Exchange commonExchange) {
return BindingBuilder.bind(commonQueue).to(commonExchange).with(COMMON_QUEUE_NAME).noargs();
}
@Bean
public Binding deadBinding13(@Qualifier("deadQueue13") Queue deadQueue, @Qualifier("deadExchange13") Exchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_QUEUE_NAME).noargs();
}
}
1.3.2、生产者
@RestController
public class Provider13 {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg13")
public String sendMsg() {
rabbitTemplate.convertAndSend(RabbitMQConfig13.COMMON_EXCHANGE_NAME, RabbitMQConfig13.COMMON_QUEUE_NAME, "测试消息");
return "发送成功";
}
}
1.3.3、消费者
@Component
public class Consumer13 {
@RabbitListener(queues = {RabbitMQConfig13.COMMON_QUEUE_NAME})
public void receiveMsg(Message message, Channel channel) throws IOException {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = {RabbitMQConfig13.DEAD_QUEUE_NAME})
public void receiveMsg2(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("死信队列接收到的消息内容:" + msg);
}
}
2、优先级队列
2.1、RabbitMQ配置类
@Configuration
public class RabbitMQConfig5 {
public static final String COMMON_EXCHANGE_NAME = "common_exchange5";
public static final String COMMON_QUEUE_NAME = "common_queue5";
@Bean
public Exchange commonExchange5() {
return new DirectExchange(COMMON_EXCHANGE_NAME);
}
@Bean
public Queue commonQueue5() {
return QueueBuilder.durable(COMMON_QUEUE_NAME).withArgument("x-max-priority", 10).build();
}
@Bean
public Binding commonBinding5(@Qualifier("commonQueue5") Queue commonQueue, @Qualifier("commonExchange5") Exchange commonExchange) {
return BindingBuilder.bind(commonQueue).to(commonExchange).with(COMMON_QUEUE_NAME).noargs();
}
}
2.2、生产者
@RestController
public class Provider5 {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg5")
public String sendMsg() {
for (int i = 1; i <= 10; i++) {
if (i == 5) {
rabbitTemplate.convertAndSend(RabbitMQConfig5.COMMON_EXCHANGE_NAME, RabbitMQConfig5.COMMON_QUEUE_NAME, "测试消息" + i, msg -> {
msg.getMessageProperties().setPriority(5);
return msg;
});
} else {
rabbitTemplate.convertAndSend(RabbitMQConfig5.COMMON_EXCHANGE_NAME, RabbitMQConfig5.COMMON_QUEUE_NAME, "测试消息" + i);
}
}
return "发送成功";
}
}
2.3、消费者
@Component
public class Consumer5 {
@RabbitListener(queues = {RabbitMQConfig5.COMMON_QUEUE_NAME})
public void receiveMsg(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消息内容:" + msg);
}
}
3、自定义延迟交换机
3.1、RabbitMQ配置类
@Configuration
public class RabbitMQConfig2 {
public static final String DELAY_EXCHANGE_NAME = "delay_exchange2";
public static final String COMMON_QUEUE_NAME = "common_queue2";
@Bean
public Exchange delayExchange2() {
Map<String, Object> arguments = new HashMap<>(1);
arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
@Bean
public Queue commonQueue2() {
return QueueBuilder.durable(COMMON_QUEUE_NAME).build();
}
@Bean
public Binding commonBinding2(@Qualifier("commonQueue2") Queue commonQueue, @Qualifier("delayExchange2") Exchange delayExchange) {
return BindingBuilder.bind(commonQueue).to(delayExchange).with(COMMON_QUEUE_NAME).noargs();
}
}
3.2、生产者
@RestController
public class Provider2 {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg2")
public String sendMsg() {
rabbitTemplate.convertAndSend(RabbitMQConfig2.DELAY_EXCHANGE_NAME, RabbitMQConfig2.COMMON_QUEUE_NAME, "我是延时时间是10s的消息", msg -> {
msg.getMessageProperties().setDelay(10000);
return msg;
});
rabbitTemplate.convertAndSend(RabbitMQConfig2.DELAY_EXCHANGE_NAME, RabbitMQConfig2.COMMON_QUEUE_NAME, "我是延时时间是2s的消息", msg -> {
msg.getMessageProperties().setDelay(2000);
return msg;
});
return "发送成功";
}
}
3.3、消费者
@Component
public class Consumer2 {
@RabbitListener(queues = {RabbitMQConfig2.COMMON_QUEUE_NAME})
public void receiveMsg(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消息内容:" + msg);
}
}
4、消息无法抵达交换机或者无法抵达队列的情况
4.1、application.properties配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
4.2、RabbitMQ配置类
@Configuration
public class RabbitMQConfig3 {
public static final String COMMON_EXCHANGE_NAME = "common_exchange3";
public static final String COMMON_QUEUE_NAME = "common_queue3";
@Bean
public Exchange commonExchange3() {
return ExchangeBuilder.directExchange(COMMON_EXCHANGE_NAME).build();
}
@Bean
public Queue commonQueue3() {
return QueueBuilder.durable(COMMON_QUEUE_NAME).build();
}
@Bean
public Binding commonBinding3(@Qualifier("commonQueue3") Queue commonQueue, @Qualifier("commonExchange3") Exchange commonExchange) {
return BindingBuilder.bind(commonQueue).to(commonExchange).with(COMMON_QUEUE_NAME).noargs();
}
}
4.3、RabbitMQ回调类
@Component
public class RabbitmqCallback3 implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
try {
String id = correlationData.getId();
System.out.printf("交换机接收消息反馈,其中id:%s,接收成功:%b,失败原因:%s\n", id, ack, cause);
} catch (Exception e) {
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.printf("交换机发送消息到队列失败,其中消息内容:%s,回复码:%d,回复内容:%s,交换机:%s,路由:%s\n", new String(message.getBody(), StandardCharsets.UTF_8), replyCode, replyText, exchange, routingKey);
}
}
4.4、生产者
@RestController
public class Provider3 {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg3")
public String sendMsg() {
CorrelationData correlationData1 = new CorrelationData();
correlationData1.setId("1");
rabbitTemplate.convertAndSend(RabbitMQConfig3.COMMON_EXCHANGE_NAME, RabbitMQConfig3.COMMON_QUEUE_NAME, "测试消息1", correlationData1);
CorrelationData correlationData2 = new CorrelationData();
correlationData2.setId("2");
rabbitTemplate.convertAndSend(RabbitMQConfig3.COMMON_EXCHANGE_NAME + "1", RabbitMQConfig3.COMMON_QUEUE_NAME, "测试消息2", correlationData2);
CorrelationData correlationData3 = new CorrelationData();
correlationData3.setId("3");
rabbitTemplate.convertAndSend(RabbitMQConfig3.COMMON_EXCHANGE_NAME, RabbitMQConfig3.COMMON_QUEUE_NAME + "1", "测试消息3", correlationData3);
return "发送成功";
}
}
4.5、消费者
@Component
public class Consumer3 {
@RabbitListener(queues = {RabbitMQConfig3.COMMON_QUEUE_NAME})
public void receiveMsg(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消息内容:" + msg);
}
}
5、备用交换机
5.1、RabbitMQ配置类
@Configuration
public class RabbitMQConfig4 {
public static final String COMMON_EXCHANGE_NAME = "common_exchange4";
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange4";
public static final String COMMON_QUEUE_NAME = "common_queue4";
public static final String BACKUP_QUEUE_NAME = "backup_queue4";
@Bean
public Exchange commonExchange4() {
return ExchangeBuilder.directExchange(COMMON_EXCHANGE_NAME).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
}
@Bean
public FanoutExchange backupExchange4() {
return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE_NAME).build();
}
@Bean
public Queue commonQueue4() {
return QueueBuilder.durable(COMMON_QUEUE_NAME).build();
}
@Bean
public Queue backupQueue4() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean
public Binding commonBinding4(@Qualifier("commonQueue4") Queue commonQueue, @Qualifier("commonExchange4") Exchange commonExchange) {
return BindingBuilder.bind(commonQueue).to(commonExchange).with(COMMON_QUEUE_NAME).noargs();
}
@Bean
public Binding backupBinding4(@Qualifier("backupQueue4") Queue backupQueue, @Qualifier("backupExchange4") FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
}
5.2、生产者
@RestController
public class Provider4 {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg4")
public String sendMsg() {
rabbitTemplate.convertAndSend(RabbitMQConfig4.COMMON_EXCHANGE_NAME, RabbitMQConfig4.COMMON_QUEUE_NAME, "测试消息1");
rabbitTemplate.convertAndSend(RabbitMQConfig4.COMMON_EXCHANGE_NAME, RabbitMQConfig4.COMMON_QUEUE_NAME + "1", "测试消息2");
return "发送成功";
}
}
5.3、消费者
@Component
public class Consumer4 {
@RabbitListener(queues = {RabbitMQConfig4.COMMON_QUEUE_NAME})
public void receiveMsg1(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("普通队列接收到的消息内容:" + msg);
}
@RabbitListener(queues = {RabbitMQConfig4.BACKUP_QUEUE_NAME})
public void receiveMsg2(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("备用队列接收到的消息内容:" + msg);
}
}
6、说明
大多数用法说明已经写在了RabbitMQ学习文档(进阶篇(Demo使用Spring编写))中,这里主要说一下发布确认,以及发布确认和备用交换机的优先级次序,其中发布确认分为消息无法抵达交换机的发布确认,以及消息无法抵达队列的发布确认
(1)消息无法抵达交换机的发布确认
如果想开启消息无法抵达交换机的发布确认,那就需要在application.properties中配置:
spring.rabbitmq.publisher-confirm-type=correlated
然后让回调类实现RabbitTemplate.ConfirmCallback ,并且重写confirm 方法,其中confirm 方法中的代码如下:
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
try {
String id = correlationData.getId();
System.out.printf("交换机接收消息反馈,其中id:%s,接收成功:%b,失败原因:%s\n", id, ack, cause);
} catch (Exception e) {
}
}
其中在消息发送的时候需要添加CorrelationData 对象,其中上面的CorrelationData correlationData 就是我们在生产者发送消息的时候添加的该对象,生产者中添加CorrelationData 对象的代码如下:
CorrelationData correlationData1 = new CorrelationData();
correlationData1.setId("1");
rabbitTemplate.convertAndSend(RabbitMQConfig3.COMMON_EXCHANGE_NAME, RabbitMQConfig3.COMMON_QUEUE_NAME, "测试消息1", correlationData1);
(2)消息无法抵达队列的发布确认
如果想开启消息无法抵达队列的发布确认,那就需要在application.properties中配置:
spring.rabbitmq.publisher-returns=true
然后让回调类实现RabbitTemplate.ReturnCallback ,并且重写returnedMessage 方法,其中returnedMessage 方法中的代码如下:
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.printf("交换机发送消息到队列失败,其中消息内容:%s,回复码:%d,回复内容:%s,交换机:%s,路由:%s\n", new String(message.getBody(), StandardCharsets.UTF_8), replyCode, replyText, exchange, routingKey);
}
(3)备用交换机
如果消息无法抵达队列的发布确认 和备用交换机 同时存在,然后消息无法从交换机发送到队列,那么备用交换机优先级更高,所以它会生效,那么消息无法抵达队列的发布确认 将不会生效
|