本来应该昨天更新的,但是使用浏览器访问服务器的RabbitMQ的web端口时chrome显示不是私密链接不让登录Edge也是相同问题,百度找了很多还是无法解决,原因是服务器没有安装SSL证书,直接使用ip访问。
1. 持久化
当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息。为了保证消息不丢失需要将队列和消息都标记为持久化。
1.1 实现持久化
- 队列持久化:在创建队列时将
channel.queueDeclare(); 第二个参数改为true。 - 消息持久化:在使用信道发送消息时
channel.basicPublish(); 将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化消息。
public class Producer3 {
private static final String LONG_QUEUE = "long_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(LONG_QUEUE,true,false,false,null);
Scanner scanner = new Scanner(System.in);
int i = 0;
while (scanner.hasNext()){
i++;
String msg = scanner.next() + i;
channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息:'" + msg + "'成功");
}
}
}
但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。
1.2 不公平分发
轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。
在消费者处修改channel.basicQos(1); 表示开启不公平分发
public class Consumer2 {
private static final String LONG_QUEUE = "long_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
Thread.sleep(30000);
System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
channel.basicQos(1);
channel.basicConsume(LONG_QUEUE,false,deliverCallback,
consumerTag -> {
System.out.println(consumerTag + "消费者取消消费");
});
}
}
1.3 测试不公平分发
测试目的:是否能实现能者多劳。 测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。
先启动生产者创建队列,再分别启动两个消费者。
生产者按照顺序发四条消息: 睡眠时间短的线程A接收到了三条消息 而睡眠时间长的线程B只接收到的第二条消息: 因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。 实验成功!
1.4 预取值
消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。
这里的预期值就值得是上述方法channel.basicQos(); 里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。
1.4.1 代码测试
测试方法:
- 新建两个不同的消费者分别给定预期值5个2。
- 给睡眠时间长的指定为5,时间短的指定为2。
- 假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。
代码根据上述代码修改预期值即可。
2. 发布确认
发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。
需要注意的是需要开启队列持久化才能使用确认发布。 开启方法:channel.confirmSelect();
2.1 单个确认发布
是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。
public class SoloProducer {
private static final int MESSAGE_COUNT = 100;
private static final String QUEUE_NAME = "confirm_solo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.confirmSelect();
long beginTime = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = ""+i;
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
boolean flag = channel.waitForConfirms();
if (flag){
System.out.println("发送消息:" + i);
}
}
long endTime = System.currentTimeMillis();
System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); }
}
2.2 批量确认发布
一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。
public class BatchProducer {
private static final int MESSAGE_COUNT = 100;
private static final String QUEUE_NAME = "confirm_batch";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.confirmSelect();
int batchSize = MESSAGE_COUNT / 10;
long beginTime = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = ""+i;
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
if (i % batchSize == 0){
if (channel.waitForConfirms()){
System.out.println("发送消息:" + i);
}
}
}
long endTime = System.currentTimeMillis();
System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
}
}
显然效率要比单个确认发布的高很多。
2.3 异步确认发布
在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。
public class AsyncProducer {
private static final int MESSAGE_COUNT = 100;
private static final String QUEUE_NAME = "confirm_async";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.confirmSelect();
long beginTime = System.currentTimeMillis();
ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
System.out.println("确认成功消息:" + deliveryTab);
};
ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
System.out.println("未确认的消息:" + deliveryTab);
};
channel.addConfirmListener(ackCallback,nackCallback);
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "" + i;
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
}
long endTime = System.currentTimeMillis();
System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
}
}
2.4 处理未确认的消息
最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。
例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks与发布线程之间进行消息的传递。
处理方式:
- 记录要发送的全部消息;
- 在发布成功确认处删除;
- 打印未确认的消息。
使用一个哈希表存储消息,它的优点:
- 可以将需要和消息进行关联;
- 轻松批量删除条目;
- 支持高并发。
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
public class AsyncProducerRemember {
private static final int MESSAGE_COUNT = 100;
private static final String QUEUE_NAME = "confirm_async_remember";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.confirmSelect();
ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
long beginTime = System.currentTimeMillis();
ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
if (multiple){
ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
confirmMap.clear();
}else {
map.remove(deliveryTab);
}
System.out.println("确认成功消息:" + deliveryTab);
};
ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
};
channel.addConfirmListener(ackCallback,nackCallback);
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "" + i;
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
map.put(channel.getNextPublishSeqNo(),msg);
}
long endTime = System.currentTimeMillis();
System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
}
}
2.5 总结
显然来说,异步处理除了在编码处有些麻烦,在处理时间效率和可用性上都是比单处理和批处理好很多。
|