?rabbitmq的一些自己的总结:
小结:
消费者消费消息需要确认应答,没有确认应答消息就不会被删除,那么消息发布不正常消息就会丢失,这样是有问题的
消息在正常情况下被消费并确认应答了,那么消息会被删除,但是有些情况如:消息在TTL内因为消费端的问题,没有
被及时消费消息就会被丢弃,消息被拒,消息会丢失,队列达到最大不能进入队列,这些消息会成为死信,对这些死信
处理到一个专门的队列就是死信队列,由消费者再消费就可保证消息不丢失
死信原因之一:消息被拒绝(basic.reject,basic.nack且参数requeue = false,这些都是非确认的应答)
死信原因之一:TTL过期,在生产端设置TTL最佳(消费端也可)
死信是从队列中迁移出的消息达到特定队列(死信队列)
区分一下:basic.ack basic.nack basic.reject
1.ack是确认应答,是告诉mq消费正正常消费了该消息,可以将消息从队列中删除了
2.nack是否定确认,这里对确认做一个解释:确认是一个行为,无论回复了ok还是不ok都是做了确认,nack是否定确认但是
消费该怎么处理取决于nack的第二个参数,如果为true则表示重新入队,如果为false表示入队,那么消息就被拒绝了,成了
死信
3.是拒绝了该消息
死信队列和延迟队列的关系:
死信队列中由于TTL过期会还成为死信的原因,但是如果将这种机制用到一些业务场景,就不能称为死信了,让这样的消息作为
延迟消息,存放在延迟队列中,应对特定的业务是非常合适的.
配置延迟消息的方式有两种:
1.消息设置过期时间: 消息设置过期,不会与队列产生关联,可以适应所有需要延迟操作的业务
2.队列设置过期时间: 不够灵活
rabbit自然情况下对于消息延迟处理是有缺陷的:
如果先投递delayed = 20s 的消息,再投递 delayed = 2s的消息,那么2s的消息会以第一个消息的延时判定
第二个消息的处理时间,这样显然不对,解决的方式是使用插件:rabbitmq_delayed_message_exchange
这样就可以将延时消息直接投递到延时队列了,具体的延时提前到了exchange而不再是队列或者消息自身来做延时了
在编码时需要自定义x-delayed-type类型的自定义交换机设置
args.put("x-delayed-type", "direct")
CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,args)
消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过
RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz
或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
发布确认:
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,
那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的.所以通过设置 mandatory 参数可以在当消息传递过
程中不可达目的地时将消息返回给生产者(回退消息回调:returnedMessage)
有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息
无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然
后触发报警,再来手动处理。
消息被交换机接收会触发确认发布回调,如果消息无法路由时配置回退消息回调可以将消息会退给生产者.如果消息无法被交换机确认
也会回退消息.那么在消息无法路由时会退给生产者这种方式改进以下,可以配置一个备份交换机,由备份交换机路由消息到备份队列
也避免了消息的重新发布:
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).alternate(BACKUP_EXCHANGE_NAME).build()
这样就给正常交换机配置了一个备份交换机,类型为fanout
幂等性:
我认为幂等性问题出现的原因是因为消费消息时的消息应答机制造成的,就是因为这个应答在消费者消费了该条消息,但是由于一些外部因素
如网络不稳定,导致mq没有收到ack确认应答,消息会重新投递,如果是扣款请求,就会导致重复扣款!
为了解决幂等性可以使用消息本身的id,uuid,时间戳这些具有全局唯一性的属性来做记录,或者redis的setnx命令,实现不重复消费
优先队列:
需要设置优先队列后,才可设置具有优先级的消息
搭建rabbit集群出现的问题:
配置由 node1,node2,node3(服务器hostname,代表不同主机) 集结成的集群cluster,集群就会变为一体,即原本的一个节点那样,
所以这时访问任何一个服务器都可以.现在在发布消息时只连接了node1,发布到node1的消息其它节点是没有的,一旦node1崩溃消息就会
丢失,所以配置镜像队列就可以将消息再备份到node2,如果node1真的宕机,就会备份到node2自身和node3,这就解决了消息不丢失的问题
但是这样并没有完全解决问题!因为消息发布时只会发布到node1,虽说node2,node3会备份成为备份队列,如果node1宕机了,消息该如何
发布,这是就需要一个可以担任负载均衡的中间件来解决问题了.
Haproxy+Keepalive 实现高可用负载均衡来解决上述的问题:
Haporxy的效果类似Nginx,Lvs来做负载均衡的,而keepalive是用来探测Haproxy之间是否存活的,一旦主Haproxy有问题,接下
来备机就会担任负载均衡的任务,这样就解决了原来node1瘫痪导致不能发布消息的问题.
集群中描述的以上问题只是在一个地区内的集群的问题,那么现在遇到的问题是:
有北京,杭州两个集群或者节点,发布消息时使用北京本地的mq来发布消息,这时即使开启publishConfirm也不会有延时上的问题,那么
假如这是要向位于杭州的集群或者节点发布消息且开启了publishConfirm这样就会面临较长的网络延时,就会造成这条发布线程性能地下
在一定程度上造成了阻塞.
就是在这种讲不同的业务部署在其它地区后造成的,解决方式:
1.使用Federation Exchange/Federation Queue
2.使用Shovel
Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即
source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作为
目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker 上。Shovel 可以翻译为"铲子",是
一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel 行为就像优秀的客户端应用程
序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。
?该部分是零散笔记
基于简单工作模式下rabbitmq为了保证消息的可靠性,在消费者未有效消费的情况下如果rabbit删除了
消息,就会导致消息的丢失,为了解决绝这个问题可以配置消息确认回调和取消回调处理,同时配置手动确认
再确认回调中最后添加应答
如果rabbit本身出了问题也会导致消息丢失(消息本身的安全性),所以对消息队列和消息要做持久化操作,
在消息生产端配置队列持久和发布的消息持久化(队列要持久化,队列中的消息也要持久化),但是如果消息传
送到队列上还没来得及持久化就宕机了,消息也会丢失,所以需要做发布确认
发布确认:生产者发布消息到队列上,rabbit做了保存,确认之后才能保证消息不丢失
完整的保证消息不丢失:
1.队列持久化
2.消息持久化
3.发布确认
发布确认模式:
1.单个确认
2.批量确认
3.异步批量确认 需要添加一个确认监听器ConfirmListener
小结:
发布消息需要发布确认(保证不丢失),消费完消息需要确认(才能删除)
在某些场景下,消息的消费能力不同所以要在消费端配置basicQos(),这是不公平的分发模式.预取值的效果
与之类似.basicQos()在设置为1时代表不公平分发,如果设置为其它如2,3...就代表预取值
使用ConcurrentSkipListMap存储发送的消息,在发布确认回调中删除已确认的,剩下的就是未确认的
以上one two three four包讲的是 简单模式,工作队列模式,没有交换机的参与,消费者之间是竞争关系
一个消息只能被消费一次,那么想要消息消费两次那就在交换机上绑定两个队列,存入相同的消息routing-key
可以相同,也可以不同.注:简单模式,工作队列模式下采用的是默认的交换机(AMQP default交换机)
发布/订阅模式(交换机开始参与)
交换机类型: direct(路由模式),topic,headers,fanout(发布/订阅模式), 无名交换机("")
交换机是负责接收消息的,通过关键字routing-key通过路由找到对应的队列queue将消息放入队列,队列
再将消息送给消费者,routing-key的作用就是为了区分不同的队列,精准存入指定队列
fanout:
交换机绑定多个无名队列(直接getQueue),没有routing-key,队列都会接收到交换机的消息
direct:
与fanout不同的地方在于指定了routing-key,如果routing-key相同就变成了fanout模式
如果不同就是direct模式
topics:
direct只能路由一个队列,topics可以解决这个问题
topics交换机可根据由规则的routing-key来路由到不同的队列
* 代表一个单词 # 代表多个单词 . 用来分隔单词
当一个队列绑定key是#,那么这个队列将接收所有数据,有点像fanout,如果绑定key当中没有 # 或 *
那么该队列绑定类型就是 direct. 注:是消费端绑定key中是否含有# *
所以topics包含了fanout和direct交换机的
死信队列/交换机:
成为死信:称为死信消息后由配置的死信交换机转发到死信队列
1.消息被拒绝 一定要开启手动应答,自动应答不存在消息被拒的情况
2.消息TTL过期
3.队列达到最大长度
TTL:TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间单位是
毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL
设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将
会被使用,有两种方式设置 TTL。
延迟队列,延迟队列是有序的,延迟队列存放的是需要指定处理时间的一种队列
延迟队列就是死信队列中的消息ttl过期
延迟: 这个延迟来自于消息的发布方设置了消息过期,在未过期期间消息存在于正常队列(无消费这消费),一旦TTL
到期就会进入死信队列,那么以死信队列为消息来源消费的消费者就和消息发布 --> 消息消费之间存在了消息延迟
这就是延迟
延迟队列使用场景:
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
在整合springboot之后
演示ttl延迟队列时出现:一个延迟20s的消息先发布,另一个延迟2s的消息紧接着发布就会导致2s的消息
同样会20s后才"死亡"(延迟后进入死信队列),这是一个缺陷
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消
息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,
如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
为了解决这个问题,使用rabbitmq_delayed_message_exchange 插件来解决.之前是在队列存放
一定时间过期后放入延迟队列,现在将延时的操作交给交换机来做,因为队列只会校验第一个进入的延迟
消息,并且按照第一个消息的过期时间来标准化后续即使很短延时的消息,所以导致短延时的消息不能及时
到延时队列
死信队列可以保证消息在过期,队列满,消息被拒时可以正确的处理消息,它可保证消息至少被消费一次或者未正确
处理的消息不会被丢弃(三大死信成因),这些都是死信队列的应用,只不过是在做延迟队列时有一定缺陷.
延迟队列适合特定的业务场景,如提交订单后未付款20min后执行任务...
Rabbitmq消息的可靠投递:
在极端情况下如:rabbit重启,导致消息投递失败,消息丢失,那么这就牵扯到发布确认的高级特性,
或者rabbit集群不可用时(极端情况)
交换机或者队列其中一个不在或者全部在时就会导致消息丢失,当交换机收到消息,就会从缓存中青春已收到的消息
所以应该当生产者发布消息给交换机时要进行确认机制处理,
(ack)
PRODUCT -------------> ROUTER --------------> QUEUE ---------------> CONSUMER
X X
|
|
|
CACHE <---------------
解决步骤:
1.解决交换机收不到消息怎么处理?
配置消息发布确认回调:配置publisher-confirm-type: correlated (simple,none)
simple其中有一种效果就是correlated另一种是rabbitTemplate调用waitForConfirm/OrDie
如果返回false会导致channel关闭,是同步回调(单个确认),需要配置
2.解决交换机路由不到队列的问题
配置publisher-returns: true # 交换机路由不出去就会回退消息给生产者
同时自定义的ConfirmCallback实现RabbitTemplate.ReturnCallback来回退消息
注意: 因为实现的是一个内部接口(RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnCallback),
这个实现类并不在RabbitTemplate内部,所以运行时是调用不到这个类的,要注入:
@PostConstruct
public void init(){
//注入
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
交换机或路由失败的情况都会导致消息丢失,所以配置了交换机确认回调和回退回调后,当消息发布未应答true则会将消息返还给
生产者(可配置定时任务再次发布消息)
小结:
(其一是配置:交换机应答,队列回退[不可路由])
在rabbit服务宕机或交换机和队列其中一个不可用时,消息就会发布失败,导致消息丢失,所谓配置确认回调返回回调,
消息就会回退,让生产者重新发布消息,能达到消息不丢失,这是一种解决发布安全的方式.
另一种方式就是配置一个备份交换机和它的备份队列发给消费者,可以备份也可以报警
在confirmController这个demo中同时配置了mandatory参数(交换机应答和队列回退)也配置了备份交换机,
如果两者同时开启,"备份交换机"的优先级更高(先起作用),不会回退消息
注:在交换机或队列其一或两者出现问题时都会导致消息丢失,如果配置了消息回退,则在不可路由时消息会被交换机退回
Rabbitmq的幂等性问题:
消费者的幂等性解决一般使用全局ID,或者写个唯一标识比如时间戳 或者UUID 或者消费者消费MQ中的消息也可利用MQ的该id来判断
或者可按照自己的规则生成一个全局唯一id,每次消费时用该id先判断该消息是否已消费过
在消费者消费了消息,返回ack应答时网络异常,导致生产方重新发布消息给消费者,消费者会重复消费.如付款后,返回ack应答过程中
异常就会重新扣款
业界的方案:
1.唯一id + 指纹码机制
2.Redis的原子性:执行setnx命令,天然的原子性
优先级队列(优先级:0-255):一般设置0-10
要使用优先队列,队列需要设置优先级,消息也要设置优先级,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才
去消费,因为,这样才有机会对消息进行排序
惰性队列
应用场景:消费者宕机导致或是因维护而关闭等而致使长时间内不能消费消息造成堆积,惰性队列就很有必要
一般情况消息保存在内存,而惰性队列保存在磁盘上,消息发布到队列上后会先保存到磁盘上,消费者消费需要读取到内存再消费
RabbitMQ集群搭建:
nmcli n on启动无线网
创建的账号: admin 密码: 123
备份问题: 镜像队列:保证节点的队列在另一个节点有备份,配置policy(策略)
发布问题: 如果发布时连接的节点宕机,那么生产者就不会正确发布消息,但是集群中其它的节点是正常的,就需要做高可用的负载均衡
可使用haProxy,Lvs,Nginx
理解: 生产端发布消息到haproxy(可配置多台,分主 备,备用机会探测主机是否存活),由haproxy将消息发布到rabbit集群
中的节点上(负载均衡)
数据同步问题: 不同节点间由于网络延迟要进行数据同步,使用Federation队列/交换机,也可使用Shovel来做不同区域的节点间数据同步
|