IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 消息中间件---RabbitMQ---基础与原生api的使用 -> 正文阅读

[大数据]消息中间件---RabbitMQ---基础与原生api的使用

AMQP概论

AMQP

??AMQP是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递信息,并不受客户端/中间件不同产品,不同的开发语言等条件限制。目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务。主要实现有RabbitMQ。

包括的要素

生产者、消费者、消息

??生产者: 消息的创建者,发送到rabbitmq;
??消费者:连接到rabbitmq,订阅到队列上,消费消息,持续订阅(basicConsumer)和单条订阅(basicGet).
??消息:包含有效载荷和标签,有效载荷指要传输的数据,标签描述了有效载荷,并且rabbitmq用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁。

信道

??概念:信道是生产消费者与rabbit通信的渠道,生产者publish或是消费者subscribe一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,什么意思呢?就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用。
??疑问:为什么不建立多个TCP连接呢?原因是rabbit保证性能,系统为每个线程开辟一个TCP是非常消耗性能,每秒成百上千的建立销毁TCP会严重消耗系统。所以rabbitmq选择建立多个信道(建立在tcp的虚拟连接)连接到rabbit上。

交换机、队列、绑定、路由键

??队列通过路由键(routing key,某种确定的规则)绑定到交换器,生产者将消息发布到交换器,交换器根据绑定的路由键将消息路由到特定队列,然后由订阅这个队列的消费者进行接收。

在这里插入图片描述

常见问题

?? 如果消息达到无人订阅的队列怎么办? 消息会一直在队列中等待,RabbitMQ默认队列是无限长度的。
?? 多个消息订阅到同一队列怎么办? 消息以循环的方式发送给消费者,每个消息只会发送给一个消费者。
?? 消息路由到了不存在的队列怎么办? 一般情况下,凉拌,RabbitMQ会忽略,当这个消息不存在,也就是这消息丢了。

消息确认

??消费者收到的每一条消息都必须进行确认(自动确认和自行确认)。
??消费者在声明队列时,可以指定autoAck参数,当autoAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。
??采用消息确认机制后,只要令autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。
??当autoAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
??RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

交换器类型

??共有四种direct,fanout,topic,headers,其种headers(几乎和direct一样)不实用,可以忽略。

Direct

??路由键完全匹配,消息被投递到对应的队列,每个amqp的实现都必须有一个direct交换器,包含一个空白字符串名称的默认交换器。声明一个队列时,会自动绑定到默认交换器,并且以队列名称作为路由键:channel->basic_public($msg,’ ’,’queue-name’)

Fanout

??消息广播到绑定的队列。

Topic

