RabbitMQ高级
过期时间TTL
概述
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
- 第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列,消费者将无法再收到该消息。
设置队列TTL
@Configuration
public class RabbitmqConfig {
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct-order-exchange",true,false);
}
@Bean
public Queue smsQueue(){
HashMap<String, Object> hashMap = new HashMap<>();
hashMap.put("x-message-ttl",5000);
return new Queue("sms.direct.queue",true,false,false,hashMap);
}
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
}

设置消息TTL
public void makeOrder(String userId,String productid,int num){
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功");
String fanoutExchange = "fanout-order-exchange";
String routingKey = "";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(fanoutExchange,routingKey,orderId,messagePostProcessor);
}
消息确认机制
死信队列
概述
DLX,全称为Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列。消息变成死信,可能是由于以下的原因:
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。 要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可。
设置死信队列
首先准备一个队列
@Configuration
public class DeadExchangeConfig {
@Bean
public DirectExchange dead(){
return new DirectExchange("dead-order-exchange",true,false);
}
@Bean
public Queue deadQueue(){
return new Queue("sms.dead.queue",true);
}
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(deadQueue()).to(dead()).with("sms");
}
}
将队列绑定到有TTL的队列中去
@Configuration
public class RabbitmqConfig {
@Bean
public DirectExchange directExchange(){
return new DirectExchange("ttl-order-exchange",true,false);
}
@Bean
public Queue smsQueue(){
HashMap<String, Object> hashMap = new HashMap<>();
hashMap.put("x-message-ttl",5000);
hashMap.put("x-dead-letter-exchange","dead-order-exchange");
hashMap.put("x-dead-letter-routing-key","sms");
return new Queue("sms.ttl.queue",true,false,false,hashMap);
}
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
}
注意点: 1.绑定的队列如果在RabbitMQ中已经存在,那么我们新增的配置是不会覆盖生效的,而是会一直报错 2.最好的方式是直接更换队列名,让它新建一个,而不是直接把原来的删除重新创建,因为这种操作在实际的生产环境中是十分危险的
分布式事务
概述
分布式事务指事务的操作位于不同的节点上,需要保证事务的AICD特性。例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务。 
基于MQ的分布式事务整体设计思路
 
可靠生产

可靠消费

内存磁盘监控
概述
RabbitMq图形化界面给我们提供了内存磁盘监控  内存的默认值是设备的0.4也就是40% 一旦占有率超过了分配的最大内存大小就会爆红,所有RabbitMQ服务会被挂起,不再接收生产者生产的消息  
命令方式调整内存大小
rabbitmqctl set_vm_memory_high_watermark 0.4
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
采用命令的方式,RabbitMQ重启之后会恢复到0.4
配置文件方式 rabbitmq.conf
当前配置文件:/etc/rabbitmq/rabbitmq.conf
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark.absolute = 2GB
|