消息中间件MQ(Message Queue)
简介 ??消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。 ??消息队列中间件是分布式系统重要组件,主要解决应用解耦、异步消息、流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。 数据库: mysql orcale ???? redis ??????jdbc?????? jedis
MQ : activeMq rabbitMq kafka zeroMq rocketMq
???amqp ? jms
常见的的消息中间件MQ
1)ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
(2)RabbitMQ AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
(3)ZeroMQ 史上最快的消息队列系统
(4)Kafka Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。
(5)RocketMQ 阿里巴巴
应用场景
(1)异步处理
??场景说明:用户注册后,需要发注册邮件和注册短信。 ??传统的做法有两种 1.串行的方式;2.并行方式 ?? 1、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。 ??2、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。 ??假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。 ??因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100) ??小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢? ??引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下: ??按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。
(2)应用解耦
上面的应用也体现了解耦合
(3)流量削峰
??流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。 ??通过加入消息队列完成如下功能: ??a、可以控制活动的人数 ??b、可以缓解短时间内高流量压垮应用 ??用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。
RabbitMQ
简介 RabbitMQ是基于AMQP和Erlang语言开发的一款消息管理系统。
下载和安装
下载 官网下载地址:http://www.rabbitmq.com/download.html 官方教程:http://www.rabbitmq.com/getstarted.html ??**先安装otp_win64_22.0.exe (Erlang的环境)**双击运行,下一步下一步即可,然后在安装rabbitmq-server-3.7.16 安装启动 进入到rabbitMq的安装目录中,进入sbin文件夹中执行命令 举例: ??在电脑的服务中确保rabbitMq的服务是一个正常运行的状态 第一步、启动RabbbitMQ
rabbitmqctl start_app
可以在浏览器上访问 http://localhost:15672/ ,如果没有正常显示,就执行第二步
第二步、开启web控制台的访问(最好执行一下)
rabbitmq-plugins enable rabbitmq_management
第三步、然后就可访问了 http://127.0.0.1:15672/ 第四步、但是需要输入用户名和密码 使用以下命令查看能使用的用户名和密码
rabbitmqctl list_users
目前里面有一个默认的用户名是guest,密码也是guest
还有两个命令: 关闭 rbbitmq: rabbitmqctl stop_app 还原: rabbitmqctl reset
注意:rabbitMq在window中使用的时候,机器用户名称不能是中文否则安装肯定出问题 如果安装失败应该如何解决: 1、重装系统 2、将RabbitMQ安装到linux虚拟机中 (自己操作) 3、使用别人安装好的RabbitMQ服务
管理界面
添加用户
创建Virtual Hosts(虚拟主机) 并且分配权限
AMQP和JMS
??通过java语言实现MQ的有两种主流方式:AMQP、JMS。 ??两者间的区别和联系:
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型 点对点、发布订阅;而AMQP的消息模型更加丰富7种 工作中用5种
5种消息模型
??RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此也就剩下5种。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
(1)基本消息模型
官方说明文档 ??RabbitMQ是一个消息的代理者(Message Broker):它接收消息并且传递消息。 你可以认为它是一个邮局:当你投递邮件到一个邮箱,你很肯定邮递员会终究会将邮件递交给你的收件人。与此类似,RabbitMQ 可以是一个邮箱、邮局、同时还有邮递员。 不同之处在于:RabbitMQ不是传递纸质邮件,而是二进制的数据。 ??在上图模型中有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者 连接工具类:
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/as");
factory.setUsername("/as");
factory.setPassword("as");
Connection connection = factory.newConnection();
return connection;
}
}
生产者发送消息:
public class Send {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
web控制台查看消息 ?&emsp:进入队列页面,可以看到新建了一个 队列:simple_queue ??点击队列名称,进入详情页,可以查看消息: 消费者收取信息
public class Recv {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
控制台输出信息: 这个时候,进入到队列去查看消息已经没了:
消费者的消息确认机制 (防止消息丢失)
??通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。 ??那么问题来了:RabbitMQ怎么知道消息被接收了呢? ??这就要通过消息确认机制(Acknowlege)来实现了。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
- 自动ACK:消息一旦被接收,消费者自动发送ACK
- 手动ACK:消息接收后,不会发送ACK,需要手动调用
需要看消息的重要性选择自动还是手动: - 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
- 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
上边的消费者代码都是自动ACK的,如果要手动ACK,需要改动我们的代码:
public class Recv2 {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
注意到最后一行代码:
// 监听队列,第二个参数false,手动进行ACK channel.basicConsume(QUEUE_NAME, false, consumer);
如果第二个参数为true,则会选择自动进行ACK;如果为false,则需要手动ACK。方法声明:
(2)work消息模型
??基本模型中,一个生产者,一个消费者,生产的消息直接被消费者消费。比较简单。 Work queues,也被称为(Task queues),任务模型。 ??当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。 角色:
- P:生产者:任务的发布者
- C1:消费者,领取任务并且完成任务,假设完成速度较慢
- C2:消费者2:领取任务并完成任务,假设完成速度快
生产者 与基本模型中的几乎一样:
public class Send {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
String message = "task .. " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 2);
}
channel.close();
connection.close();
}
}
消费者1 消费者2 与消费者1基本类似,就是没有设置消费时间。这里模拟的是消费者2比消费者1快 两个消费者一同启动,生产者发送50条消息,消费者1消费25条,消费者2消费类另外25条: 能者多劳 ??刚才的实现有问题吗?
- 消费者1比消费者2的效率要低,一次任务的耗时较长
- 然而两人最终消费的消息数量是一样的
- 消费者2大量时间处于空闲状态,消费者1一直忙碌
??现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。 ??怎么实现呢? ??我们可以修改设置,让消费者同一时间只接收一条消息,这样处理完成之前,就不会接收更多消息,就可以让处理快的人,接收更多消息 :
结果:
(3)订阅模式
在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
(3.1)订阅模式-Fanout(广播)
Fanout,也叫做广播。 在广播模式下,消息发送流程是这样的:
- 1) 可以有多个消费者
- 2) 每个消费者有自己的queue(队列)
- 3) 每个队列都要绑定到Exchange(交换机)
- 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 5) 交换机把消息发送给绑定过的所有队列
- 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
生产者 两个变化:
- 1) 声明Exchange,不再声明Queue
- 2) 发送消息到Exchange,不再发送到Queue
public class Send {
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Hello everyone";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1
public class Recv {
private final static String QUEUE_NAME = "fanout_exchange_queue_1";
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
要注意代码中:队列需要和交换机绑定 消费者2
public class Recv2 {
private final static String QUEUE_NAME = "fanout_exchange_queue_2";
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
结果,生产者发送1条消息给到交换机,交换机发送到其绑定的队列,消费者从不同的队列中取得相同消息:
(3.2)订阅模型-Direct(定向)
??在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey (路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey 。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息。
流程图: 图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
生产者 此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete
public class Send {
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message = "商品新增了, id = 1001";
channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
System.out.println(" [商品服务:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1 我们此处假设消费者1只接收两种类型的消息:更新商品和删除商品。
public class Recv {
private final static String QUEUE_NAME = "direct_exchange_queue_1";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者2 我们此处假设消费者2接收所有类型的消息:新增商品,更新商品和删除商品。
public class Recv2 {
private final static String QUEUE_NAME = "direct_exchange_queue_2";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
我们分别测试增、删改的RountingKey,结果为:
(3.3)订阅模型-Topic(通配符)
??Topic类型的 Exchange与 Direct相比,都是可以根据 RoutingKey把消息路由到不同的队列。只不过 Topic类型 Exchange可以让队列在绑定 Routing key` 的时候使用通配符!
??Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则: # :匹配一个或多个词 * :匹配不多不少恰好1个词
举例: item.# :能够匹配item.spu.insert 或者 item.spu item.* :只能匹配item.spu
图示:
解释:
- 红色Queue:绑定的是
usa.# ,因此凡是以 usa. 开头的routing key 都会被匹配到 - 黄色Queue:绑定的是
#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
生产者 ??使用topic类型的Exchange,发送消息的routing key有3种: item.isnert 、item.update 、item.delete :
public class Send {
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message = "新增商品 : id = 1001";
channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
System.out.println(" [商品服务:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1
public class Recv {
private final static String QUEUE_NAME = "topic_exchange_queue_1";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者2
public class Recv2 {
private final static String QUEUE_NAME = "topic_exchange_queue_2";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
持久化
如何避免数据丢失? 1) 消费者的ACK机制。可以防止消费者丢失消息。 2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。
所以我们需要将消息持久化到硬盘,以防服务宕机。 要将消息持久化,前提是:队列、Exchange都持久化 交换机持久化 队列持久化 消息持久化
Spring AMQP
Spring有很多不同的项目,其中就有对AMQP的支持:
生产者
第一步:创建一个项目
第二步:添加依赖
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
</dependencies>
第三步:在项目的resources文件夹下添加一个spring的配置文件,文件的名称:applicationContext-p.xml
<context:component-scan base-package="cn.it.rabbitmq.spring"/>
<rabbit:connection-factory id="connectionFactory"
virtual-host="/as"
host="127.0.0.1"
port="5672" username="as"
password="as" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" />
<rabbit:direct-exchange name="spring.test.exchange">
<rabbit:bindings>
<rabbit:binding queue="spring.test.queue" key="user.insert" />
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
exchange="spring.test.exchange"
message-converter="jsonMessageConverter"/>
</beans>
第四步:开发一个发送方的类,有一个发送的方法
@Component
public class MqSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessage(){
amqpTemplate.convertAndSend("user.insert","spring整合RabbitMQ消息");
System.out.println("发送成功........");
}
}
第五步:测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-p.xml")
public class MqSendDemo {
@Autowired
private MqSender mqSender;
@Test
public void test(){
mqSender.sendMessage();
}
}
消费者
第一步:创建一个项目 第二步:添加依赖 和生产者依赖一模一样。
第三步:直接在项目的resources文件夹下 添加spring配置文件,文件的名 applicationContext-c.xml
<rabbit:connection-factory id="connectionFactory"
virtual-host="/as"
host="127.0.0.1"
port="5672"
username="as"
password="as" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" />
<bean id="testMqListener" class="cn.it.rabbitmq.spring.MqListener" />
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener ref="testMqListener" queues="spring.test.queue" />
</rabbit:listener-container>
</beans>
第四步:创建一个消费类,这个类一定要实现一个监听器接口
public class MqListener implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println(message.getBody());
String ms = new String(message.getBody(), "UTF-8");
System.out.println(ms);
} catch (Exception e) {
e.printStackTrace();
}
}
}
第五步:测试
public class MqConsumerTest {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext app = new ClassPathXmlApplicationContext("classpath:applicationContext-c.xml");
app.start();
System.in.read();
}
}
|