前面说过分布式事务的几种解决方案和和相应的问题,这里主要说一下 通过RabbitMQ的延时队列实现:柔性事务+最终一致性
一,常用解决方案
常用解决方案:
spring的schedule定时任务轮询数据库。
缺点:消耗系统内存 增加数据库的压力,存在较大的时间误差
解决:rabbitmq的消息TTL(存活时间)和死信Exchange结合
二,TTL
消息的TTL:
消息的存活时间。
RabbitMQ可对消息和队列分别设置TTL。
1)对队列设置:此队列中所有的消息过期时间相同,也可以对每一个单独的消息做单独的设置。
超过了这个时间就认为这个消息死了,称为死信。
2)如果队列设置了消息也设置了,会取小的,所以一个消息被路由到不同的队列中
这个消息的死亡时间有可能不一样(不同队列的设置不同)
三,Dead Letter Exchanges(DLX)
一个消息在满足如下条件下会进入死信路由,这里是路由而不是队列,
一个路由可以有很多队列。(什么是死信)
1)一个消息被Consumer拒收了,并且不会再次进入队列被其他消费者使用。
2)消息的TTL到了,消息过期了。
3)队列的长度限制满了,排在前面的消息会被丢弃或者扔到死信队列。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他的exchange没有两样
只是在某一个设置Dead Letter Exchange的队列中有消息过期,会自动触发消息的转发,发送到Dead Letter Exchange中去
四,延时队列实现
1,队列过期时间
流程:
生产者把消息按照指定的路由键发送给交换机--->
交换机按照路由键把消息投递给对应的队列(该队列不能有任何消费者)(设置队列的过期时间,过期后按照哪个路由键扔给哪个交换机)
-->交换机根据路由键投递给指定的队列-->
消费者订阅这个队列(这个队列的消息都是过了过期时间的);
从而实现消息延迟消费的效果。
2,消息过期时间
流程:
生产者投递消息时指定消息的过期时间-->
队列接收消息(队列不能有消费者),等到消息过期后,把消息按照哪个路由键扔给哪个交换机-->
交换机根据路由键投递给指定的队列-->
消费者订阅这个队列。
注意:
消息过期时间:rabbitmq惰性检查。
例如:第一个消息5分钟过期
第二个消息1分钟过期
第三个消息1秒钟过期
服务器的检查机制:先取出第一个消息看是否过期,
如果没过期就不会取第二个消息。按消息顺序,不会检查所有消息是否过期。
五,队列过期时间改进版
多个队列根据不同的routing-key绑定同一个交换机,以下代码以队列过期为例
设置队列过期时间:队列里的所有消息都是这个过期时间。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyMQConfig {
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity entity, Channel channel, Message message)throws Exception{
System.out.println(entity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@Bean
public Queue orderDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","order-event-exchange");
arguments.put("x-dead-letter-routing-key","order.release.order");
arguments.put("x-message-ttl",60000);
return new Queue("order.delay.queue", true, false, false, arguments);
}
@Bean
public Queue orderReleaseOrderQueue(){
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
@Bean
public Exchange orderEventExchange(){
TopicExchange topicExchange = new TopicExchange("order-event-exchange", true, false);
return topicExchange;
}
@Bean
public Binding orderCreateOrderBinding(){
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",null);
}
@Bean
public Binding orderReleaseOrderBinding(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",null);
}
}
|