<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
#设置rabbitmq的地址
spring.rabbitmq.host=172.16.70.163
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
#开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发送优先回调这个returnconfirm
spring.rabbitmq.template.mandatory=true
#设置消息接受到的回复模式,手动ack模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
package com.xdtmall.order.config;
import com.rabbitmq.client.Channel;
import com.xdtmall.order.entity.OrderEntity;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyMQConfig {
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的信息,准备关闭订单"+entity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@Bean
public Queue orderDelayQueue(){
Map<String,Object> arguments=new HashMap<>();
arguments.put("x-dead-letter-exchange","order-event-exchange");
arguments.put("x-dead-letter-routing-key","order.release.order");
arguments.put("x-message-ttl",60000);
Queue queue = new Queue("order.delay.queue", true, false, false,arguments);
return queue;
}
@Bean
public Queue orderReleaseOrderQueue(){
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
@Bean
public Exchange orderEventExchange(){
TopicExchange topicExchange = new TopicExchange("order-event-exchange", true, false);
return topicExchange;
}
@Bean
public Binding orderCreateOrderBinding(){
Binding binding = new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
return binding;
}
@Bean
public Binding orderReleaseOrderQueueBinding(){
Binding binding = new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
return binding;
}
}
import com.xdtmall.order.entity.OrderEntity;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Date;
import java.util.UUID;
@Controller
public class testController {
@Autowired
RabbitTemplate rabbitTemplate;
@ResponseBody
@GetMapping("/test/createOrder")
public String createOrderTest(){
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setModifyTime(new Date());
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
return "ok";
}
}
RabbitMQ常用队列属性有以下几种: 标签 Arguments 含义 缩写 Message TTL x-message-ttl 队列中所有消息的过期时间 TTL Auto expire x-expires 队列生存期(毫秒)内没有被使用就会自动删除 Exp Max length x-max-length 队列的最大条数 Lim Max length bytes x-dead-letter-exchange 队列消息的最大字节数,超过的话丢弃队列头部的消息 Lim B Dead letter exchange x-dead-letter-exchange 死信交换机 DLX Dead letter routing key x-dead-letter-routing-key 死信路由键 DLK Maximum priority x-max-priority 队列支持优先级(值为0-255),优先级越大越优先 Pri 点击标签就可以自动添加参数
在RabbitMQ中想要使用优先级特性需要的版本为3.5+
没有指定优先级的消息会将优先级以0对待。 对于超过优先级队列所定最大优先级的消息,优先级以最大优先级对待。 注意,如果要启动服务就创建rabbitmq必须要有rabbitmq的监听消息,否则启动springboot不会自动创建容器
|