-
RabbitMq的特点 基于AMQP协议开发,AMQP协议的主要特点是面向消息,队列,路由,可靠性,安全性. 支持多种语言. -
AMQP核心概念 server:又称broker,接受客户端的连接,实现AMQP实体服务. connection:连接,应用与Broker的网络连接 Channel:信道,所有的操作都是在信道中进行的,一个客户端可以有多个信道.每个信道代表一个会话. Message:消息,服务器和应用之间传输的数据,由properties和body组成.properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body消息的具体内容. Virtual host:虚拟地址,用于进行逻辑隔离,一个Virtual host里面可以有若干个Exchange和queue,同一个Virtual Host里面不能有相同名称的Exchange或queue. Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列 Bingding;Exchange和Queue之间的虚拟连接,Binging中可以包含routing key Queue:队列,保存消息并将他们转发给消费者 -
Exchange类型 1.direct模式 消息中的路由键(routing key)如果和Binging key一致,交换器就将消息发送到对应的队列中,它是完全匹配、单播的模式 2.fanout 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的 3.topic topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词 -
怎么保证消息的不丢失 生产者到rabbitMq-server 1.网络故障 2.发送消息到exchange后,发下路由和queue没有绑定,消息会存在丢失情况. RabbitMQ-Server中存储的消息丢失 1.消息没有持久化 2.单节点或集群模式下没有镜像模式消息丢失 3.磁盘损坏导致消息丢失 4.机房被炸 RabbitMQ-Server到消费者消息丢失 1.消息获取后还没处理服务器挂了 -
消息可靠性的保障 生产者到RabbitMq-server 开启RabbitMq的事务机制,但是事务机制,会一直阻塞影响性能 一般采用confirm模式confirm模式有三种模式 1.串行confirm 2.批量confirm 3.异步confirm 异步模式需要自己多写一部分复杂的代码实现,异步监听类,监听server端的通知消息,异步的好处性能会大幅度提升,发送完毕之后,可以继续发送其他消息。 MQServer通知生产端ConfirmListener监听类:用户可以继承接口实现自己的实现类,处理消息确认机制, 对于发送消息到exchange后,发下路由和queue没有绑定,消息会存在丢失情况. 1.可以设置使用mandatory 设置true 2.利用备份交换机(alternate-exchange):实现没有路由到队列的消息 RabbitMQ-Server保证不丢失 设置消息持久化到磁盘。设置持久化有两个步骤: ①创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。 ②发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。 必须要同时开启这两个才可以。 而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rabbitmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。 消费者消息不丢失 1.开启手动ack机制 2.若遇到异常重试多次后还无法成功,将消息放到失败消息表中进行人工干预 -
消息消费的顺序行 1.将顺序消息发送到一个队列中 2.一个队列只对应一个消费者 3.手动ack 4.将拉取消息数量设为1 -
消费消息的幂等性 消费数据为了单纯的写入数据库,可以先根据主键查询数据是否已经存在,如果已经存在了就没必要插入了。或者直接插入也没问题,因为可以利用主键的唯一性来保证数据不会重复插入,重复插入只会报错,但不会出现脏数据。 消费数据只是为了缓存到redis当中,这种情况就是直接往redis中set value了,天然的幂等性。 针对复杂的业务情况,可以在生产消息的时候给每个消息加一个全局唯一ID,消费者消费消息时根据这个ID去redis当中查询之前是否消费过。如果没有消费过,就进行消费并将这个消息的ID写入到redis当中。如果已经消费过了,就无需再次消费了
|