1. 代码优化
在这讲正式之前,咱们先把之前的生产者、消费者代码,简单优化下吧
看之前的代码:生产者、消费者的连接和关闭都是一样的代码,那么,我们就可以将相同的代码提取出来,放在一个工具类里面,然后在相应的地方用工具类进行替换。
提取一个工具类 RabbitMqUtil,用来处理连接、关闭功能:
public class RabbitMqUtil {
private RabbitMqUtil() {}
public static Connection getConnection(String name) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection conn = connectionFactory.newConnection(name);
return conn;
}
public static void close(Connection conn, Channel channel) throws Exception{
if (null != channel && channel.isOpen()) {
channel.close();
}
if (null != conn && conn.isOpen()) {
conn.close();
}
}
}
生产者代码修改为:
public class Producer {
public static void main(String[] args) {
Connection connection = null;
try {
connection = RabbitMqUtil.getConnection("生产者");
} catch (Exception e) {
System.out.println("获取连接时,出现异常");
}
Channel channel = null;
try {
channel = connection.createChannel();
String queueName = "code_simple_queue1";
channel.queueDeclare(queueName, false, false, false, null);
String message = "Hello RabbitMQ";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送完成~~~发送的消息为:" + message);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
RabbitMqUtil.close(connection, channel);
} catch (Exception e) {
System.out.println("关闭时,出现异常");
}
}
}
}
消费者代码为:
public class Consumer {
public static void main(String[] args) {
Connection connection = null;
try {
connection = RabbitMqUtil.getConnection("消费者");
} catch (Exception e) {
System.out.println("获取连接异常");
}
Channel channel = null;
try {
channel = connection.createChannel();
String queueName = "code_simple_queue1";
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
}
};
channel.basicConsume(queueName, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
RabbitMqUtil.close(connection, channel);
} catch (Exception e) {
System.out.println("关闭时,出现异常");
}
}
}
}
咱们再来回顾下 消费者中的 channel.basicConsume() 方法,它有三个参数,分别为:
- queueName:队列名称
- autoAck:设置是否自动确认
- callback:设置消费者的回调函数,用来处理 RabbitMQ 推送过来的消息
以上的示例中,咱们是将自动确认参数设置为 true ,那么,这就会导致一个现象:消息一旦被消费者接收,队列中的消息就会被删除。
问题:RabbitMQ 怎么知道消息被消费者接收了呢?如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?总之:如果消费消息失败了,RabbitMQ 无从得知,这样消息就丢失了!
为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制 ACK:当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。
不过这种回执 ACK 分两种情况:
- 自动 ACK:autoAck 设置为 true。RabbitMQ 会自动把发送出去的消息置为确认,然后从内存或磁盘中删除,而不管消费者是否真正地消费了这些消息
- 手动 ACK:autoAck 设置为 false。RabbitMQ 会等待消费者显示地回复确认信号后才从内存或磁盘中移去消息
采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程挂掉后消息丢失的问题。因为,RabbitMQ 会一直等待持有消息,直到消费者显示调用 Basic.Ack 命令为止。
并且,当 autoAck 参数设置为 false 后,对于 RabbitMQ 而言,队列中的消息分成了两部分:一是等待投递给消费者的消息;二是已经投递给消费者,但还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者。
对于以上的两种回执 ACK,如何选择哪一种,这得看消息的重要性了(建议选择手动确认):
- 如果消息不太重要,丢失也没有影响,那么自动 ACK 会比较方便
- 如果消息非常重要,不容丢失。那么,最好在消费完成后手动 ACK,否则接收消息后就自动 ACK,RabbitMQ 就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
2. 手动 ACK
将自动 ACK 修改为手动 ACK,只需要修改消费者的代码:
public class Consumer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqUtil.getConnection("消费者");
final Channel channel = connection.createChannel();
String queueName = "code_simple_queue1";
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
channel.basicAck(deliveryTag, false);
}
};
channel.basicConsume(queueName, false, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
RabbitMqUtil.close(connection, channel);
}
}
主要作出一下两点变化:
channel.basicConsume() 方法的第二个参数修改为 false- 在消费者中,必须得调用
channel.basicAck() 方法,进行手动确认,
修改后,发现上述的消费者依旧可以消费消息
3. 自动 ACK 带来的问题
问题:那么,自动 ACK 会带来什么问题呢?接下来我们通过实例进行演示一下
我们修改下自动 ACK 的消费者代码:在 handleDelivery() 方法中手动添加异常代码,让其抛出异常,这样,就中断了正在执行的 handleDelivery() 方法:
channel = connection.createChannel();
String queueName = "code_simple_queue1";
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
int result = 1 / 0;
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
}
};
channel.basicConsume(queueName, true, consumer);
上述的 handleDelivery() 方法确实会抛出异常,但它又被 ConsumerDispatcher#handleDelivery() 方法捕获并进行处理了:
public void handleDelivery(final Consumer delegate, final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
this.executeUnlessShuttingDown(new Runnable() {
public void run() {
try {
delegate.handleDelivery(consumerTag, envelope, properties, body);
} catch (Throwable var2) {
ConsumerDispatcher.this.connection.getExceptionHandler().handleConsumerException(ConsumerDispatcher.this.channel, var2, delegate, consumerTag, "handleDelivery");
}
}
});
}
然后,运行生产者,去生产消息,这时,得注意消息的总数。再运行消费者程序,发现它会抛出异常并被处理了,然而,异常后面的代码都没有被执行,但刚才生产的消息却被消费了。实际上消费者还没有去真正地消费消息。
但将这行异常代码添加到手动 ACK 中的代码中去,发现这条消息并没有被消费!!
|