为什么要用MQ
MQ有以下几个功能 1.流量消峰 ··对流量进行限制,比如系统最多能容纳一万访问,但是到了促销时间,突然来了两万订单系统当然处理不了。但是可以使用消息队列做缓冲,把一秒钟的访问分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单操作。 2.应用解耦 ··如果一个消费系统直接调用订单系统,库存系统,会员系统数据库,当其中有一个系统报错整个操作就会中断。如果使用消息队列,消费系统就先调用消息队列,订单系统,库存系统,会员系统的调用操作就由消息队列进行完成,当其中一个系统调用失败,消息队列就会再次调用直到成功。 3.异步处理 ··a调用b,a不需要等待b什么时候完成,a可以做自己的事情,b完成之后给MQ的msg发送信息,再由MQ的msg发送给A处理信息。
MQ的分类
ActiveMQ ··优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据 ··缺点:官方对ActiveMQ的维护越来越少,高吞吐量场景较小使用
Kafka ··优点:性能卓越,单机写入TPS在百万条每秒,最大的优点,就是吞吐量高,时效性ms级可用性非常高,kafka是分布式的,一条数据多个副本,少数机器宕机,不会丢失数据不会导致不可用,消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅仅被消费一次,有优秀的第三方,kafka web管理界面,kafka-Manager ;在日志领域比较成熟,被多家公司和多个开源项目使用,功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用 ··缺点:kafka单机超过64个队列分区,Load会发生明显的飙升现象,队列越多,load越高,发送消息响应时间长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试,支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新慢。
RocketMQ ··优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失,MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为消息堆积导致性能下降,源码是java我们可以自己阅读,定制自己公司的MQ ··缺点:支持的客户端语言不多,目前是java以及c++,其中c++不成熟,社区活跃度一般,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量的代码。
RabbitMQ ··优点:由于erlang语言的高并发特性,性能较好,吞吐量到万级,MQ功能比较完备,健壮稳定易用,跨平台,支持多种语言,如:Python,Ruby,.NET,Java等,支持ajax,文档齐全,开源提供的管理界面非常棒,用起来很好用,社区活跃度高。 ··缺点:商业版需要收费,学习成本较高
RabbitMQ四大核心概念
生产者: ··生产数据发送消息的程序是生产者 交换机: ··交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面他将消息推送到队列中,交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定的队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。 队列: ··队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但他们只能存储在队列中,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区,许多生产者可以发送到一个队列,许多消费者可以尝试从一个队列接收数据,这就是我们使用队列的方式。 消费者: ··消费者与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序,请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者,又可以是消费者。

