035-云E办_RabbitMQ_RabbitMQ消息的事务机制
前言: RabbitMQ消息的事务机制(了解)
在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?
RabbitMQ为我们提供了两种方式: 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案; 通过将channel设置成confirm模式来实现;
RabbitMQ中与事务机制有关的方法有三个: txSelect() , txCommit() 以及 txRollback(),txSelect() 用于将当前channel设置成transaction模式,txCommit() 用于提交事务,txRollback() 用于回滚事务,在通过txSelect() 开启事务之后,我们便可以发布消息给broker代理服务器了,如果 txCommit() 提交成功了,则消息一定到达了broker了,如果在 txCommit() 执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过 txRollback() 回滚事务。
总结:事务会降低性能
事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式
一、confirm确认模式
通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,此时处理AMQP协议层面能够实现消息事物控制外,我们还有第二种方式即:Confirm模式。
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack(未确定)消息。 在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。
注意:两种事物控制形式不能同时开启!
1、实现生产者confirm 机制有三种方式:
-
普通confirm模式-串行-同步:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm确认。实际上是一种串行confirm了。 -
批量confirm模式-串行-同步:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端confirm。 -
异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个 方法。
2、异步confirm
异步:你发消息,我等待确认,然而,在我等待时,我还可以发送消息。确认了消息返回,返回的时候生产者有回掉的方法,能收到确认消息。
异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener() 【该方法用来监听返回的消息】回调方法只包含deliveryTag 【当前Chanel发出的消息序号】,我们需要自己为每一个Channel维护一个 unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck 方法【确认的一个处理】, unconfirm 集合删掉相应的一条 (multiple=false)或多条 (multiple=true) 记录。从程序运行效率上看,这个 unconfirm集合最好采用有序集合SortedSet存储结构(有序集合)。实际上waitForConfirms() 方法也是通过SortedSet维护消息序号的。
3、代码:
package com.xxxx.async.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "confirm_async";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
Connection connection = null;
Channel channel = null;
try {
final SortedSet<Long>
confirmSet=Collections.synchronizedSortedSet(new TreeSet<Long>());
connection = factory.newConnection();
channel = connection.createChannel();
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("handleAck--success-->multiple" + deliveryTag);
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
System.out.println("handleAck--success-->single" + deliveryTag);
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple)
throws IOException {
if (multiple) {
System.out.println("handleNack--failed-->multiple-->" +
deliveryTag);
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
System.out.println("handleNack--failed-->single" +
deliveryTag);
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
String message = "Hello World!";
Long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null,
message.getBytes("utf-8"));
confirmSet.add(seqNo);
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (null != channel && channel.isOpen())
{ channel.close();}
if (null != connection && connection.isOpen())
{ connection.close();}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.xxxx.async.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv {
private final static String QUEUE_NAME = "confirm_async";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit pressCTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag
-> {
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
|