前言:Publisher Confirms是RabbitMQ实现可靠发布的扩展方式。当在通道上启用发布者确认时,如果客户端发布的消息被代理异步确认,表示它们已经在服务器端得到处理。在本教程中,我们将使用publisher confirms来确保已发布的消息已经安全地到达代理。我们将介绍几种使用publisher confirms的策略,并解释它们的优缺点
RabitMQ安装 如何安装: https://blog.csdn.net/Beijing_L/article/details/119042261
启动Channel的发布确认功能
发布确认是RabbitMQ 对AMQP0.9.1协议的扩展,因此这个功能并不是默认开启的。可以通过confirmSelect方法开启这个功能,建立管道后可以开启这个功能,并不需要针对每一个消息都做处理。 例如:
Channel ch = connection.createChannel();
ch.confirmSelect();
方法签名
/**
* Enables publisher acknowledgements on this channel. (是否为管道开启发布确认功能)
* @see com.rabbitmq.client.AMQP.Confirm.Select
* @throws java.io.IOException if an error is encountered
*/
Confirm.SelectOk confirmSelect() throws IOException;
策略1:单独消息发布确认
让我们从最简单的方法开始了解发布确认,参考下面的的代码
注意下面的代码一些JDK1.7的特性
- int 值5_000等同于5000. JDK1.7中增加了新特性可以在int中数值间增加下划线以提高可读性
- try语句(try-with-resource) ,try块退出的时候会自动调用close方法关闭资源
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();//开发发布确认
long start = System.nanoTime();
//循环发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
//等待确认,其中5_000是JDK1.7特性
ch.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
创建Channel后 先使用confirmSelect开启发布确认功能,使用waitForConfirmsOrDie 方法等待确认,消息一旦被RabbitMQ代理确认,该方法就会返回, 如果消息超时间内没有被确认,或者消息被否定(nack-ed)(否定表示:RabbitMQ代理由于某种原因无法处理).这个方法就会抛出异常,出现异常以后一般是记录错误消息或者重新发送。
/** Wait until all messages published since the last call have been either ack'd or nack'd by the broker; or until timeout elapses.
* 上一次消息发布后等待,直到发布的所有消息都被代理确认或否定;或者直到超时
* If the timeout expires a TimeoutException is thrown.
* 超时抛出TimeoutException 异常
* If any of the messages were nack'd, waitForConfirmsOrDie will throw an IOException.
* 消息被拒绝会抛出IOException异常
*When called on a non-Confirm channel, it will throw an IllegalStateException.
* 方法调用的时候channel没有开启发布确认,抛出IllegalStateException异常
* @throws java.lang.IllegalStateException
*/
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
这项技术非常简单, 但也有一个主要的缺点: 它显著降低了发布速度。因为waitForConfirmsOrDie 执行后会一直等待直到消息被确认,它是一个同步的Helper,等待消息时会阻止所有后续消息发布。这种方法的吞吐量不会超过几百条消息/秒。但对于某些应用来说已经足够了
策略2:批量消息发布确认
改进策略1的例子,提供一个批量发送消息的方式,批量发勇消息等待整批都发送完成后确认
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
等待一批消息发送然后全部被确认大大提高了吞吐量,这样无需等待单个消息的确认(对于远程RabbitMQ节点,最多可等待20-30次)。缺点是:如果发送失败, 我们不知道发生了什么错误,所以我们可能需要在内存中保留一整批来记录一些有意义的信息或者重新发布整批消息。这个解决方案仍然是同步的,所以它依然会阻止了后续消息的发布处理
策略3:异步发布确认
代理异步确认发布的消息,只需要在客户端注册一个回调,通过回调得到这些确认的通知,这里有两个回调方法,第一个方法为完成确认的方法,第二哥方法为RabbitMQ代理拒绝的方法,一般是丢失。简单代码如下
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});
?先看addConfirmListener接口签名,它的参数有两个回调接口ConfirmCallback?组成
/**
* Add a lambda-based {@link ConfirmListener}.
* @see ConfirmListener
* @see ConfirmCallback
* @param ackCallback callback on ack
* @param nackCallback call on nack (negative ack)
* @return the listener that wraps the callbacks
*/
ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
?看下回调接口的定义: ConfirmCallback 回调有2个参数,第一个是序号deliveryTag,第2个是一个布尔值
public interface ConfirmCallback {
void handle(long deliveryTag, boolean multiple) throws IOException;
}
- deliveryTag 参数:也就是监听中的sequenceNumber,序列号标识确认或否定消息的号码,序列号可以通过?Channel#getNextPublishSeqNo()生成
- multiple参数:是一个布尔值,如果为假,则只确认/否定一条消息,如果为真,则确认/否定所有序列号较低或相等的消息
序列的简单用法是将其放到MAP里, KEY=序列号, VALUE=消息,发送消息后同时将序列号和消息存入MAP,当消息发送确认后通过序列将其从MAP中移除, 最后都移除的时候表示消息都发送成功,
使用MAP的方式是比较简单清晰的,它将序列号和消息直接关联,比较容易清除给定序列ID的条目。 它支持并发访问,因为confirm回调是在客户端库拥有的线程中调用的,应该与发布线程保持不同。除了使用复杂的映射实现之外,还有其他方法来跟踪未完成的确认,比如使用简单的并发哈希映射和一个变量来跟踪发布序列的下限,但是它们通常更复杂,不属于教程。
总而言之,异步处理发布者确认通常需要以下步骤:
- 提供一种将发布序列号与消息相关联的方法。可以是MAP也客户是其他
- 在通道上注册一个确认侦听器,以便在发布者确认/否定确认到达时得到通知,从而执行适当的操作,如记录或重新发布否定确认消息。序列号到消息的关联机制在此步骤中可能还需要一些清理。
- 发布消息前跟踪发布序列号。
完整的参考代码如下
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();//开启发布确认
//定义Map建立序列和消息关系
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
//定义成功回调函数
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
//multiple=false之前的消息都移除
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
//multiple=false只清除自身所以将成功的消息雄map中移除
outstandingConfirms.remove(sequenceNumber);
}
};
//addConfirmListener方法有2个参数,第一个是成功确认,这个回调上面已经定义,第二个参数是代理否决即确认失败
ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
//生成序列并将消息和序列关系保存到MAP里
outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
ch.basicPublish("", queue, null, body.getBytes());
}
if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
}
long end = System.nanoTime();
System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
文章总结
应用程序中,确保发布的消息到达RabbitMQ代理是非常重要的。Publisher确认是RabbitMQ的一项功能,有助于满足这一要求。Publisherconfirms本质上是异步的,但也可以同步处理它们。没有明确的方法来实现publisher confirms,这通常是由于应用程序和整个系统中的限制。典型的技术有:
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,等待确认同步进行一批:简单,吞吐量合理,但出问题时很难推理。
- 异步处理:最好的性能和资源的使用,在出现错误的情况下很好的控制,但是可以正确实现。
通过上面的例子比较性能,参考结果如下
Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms
上一篇:RabbitMQ教程远程过程调用RPC
|