安装erlang rabbitMQ 打开 web插件 添加用户
点击这里
Work Queues (工作队列)
工作队列的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程的时候,这些工作线程将一起处理这些任务。 
消息应答 为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是:消费者收到信息并处理该信息之后,告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了。 消息应答有两种:自动应答,手动应答
··自动应答:消息发送成功立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
··手动应答:手动应答有三个方法 1.Channel.basicAck(用于肯定确认)
2.Channel.basicNack(用于否定确认)
3.Channel.basicReject(用于否定确认) 与Channel.basicNack相比少一个参数:Multiple
Multiple(是否批量处理) Multiple的true和false代表不同意思
true:代表批量应答channel上未应答的消息,比如channel上有传送的tag的消息,5,6,7,8,当前tag是8那么此时5-8的这些还未应答的消息都会被确认收到消息应答
false:同false相比,只会应答tag=8的消息5,6,7这三个消息依然不会被确认收到消息应答。
消息自动重新入队 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或者TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将其重新排队,如果此时其他消费者可以处理,它将很快将其分发给拎一个消费者,这样,即使某个消费者偶尔死亡,也可以确认不会丢失任何消息。  RabbitMQ持久化 默认情况下RabbitMQ退出或者由于某种原因崩溃时,它忽视队列和消息,除非告知他不要这样做,确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。  消息持久化  不公平分发 最开始我们学习到RabbitMQ分发消息采用的轮询分发,但是在某种场合下这种策略并不是很好,比方说两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度很慢,很慢,这个时候我们还是采用轮询分发的话就会让处理速度快的消费者很大一部分时间处于空闲,而处理状态慢的消费者一直在干活,这种分配方式在这种情况下其实不太好,但是RabbitMQ并不知道这种情况它依然很公平的分发。  预取值 手动设定消息队列分发给消费者的比例,如:消费者1设定是5,消费者2设定是2,如果有七个消息要从消息队列中发送,消费者1就能获取5个,消费者2就能获取2个  发布确认原理 首先发布确认需要有两个条件,要求队列必须持久化,要求消息必须持久化,有以上两个条件消息也不一定保证不丢失,因为发布消息到消息队列中还没到磁盘上就有可能宕机,所以我们还需要消息确认,生产者发送消息到消息队列,队列确认将消息保存到磁盘上之后告诉消息生产者消息发送成功才算完成操作。
public class ConfirmMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
ConfirmMessage.publishMessageAsync();
}
public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
channel.basicPublish("",queueName,null,(i+"").getBytes());
boolean b = channel.waitForConfirms();
if (b){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end - begin)+"ms");
}
public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
int batchSize = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
channel.basicPublish("",queueName,null,(i+"").getBytes());
if (i%batchSize == 0){
System.out.println("第"+i+"条");
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时"+(end - begin)+"ms");
}
public static void publishMessageAsync() throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback ackCallback = ( deliveryTag, multiple) ->{
System.out.println("确认的消息"+deliveryTag);
if (multiple){
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =
outstandingConfirms.headMap(deliveryTag);
longStringConcurrentNavigableMap.clear();
}else {
outstandingConfirms.remove(deliveryTag);
}
};
ConfirmCallback nackCallback = ( deliveryTag, multiple) ->{
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息"+deliveryTag+"消息内容是:"+message);
};
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = ""+i;
channel.basicPublish("",queueName,null,message.getBytes());
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"异步确认消息,耗时"+(end - begin)+"ms");
}
}
交换机 此前消息生产者发布消息只能由一个消息队列分发到一个消费者消费一次,但是如果我们需要一个消息能够被消费多次,就需要用到我们的交换机。运行步骤:消息生产者发布消息到RabbitMQ的交换机,由交换机去绑定多个消息队列,一个消息队列发布给一个消费者,这样就可以一个消息多次消费。 交换机类型: ··直接(direct) ··主题(topic) ··标题(headers) ··扇出(fanout) Fanout 发布订阅模式 它是将接收到的所有消息广播到它知道的所有队列中
消息生产者
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("生产者发出消息:"+message);
}
}
}
消费者1
public class ReceiveLogs01 {
public static final String exhange_name = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(exhange_name,"fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,exhange_name,"");
System.out.println("等待接收消息,把接收到的消息打印在屏幕上");
DeliverCallback deliverCallback = ( consumerTag, message)->{
System.out.println("控制台打印接收到的消息:"+new String(message.getBody()));
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
}
}
消费者2
public class ReceiveLogs02 {
public static final String exhange_name = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(exhange_name,"fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,exhange_name,"");
System.out.println("等待接收消息,把接收到的消息打印在屏幕上");
DeliverCallback deliverCallback = ( consumerTag, message)->{
System.out.println("控制台打印接收到的消息:"+new String(message.getBody()));
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
}
}
直接交换机(Direct exchange) 上面我们使用的扇出交换机,相当于广播模式,讲消息广播给所有消费者,而这种交换机并不能给我们带来很大的灵活性,他只能无意识的进行广播。这里我们使用直接交换机来进行替换,这种类型的工作方式是,消息只到它绑定的routingKey队列仲去。
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("console",false,false,false,null);
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
DeliverCallback deliverCallback = (consumerTag ,message)->{
System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+new String(message.getBody()));
};
channel.basicConsume("console",true,deliverCallback,consumerTag->{});
}
}
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("disk",false,false,false,null);
channel.queueBind("disk",EXCHANGE_NAME,"error");
DeliverCallback deliverCallback = (consumerTag ,message)->{
System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+new String(message.getBody()));
};
channel.basicConsume("disk",true,deliverCallback,consumerTag->{});
}
}
消息生产方直接找到direct_logs交换机下的对应RoutingKey,这里与队列名称没有关系
public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes());
System.out.println("生产者发出消息:"+message);
}
}
}
主题交换机(Topic) 尽管使用direct交换机改进了我们的系统,但是它仍然存在局限性,比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想info.base的消息,那这个时候direct就办不到了,这个时候只能使用topic类型
要求: 发送类型是topic交换机的消息routing_key不能随便写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:stock.sud.nyse , nyse.vmw 这种类型的,当然这个单词列表最多不能超过255个字节; *(星号)可以替代一个单词 #(井号)可以替代零个或多个单词
topic可以替代扇出交换机和直接交换机
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
HashMap<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "队列被Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
for (Map.Entry<String, String> stringStringEntry : bindingKeyMap.entrySet()) {
String key = stringStringEntry.getKey();
String value = stringStringEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,key,null,value.getBytes("UTF-8"));
System.out.println("生产者发出消息的内容:"+value);
}
}
}
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME ="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = "Q1";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
System.out.println("接收队列:"+queueName+"绑定键:"+message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
}
}
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME ="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = "Q2";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
System.out.println("接收队列:"+queueName+"绑定键:"+message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
}
}
死信队列 某些时候由于特殊原因导致queue中的某些信息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有死信队列。 应用场景:为了保证订单业务的消息数据不丢失,需要用到RabbitMQ的死信队列机制,当消息消费发生异常的时候,将消息投入死信队列中,还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。  消息生产者
public class Producer {
public static final String NORMAL_EXCHANGE_NAME= "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info"+i;
channel.basicPublish(NORMAL_EXCHANGE_NAME,"zhangsan",null,message.getBytes());
}
}
}
消息消费者1 声明死信队列 死信交换机,普通队列 普通交换机
public class Consumer01 {
public static final String NORMAL_EXCHANGE_NAME= "normal_exchange";
public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE_NAME,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"lisi");
System.out.println("等待接收消息。。。");
DeliverCallback deliverCallback = (consumerTag,message)->{
String msg = new String(message.getBody());
if (msg.equals("info5")){
System.out.println("拒绝的消息是:"+msg);
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else {
System.out.println("consumer01接收的消息是"+new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
channel.basicConsume(NORMAL_QUEUE,false, deliverCallback,consumerTag->{});
}
}
消费者2消费死信队列消息
public class Consumer02 {
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("consumer02接收的消息是"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(DEAD_QUEUE,true, deliverCallback,consumerTag->{});
}
}
延迟队列 延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中国的元素是希望在指定时间到了以后或者之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。 使用场景例如: 1.订单在十分钟之内未支付则自动取消 2.新创建的店铺,如果在十天内没有上传过商品,则发生消息提醒 3.用户注册成功后,如果三天内没有登陆则进行短信提醒 4.用户发起退款,如果三天之内没有得到处理则通知相关运营人员 5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
队列TTL配置  这里开始用的是springBoot绑定设置两个交换机三个队列
@Configuration
public class TtlQuquqConfig {
public static final String X_EXCHANGE = "X";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String DEAD_LETTER_QUEUE = "QD";
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
HashMap<String, Object> arguments = new HashMap<>(3);
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-rounting-key","YD");
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
@Bean("queueB")
public Queue queueB(){
HashMap<String, Object> arguments = new HashMap<>(3);
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-rounting-key","YD");
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
注意:使用死信队列做延迟有一个缺陷,就是死信队列是需要排队的,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个不会优点得到执行。
RabbitMQ插件实现延迟队列 在官网上下载 https://www.rabbitmq.com/community-plugins.html,下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。 进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins rabbitmq-plugins enable rabbitmq_delayed_message_exchange
发布确认springboot版本 在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢?  当我们消息生产者发现消息给交换机失败,应该要吧消息放到缓存中使用定时任务再次发送。  配置文件
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
配置类
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,@Qualifier("confirmQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);
}
}
callBack类
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id =correlationData != null ? correlationData.getId() : "";
if (ack){
log.info("交换机已经收到ID为:{}的消息",id);
}else {
log.info("交换机还未收到ID为:{}的消息,原因:{}",id,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息{}被交换机{}退回,退回原因:{},路由Key:{}",
new String(returnedMessage.getMessage().getBody()),returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
}
}
消息生产者
@RestController
@Slf4j
@RequestMapping("/confirm")
public class ProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("发送消息内容:{}",message);
correlationData = new CorrelationData("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData);
log.info("发送消息内容:{}",message);
}
}
消息消费者
@Component
@Slf4j
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message){
log.info("接收到的队列confirm.queue消息:"+new String(message.getBody()));
}
}
备份交换机 
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
public static final String BACKUP_QUEUE_NAME = "backup_queue";
public static final String WARNING_QUEUE_NAME = "warning_queue";
@Bean
public DirectExchange confirmExchange() {
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
}
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);
}
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean
public Binding backupQueueBindingBackupExchange(@Qualifier("backupExchange") FanoutExchange fanoutExchange, @Qualifier("backupQueue") Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding warrningQueueBindingBackupExchange(@Qualifier("backupExchange") FanoutExchange fanoutExchange, @Qualifier("warningQueue") Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
幂等性 用户对于同一操作发现的一次请求或多次请求的结果是一致的,不会因为多次点击而产生副作用。举个最简单例子,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成两条,在以前的单应用系统中,我们只需要把数据放入事务中即可,但是响应客户端的时候也有可能出现网络中断或者异常等等。 优先级队列 在我们系统中有一个订单催付的场景,我们客户在天猫下订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内没有完成支付,那么就会给用户推送一条短信提醒。看似很简单一个功能,但是天猫商家对我们来说,肯定要区分大客户和小客户,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理所应当他们的订单优先得到处理。

如何添加   惰性队列 正常情况下:消息是保存到内存中的 惰性队列:消息是保存在磁盘中  搭建集群  镜像队列 当有多个队列进行集群的时候,其中一个服务队列存储着消息如果挂了,别的服务是不知道有这个消息。其他节点是不知道有这个队列消息。 
高可用负载均衡 我们连接rabbitMQ是根据IP地址连接,如果在集群模式下,我们正在连接的ip地址的节点如果关闭,我们的服务是不知道的,依旧连接旧的IP地址节点。  这里解决方案可以使用Haproxy实现负载均衡,使用Keepalived实现双机
|