1、 MQ模型
1.1 简单队列模型
简单队列模型
publisher -----> queue ----> consumer
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处理队列中的消息
1.2 发布/订阅模型
publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给exchange(交换机)
exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
exchange种类:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,将消息交给符合routingKey的队列
- Topic:主题(使用通配符),将消息交给符合routing pattern(路由模式)的队列
consumer:消费者,订阅队列,消费消息
queue:队列,接收消息,缓存消息
2、 SpringAMQP的使用
SpringAMQP地址:https://spring.io/projects/spring-amqp
2.1 Basic Queue 简单队列模型
- 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置MQ地址
spring:
rabbitmq:
host: 192.168.10.128
port: 5672
virtual-host: /
username: root
password: root
- 实体类与Controller类
@Data
public class MqMessage {
private String message;
private String queueName;
private String exchange;
private String routingKey;
}
@PostMapping("/send")
public String sendMsgService(@RequestBody MqMessage msg) {
return msgService.sendMsg(msg);
}
- impl 消息发送
@Service
public class MsgServiceImpl implements MsgService {
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public String sendMsg(MqMessage msg) {
try {
rabbitTemplate.convertAndSend(msg.getRoutingKey(), msg.getMessage());
return "Success";
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
}
}
}
- 调用
调用成功后可在rabbitmq的控制台看到刚发送到MQ的消息: - 消息消费(配置文件内容与publisher一样)
@Configuration
public class ConsumerListen {
@RabbitListener(queues = "my.queue")
public void listenSimple(String msg) throws InterruptedException {
System.out.println("接收到的消息:" + msg);
}
@RabbitListener(queuesToDeclare = @Queue("myQueue"))
public void myQueue(String msg) throws InterruptedException {
System.out.println("myQueue接收到的消息:" + msg);
}
}
另一种写法(未去验证):
@Component
@RabbitListener(queues = "my.queue")
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("my.queue消费者收到消息 : " + testMessage.toString());
}
}
2.2 Work Queue模型
让多个消费者绑定到一个队列,共同消费队列中的消息。 场景:当消息处理比较耗时,生产速度远大于消耗速度,就可以使用此模型
...
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(msg.getRoutingKey(), msg.getMessage() + " " + i);
}
消费者:(多个消费者加监听simple.queue队列)
消费者1
@RabbitListener(queues = "my.queue")
public void listenSimple(String msg) throws InterruptedException {
System.out.println("1接收到的消息:" + msg);
Thread.sleep(1000);
}
@RabbitListener(queues = "my.queue")
public void listenSimple2(String msg) throws InterruptedException {
System.out.println("2接收到的消息:" + msg);
}
上面会有一个问题:消费者2很快就处理了25条信息,但是消费者1还在慢慢处理自己的消息。 出现的原因:消息是平均分配给每个消费者,没有考虑到消费者的处理能力 解决的办法:能者多劳 修改consumer的application.yml配置文件中添加配置
spring:
rabbitmq:
listener:
simple:
prefetch: 1
2.3 发布/订阅模型
2.3.1 Fanout模式(广播模式)
消息发送流程
- 可以有多个队列
- 每个队列绑定到exchange 生产者生产消息,发送到交换机上
- 交换机把消息发送到绑定的所有队列
- 订阅队列的消费者,消费消息
1、声明队列和交换机(publish)
@Component
public class RabbitQueueConfig {
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("amqp.fanout");
}
}
2、把队列绑定到交换机上
@Bean
public Binding bindingQueue1(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding bindingQueue2(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
3、发送消息
@Override
public String fanoutPublisher(MqMessage msg) {
try {
rabbitTemplate.convertAndSend(msg.getExchange(), "", msg.getMessage());
return "Success";
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
}
}
4、消费消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("fanout.queue1接收到的消息:" + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("fanout.queue2接收到的消息:" + msg);
}
2.3.2 direct模式(定向模式)
在fanout模式中,一条消息会被发布到所有订阅的队列,在某些场景下,我们希望不同的消息被不同的队列消费,就可以使用direct模式了
direct模式:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey (路由key) - publisher 向 Exchange发送消息时,也必须指定消息的
RoutingKey 。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key 进行判断,只有队列的Routingkey 与消息的Routing key 完全一致,才会接收到消息
1、使用注解的方式,声明队列和交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "amqp.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String msg){
log.info("direct.queue1接收到的消息:{}",msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "amqp.direct",type = ExchangeTypes.DIRECT),
key = {"yellow","blue"}
))
public void listenDirectQueue2(String msg){
log.info("direct.queue2接收到的消息:{}",msg);
}
2、声明发布者发送消息
@Override
public String directPublisher(MqMessage msg) {
try {
rabbitTemplate.convertAndSend(msg.getExchange(), "red", "red key " + msg.getMessage());
rabbitTemplate.convertAndSend(msg.getExchange(), "yellow", "yellow key " + msg.getMessage());
rabbitTemplate.convertAndSend(msg.getExchange(), "blue", "blue key " + msg.getMessage());
return "Success";
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
}
}
direct交换机和fanout交换机的差异:
- fanout交换机将消息路由给每个与其绑定的队列
- direct交换机根据routingkey判断路由给那个队列
- 如果多个队列具有相同的routingkey,则与fanout功能相似
2.3.3 Topic模式(主题模式)
Topic 类型的Exchange 与Direct 相比,都是可以根据RoutingKey 把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成 ,多个单词之间以”.”分割,例如: item.insert 通配符规则: # :匹配一个或多个词 * :匹配不多不少恰好1个词 1、使用注解的方式,声明队列和交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "amqp.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
log.info("topic.queue1接收到的消息:{}",msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "amqp.topic",type = ExchangeTypes.TOPIC),
key = "*.new"
))
public void listenTopicQueue2(String msg){
log.info("topic.queue2接收到的消息:{}",msg);
}
2、声明发布者发布消息
@Override
public String topicPublisher(MqMessage msg) {
rabbitTemplate.convertAndSend(msg.getExchange(), "china.new", "china.new ," + msg.getMessage());
rabbitTemplate.convertAndSend(msg.getExchange(), "china.new.now", "china.new.now ," + msg.getMessage());
rabbitTemplate.convertAndSend(msg.getExchange(), "topic.new", "topic.new ," + msg.getMessage());
return "Success";
}
消息转换器
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在的问题:数据体积过大、有安全漏洞、可读性差 解决问题:使用json转换器 配置json转换器 在publisher和consumer中引入依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
在启动类配置转换bean
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
3、在使用SpringAMQP中存在的问题
- 消息可靠性问题
如何确保发送的消息至少被消费一次 - 延迟消息问题
如何实现消息的延迟投递 - 高可用问题
如何避免单点的MQ故障导致的不可用问题 - 消息堆积
如何解决数百万消息堆积,无法及时消费问题
3.1 消息可靠性
#消息发送的流程 publisher—>exchange–>queue–>consumer 其中每一步都可能导致消息丢失,常见的丢失原因有:
- 发送时丢失
- 生产者发送消息为送达exchange
- 消息到达exchange没有到queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后没有消费就宕机了
解决方案:
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
3.1.1 生产者确认机制
这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
- publisher-return,发送者回执
- 消息投递到交换机,但是没有路由到队列,返回ack及失败的原因
1、 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、 在publisher和consumer服务中添加application.yml配置文件
spring:
rabbitmq:
host: 192.168.10.128
port: 5672
virtual-host: /
username: root
password: root
publisher-confirm-type: correlated
publisher-returns : true
template:
mandatory: true
publish-confirm-type :开启publisher-confirm,这里支持两种类型:
simple : 同步等待confirm结果直到超时correlated :异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback publish-returns :开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbacktemplate.mandatory :定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
3、 定义return回调(ReturnCallback) 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback((message, replyCode, replyTest, exchange, routingKey)
-> log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode,replyTest,exchange,routingKey,message));
}
}
4、定义ConfirmCallback ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
@org.junit.Test
public void testSendMessage(){
String message = "hello, spring amqp! ".concat(LocalDateTime.now().toString());
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(confirm -> {
if(confirm.isAck()){
log.debug("消息发送成功ack,ID{}",correlationData.getId());
}else{
log.info("消息发送失败-nack,ID{}",correlationData.getId());
}
}
, throwable -> log.error("消息发送失败-连接mq异常,ID{}",correlationData.getId()));
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE","DIRECT_ROUTING_KEY",message,correlationData);
}
测试的时候一直报312,NO_ROUTE错误,经过几番验证,后面重新写了个交换机和队列就解决了,好像一个队列只能被一个交换机绑定!(查看网上说报312都是延迟消息发送,配置参数mandatory要改成false,但咱这里不是延迟消息) 所以先在消费者上创建后并启动:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3"),
exchange = @Exchange(name = "DIRECT_EXCHANGE", type = ExchangeTypes.DIRECT),
key = {"DIRECT_ROUTING_KEY"}
))
public void listenDirectQueue(String msg) {
log.info("confim.ack接收到的消息:{}", msg);
}
3.1.2 消费者确认机制
RabbitMQ是阅后即焚机制,确认消息被消费者消费后会立刻删除。
而RabbitMQ也是可以通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
修改application.yml配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none
本地失败重试机制
我们可以利用Spring的retry机制 ,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改application.yml配置文件
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 1000ms
multiplier: 1
max-attempts: 3
stateless: true
失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1、在consumer服务中定义处理失败消息啊的队列和交换机
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2、定义RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
3.1.3 消息持久化
生产者可以确定的将消息投递到mq中,但是消息发送到mq以后,突然宕机,导致消息丢失,因此想要确保消息在mq中,就需要开启消息持久化机制。
交换机持久化
@Bean
public DirectExchange simpleExchange(){
return new DirectExchange("simple.direct", true, false);
}
由SpringAMQP声明的交换机都是持久化的
队列持久化
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
}
@Bean
public Queue simpleQueue(){
return new Queue("fanout.queue2",true,false,false);
}
由SpringAMQP声明的队列都是持久化的
消息持久化
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode
Message message = MessageBuilder
.withBody("hello, spring amqp!".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
SpringAMQP发出的任何消息都是持久化的
3.1.4 延迟消息问题
死信交换机
什么是死信: 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
死信交换机(Dead Letter Exchange,DLX)如果这个包含死信的队列配置了dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中 1、在consumer服务中 定义死信交换机、死信队列
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue")
.deadLetterExchange("dead.direct")
.deadLetterRoutingKey("simple")
.build();
}
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dead.direct",true,false);
}
@Bean
public Queue dlQueue(){
return new Queue("dead.queue",true);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
# 消费者确认模式: auto
acknowledge-mode: auto
特征: 当消息不能被消费时,会重新入队,再次投递给消费者进行被消费
default-requeue-rejected: false # 拒绝消息重新入队,如果队列绑定了死信交换机则消息会投递到死信交换机并路由到死信队列
# 本地重试:
当本地重试次数耗尽时,如果当前队列没有绑定死信交换机或错误队列,则消息丢弃
如果提供了错误队列,则消息投递到错误队列
如果队列绑定了死信交换机,则消息以死信的形式存放到死信队列
TTL
(time to live 消息存活时间) 一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:
- 消息所在的队列设置了超时时间
- 消息本身设置了超时时间
队列设置超时时间
1、声明 死信交换机、死信队列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.direct"),
key = "ttl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
2、声明一个队列,指定TTL
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue")
.ttl(10000)
.deadLetterExchange("dl.ttl.direct")
.deadLetterRoutingKey("ttl")
.build();
}
3、声明交换机,将ttl.queue队列关联上
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
4、发送消息
@Test
public void testTTLQueue() {
String message = "hello, ttl queue";
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("发送消息成功");
}
消息设置超时时间
@Test
public void testSendTTLMessage() throws InterruptedException {
Message msg = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
rabbitTemplate.convertAndSend("ttl.direct","ttl", msg);
log.info("发送消息成功...");
}
}
延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。 延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
安装DelayExchange插件https://www.rabbitmq.com/community-plugins.html
安装DelayExchange
1、下载插件 2、我们是基于docker安装的rabbitmq,所以要把下载的文件挂载到数据卷中
// 查看数据卷的位置
docker volume inspect mq-plugins
3、把下载的文件放在数据卷中
/var/lib/docker/volumes/mq-plugins/_data
4、安装插件 进入容器
docker exec -it mq bash
开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安装完成
使用DelayExchange
1、声明DelayExchange交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.direct",delayed = "true"),
key = "delay"
))
public void listenDelayedQueue(String msg){
log.info("接收到delay.queue的消息:{}",msg);
}
2、发送消息 发送消息时,一定要携带x-delay属性,指定延迟时间
@Test
public void testDelayMsg(){
Message message = MessageBuilder
.withBody("hello delay message".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay",1000)
.build();
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
log.info("发送消息成功");
}
3.3 消息堆积问题
消息堆积问题: 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃 解决消息堆积问题的思路:
- 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
- 扩大队列容积,提高堆积上限
惰性队列
特征:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
设置惰性队列 1、使用命令行设置lazy-queue
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
- rabbitmqctl :RabbitMQ的命令行工具 set_policy :添加一个策略
- Lazy :策略名称,可以自定义
- “^lazy-queue$” :用正则表达式匹配队列的名字
- ‘{“queue-mode”:“lazy”}’ :设置队列模式为lazy模式
- –apply-to queues:策略的作用对象,是所有的队列
2、基于@Bean声明lazy-queue
@Bean
public Queue lazyQueue(){
return QueueBuilder.durable("lazy-queue")
.lazy()
.build();
}
3、基于@RabbitListener声明lazy-queue
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy-queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到消息:{}",msg);
}
}
惰性队列的优点
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
惰性队列的缺点
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO
3.4 高可用问题
搭建集群
集群的分类:
- 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
- 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
…… 普通集群 镜像集群 仲裁队列 集群扩容 增加仲裁队列副本
Exchange的4种类型:direct、fanout、topic、headers RabbitMQ的常见队列模型,simple模式、work模式、fanout模式、direct模式、topic模式、headers模式、RPC simple模式就是上文中的Basic模式
|