添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加application配置
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.addresses=localhost
#如果不设置默认为"/"
spring.rabbitmq.virtual-host=spring-cloud-alibaba-sample
添加配置文件
@Slf4j
@Configuration
public class RabbitMQConfig {
/**
* 交换机名称
*/
public static final String ITEM_TOPIC_EXCHANGE = "item.topic.exchange";
/**
* 队列名称
*/
public static final String ITEM_QUEUE = "item.queue";
/**
* 备份交换器名称
*/
public static final String SPARE_EXCHANGE = "item.spare.exchange";
/**
* 备份交换器队列
*/
public static final String SPARE_QUEUE = "item.spare.queue";
/**
* 死信队列交换器
*/
public static final String DLX_EXCHANGE = "item.dlx.exchange";
/**
* 死信队列
*/
public static final String DLX_QUEUE = "item.dlx.queue";
/**
* 声明交换机
*
* @return
*/
@Bean
public Exchange itemExchange() {
return ExchangeBuilder
.topicExchange(ITEM_TOPIC_EXCHANGE)
.durable(true)
.alternate("spare.exchange")
.build();
}
/**
* 声明队列
*
* @return
*/
@Bean
public Queue itemQueue() {
return QueueBuilder.durable(ITEM_QUEUE).deadLetterExchange(DLX_EXCHANGE).deadLetterRoutingKey("dlx.routingKey").build();
}
/**
* 绑定消息队列和交换机
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue, @Qualifier("itemExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
/**
* 备份交换机
*
* @return
*/
@Bean
public Exchange spareExchange() {
return ExchangeBuilder.fanoutExchange(SPARE_EXCHANGE).build();
}
/**
* 备份队列
*
* @return
*/
@Bean
public Queue spareQueue() {
return QueueBuilder.durable(SPARE_QUEUE).build();
}
/**
* 备份交换机绑定备份队列
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding spareQueueExchange(@Qualifier("spareQueue") Queue queue, @Qualifier("spareExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
/**
* 死信队列交换器
*
* @return
*/
@Bean
public Exchange dlxExchange() {
return ExchangeBuilder.directExchange(DLX_EXCHANGE).durable(true).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(DLX_QUEUE).build();
}
/**
* 绑定死信交换器和队列
*
* @param exchange
* @param queue
* @return
*/
@Bean
public Binding dlxQueueExchange(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("dlx.routingKey").noargs();
}
}
当前声明了一个交换器和队列,并且给他设置了备份交换器和死信队列。
发送消息
@Api(tags = "发送消息")
@RestController
@RequestMapping("/message")
@Slf4j
public class MessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@ApiOperation(value = "发送消息")
@GetMapping("/sendMessage")
public AjaxResult sendMessage(String routingKey, String msg) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("2000");
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, routingKey, message);
return AjaxResult.success("消息发送成功");
}
}
消费消息
@RabbitListener(queues = RabbitMQConfig.ITEM_QUEUE)
public void receive(Message message, Channel channel) throws Exception {
ThreadUtils.sleep(1000);
String msg = new String(message.getBody());
log.info("消费者消费了消息:{}", msg);
}
消息生产者回调
交换器不存在
?添加application配置
spring.rabbitmq.publisher-confirm-type=correlated
给RabbitTemplate添加confirmCallBack
@Slf4j
@Configuration
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback, CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void run(String... args) throws Exception {
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送成功");
} else {
log.info("消息发送失败:{}", cause);
}
}
}
路由失败
添加备份交换器
? ? ? ? 消息路由失败会将消息发送到备份交换器
给RabbitTemplate添加returnCallback方法
? ? ? ? 给application添加配置
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true
? ? ? ? 添加returnCallback方法
@Slf4j
@Configuration
public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback, CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void run(String... args) throws Exception {
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 路由失败业务处理
}
}
消费者手动确认
全局设置手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
单个监听上设置ackMode
@RabbitListener(queues = RabbitMQConfig.ITEM_QUEUE,ackMode = "MANUAL")
单个的优先级要高于全局
@RabbitListener(queues = RabbitMQConfig.ITEM_QUEUE)
public void receive(Message message, Channel channel) throws Exception {
ThreadUtils.sleep(1000);
String msg = new String(message.getBody());
log.info("消费者消1费了消息:{}", msg);
// 确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
// 拒绝
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
// log.info(JSONObject.toJSONString(message.getMessageProperties()));
}
消息确认与拒绝消息说明
|