IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 分布式事务-(3)柔性事务+最终一致性(RabbitMQ延时队列)篇 -> 正文阅读

[大数据]分布式事务-(3)柔性事务+最终一致性(RabbitMQ延时队列)篇

前面说过分布式事务的几种解决方案和和相应的问题,这里主要说一下
通过RabbitMQ的延时队列实现:柔性事务+最终一致性

在这里插入图片描述

一,常用解决方案

常用解决方案:
spring的schedule定时任务轮询数据库。	
	缺点:消耗系统内存 增加数据库的压力,存在较大的时间误差
	解决:rabbitmq的消息TTL(存活时间)和死信Exchange结合

二,TTL

消息的TTL:
	消息的存活时间。
	RabbitMQ可对消息和队列分别设置TTL。
	1)对队列设置:此队列中所有的消息过期时间相同,也可以对每一个单独的消息做单独的设置。
	超过了这个时间就认为这个消息死了,称为死信。
	2)如果队列设置了消息也设置了,会取小的,所以一个消息被路由到不同的队列中
		这个消息的死亡时间有可能不一样(不同队列的设置不同)

三,Dead Letter Exchanges(DLX)

一个消息在满足如下条件下会进入死信路由,这里是路由而不是队列,
一个路由可以有很多队列。(什么是死信)
	1)一个消息被Consumer拒收了,并且不会再次进入队列被其他消费者使用。
	2)消息的TTL到了,消息过期了。
	3)队列的长度限制满了,排在前面的消息会被丢弃或者扔到死信队列。
	
Dead Letter Exchange其实就是一种普通的exchange,和创建其他的exchange没有两样
	只是在某一个设置Dead Letter Exchange的队列中有消息过期,会自动触发消息的转发,发送到Dead Letter Exchange中去

四,延时队列实现

1,队列过期时间

在这里插入图片描述

	流程:
	生产者把消息按照指定的路由键发送给交换机--->
	交换机按照路由键把消息投递给对应的队列(该队列不能有任何消费者)(设置队列的过期时间,过期后按照哪个路由键扔给哪个交换机)
	-->交换机根据路由键投递给指定的队列-->
	消费者订阅这个队列(这个队列的消息都是过了过期时间的);
从而实现消息延迟消费的效果。

2,消息过期时间

在这里插入图片描述

流程:
生产者投递消息时指定消息的过期时间-->
队列接收消息(队列不能有消费者),等到消息过期后,把消息按照哪个路由键扔给哪个交换机-->
交换机根据路由键投递给指定的队列-->
消费者订阅这个队列。

注意:
消息过期时间:rabbitmq惰性检查。
			例如:第一个消息5分钟过期
				 第二个消息1分钟过期
                 第三个消息1秒钟过期
                 服务器的检查机制:先取出第一个消息看是否过期,
                 如果没过期就不会取第二个消息。按消息顺序,不会检查所有消息是否过期。

五,队列过期时间改进版

	多个队列根据不同的routing-key绑定同一个交换机,以下代码以队列过期为例
	设置队列过期时间:队列里的所有消息都是这个过期时间。

在这里插入图片描述


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MyMQConfig {


    /**
     * 测试消费者(监听正常队列)
     */
    @RabbitListener(queues = "order.release.order.queue")
    public void listener(OrderEntity entity, Channel channel, Message message)throws Exception{
        System.out.println(entity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }


    /**
     * 死信队列
     * @return
     */
    @Bean // 容器中的组件会自动创建(RabbitMQ没有的情况下)
    public Queue orderDelayQueue(){
        /**
         * String name,
         * boolean durable,
         * boolean exclusive,
         * boolean autoDelete,
         * Map<String, Object> arguments
         */
        Map<String, Object> arguments = new HashMap<>();
        // key为固定写法
        // 对于死信的消息绑定的交换机
        arguments.put("x-dead-letter-exchange","order-event-exchange");
        // 对于死信的消息的新的routing-key
        arguments.put("x-dead-letter-routing-key","order.release.order");
        // 过期时间 单位毫秒
        arguments.put("x-message-ttl",60000);

        return new Queue("order.delay.queue", true, false, false, arguments);

    }



    /**
     * 可监听队列-普通队列
     * @return
     */
    @Bean
    public Queue orderReleaseOrderQueue(){

        Queue queue = new Queue("order.release.order.queue", true, false, false);
        return queue;

    }


    /**
     * 交换机
     * 一个交换机对多个队列一般都用Topic类型的交换机
     * @return
     */
    @Bean
    public Exchange orderEventExchange(){
        // String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
        TopicExchange topicExchange = new TopicExchange("order-event-exchange", true, false);
        return topicExchange;
    }

    /**
     * 与死信队列绑定关系
     * @return
     */
    @Bean
    public Binding orderCreateOrderBinding(){
        /**
         * String destination,
         * DestinationType destinationType,
         * String exchange,
         * String routingKey,
         * 	Map<String, Object> arguments
         */
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",null);
    }


    /**
     * 与正常队列绑定关系
     * @return
     */
    @Bean
    public Binding orderReleaseOrderBinding(){
        /**
         * String destination,
         * DestinationType destinationType,
         * String exchange,
         * String routingKey,
         * 	Map<String, Object> arguments
         */
        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",null);
    }


}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-17 15:27:43  更:2021-08-17 15:28:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:11:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码