RabbitMQ的消息应答以及不公平分发
RabbitMQ的消息应答机制
什么是RabbitMQ的消息应答
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了
RabbitMQ的自动应答机制
在消息发送后就被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。上一篇文章的简单程序就是自动确认
channel.basicConsume(QUERE_NAME,false,deliverCallback,cancelCallback);
RabbitMQ的手动应答机制
手动应答的几种方法
Channel.basicAck 用于肯定确认,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了Channel.basicNack 用于否定确认Channel.basicReject 用于否定确认,与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了
手动应答举例
- 通过设置自动应答为false以及设置手动应答的确认方法,来开启手动应答
public class Worker01 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xxx.xxx.xxx.xxx");
factory.setUsername("username");
factory.setPassword("password");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
Boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
- 上述channel.basicAck() 第二个参数批量不批量的解释:
- true 代表批量应答 channel 上未应答的消息 比如说 channel 上有传送tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答
- false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
RabbitMQ的不公平分发
上一篇文章中介绍了RabbitMQ的轮训分发,但是这种机制在某种策略下显得很不公平,比如当有多个消费者时,有一些消费者处理消息很快,有一些消费者处理消息很慢,这样处理快的消费者就在处理完的消息的时候一直处于空闲状态,而处理慢的那些消费者一直在干活,这种分配方式在这种情况下其实就不太好,所以此处介绍RabbitMQ的不公平分发。
代码示例
- 通过设置
channel.basicQos(1); 来设置分发模式为不公平分发,下面我们对上篇的代码进行改造,让接收到消息进行不同时间的沉睡(此处应先关闭自动应答channel.basicConsume(QUERE_NAME,false,deliverCallback,cancelCallback); ,通过手动应答时间的长短来模拟不同的处理消息的速度,以下为示例代码,work02与01仅仅是沉睡的时间不同,此处只写出work01 - 生产者代码work01
public class Worker01 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
=========与上述相同生成信道===========
System.out.println("C1等待接收消息处理,时间较短。。");
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
Thread.sleep(1000 * 1);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
System.out.println("接收到的消息:" + new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
channel.basicQos(1);
Boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
- 生产者代码,生产者也需要设置
channel.confirmSelect(); 来开启发布确认
public class Task02 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
======与上述相同的生成信道=========
channel.confirmSelect();
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME, null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}
}
实验结果
RabbitMQ的持久化
- 队列的持久化
RabbitMQ如果重启的话,之前创建的队列都会消失,如果要队列实现持久化,需要在声明队列的时候就把durable参数设置为持久化。
Boolean durable = true ;
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
- 消息的持久化
在推送消息的生产者上添加MessageProperties.PERSISTENT_TEXT_PLAIN 这个属性,来使消息保存到磁盘,不写默认保存到内存
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
尚硅谷B站RabbitMQ教程:尚硅谷RabbitMQ
|