IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ】RabbitMQ 学习教程(三)消息确认机制 ACK -> 正文阅读

[大数据]RabbitMQ】RabbitMQ 学习教程(三)消息确认机制 ACK

1. 代码优化

在这讲正式之前,咱们先把之前的生产者、消费者代码,简单优化下吧

看之前的代码:生产者、消费者的连接和关闭都是一样的代码,那么,我们就可以将相同的代码提取出来,放在一个工具类里面,然后在相应的地方用工具类进行替换。

提取一个工具类 RabbitMqUtil,用来处理连接、关闭功能:

public class RabbitMqUtil {

    // 私有构造
    private RabbitMqUtil() {}

    // 获取连接
    public static Connection getConnection(String name) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection conn = connectionFactory.newConnection(name);
        return conn;
    }

    // 关闭信道、连接
    public static void close(Connection conn, Channel channel) throws Exception{
        if (null != channel && channel.isOpen()) {
            channel.close();
        }
        if (null != conn && conn.isOpen()) {
            conn.close();
        }
    }
}

生产者代码修改为:

public class Producer {

    public static void main(String[] args) {
        // 1. 获取连接
        Connection connection = null;
        try {
            connection = RabbitMqUtil.getConnection("生产者");
        } catch (Exception e) {
            System.out.println("获取连接时,出现异常");
        }

        Channel channel = null;
        try {
            // 2. 通过连接获取通道 Channel
            channel = connection.createChannel();
            String queueName = "code_simple_queue1";
            // 3. 通过通道创建声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 4. 准备消息内容
            String message = "Hello RabbitMQ";
            // 5. 发送消息给队列 Queue
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("消息发送完成~~~发送的消息为:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                RabbitMqUtil.close(connection, channel);
            } catch (Exception e) {
                System.out.println("关闭时,出现异常");
            }
        }
    }
}

消费者代码为:

public class Consumer {

    public static void main(String[] args) {

        Connection connection = null;
        try {
            connection = RabbitMqUtil.getConnection("消费者");
        } catch (Exception e) {
            System.out.println("获取连接异常");
        }

        Channel channel = null;
        try {
            channel = connection.createChannel();
            String queueName = "code_simple_queue1";

            // 定义消费者
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 交换机
                    String exchange = envelope.getExchange();
                    // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    // body 消息体
                    String msg = new String(body,"utf-8");
                    System.out.println("收到消息:" + msg);
                }
            };
            // 监听队列。自动确认
            channel.basicConsume(queueName, true, consumer);

            System.out.println("开始接收消息~~~");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                RabbitMqUtil.close(connection, channel);
            } catch (Exception e) {
                System.out.println("关闭时,出现异常");
            }
        }
    }
}

咱们再来回顾下 消费者中的 channel.basicConsume() 方法,它有三个参数,分别为:

  1. queueName:队列名称
  2. autoAck:设置是否自动确认
  3. callback:设置消费者的回调函数,用来处理 RabbitMQ 推送过来的消息

以上的示例中,咱们是将自动确认参数设置为 true,那么,这就会导致一个现象:消息一旦被消费者接收,队列中的消息就会被删除。

问题:RabbitMQ 怎么知道消息被消费者接收了呢?如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?总之:如果消费消息失败了,RabbitMQ 无从得知,这样消息就丢失了!

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制 ACK:当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。

不过这种回执 ACK 分两种情况:

  • 自动 ACK:autoAck 设置为 true。RabbitMQ 会自动把发送出去的消息置为确认,然后从内存或磁盘中删除,而不管消费者是否真正地消费了这些消息
  • 手动 ACK:autoAck 设置为 false。RabbitMQ 会等待消费者显示地回复确认信号后才从内存或磁盘中移去消息

采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程挂掉后消息丢失的问题。因为,RabbitMQ 会一直等待持有消息,直到消费者显示调用 Basic.Ack 命令为止。

并且,当 autoAck 参数设置为 false 后,对于 RabbitMQ 而言,队列中的消息分成了两部分:一是等待投递给消费者的消息;二是已经投递给消费者,但还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者。

对于以上的两种回执 ACK,如何选择哪一种,这得看消息的重要性了(建议选择手动确认):

  1. 如果消息不太重要,丢失也没有影响,那么自动 ACK 会比较方便
  2. 如果消息非常重要,不容丢失。那么,最好在消费完成后手动 ACK,否则接收消息后就自动 ACK,RabbitMQ 就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

2. 手动 ACK

将自动 ACK 修改为手动 ACK,只需要修改消费者的代码:

public class Consumer {

    public static void main(String[] args) throws Exception{
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");

        final Channel channel = connection.createChannel();
        String queueName = "code_simple_queue1";

        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 消息体
                String msg = new String(body,"utf-8");
                System.out.println("收到消息:" + msg);
                /**
                 * @param1:deliveryTag:用来标识消息的id
                 * @param2:multiple:是否批量。true:将一次性 ACK 所有小于 deliveryTag 的消息
                 */
                // 手动确认
                channel.basicAck(deliveryTag, false);
            }
        };

        // 监听队列  手动 ACK
        channel.basicConsume(queueName, false, consumer);

        System.out.println("开始接收消息~~~");
        System.in.read();

        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}

主要作出一下两点变化:

  1. channel.basicConsume() 方法的第二个参数修改为 false
  2. 在消费者中,必须得调用 channel.basicAck() 方法,进行手动确认,

修改后,发现上述的消费者依旧可以消费消息

3. 自动 ACK 带来的问题

问题:那么,自动 ACK 会带来什么问题呢?接下来我们通过实例进行演示一下

我们修改下自动 ACK 的消费者代码:在 handleDelivery() 方法中手动添加异常代码,让其抛出异常,这样,就中断了正在执行的 handleDelivery() 方法:

channel = connection.createChannel();
String queueName = "code_simple_queue1";

// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		// 抛出异常,后续代码都不会执行
	    int result = 1 / 0;
	    
	    // 交换机
	    String exchange = envelope.getExchange();
	    // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
	    long deliveryTag = envelope.getDeliveryTag();
	    // body 消息体
	    String msg = new String(body,"utf-8");
	    System.out.println("收到消息:" + msg);
	}
};
// 监听队列
channel.basicConsume(queueName, true, consumer);

上述的 handleDelivery() 方法确实会抛出异常,但它又被 ConsumerDispatcher#handleDelivery() 方法捕获并进行处理了:

public void handleDelivery(final Consumer delegate, final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
    this.executeUnlessShuttingDown(new Runnable() {
        public void run() {
            try {
                delegate.handleDelivery(consumerTag, envelope, properties, body);
            } catch (Throwable var2) {
                ConsumerDispatcher.this.connection.getExceptionHandler().handleConsumerException(ConsumerDispatcher.this.channel, var2, delegate, consumerTag, "handleDelivery");
            }

        }
    });
}

然后,运行生产者,去生产消息,这时,得注意消息的总数。再运行消费者程序,发现它会抛出异常并被处理了,然而,异常后面的代码都没有被执行,但刚才生产的消息却被消费了。实际上消费者还没有去真正地消费消息。

但将这行异常代码添加到手动 ACK 中的代码中去,发现这条消息并没有被消费!!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-01 12:00:29  更:2021-09-01 12:00:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:50:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码