??通过使用“*”和“#”,使来自不同源头的消息到达同一个队列,”.”将路由键分为了几个标识符,“*”匹配1个,“#”匹配一个或多个。例如日志处理:
??假设有交换器log-exchange,
??日志级别有error,info,warning,
??应用模块有user,order,email,
??服务器有 A、B、C、D
??路由键的规则为 服务器+“.”+日志级别+“.”+应用模块名,如:A. info .email。
??1、要关注A服务器发送的所有应用错误的消息,怎么做?
声明队列名称为“a-app-error-queue”并绑定到交换器上:channel. queueBind (‘a-app-error-queue’,’logs-change’,’A.error.*’)
??2、关注B服务器发送的的所有日志,怎么办?
声明队列名称为“b-all-queue”并绑定到交换器上:channel. queueBind (b-all-queue’,’logs-change’,’ B.#’)或channel. queueBind (b-all-queue’,’logs-change’,’ B.*.*’)
??3、关注所有服务器发送的email的所有日志,怎么办?
声明队列名称为“email-all-queue”并绑定到交换器上:channel. queueBind (email -all-queue’,’logs-change’,’ *.*.emal’)
??4、想要接收所有日志:channel->queue_bind(‘all-log’,’logs-change’,’#’)

虚拟主机

??虚拟消息服务器,vhost,本质上就是一个mini版的mq服务器,有自己的队列、交换器和绑定,最重要的,自己的权限机制。Vhost提供了逻辑上的分离,可以将众多客户端进行区分,又可以避免队列和交换器的命名冲突。Vhost必须在连接时指定,rabbitmq包含缺省vhost:“/”,通过缺省用户和口令guest进行访问。
??rabbitmq里创建用户,必须要被指派给至少一个vhost,并且只能访问被指派内的队列、交换器和绑定。Vhost必须通过rabbitmq的管理控制工具创建。

原生Java客户端进行消息通信

Direct

代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/exchange/direct

Fanout

代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/exchange/fanout

Topic

代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/exchange/topic

消息发布时的权衡在这里插入图片描述

失败确认

??在发送信息时设置mandatory标志,告诉RabbitMQ,如果消息不可路由,应该将消息返回给发送者,并通知失败。可以这样认为,开启mandatory是开启故障检测模式。
??注意:它只会让RabbitMQ向你通知失败,而不会通知成功。如果消息正确路由到队列,泽发布者不会收到任何通知。带来的问题是无法确保发布信息一定是成功的,因为通知失败的信息可能会丢失。
在这里插入图片描述
channel.addConfirmListener则用来监听RabbitMQ发回的信息。
代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/mandatory

监听器的小甜点

在信道关闭和连接关闭时,还有两个监听器可以使用。
在这里插入图片描述

事务

??事务的实现主要是对信道(channel)的设置,主要的方法有三个:
1,channel.txSelect()声明启动事务模式
2,channel.txComment()提交事务
3,channel.txRollback()回滚事务
在发送信息之前,需要声明channel为事务模式,提交或者回滚事务即可。
开启事务后,客服端和rabbitmq之间的通讯交互流程:
1,客服端发送给服务Tx.select(开启事务模式)
2,服务端返回Tx.select-Ok(开启事务模式OK)
3,推送信息
4,客户端发送给事务提交Tx.Commit
5,服务端返回Tx.Commit-Ok
??以上就完成了事务的交互流程,如果其中任意一个环节出现问题,就会抛出ioException异常,这样用户就可以拦截异常进行事务回滚,或决定要不要重复发信息。
??那么,既然已经有事务了,为何还要使用发送方确认模式呢,原因是因为事务的性能是非常差的。根据相关资料,事务会降低2~10倍的性能。(非必要不要使用事务,太影响性能了)

发送方确认模式

??基于事务的性能问题,RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性能影响几乎可以忽略不计。
??原理:生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的信息都将会被指派一个唯一的ID(从1开始),由这个ID在在生产者和RabbitMQ之间进行信息的确认。
??不可路由的消息,当交换机发现,消息不能路由大搜任何队列,会进行确认操作,表示收到了消息。如果发送方设置了mandatory模式,则会先调用addReturnListener监听器。
在这里插入图片描述

??可路由的消息,要等到消息被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。
在这里插入图片描述
??confirm模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以用在回调方法中处理该nack消息并决定下一步的处理。

confirm的三种实现

??方式一:channel.waritForConfirms()普通发送方确认模式:消息到达交换器,就会返回TRUE。
??方式二:channel.waitForConfirmsOrDie()批量确认模式:使用同步方式等所有的消息发送之后才会执行后面代码,只有一个消息未到达交换器就会抛出IOException异常。
??方式三:channel.addConfirmListener()异步监听发送方确认模式。
??如何使用:代码https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/producerconfirm

备用交换机

??在第一个声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由信息,那么消息将被路由到这个新的备用交换器。
??如果发布消息时同时设置了mandatory会发生什么?如果主交换器无法路由消息,RabbitMQ并不会通知发布者,因为,向备用交换器发送消息,表示消息已经被路由了。注意,新的备用交换器就是普通的交换器,并没有任何特殊的地方。
??使用备用交换器,向往常一样,声明Queue和备用交换器,把Queue绑定到备用交换器上。然后在声明主交换器时,通过交换器的参数,alternate-exchange,,将备用交换器设置给主交换器。
建议备用交换器设置为faout类型,Queue绑定时的路由键设置为“#”。
??如何使用,代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/backupexchange

消息的消费

消息的获得方式

拉取Get

??属于一种轮询模型,发送一次get请求,获得一个消息。如果此时RabbitMQ中没有消息,或获得一个表示空的回复。总的来说,这种方式性能比较差,很明显,每获得一条消息,都要和RabbitMQ进行网络通信发出请求。而且对RabbitMQ来说,RabbitMQ无法进行任何优化,因为它永远不知道应用程序何时会发出请求。对我们实现者来说,要在一个循环里,不断去服务器get消息。
代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/getmessage

推送Consume

??属于一种推送模型。注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者。这种模式很常见。
代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/exchange/direct

消息的应答

??消费者收到的每一条消息都必须进行确认。消息确认后,RabbitMQ才会从队列删除这条消息,RabbitMQ不会为未确认的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

自动确认

??消费者在声明队列时,可以指定autoAck参数,当autoAck=true时,一旦消费者接收到了消息,就视为自动确认了消息。如果消费者在处理消息的过程中,出了错,就没有什么办法重新处理这条消息,所以我们很多时候,需要在消息处理成功后,再确认消息,这就需要手动确认。

自行手动确认

??当autoAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。
??采用消息确认机制后,只要令autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。
??当autoAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/ackfalse

Qos预取模式

??在确认消息被接收之前,消费者可以预先要求接收一定数量的信息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息之前奔溃,则所有未确认的消息将被重新发送给其他消费者。所以这里存在着一定程度上的可靠性风险。
??这种机制一方面可以实现限速(将消息暂存到RabbitMQ内存中)的作用,一方面可以保证消息确认质量(比如确认了但是处理有异常的情况)。
??注意:消费确认模式必须是非自动ACK机制(这个是使用baseQos的前提条件,否则会Qos不生效),然后设置basicQos的值;另外,还可以基于consume和channel的粒度进行设置(global)。
代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/qos

??basicQos方法参数详细解释:
??prefetchSize:最多传输的内容的大小的限制,0为不限制,但据说prefetchSize参数,rabbitmq没有实现。
??prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
??global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别。
??如果同时设置channel和消费者,会怎么样?AMQP规范没有解释如果使用不同的全局值多次调用basic.qos会发生什么。 RabbitMQ将此解释为意味着两个预取限制应该彼此独立地强制执行; 消费者只有在未达到未确认消息限制时才会收到新消息。
??channel.basicQos(10, false); // Per consumer limit
??channel.basicQos(15, true); // Per channel limit
??channel.basicConsume(“my-queue1”, false, consumer1);
??channel.basicConsume(“my-queue2”, false, batchAckConsumer);
??也就是说,整个通道加起来最多允许15条未确认的消息,每个消费者则最多有10条消息。

消费者中的事务

??使用方法和生产者一致。
??假设消费者模式中使用了事务,并且在消息确认之后进行了事务回滚,会是什么样的结果?
结果分为两种情况:
??1. autoAck=false手动应对的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但RabbitMQ对消息的确认会等事务的返回结果,再做最终决定是确认消息还是重新放回队列,如果你手动确认之后,又回滚了事务,那么以事务回滚为准,此条消息会重新放回队列。
??2. autoAck=true如果自动确认为true的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了。

可靠性和性能的权衡

??

消息的拒接

Reject 和 Nack

?? 消息确认可以让RabbitMQ知道消费者已经接受并处理完消息。但是如果消息本身或者消息的处理过程出现问题怎么办?需要一种机制,通知RabbitMQ,这个消息,我无法处理,请让别的消费者处理。这里就有两种机制,Reject 和 Nack。
??Reject在拒接消息时,可以使用requeue标识,告诉RabbitMQ是否需要重新发送给别的消费者。不重新发送,一般这个消息就会被RabbitMQ丢弃。Reject一次只能拒接一条消息。requeue参数设置为true时,消息发生了重新投递,false的时候则会丢弃。
??Nack则可以一次性拒接多个消息。这是RabbitMQ对AMQP规范的一个拓展。
代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/rejectmsg

死信交换器DLX

?? RabbitMQ对AMQP规范的一个扩展。被投递消息被拒绝后的一个可选行为,往往用在对问题消息的诊断上。
?? 消息变成死信一般是以下几种情况:
?? ? 消息被拒绝,并且设置 requeue 参数为 false
?? ? 消息过期
?? ? 队列达到最大长度
?? 死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作。在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的参数是x-dead-letter-exchange。
在这里插入图片描述

代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/dlx
??通过运行程序可以看到,生产者产生了3条消息,分别是error,info,warn,两个消费者WillMakeDlxConsumer和WillMakeWarnDlxConsumer都拒绝了两条消息,而投送到死信队列后,可以发现根据投送死信时的路由键,不同的消费者有些可以接受到消息,有些则不行。

和备用交换器的区别

??1、备用交换器是主交换器无法路由消息,那么消息将被路由到这个新的备用交换器,而死信交换器则是接收过期或者被拒绝的消息。
??2、备用交换器是在声明主交换器时发生联系,而死信交换器则声明队列时发生联系。

控制队列

临时队列

自动删除队列

??自动删除队列和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除。
??自动删除队列只需要在声明队列时,设置属性auto-delete标识为true即可。系统声明的随机队列,缺省就是自动删除的。
在这里插入图片描述

单消费者队列

??普通队列允许的消费者没有限制,多个消费者绑定到多个队列时,RabbitMQ会采用轮询进行投递。如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。
在这里插入图片描述

自动过期队列

??指队列在超过一定时间没使用,会被从RabbitMQ中被删除。什么是没使用?
??一定时间内没有Get操作发生
??没有Consumer连接在队列上
??特别的:就算一直有消息进入队列,也不算队列在被使用。
??通过声明队列时,设定x-expires参数即可,单位毫秒。
在这里插入图片描述

永久队列

队列的持久性

??持久化队列和非持久化队列的区别是,持久化队列会被保存在磁盘中,固定并持久的存储,当Rabbit服务重启后,该队列会保持原来的状态在RabbitMQ中被管理,而非持久化队列不会被保存在磁盘中,Rabbit服务重启后队列就会消失。
??非持久化比持久化的优势就是,由于非持久化不需要保存在磁盘中,所以使用速度就比持久化队列快。即是非持久化的性能要高于持久化。而持久化的优点就是会一直存在,不会随服务的重启或服务器的宕机而消失。
??在声明队列时,将属性durable设置为“false”,则该队列为非持久化队列,设置成“true”时,该队列就为持久化队列。
在这里插入图片描述

队列级别消息过期

??就是为每个队列设置消息的超时时间。只要给队列设置x-message-ttl 参数,就设定了该队列所有消息的存活时间,时间单位是毫秒。如果声明队列时指定了死信交换器,则过期消息会成为死信消息。
在这里插入图片描述

队列保留参数列表

在这里插入图片描述

消息的属性

在这里插入图片描述
??在发送消息时,我们还可以对消息的属性做更细微的控制,比如构建Request-Response模式。
代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/setmsg

消息存活时间

??当队列消息的TTL 和消息TTL都被设置,时间短的TTL设置生效。如果将一个过期消息发送给RabbitMQ,该消息不会路由到任何队列,而是直接丢弃。
??为消息设置TTL有一个问题:RabbitMQ只对处于队头的消息判断是否过期(即不会扫描队列),所以,很可能队列中已存在死消息,但是队列并不知情。这会影响队列统计数据的正确性,妨碍队列及时释放资源。

消息的持久化

??默认情况下,队列和交换器在服务器重启后都会消失,消息当然也是。将队列和交换器的durable属性设为true,缺省为false,但是消息要持久化还不够,还需要将消息在发布前,将投递模式设置为2。消息要持久化,必须要有持久化的队列、交换器和投递模式都为2。
代码:https://gitee.com/bruceyuan10/rabbitmqstudy/tree/master/native/src/main/java/cn/enjoyedu/msgdurable

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-29 09:09:56  更:2021-08-29 09:26:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:59:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码