1.AMQP
AMQP(Advanced Message Queuing Protocol)AMQP,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
2.配置环境
引入依赖:
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
在application.yml中添加RabbitMQ配置
spring:
rabbitmq:
host: 192.168.136.160
port: 5672
username: lepeng
password: lepeng
virtual-host: lepeng
3.SimpleQueue
简单模型,一个生产者,一个队列,一个消费者 消息发送:
@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpPublusherTest {
@Autowired
private RabbitTemplate template;
@Test
public void testSendBaseMessage() {
String queueName = "simple.queue";
String message = "宝 你今天输液了嘛";
template.convertAndSend(queueName,message);
}
}
发送完成后,可以去RabbitMQ中simple.queue里查看发送的消息<注意先不要启动消费者,消费者如果接收则查看不到了> 消息接收: 消费者需要保持一个开启状态接收信息,所以需要配置监听消息队列的监听器,注意监听器需要与启动类放在同一目录
@Component
public class MessageListener {
@RabbitListener(queues = "simple.queue")
public void recBaseMessage1(String message) {
System.out.println("获取基础消息:"+message);
}
}
启动消费者类即可!!!
4.WorkQueue
循环模拟大量信息:
@Test
public void testSendWorkMessage() {
for (int i = 0; i < 10; i++) {
String queueName = "simple.queue";
String message = "宝 你今天输液了嘛";
template.convertAndSend(queueName,message);
}
}
配置两个消费者,也就是监听器
@RabbitListener(queues = "simple.queue")
public void recBaseMessage1(String message) {
System.out.println("消费者1获取基础消息:"+message);
}
@RabbitListener(queues = "simple.queue")
public void recBaseMessage2(String message) {
System.out.println("消费者2获取基础消息:"+message);
}
结果: 但这样消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。 我们可以配置一个能者多劳机制。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
5.发布/订阅
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- C:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
6.Fanout(广播)
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
广播模式主要实现了消息能发送给多个用户,因为队列中消息一次只能发送一个人,所以需要使用交换机去选择队列,发送给不同用户。
发送消息
@Test
public void testFanoutMessage() {
String exchange = "lepengExchange";
String message = "输的什么液、想你的夜";
template.convertAndSend(exchange,"",message);
}
接收消息
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "fanout.queue1"
),
exchange = @Exchange(
name = "lepengExchange",
type = ExchangeTypes.FANOUT
)
)
)
public void recFanoutMessage(String message) throws InterruptedException {
System.out.println("消费1-获取基础消息:"+message);
Thread.sleep(200);
}
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(
name = "fanout.quque2"
),
exchange = @Exchange(
name = "lepengExchange",
type = ExchangeTypes.FANOUT
)
)
)
public void recFanoutMessage2(String message) throws InterruptedException {
System.out.println("消费2-获取基础消息:"+message);
Thread.sleep(200);
}
7.Direct
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey (路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey 。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
发送消息
@Test
public void testDirectMessage() {
String exchange = "exchange-direct";
String routingKey = "fan" ;
String message = "你看这牢饭又香又甜";
template.convertAndSend(exchange,routingKey,message);
}
接收消息
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "direct.queue1"
),
exchange = @Exchange(
name = "exchange-direct",
type = ExchangeTypes.DIRECT
),
key = {"fan"}
)
)
public void recDirectMessage(String message) throws InterruptedException {
System.out.println("消费1-获取基础消息:"+message);
Thread.sleep(200);
}
8.Topic
Topic 类型的Exchange 与Direct 相比,都是可以根据RoutingKey 把消息路由到不同的队列。只不过Topic 类型Exchange 可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
# :匹配一个或多个词
* :匹配不多不少恰好1个词
举例:
item.# :能够匹配item.spu.insert 或者 item.spu
item.* :只能匹配item.spu 解释:
- 红色Queue:绑定的是
usa.# ,因此凡是以 usa. 开头的routing key 都会被匹配到 - 黄色Queue:绑定的是
#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
发送消息:
@Test
public void testTopicMessage() {
String exchange = "exchange-topic";
String routingKey = "china.news";
String message = "吴签刑拘";
template.convertAndSend(exchange,routingKey,message);
}
接收消息:
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "topic-queue1"
),
exchange = @Exchange(
name = "exchange-topic",
type = ExchangeTypes.TOPIC
),
key = "*.news"
)
)
public void recTopicMessage(String message) throws InterruptedException {
System.out.println("消费1:"+message);
Thread.sleep(200);
}
注意:将消费者停止,去RabbitMQ的查看数据,可以看到我们传入的消息 经过base64解码我们可以看到 出现这种原因是因为:
Spring会把你发送的消息序列化,后进行一个base64的加密后放到队列当中。
默认情况下Spring采用的序列化方式是JDK序列化。JDK序列化存在下列问题:
我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
9.配置配置JSON转换器
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
并在启动类配置json转换的bean
@Beanpublic MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter();}
测试: 监听器中修改参数为Map测试
10.源码
自行下载:https://gitee.com/le-peng/rabbit-mq-demo
|