消息丢失分两部分:
1.生产者发送消息到队列时,因外部原因如MQ突然宕机,导致新进来的数据还未保存到本地磁盘
2.消费者消费消息失败,例如多个消费者,其中某个消费者在处理消息时突然宕机,导致该消息未成功消费等。
解决这两种问题的方法:
防止生产者发送到队列时消息丢失,可采用发布确认模式
1.单个消息确认,能够保证消息不丢失,但吞吐量会下降
//批量发送消息的个数
public static final int MESSAGE_COUNT = 1000;
public static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
Long begin = System.currentTimeMillis();
//批量发布消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i+ "";
channel.basicPublish("",queueName,null,message.getBytes());
//单个消息马上进行确认
boolean flag = channel.waitForConfirms();//等待确认返回值
if(flag){
System.out.println("消息发送成功");
}
}
//结束时间
Long end = System.currentTimeMillis();
System.out.println("发布"+ MESSAGE_COUNT +"单个确认消息,耗时:"+(end - begin));
}
2.批量消息确认,不能保证消息不丢失,但吞吐量相比较单个确认要高
//批量发送消息的个数
public static final int MESSAGE_COUNT = 1000;
public static void publicMessageBatch() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明队列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
Long begin = System.currentTimeMillis();
//批量确认消息大小
int batch = 100;
//批量发布消息 批量消息确认
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i+ "";
channel.basicPublish("",queueName,null,message.getBytes());
//判断达到100调消息的时候,批量确认与粗
if(i%100 == 0){
//发布确认
channel.waitForConfirms();
System.out.println("消息发送成功");
}
}
//结束时间
Long end = System.currentTimeMillis();
System.out.println("发布"+ MESSAGE_COUNT +"批量发布确认消息,耗时:"+(end - begin));
}
3.异步消息确认,能够保证消息不丢失,吞吐量也高,但相对的复杂度比较高
//批量发送消息的个数
public static final int MESSAGE_COUNT = 1000;
public static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//并发队列存储所有要发送的消息
/**
* 线程安全有序的一个哈希表 适用于高并发的情况下
* 1.轻松的将序号于消息进行关联
* 2.轻松的批量删除条目 只要给到序号
* 3.支持高并发
*/
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
//开始时间
Long begin = System.currentTimeMillis();
//消息确认成功 回调函数式
ConfirmCallback ackCallBack = (deliveryTag,multiple) -> {
//2.删除掉已经确认的消息 剩下的就是未确认的消息
if(multiple){//判断是否为批量确认
ConcurrentNavigableMap<Long,String> confirmed = outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
}else {//单个确认
outstandingConfirms.remove(deliveryTag);
}
System.out.println("已确认的消息" + deliveryTag);
};
//消息确认失败 回调函数
/**
* 参数:
* 1.消息的标记
* 2.是否为批量确认
*/
ConfirmCallback nackCallBack = (deliveryTag,multiple) -> {
//3.大一一下未确认的消息
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息:" + message + "未确认消息序号" + deliveryTag);
};
//准备消息监听器 监听哪些消息成功,那些失败
/**
* 参数说明:
* 1.监听哪些消息成功
* 2.监听哪些消息失败
*/
channel.addConfirmListener(ackCallBack,nackCallBack);//确认监听器,异步通知
//批量发布消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息"+i;
channel.basicPublish("",queueName,null,message.getBytes("UTF-8"));
//1.此处记录下所有要发送的消息
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}
//结束时间
Long end = System.currentTimeMillis();
System.out.println("发布"+ MESSAGE_COUNT +"异步发布确认消息,耗时:"+(end - begin));
}
防止消息在消费时丢失,可采用消息手动应答。消息在手动应答时是不丢失的,会重新放回到队列中重新消费
//消息再手动应答时是不丢失,放回到队列中重新消费
public class Consumer1 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息处理时间较短");
//消息回调接口
DeliverCallback deliverCallback = (consumerTag,message) -> {
//沉睡1秒
SleepUtils.sleep(1);
System.out.println("接收到的消息:" + new String(message.getBody(),"UTF-8"));
//手动应答代码
/** 参数:
* 1.消息的标记
* 2.是否批量应答:false 不批量应答信道中的消息 true:批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//接收消息采用手动应答
boolean autoAck = false;
//设置消息分发类型:0轮询分发 1不公平分发
//int prefetchCount = 1;
//设置预取值:当prefetchCount > 1的时候,为预取值
int prefetchCount = 2;
channel.basicQos(prefetchCount);
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag ->{
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
}));
}
}
|