IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> rabbitMQ死信队列实战 -> 正文阅读

[大数据]rabbitMQ死信队列实战

理解下死信,就是永远都不能被消费到的信息。死信队列其实就是为了防止消息未消费成功丢失的场景。通过死信队列存储这类消息,然后再对死信队列的消息做处理操作。场景比如订单支付挂起。

maven配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>SpringbootTest</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>SpringbootTest</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

定义两个消费者,一个01消费正常队列,一个02消费死信队列。

public class DeadConsumer01 {
    private static final String DEAD_EXCHANGE = "dead-1exchange";
    private static final String NORMAL_EXCHANGE = "normal-1exchange";
    private static final String DEAD_QUEUE = "dead-1queue";
    private static final String NORMAL_QUEUE = "normal-1queue";
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = MqChanneUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //声明正常队列-需绑定死信交换机
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置与之绑定的死信交换机
        paramMap.put("x-dead-letter-routing-key","dead");//设置死信队列的绑定key
        paramMap.put("x-max-length",4);//设置队列消息最大数
        paramMap.put("x-message-ttl",10000);//设置队列中消息过期时间
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,paramMap);
        //绑定
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
        DeliverCallback deliverCallback = (comsumeTag, delivery)->{
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("消息3")){
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
            }else {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                System.out.println("consumer01已经收到消息" + message);
            }
        };
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,comsumeTag->{
        });
    }
public class DeadConsumer02 {
    private static final String DEAD_QUEUE = "dead-1queue";
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = MqChanneUtils.getChannel();

        DeliverCallback deliverCallback = (comsumeTag, delivery)->{
            String message = new String(delivery.getBody(),"UTF-8");
            System.out.println("consumer02 死信队列已经收到消息"+message);
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,comsumeTag->{
        });
    }
}

定义一个生产者

?

public class Producter01 {
    private static final String NORMAL_EXCHANGE = "normal-1exchange";

    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = MqChanneUtils.getChannel();
     /*   AMQP.BasicProperties properties = new
                AMQP.BasicProperties().builder().expiration("10000").build();*/
        for (int i =0; i < 5;i++) {
            String message = "消息";
            channel.basicPublish(NORMAL_EXCHANGE, "normal", false, null,(message+i).getBytes(StandardCharsets.UTF_8));
        }
    }
}

?

?原理就是根据消费者01的定义的一些参数来控制是否进入死信队列。如它的消息在10s内没被消费,这就可以用作订单支付超时的场景。如它的消息长度超过最大长度限制,这可以控制高并发下的抢单场景。如消费者这边对于某些特殊消息要做拒绝处理,但又需要留痕到数据库。都可以用死信队列在做逻辑。? 其实死信队列跟普通队列一致,就是你定义一个普通队列来解决死信的问题也是OK的,只不过MQ里头把它作为一个参数。定义这个更加使用灵活。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:09:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 0:09:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码