1.什么是RabbitMQ
1.1 MQ(Message Queue)消息队列
- 消息队列中间件,是分布式系统中的重要组件
- 主要解决,异步处理,应用解耦,流量削峰等问题
- 从而实现高性能,高可用,可伸缩和最终一致性的架构
- 使用较多的消息队列产品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka等
1.1.1 异步处理
- 用户注册后,需要发送验证邮箱和手机验证码;
- 将注册信息写入数据库,发送验证邮件,发送手机,三个步骤全部完成后,返回给客户端
1.1.2 应用解耦
- ?场景:订单系统需要通知库存系统
- 如果库存系统异常,则订单调用库存失败,导致下单失败
? ? ? ?原因:订单系统和库存系统耦合度太高
- ?订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户,下单成功;
- 库存系统:订阅下单的消息,获取下单信息,库存系统根据下单信息,再进行库存操作;
- 假如:下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队 列就不再关心其他的后续操作了,实现了订单系统和库存系统的应用解耦;
- 所以说,消息队列是典型的:生产者消费者模型
- 生产者不断的向消息队列中生产消息,消费者不断的从队列中获取消息
- 因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的入侵,这样就 实现了生产者和消费者的解耦
1.1.3 流量削峰
- 抢购,秒杀等业务,针对高并发的场景
- 因为流量过大,暴增会导致应用挂掉,为解决这个问题,在前端加入消息队列
- ?用户的请求,服务器接收后,首先写入消息队列,如果超过队列的长度,就抛弃,甩一个秒杀结束 的页面!
- 说白了,秒杀成功的就是进入队列的用户;
1.2 背景知识介绍
1.2.1 AMQP高级消息队列协议
- 即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议
- 协议:数据在传输的过程中必须要遵守的规则
- 基于此协议的客户端可以与消息中间件传递消息
- 并不受产品、开发语言等条件的限制
1.2.2 JMS
- Java Message Server,Java消息服务应用程序接口, 一种规范,和JDBC担任的角色类似
- 是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消 息,进行异步通信
1.2.3 二者的联系
- JMS是定义了统一接口,统一消息操作;AMQP通过协议统一数据交互格式
- JMS必须是java语言;AMQP只是协议,与语言无关
1.2.4 Erlang语言
- Erlang(['?:l??])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CSLab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境
- 最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此非常适合构建分布 式,实时软并行计算系统
- Erlang运行时环境是一个虚拟机,有点像Java的虚拟机,这样代码一经编译,同样可以随处运行
1.3 为什么选择RabbitMQ
- 我们开篇说消息队列产品那么多,为什么偏偏选择RabbitMQ呢?
- 先看命名:兔子行动非常迅速而且繁殖起来也非常疯狂,所以就把Rabbit用作这个分布式软件的 命名(就是这么简单)
- Erlang开发,AMQP的最佳搭档,安装部署简单,上手门槛低
- 企业级消息队列,经过大量实践考验的高可靠,大量成功的应用案例,例如阿里、网易等一线大厂 都有使用
- 有强大的WEB管理页面
- 强大的社区支持,为技术进步提供动力
- 支持消息持久化、支持消息确认机制、灵活的任务分发机制等,支持功能非常丰富
- 集群扩展很容易,并且可以通过增加节点实现成倍的性能提升
- 总结:如果你希望使用一个可靠性高、功能强大、易于管理的消息队列系统那么就选择 RabbitMQ,如果你想用一个性能高,但偶尔丢点数据不是很在乎可以使用kafka或者zeroMQ
- kafka和zeroMQ的性能爆表,绝对可以压RabbitMQ一头!
1.4 RabbitMQ各组件功能
Broker:消息队列服务器实体
Virtual Host:虚拟主机
- 标识一批交换机、消息队列和相关对象,形成的整体
- 虚拟主机是共享相同的身份认证和加密环境的独立服务器域
- 每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权 限机制
- vhost是AMQP概念的基础,RabbitMQ默认的vhost是 /,必须在链接时指定
Exchange:交换器(路由)
- 用来接收生产者发送的消息并将这些消息路由给服务器中的队列
Queue:消息队列
- 用来保存消息直到发送给消费者。
- 它是消息的容器,也是消息的终点。
- 一个消息可投入一个或多个队列。
- 消息一直在队列里面,等待消费者连接到这个队列将其取走。
Banding:绑定,用于消息队列和交换机之间的关联。
Channel:通道(信道)
- 多路复用连接中的一条独立的双向数据流通道。
- 信道是建立在真实的TCP连接内的 虚拟链接
- AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信 道完成的
- 因为对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概 念,用来复用TCP连接。
Connection:网络连接,比如一个TCP连接。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Message:消息
- 消息是不具名的,它是由消息头和消息体组成。
- 消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由 键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。
2.怎么用RabbitMQ
- 想要安装RabbitMQ,必须先安装erlang语言环境,类似安装tomcat,必须先安装JDK
- 查看匹配的版本:https://www.rabbitmq.com/which-erlang.html
?2.1 RabbitMQ安装启动
erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
socat 下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
RabbitMQ 下载:https://www.rabbitmq.com/install-rpm.html#downloads
2.1.1 安装
[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
[root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
[root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
2.1.2 启动后台管理插件
[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management
2.1.3 启动RabbitMQ
[root@localhost opt]# systemctl start rabbitmq-server.service
[root@localhost opt]# systemctl status rabbitmq-server.service
[root@localhost opt]# systemctl restart rabbitmq-server.service
[root@localhost opt]# systemctl stop rabbitmq-server.service
2.1.4 查看进程
[root@localhost opt]# ps -ef | grep rabbitmq
2.1.5 测试
1. 关闭防火墙: systemctl stop firewalld
2. 浏览器输入:http://ip:15672
3. 默认帐号密码:guest,guest用户默认不允许远程连接
? ? ? ? 1.创建账号
[root@localhost opt]# rabbitmqctl add_user panghl panghl
????????2. 设置用户角色
[root@localhost opt]# rabbitmqctl set_user_tags panghl administrator
????????3. 设置用户权限
[root@localhost opt]# rabbitmqctl set_permissions -p "/" panghl ".*" ".*" ".*"
????????4. 查看当前用户和角色
[root@localhost opt]# rabbitmqctl list_users
????????5. 修改用户的密码
[root@localhost opt]# rabbitmqctl change_password panghl panghl
管理界面介绍
overview:概览
connections:查看链接情况
channels:信道(通道)情况
Exchanges:交换机(路由)情况,默认4类7个
Queues:消息队列情况
Admin:管理员列表
端口:
- 5672:RabbitMQ提供给编程语言客户端链接的端口
- 15672:RabbitMQ管理界面的端口
- 25672:RabbitMQ集群的端口
2.2 RabbitMQ快速入门
2.2.1 依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
2.2.2 日志依赖log4j(可选项)
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
2.2.2 创建连接
先创建好虚拟主机
package util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author panghl
* @Date 2021/8/8 12:26
* @Description 专门与RabbitMQ获得连接
**/
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.在工厂对象中设置MQ的连接信息(ip,port,vhost,username,password)
factory.setHost("192.168.40.100");
factory.setPort(5672);
factory.setVirtualHost("/lagou");
factory.setUsername("panghl");
factory.setPassword("panghl");
//3.通过工厂获得与MQ的连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = getConnection();
System.out.println("connection:->"+connection);
//amqp://panghl@192.168.40.100:5672//lagou
connection.close();
}
}
2.3 RabbitMQ模式
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此我们只学习前5种
在线手册:https://www.rabbitmq.com/getstarted.html
5种消息模型,大体分为两类:
????????1和2属于点对点
????????3、4、5属于发布订阅模式(一对多)
点对点模式:P2P(point to point)模式包含三个角色:
????????消息队列(queue),发送者(sender),接收者(receiver)
????????每个消息发送到一个特定的队列中,接收者从中获得消息
????????队列中保留这些消息,直到他们被消费或超时
????????特点:
????????????????1. 每个消息只有一个消费者,一旦消费,消息就不在队列中了
????????????????2. 发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会?影?响 消息发送到队列中(我给你发微信,不管你看不看手机,反正我发完了)
????????????????3. 接收者成功接收消息之后需向对象应答成功(确认)
? ? ? ? ? ? ? ? 4.?如果希望发送的每个消息都会被成功处理,那需要P2P
发布订阅模式:publish(Pub)/subscribe(Sub)
- pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者 (subcriber)
- 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
- 特点:
1.每个消息可以有多个订阅者 2.发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅 后,才能消费发布者的消息 3.为了消费消息,订阅者必须保持运行状态;类似于,看电视直播。 - ?如果希望发送的消息被多个消费者处理,可采用本模式
2.3.1 简单模式
RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!
类似邮局,处理信件的应该是收件人而不是邮局!
?2.3.1.1 生产者P
/**
* @Author panghl
* @Date 2021/8/10 21:32
* @Description 简单模式-生产者P
**/
public class MessageSender {
public static void main(String[] args) throws IOException, TimeoutException {
String msg = "panghl: Hello Java Rab";
//1.获得连接
Connection connection = ConnectionUtil.getConnection();
//2.在连接中创建通道(信道)
Channel channel = connection.createChannel();
//3.创建消息队列(1,2,3,4,5)
/*
* 参数1:队列的名称
* 参数2:队列中的数据是否持久化
* 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
* 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还保存数据)
* 参数5:队列参数(没有参数为null)
*/
channel.queueDeclare("queue1",false,false,false,null);
//4.向指定的队列发送消息
/*
* 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
* 参数2: 目标队列的名称
* 参数3:设置消息的属性(没有属性则为null)
* 参数4:消息的内容 (只接受字节数组)
*/
channel.basicPublish("","queue1",null,msg.getBytes());
//5.释放资源
channel.close();
connection.close();
}
}
2.3.1.2 消费者C
/**
* @Author panghl
* @Date 2021/8/10 21:32
* @Description 简单模式-接收者P
**/
public class MessageRecer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.获得连接
Connection connection = ConnectionUtil.getConnection();
//2.获得通道(信道)
Channel channel = connection.createChannel();
//3.从信道中获得消息(1,2,3,4,5)
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override //交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的消息
String s = new String(body);
System.out.println("接收 = " + s);
//todo.... 业务代码
//手动确认(收件人信息,是否同时确认多个消息)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//4.监听队列 true:自动消息确认 自动ACK
channel.basicConsume("queue1", false, defaultConsumer);
}
}
启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0
2.3.1.3 消息确认机制ACK
- 通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除
- RabbitMQ如何得知消息被消费者接收?
1、如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是RabbitMQ无从得 知,这样消息就丢失了 2、因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告 知消息已经被接收 3、ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种 传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200 就是告诉我们服务器执行成功 4、整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执 5、不过这种回执ACK分为两种情况: 自动ACK:消息接收后,消费者立刻自动发送ACK(快递放在快递柜) 手动ACK:消息接收后,不会发送ACK,需要手动调用(快递必须本人签收) 6、两种情况如何选择,需要看消息的重要性: 如果消息不太重要,丢失也没有影响,自动ACK会比较方便 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把 消息从队列中删除,如果此时消费者抛异常宕机,那么消息就永久丢失了 ? - 修改手动消息确认
// false:手动消息确认
channel.basicConsume("queue1", false, consumer); - 结果如下:
- 解决问题
? /**
* @Author panghl
* @Date 2021/8/10 21:32
* @Description 简单模式-接收者P
**/
public class MessageRecer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.获得连接
Connection connection = ConnectionUtil.getConnection();
//2.获得通道(信道)
Channel channel = connection.createChannel();
//3.从信道中获得消息(1,2,3,4,5)
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override //交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的消息
String s = new String(body);
System.out.println("接收 = " + s);
//todo.... 业务代码
//手动确认(收件人信息,是否同时确认多个消息)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//4.监听队列 true:自动消息确认 自动ACK
channel.basicConsume("queue1", false, defaultConsumer);
}
}
?2.3.2 工作队列模式
- 之前我们学习的简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能 力有限,就会产生消息在队列中堆积(生活中的滞销)
? - 一个烧烤师傅,一次烤50支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理?
- 多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共 享,但其中某一个消息只会被一个消费者获取(100支肉串20个人吃,但是其中的某支肉串只能被 一个人吃)
2.3.2.1 生产者P
/**
* @Author panghl
* @Date 2021/8/10 22:06
* @Description 工作队列--发送者
**/
public class Sender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue", false, false, false, null);
for (int i = 1; i <= 100; i++) {
String msg = "羊肉串--->" + i;
channel.basicPublish("", "test_work_queue", null, msg.getBytes());
System.out.println("新鲜出炉:--》" + msg);
}
channel.close();
connection.close();
}
}
2.3.2.2 消费者1
/**
* @Author panghl
* @Date 2021/8/10 22:10
* @Description 工作队列-消费者
**/
public class Recer {
static int i = 1;
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue", false, false, false, null);
//queueDeclare() 此方法有双重作用,如果队列不存在,再创建;如果队列存在,则获取
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【顾客1】吃掉"+s+"!总共吃【"+i+++"】串!");
//模拟网络延迟
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("test_work_queue",false,defaultConsumer);
}
}
2.3.2.3 消费者2
/**
* @Author panghl
* @Date 2021/8/10 22:10
* @Description 工作队列-消费者
**/
public class Recer2 {
static int i = 1;
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue", false, false, false, null);
//queueDeclare() 此方法有双重作用,如果队列不存在,再创建;如果队列存在,则获取
channel.basicQos(2);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【顾客2】吃掉"+s+"!总共吃【"+i+++"】串!");
//模拟网络延迟
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("test_work_queue",false,defaultConsumer);
}
}
- 先运行2个消费者,排队等候消费(取餐),再运行生产者开始生产消息(烤肉串)
- 虽然两个消费者的消费速度不一致(线程休眠时间),但是消费的数量却是一致的,各消费50个 消息
? ? ? ?例如:工作中,A同学编码速率高,B同学编码速率低,两个人同时开发一个项目,A10天完 成,B30天完成,A完成自己的编码部分,就无所事事了,等着B完成就可以了,这样是不可 以的,应该遵循“能者多劳”;效率高的多干点,效率低的少干点 - 看下面官网是如何给出解决思路的:
?
公平的分配
您可能已经注意到分派仍然不能完全按照我们的要求工作。例如,如果有两个员工,当所有 奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都 不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。
????????这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它 只是盲目地将每条第n个消息分派给第n个消费者。
????????为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉 RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并 确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的 worker。
// 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel.queueDeclare("test_work_queue",false,false,false,null);
// 可以理解为:快递一个一个送,送完一个再送下一个,速度快的送件就多
channel.basicQos(1);
能者多劳必须要配合手动的ACK机制才生效
2.3.2.4 面试题:避免消息堆积?
1. workqueue,多个消费者监听同一个队列
2. 接收到消息后,通过线程池,异步消费
2.3.3 发布订阅模式
Publish/Subscribe
????????In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".
????????To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them. ????????In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen. ????????Essentially, published log messages are going to be broadcast to all the receivers.
发布-订阅
????????在上一篇教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都被准确地交 付给一个工作者。在这一部分中,我们将做一些完全不同的事情——将消息传递给多个消费者。 此模式称为“发布/订阅”。
????????为了演示这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将 发送日志消息,第二个将接收和打印它们。
????????在我们的日志系统中,接收程序的每一个正在运行的副本都将获得消息。这样我们就可以运行 一个接收器并将日志指向磁盘;与此同时,我们可以运行另一个接收器并在屏幕上看到日志。
????????基本上,发布的日志消息将广播到所有接收方。
生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视 频通知
- ?上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)
- P生产者发送信息给X路由,X将信息转发给绑定X的队列
- ?X队列将信息通过信道发送给消费者,从而进行消费
- 整个过程,必须先创建路由
1、路由在生产者程序中创建 2、因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没 有队列,路由并不知道将信息发送给谁 3、运行程序的顺序: 1. MessageSender 2. MessageReceiver1和MessageReceiver2 3. MessageSender
2.3.3.1 生产者
/**
* @Author panghl
* @Date 2021/8/10 22:37
* @Description 发布订阅-生产者
**/
public class Sender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明路由(路由名,路由类型)
// fanout: 不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到该路由绑定的所有队列上)
channel.exchangeDeclare("test_exchange_fanout", "fanout");
String msg = "hello,panghl!!";
// 不要写队列的名称,此时你并不知道 谁会绑定路由
channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes());
System.out.println("生产者:=》" + msg);
channel.close();
connection.close();
}
}
2.3.3.2 消费者1
/**
* @Author panghl
* @Date 2021/8/10 22:43
* @Description 发布订阅-消费者1
**/
public class Recer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null);
//绑定路由(队列名,路由名, routing key)
channel.queueBind("test_exchange_fanout_queue_1","test_exchange_fanout","");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【粉丝1】==>"+s);
}
};
channel.basicConsume("test_exchange_fanout_queue_1",true,consumer);
}
}
?2.3.3.3 消费者2
/**
* @Author panghl
* @Date 2021/8/10 22:43
* @Description 发布订阅-消费者2
**/
public class Recer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_fanout_queue_2",false,false,false,null);
//绑定路由(队列名,路由名,)
channel.queueBind("test_exchange_fanout_queue_2","test_exchange_fanout","");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【粉丝2】==>"+s);
}
};
channel.basicConsume("test_exchange_fanout_queue_2",true,consumer);
}
}
2.3.4 路由模式
?2.3.4.1 生产者
/**
* @Author panghl
* @Date 2021/8/10 23:19
* @Description 路由模式-定向分发-生产者
**/
public class Sender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明路由(路由名,路由类型)
channel.exchangeDeclare("test_exchange_direct", "direct");
// String msg = "用户注册,[userid=1]";
String msg = "用户查询,[userid=1]";
// channel.basicPublish("test_exchange_direct", "insert", null, msg.getBytes());
channel.basicPublish("test_exchange_direct", "select", null, msg.getBytes());
System.out.println("【用户系统】:"+msg);
channel.close();
connection.close();
}
}
2.3.4.2 消费者1
/**
* @Author panghl
* @Date 2021/8/10 23:22
* @Description 路由模式-定向分发-消费者1
**/
public class Recer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_exchange_direct_queue_2",false,false,false,null);
//绑定路由
channel.queueBind("test_exchange_direct_queue_2","test_exchange_direct","select");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("[消费者1]="+s);
}
};
channel.basicConsume("test_exchange_direct_queue_2",true,consumer);
}
}
2.3.4.3 消费者2
/**
* @Author panghl
* @Date 2021/8/10 23:22
* @Description 路由模式-定向分发-消费者2
**/
public class Recer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_exchange_direct_queue_1",false,false,false,null);
//绑定路由(如果路由键的类型是 添加、删除、修改的话,绑定到这个队列上)
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","insert");
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","update");
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","delete");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("[消费者1]="+s);
}
};
channel.basicConsume("test_exchange_direct_queue_1",true,consumer);
}
}
1. 记住运行程序的顺序,先运行一次sender(创建路由器),
2. 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定
3. 再次运行sender,发出消息
2.3.5 通配符模式
?和路由模式90%是一样的。
唯独的区别就是路由键支持模糊匹配
匹配符号
????????*:只能匹配一个词(正好一个词,多一个不行,少一个也不行)
????????#:匹配0个或更多个词
看一下官网案例:
????????Q1绑定了路由键 *.orange.*? ? ? ? ?Q2绑定了路由键 *.*.rabbit 和 lazy.#
????????下面生产者的消息会被发送给哪个队列?
quick.orange.rabbit? ? ? ? ? ? ? ? ? ? ?# Q1 Q2
lazy.orange.elephant? ? ? ? ? ? ? ? ? # Q1 Q2
quick.orange.fox? ? ? ? ? ? ? ? ? # Q1
lazy.brown.fox? ? ? ? ? ? ? ? ? ? ? # Q2
lazy.pink.rabbit? ? ? ? ? ? ? ? ? ? # Q2
quick.brown.fox ? ? ? ? ? ? ? ? ?# 无
orange ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 无
quick.orange.male.rabbit? ?# 无
2.3.5.1 生产者
/**
* @Author panghl
* @Date 2021/8/10 23:32
* @Description 通配符模式-生产者
**/
public class Sender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//topic : 模糊匹配的定向分发
channel.exchangeDeclare("test_exchange_topic","topic",true);
String msg = "hello,msg - topic";
// channel.basicPublish("test_exchange_topic","user.insert",null,msg.getBytes());
channel.basicPublish("test_exchange_topic","product.down", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
System.out.println("[消息中心-->]"+msg);
channel.close();
connection.close();
}
}
2.3.5.2 消费者1
/**
* @Author panghl
* @Date 2021/8/10 23:41
* @Description 通配符模式-消费者
**/
public class Recer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列 true 队列持久化
channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);
//绑定路由(队列名,路由名,路由key) 绑定用户相关的消息
channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】==>"+s);
}
};
channel.basicConsume("test_exchange_topic_queue_1",true,consumer);
}
}
2.3.5.3 消费者2
/**
* @Author panghl
* @Date 2021/8/10 23:41
* @Description 通配符模式-消费者
**/
public class Recer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_exchange_topic_queue_2",true,false,false,null);
//绑定路由(队列名,路由名,路由key) 绑定商品和订单相关的消息
channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","product.#");
channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","order.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者2】==>"+s);
}
};
channel.basicConsume("test_exchange_topic_queue_2",true,consumer);
}
}
2.4 持久化
2.4.1 生产者
/**
* @Author panghl
* @Date 2021/8/10 23:32
* @Description 通配符模式-生产者
**/
public class Sender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//topic : 模糊匹配的定向分发
channel.exchangeDeclare("test_exchange_topic","topic",true);
String msg = "hello,msg - topic";
// channel.basicPublish("test_exchange_topic","user.insert",null,msg.getBytes());
channel.basicPublish("test_exchange_topic","product.down", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
System.out.println("[消息中心-->]"+msg);
channel.close();
connection.close();
}
}
2.4.2 消费者
/**
* @Author panghl
* @Date 2021/8/10 23:41
* @Description 通配符模式-消费者
**/
public class Recer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列 true 队列持久化
channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);
//绑定路由(队列名,路由名,路由key) 绑定用户相关的消息
channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】==>"+s);
}
};
channel.basicConsume("test_exchange_topic_queue_1",true,consumer);
}
}
2.4 Spring整合RabbitMQ
- 五种消息模型,在企业中应用最广泛的就是最后一种:定向匹配topic
- Spring AMQP 是基于 Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象 层,提供基于消息驱动的 POJO的消息监听等,简化了我们对于RabbitMQ相关程序的开发。
2.4.1 生产端工程
1、依赖
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
2、spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 1.配置连接 -->
<rabbit:connection-factory
id="connectionFactory"
host="192.168.40.100"
port="5672"
username="panghl"
password="panghl"
virtual-host="/lagou"
/>
<!-- 2.配置队列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3.配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删
除队列与交换机,发送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.配置topic类型exchange;队列绑定到交换机 -->
<rabbit:topic-exchange name="spring_topic_exchange">
<rabbit:bindings>
<rabbit:binding queue="test_spring_queue_1" pattern="msg.#"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 5. 配置消息对象json转换类 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/
>
<!-- 6. 配置RabbitTemplate(消息生产者) -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring_topic_exchange"
message-converter="jsonMessageConverter"
/>
</beans>
3、发消息
/**
* @Author panghl
* @Date 2021/8/11 21:15
* @Description 生产者
**/
public class Sender {
public static void main(String[] args) {
//1.创建spring容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
//2.从容器中获得rabbit模板对象
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
//3.发消息
Map<String, String> map = new HashMap<>();
map.put("name", "erguo2");
map.put("email", "phl0425@qq2.com");
// rabbitTemplate.convertAndSend("lalalal","msg.user",map);
// for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("msg.user", map);
System.out.println("消息已经发布。。。");
// }
context.close();
}
}
2.4.2 消费端工程
1、依赖与生产者一致
2、spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 1. 配置连接 -->
<rabbit:connection-factory
id="connectionFactory"
host="192.168.40.100"
port="5672"
username="panghl"
password="panghl"
virtual-host="/lagou"
/>
<!-- 2. 配置队列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3.配置rabbitAdmin -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.springIOC注解扫描包-->
<context:component-scan base-package="listener"/>
<!-- 5.配置监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="consumerListener" queuenames="test_spring_queue_1" />
</rabbit:listener-container>
</beans>
消费者
????????MessageListener接口用于spring容器接收到消息后处理消息
????????如果需要使用自己定义的类型 来实现 处理消息时,必须实现该接口,并重写onMessage()方 法
????????当spring容器接收消息后,会自动交由onMessage进行处理
@Component
public class ConsumerListener implements MessageListener {
// jackson提供序列化和反序列中使用最多的类,用来转换json的
private static final ObjectMapper MAPPER = new ObjectMapper();
public void onMessage(Message message) {
try {
// 将message对象转换成json
JsonNode jsonNode = MAPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
String email = jsonNode.get("email").asText();
System.out.println("从队列中获取:【"+name+"的邮箱是:"+email+"】");
} catch (Exception e){
e.printStackTrace();
}
}
}
启动项目
/**
* @Author panghl
* @Date 2021/8/11 21:43
* @Description TODO
**/
public class TestRunner {
public static void main(String[] args) throws IOException {
//1.创建spring容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
//让程序一直运行
System.in.read();
}
}
2.5 消息成功确认机制
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
2.5.1 事务机制
- AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式
- 并利用信道 的三个方法来实现以事务方式 发送消息,若发送失败,通过异常处理回滚事务,确保 消息成功投递
channel.txSelect(): 开启事务
channel.txCommit() :提交事务
channel.txRollback() :回滚事务
- Spring已经对上面三个方法进行了封装,所以我们只能使用原始的代码演示
2.5.1.1 生产者
/**
* @Author panghl
* @Date 2021/8/10 23:32
* @Description 通配符模式-生产者
**/
public class Sender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//topic : 模糊匹配的定向分发
channel.exchangeDeclare("test_transaction_exchange_topic", "topic");
channel.txSelect();
String msg = "商品降价";
try {
channel.basicPublish("test_transaction_exchange_topic", "product.price", null, "商品1降价".getBytes());
// System.out.println(1/0);
channel.basicPublish("test_transaction_exchange_topic", "product.price", null, "商品2降价".getBytes());
System.out.println("[生产者-->消息已发送]");
channel.txCommit();
} catch (Exception e) {
System.out.println("[生产者-->消息已全部撤销]");
channel.txRollback();
}finally {
channel.close();
connection.close();
}
}
}
2.5.1.2 消费者
/**
* @Author panghl
* @Date 2021/8/10 23:41
* @Description 通配符模式-消费者
**/
public class Recer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列 true 队列持久化
channel.queueDeclare("test_tx_exchange_topic_queue_1",false,false,false,null);
//绑定路由(队列名,路由名,路由key) 绑定用户相关的消息
channel.queueBind("test_tx_exchange_topic_queue_1","test_transaction_exchange_topic","product.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者】==>"+s);
}
};
channel.basicConsume("test_tx_exchange_topic_queue_1",true,consumer);
}
}
2.5.2 Confirm发布确认机制
- RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是 采用事务会大大降低消息的吞吐量
- 我本机SSD硬盘测试结果10w条消息未开启事务,大约8s发送完毕;而开启了事务后,需要将 近310s,差了30多倍。
- 接着翻阅官网,发现官网中已标注
- ?那么有没有更加高效的解决方式呢?答案就是采用Confirm模式。
- 事务效率为什么会这么低呢?试想一下:10条消息,前9条成功,如果第10条失败,那么9条消息 要全部撤销回滚。太太太浪费
- 而confirm模式则采用补发第10条的措施来完成10条消息的送达
2.5.2.1 在spring中应用
spring-rabbitmq-producer.xml
<!--1.配置连接 ,启动生产者确认机制,publisher-confirms="true"-->
<rabbit:connection-factory id="connectionFactory"
host="192.168.40.100"
port="5672"
username="panghl" password="panghl"
virtual-host="/lagou"
publisher-confirms="true"/>
//省略....
<!--6.配置rabbitmq的模板,添加确认回调处理类:confirm-callback="msgSendConfirmCallback"-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
exchange="spring_topic_exchange"
message-converter="jsonMessageConverter"
confirm-callback="messageConfirm"/>
<!--7.确认机制的处理类-->
<bean id="messageConfirm" class="confirm.MessageConfirm"/>
消息确认处理类
/**
* @Author panghl
* @Date 2021/8/11 22:14
* @Description 确认机制,消息确认处理
**/
public class MessageConfirm implements RabbitTemplate.ConfirmCallback {
/**
*
* @param correlationData 消息相关的数据对象(封装了消息的唯一id)
* @param b 消息是否确认成功
* @param s 异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
System.out.println("消息确认成功!correlationData-->"+correlationData+"s->"+s);
}else {
System.out.println("消息确认失败! correlationData-->"+correlationData+"s->"+s);
//如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用补发
//1.采用递归(限制递归的次数)
//2.redis+定时任务(jdk的timer,或者定时任务框架)
}
}
}
log4j.properties
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rabbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l
%m%n
log4j.rootLogger=debug, stdout,file
发送消息
/**
* @Author panghl
* @Date 2021/8/11 21:15
* @Description 生产者
**/
public class Sender {
public static void main(String[] args) {
//1.创建spring容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
//2.从容器中获得rabbit模板对象
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
//3.发消息
Map<String, String> map = new HashMap<>();
map.put("name", "erguo2");
map.put("email", "phl0425@qq2.com");
// 第一个参数是路由名称,
// 不写,则使用spring容器中创建的路由
// 乱写一个,因为路由名错误导致报错,则进入消息确认失败流程
rabbitTemplate.convertAndSend("lalalal","msg.user",map);
context.close();
}
}
2.6 消费端限流
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("msg.user", map);
System.out.println("消息已经发布。。。");
}
<!--5.配置监听-->
<!-- prefetch="3" 一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于
N 个消息,一旦有 N 个消息还没有ack,则该 consumer 将阻塞,直到消息被ack-->
<!-- acknowledge-mode: manual 手动确认-->
<rabbit:listener-container
connection-factory="connectionFactory"
prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
</rabbit:listener-container>
package listener;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @Author panghl
* @Date 2021/8/11 21:33
* @Description 消费者监听队列 -
* AbstractAdaptableMessageListener用于在spring容器接收到消息后用于处理消息的抽象
* 基类
**/
@Component
public class ConsumerListener extends AbstractAdaptableMessageListener implements MessageListener {
// jackson提供序列化和反序列中使用最多的类,用来转换json的
private static final ObjectMapper MAPPER = new ObjectMapper();
// @Override
// public void onMessage(Message message) {
// System.out.println("收到消息了.....");
// String body = new String(message.getBody());
// //将message对象转换成json
// try {
// JsonNode jsonNode = MAPPER.readTree(body);
// String name = jsonNode.get("name").asText();
// String email = jsonNode.get("email").asText();
// System.out.println("从队列中获取【"+name+"的邮箱是"+email+"】");
//
// System.out.println("休息三秒然后再接收消息");
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("收到消息了.....");
String body = new String(message.getBody());
//将message对象转换成json
try {
// JsonNode jsonNode = MAPPER.readTree(body);
// String name = jsonNode.get("name").asText();
// String email = jsonNode.get("email").asText();
// System.out.println("从队列中获取【" + name + "的邮箱是" + email + "】");
System.out.println("body-->"+body);
long msgId =
message.getMessageProperties().getDeliveryTag();
//确认收到(参数1,参数2)
/*
参数1:RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递
增的正整数,delivery_tag 的范围仅限于 Channel
参数2:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以
一次性确认 msgId 小于等于传入值的所有消息
*/
channel.basicAck(msgId, true);
TimeUnit.SECONDS.sleep(3);
System.out.println("休息三秒然后再接收消息");
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.7 过期时间TTL
- Time To Live:生存时间、还能活多久,单位毫秒
- 在这个周期内,消息可以被消费者正常消费,超过这个时间,则自动删除(其实是被称为dead message并投入到死信队列,无法消费该消息)
- RabbitMQ可以对消息和队列设置TTL
1、通过队列设置,队列中所有消息都有相同的过期时间 2、对消息单独设置,每条消息的TTL可以不同(更颗粒化)
2.7.1 设置队列TTL
spring-rabbitmq-producer.xml
<!--2.重新配置一个队列,同时,对队列中的消息设置过期时间-->
<rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="5000"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
2.7.1 设置消息TTL
- 设置某条消息的ttl,只需要在创建发送消息时指定即可
<!--2.配置队列-->
<rabbit:queue name="test_spring_queue_ttl_2">
/**
* @Author panghl
* @Date 2021/8/11 21:15
* @Description 生产者
**/
public class Sender2 {
public static void main(String[] args) throws IOException {
//1.创建spring容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
//2.从容器中获得rabbit模板对象
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
//3.发消息
Map<String, String> map = new HashMap<>();
map.put("name", "erguo2");
map.put("email", "phl0425@qq2.com");
//创建消息的配置对象
MessageProperties properties = new MessageProperties();
//设置过期时间3s
properties.setExpiration("3000");
//创建消息
ByteArrayOutputStream os = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(map);
Message message = new Message(os.toByteArray(),properties);
rabbitTemplate.convertAndSend("msg.user", message);
System.out.println("消息已经发布。。。");
context.close();
}
}
- 如果同时设置了queue和message的TTL值,则二者中较小的才会起作用
2.8 死信队列
1、DLX(Dead Letter Exchanges)死信交换机/死信邮箱,当消息在队列中由于某些原因没有被及时 消费而变成死信(dead message)后,这些消息就会被分发到DLX交换机中,而绑定DLX交换机 的队列,称之为:“死信队列”
2、消息没有被及时消费的原因:
????????消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
????????消息超时未消费
????????达到最大队列长度
?spring-rabbitmq-producer-dlx.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--1.配置连接 ,启动生产者确认机制,publisher-confirms="true" 192.168.40.66 keepalived集群后的 虚拟ip-->
<rabbit:connection-factory id="connectionFactory"
host="192.168.40.100"
port="5672"
username="panghl" password="panghl"
virtual-host="/lagou"
publisher-confirms="true"/>
<!--3.配置rabbitAdmin:主要用于在java代码中队队列的管理,用来创建,绑定,删除队列与交换机、发送消息-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--6.配置rabbitmq的模板,添加确认回调处理类:confirm-callback="msgSendConfirmCallback"-->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="my_exchange"/>
<!--
############################################################################
##########################################-->
<!--死信队列-->
<rabbit:queue name="dlx_queue"/>
<!--定向死信交换机-->
<rabbit:direct-exchange name="dlx_exchange">
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="dlx_queue" />
<rabbit:binding key="dlx_max" queue="dlx_queue" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!--声明定向的测试消息的交换机-->
<rabbit:direct-exchange name="my_exchange">
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="test_ttl_queue"></rabbit:binding>
<rabbit:binding key="dlx_max" queue="test_max_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--声明 测试过期的消息队列-->
<rabbit:queue name="test_ttl_queue">
<rabbit:queue-arguments>
<!--设置队列的过期时间TTL-->
<entry key="x-message-ttl" value-type="long" value="20000"/>
<!--消息超时 投递给死信交换机-->
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--声明 测试超过长度的消息队列-->
<rabbit:queue name="test_max_queue">
<rabbit:queue-arguments>
<!--设置队列的额定长度(本队列最多装两个消息)-->
<entry key="x-max-length" value-type="long" value="2" />
<!--消息如果超出长度 投递给死信交换机-->
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
</beans>
3、发消息进行测试
/**
* @Author panghl
* @Date 2021/8/11 21:15
* @Description 生产者-死信
**/
public class SenderDLX {
public static void main(String[] args) throws IOException {
//1.创建spring容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
//2.从容器中获得rabbit模板对象
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
//3.发消息
// rabbitTemplate.convertAndSend("dlx_ttl", "超时,关闭订单".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度1".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度2".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度3".getBytes());
System.out.println("消息已经发布。。。");
context.close();
}
}
2.9 延迟队列
- 延迟队列:TTL + 死信队列的合体
- 死信队列只是一种特殊的队列,里面的消息仍然可以消费
- 在电商开发部分中,都会涉及到延时关闭订单,此时延迟队列正好可以解决这个问题
2.9.1 生产者
沿用上面死信队列案例的超时测试,超时时间改为订单关闭时间即可
2.9.2 消费者
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!--1.配置连接-->
<rabbit:connection-factory id="connectionFactory"
host="192.168.40.100"
port="5672"
username="panghl" password="panghl"
virtual-host="/lagou"/>
<!--2.配置队列-->
<rabbit:queue name="test_spring_queue_ttl" />
<!--3.配置rabbitAdmin:主要用于在java代码中队队列的管理,用来创建,绑定,删除队列与交换机、发送消息-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--4.注解扫描包(SpringIOC)-->
<context:component-scan base-package="listener"/>
<!--5.配置监听-->
<!-- prefetch="3" 一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于
N 个消息,一旦有 N 个消息还没有ack,则该 consumer 将阻塞,直到消息被ack-->
<!-- acknowledge-mode: manual 手动确认-->
<rabbit:listener-container
connection-factory="connectionFactory"
prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="test_spring_queue_ttl"/>
</rabbit:listener-container>
<!-- 监听死信队列 -->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3"
acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="dlx_queue" />
</rabbit:listener-container>
</beans>
3、集群搭建
由于篇幅过长,请移步:https://blog.csdn.net/qq_45441466/article/details/119699104
|