RabbitMQ简单使用看这里,包括使用这个方案之前mq的配置环境
RabbitMQ
? ? ? ?
死信
Dead Letter exchanges (死信路由)
》一个消息在满足如下条件下,会进死信路由,注意是路由不是队列,一个路由可以对应多个队列。
.一个消息被消费者拒收了,并且reject方法的参数里requeue是false,也就是说不会被再次放到队列里,被其他消费者使用。
(basic.reject/basic.nack) requeue=false
.上面消息的TTL到了,消息过期了。
.队列的长度限制满了,排在前面的消息会被丢弃或者扔到死信路由上
》Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没什么区别,只是在某一个设置Dead Letter Exchange的队列 中有消息过期,会自动触发消息的转发,发送到Dead Letter Exchange中去。
》我们即可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,可以实现一个延迟队列。
延迟队列
实现定时任务
场景:比如未付款,超过一定时间后,系统自动取消订单并释放占有物品,也就是解库存。
解决方案springDe schedule 定时任务轮询数据库。
缺点:增加系统消耗,增加数据库压力,存在较大时间差
解决:Rabbitmq的消息TTL和死信队列EXchange结合
?说明
首先 p 是一个消息发送方,发送消息到一个交换机 X,并指定了路由键,
X 在通过路由键将消息发送给指定的队列,这个队列有些设置:
就是设置在队列里消息的过期时间,以及指定了过了超时时间就不要丢弃,将消息转给指定的交换机,并且指定队列,来路由。
被指定的 delay excahnge交换机就通过路由来将消息投递给路由成功的队列。
然后消费端 c 就去监听 这个存了过期消息的队列 test queue
当然上面是给队列设置过期时间,还有需要路由的路由键和交换机。
我们也可以不给队列设置,而是给消息设置。 但是,不建议,会有问题。
因为Rabbit的是惰性检查。
细化一下
但是上面的过程还可以细化,就是省去一个交换机,因为我们的消息都要交换机的路由键来路由到不同的队列,那我们就使两个路由键来指定不同的队列就可以了
如图:
?我们只需要一个交换机就可以实现,让他们有不同的路由键去路由不同的队列。
上代码
import com.rabbitmq.client.Channel;
import com.sz.rabbitmq.entity.Student;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.HashMap;
@Configuration
public class MQConfig {
//我们直接在这里来个监听
@RabbitListener(queues = {"order.release.queue"})
public void test(Channel channel, Student student, Message message) throws IOException {
System.out.println(student);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
//这个队列是不需要被监听的
@Bean
public Queue orderCreateQueue(){
//设置队列的属性
HashMap<String, Object> hashMap = new HashMap<>();
hashMap.put("x-dead-letter-exchange","order.exchange"); //需要被路由的交换机
hashMap.put("x-dead-letter-routing-key","key.release.queue"); //路由键
hashMap.put("x-message-ttl",60000); //单位是毫秒
Queue queue = new Queue("order.create.queue",true,false,false,hashMap);
return queue;
}
//这个队列需要被监听
@Bean
public Queue orderReleaseQueue(){
Queue queue = new Queue("order.release.queue",true,false,false);
return queue;
}
//创建交换机
@Bean
public Exchange orderExchange(){
DirectExchange directExchange = new DirectExchange("order.exchange",true,false);
return directExchange;
}
//交换机绑定队列
@Bean
public Binding orderCreateBinding(){
Binding binding = new Binding("order.create.queue",Binding.DestinationType.QUEUE,"order.exchange","key.create.queue",null);
return binding;
}
//交换机绑定队列
@Bean
public Binding orderReleaseBinding(){
Binding binding = new Binding("order.release.queue",Binding.DestinationType.QUEUE,"order.exchange","key.release.queue",null);
return binding;
}
}
注意:设置路由交换机和路由键的队列他们的值需要写对了,
写在config里面是方便程序一启动就帮我们创建对应的交换机和队列并绑定路由关系。
启动后我们也可以通过RabbitMq的后台页面去查看
安装mq的时候默认是15672端口。
然后我们再去投递一个消息,然后此时不会我们监听的队列是没有消息的,只有当到了设置的时间,才会将消息转发到我们监听的队列中。我们才能监听到,到此,就已经完成了。
这种方案我们一般用于订单取消和订单超时来释放库存。
当然我们的订单超时也可以用这种方式来判断是否超时。
通过这种延迟队列的方式来解库存的话,我们还需要去创建工作单,里面存有订单的信息,还需要工作详情单,去记录工作单所对应的商品已经库存信息,方便后面的释放库存
但是就会产生一个问题就是,当我们的订单创建信息发到mq时,由于网络的波动等其他原因,或者没有及时的给超时的订单来标记状态,由于我们的锁库存是跟在创建订单后面的任务,那么此时监听解锁库存的队列,来判断订单的状态,发现是待支付状态,也就是说这个订单没有及时的取消,那么此时库存就表示订单还没有到期,那么库存也不会在释放。
解决:
因此,我们的订单在解锁的时候,解锁成功了还要发个消息给锁库存的队列,让它来解锁库存
这相当于是一个主动补偿
那么延迟队列库存解锁可以理解为一个反向补偿。
|