IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Rabbitmq入门4 -> 正文阅读

[大数据]Rabbitmq入门4

4.1? channel.basicPublish中的?mandatory 和?immediate 详解

mandatory :为true,消息无法发送到队列时,会将消息返回给客户端;为false,会丢弃消息。

判断消息是否路由成功:

通过channel.addReturnListener加returnKListener监听器

channel.basicPublish(EXCHANGE_NAME , "", true, MessageProperties.PERSISTENT_TEXT_PLAIN , "mandatory test".getBytes());
        channel.addReturnListener(new ReturnListener() {
        public void handleReturn(int replyCode , String replyText , String exchange , String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
            String message = new String(body);
            System.out.println( "Basic.Return 返回的结果是: "+message );
        }

AMQP指令:Basic.Return

immediate :为true时,如果投递到的队列没有消费者,会给客户端,已经废弃,imrnediate 参数会影响镜像队列的性能增加了代码复杂性,建议采用 TTL或DLX 的方法替代

备份交换器:与普通交换器基本没有区别,最好是fanout类型的,如果是direct或其他类型,消息可能丢失

Map<String, Object> args = new HashMap<String , Object>(); 
args.put("a1ternate-exchange" , "myAe"); 
channe1.exchangeDec1are( "norma1Exchange" , "direct" , true , fa1se , args); 
channe1 . exchangeDec1are( "myAe " , "fanout" , true , fa1se , nu11) ; 
channe1 .queueDec1are( "norma1Queue " , true , fa1se , fa1se , nu11); 
channe1 .queueB nd( norma1Queue "norma1Exchange" , " norma1Key"); 
channe1 .queueDec1are( "unroutedQueue " , true , fa1se , fa1se , nu11);
备份交换器,以下几种特殊情况:

1.备份交换器不存在或未绑定任何队列或没有匹配的队列,不会报错,消息丢失

2.备份交换器跟mandatory 同时存在,mandatory 失效

4.2 过期时间 TTL

TTL: Time to live , 过期时间,rabbitmq 可以给队列和消息设置TTL

1.设置消息的TTL:

(1)、给队列设置统一过期时间

调用channel.queueDelclare 方法,在参数中加入x-message-ttl 参数实现

Map<String, Object> argss = new HashMap<String , Object>(); 
argss.put("x-message-ttl " , 6000); 
channel.queueDeclare(queueName , durable , exclusive , autoDelete , argss) ; 

(2)、给消息单独设置过期时间

通过channel.basicPublish,设置expiration属性参数,单位毫秒

AMQP .BasicProperties.Builder builder = new AMQP.BasicProperties . Builder(); 
builder deliveryMode(2); 持久化消息
builder expiration( 60000 );/ 设置 TTL=60000ms
AMQP.BasicProperties properties = builder. build() ; 
channel.basicPublish(exchangeName , routingKey, mandatory, properties, 
"ttlTestMessage".getBytes()); 

如果两种都设置,会取较小的值;

不设置TTL,表示不会过期;设置为0,除非立刻将消息投递给消费者,否则会立刻丢失,可以部分替代imrnediate 参数,imrnediate 参数还可以将消息返回给客户端,可以用死信队列来实现

两种过期时间的区别:给队列设置只要检查头是否过期即可,可以立刻移除;给消息单独设置,在被消费时才会被移除,因为如果全部移除需要扫描整个队列

2. 设置队列的TTL

Map<String , Object> args =口ew HashMap<String, Object>{) ; 
args.put( "x-expires" , 1800000); 
channel.queueDeclare("myqueue " , false , false , false , args) ; 

通过channel.queueDleclare 的参数x-expires 参数可以控制队列删除前未使用的时间;

未使用:队列上没消费者,没有被重新声明,过期时间内未调用basic.get(单条消费消息)

Rabbitmq会确保过期时间到达后将队列删除,但是不保证及时,如果mq重启,持久化的队列过期时间会刷新

过期时间不可以设置为0,单位为毫秒

4.3 死信队列 DLX Dead-Letter-Exchange,死信交换器

消息变成死信的情况:

1.消息被拒绝

2.消息过期

3.队列达到最大长度

简单来说:就是消息在队列中失效了,会被路由到这个队列绑定的死信交换器,再路由到绑定的队列里。这个队列也叫死信队列

channel.exchangeDeclare("dlx_exchange " , "direct "); // 创建 DLX: dlx_exchange 
Map<String, Object> args = new HashMap<String, Object>(); 
args.put("x-dead-letter-exchange" , " dlx_exchange "); 
args.put("x-dead-letter-routing-key" , "dlx-routing-key");
//为队列 myqueue 添加 DLX
channel.queueDeclare("myqueue" , false , false , false , args);

4.4? 延迟队列 ,就是定时执行的

Rabbitmq本身不支持,但是可以通过DLX跟TTL实现

思路:消息设置过期时间,然后绑定死信交换器,绑定死信队列,过期的消息会投递到死信队列然后直接消费,

4.5 优先级队列

Map<String, Object> args =new HashMap<String, Object>() ; 
args.put( "x-rnax-priority" , 10) ; 
channel.queueDeclare( "queue","priority" , true , fa1se , false , args) ;

优先级最低是0,最高为队列设置的最大优先级,优先级高的优先消费

4.6 RPC实现 Remote Procedure Call, 即远程过程调用

RPC 的处 理流程如下:
(1)当客户端启动时,创建一个匿名的回调队列(名称由 RabbitMQ 自动创建
(2) 客户端为 RPC 请求设置2个属性:?? replyTo? 用来告知 RPC 服务端回复请求时的目的
队列,即回调队列; correlationld 用来标记一个请求。
(3)请求被发送到 rpc_ queue 列中。
( 4) RPC 服务端监听 rpc_queue 队列中的请求,当请求到来时,服务端会处理并且把带有
结果的消息发送给客户端。接收的队列就是 replyTo 设定的 回调队列。

(5)客户端监昕回调队列 当有消息时 检查 correlationld 属性,如果与请求匹配,那就是结果了。

4.7 持久化

RabbitMq的持久化包含三个部分:

交换器持久化:声明交换器时设置durable

队列持久化:声明队列时设置durable

消息持久化:将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为2 即可实现消息的持久化

4.8 生产者确认

RabbitMQ 对这个问题,提供了两种解决方式:

1. 通过事务机制实现

//事务正常提交
channel.txSelect();//开启事务
channel.basicPublish(EXCHANGE NAME , ROUTING KEY , 
MessageProperties.PERSISTENT TEXT_PLAIN, 
"transaction messages".getBytes()); 
channel.txCommit(); //提交事务


//事务回滚
try{
channel.txSelect() ; 
channel.basicPublish(exchange,routeringKey,
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); 
int result = 1 / 0 ; 
channel.txCommit() ; 
} catch(Exception e) {
e.printStackTrace(); 
channel . txRollback();
}

事务确实能够解决消息确认的问题,但是对mq的性能消耗很大

2.?通过发送方确认(publisher confirm)机制实现

三种方式:

同步:

也是串行,比事务好一点,

事务指令步骤:basic.publish 和 basic.commit/basic.commit-ok(basic-rollback/basic-rollback-ok)

同步通知指令步骤:basic.publish 和basic.ack

注意:事务和通知机制不能共存,

try { 
channel.confirmSelect() // 将信道置为 publisher confirm 模式
//之后正常发送消息
channel . basicPublish( "exchange" , " routingKey" , null, 
"publisher confirm test ".getBytes()); 
if (!channel.waitForConfirms()) { 
System.out.println( " send message failed" ) ; 
// do something else.. 
} catch (InterruptedException e){
e.printStackTrace() ; 
}

批量同步:

try { 
channel.confirmSelect(); 
int MsgCount = 0; 
while (true) { 
channel . basicPublish("exchange" , "routingKey" , 
null , "batch confirm test ". getBytes()); 
//将发送出去 的消息存入缓存中,缓存可以是
//一个 ArrayList 或者 BlockingQueue 之类的
if (++MsgCount >= BATCH COUNT) { 
MsgCount = 0; 
try { 
 if(channel.waitForConfirms()) { 
/1 将缓存中的消息清空
//将缓存中的消息重新发送
} catch (InterruptedException e) { 
e.printStackTrace(); 
//将缓存中 的消息重新发送
} catch (IOException e) { 
e.printStackTrace() ;
}

异步:在channel中提供的addConfirmListener方法添加一个ConfirmListener回调接口

channel.confirmSelect() ; 
channel.addConfirmListener(new ConfirmListener () {

public void handleAck(long deliveryTag , boolean multiple)
throws IOException ( 
System.out . println( "Nack, SeqNo : " + deliveryTag 
+ ", multiple : " + multiple) ; 
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear( ); 
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack (long deliveryTag, boolean multiple) 
throws IOException { 
if (multiple) {
confirmSet.headSet(deliveryTag - 1 ).clear(); 
} e1se {
confirmSet.remove(deliveryTag); 
//注意这里需要添加处理消息重发的场景
}
});
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish (ConfirmConfig.exchangeName , ConfimConfig.routingKey
MessageProperties.PERSISTENT_TEXT_PLAIN, 
ConfirmConfig.msg_10B.getBytes()); 
confirmSet.add(nextSeqNo) ;
}

事务机制和普通confirm的方式吐吞量很低,但是编程方式简单,不需要在客户端维护状态(这里 指的是维护 deliveryTag 及缓存未确认的消息)。批量 confmn 方式的问题在于遇到 RabbitMQ 服务端返回 Basic.Nack 需要重发批量消息而导致的性能降低。异步 confinn 方式编程模型最 为复杂,而且和批量 confmn 方式 样需要在客户端维护状态。

4.9 消费端要点

消息分发:

多个消费者监听一个消息队列,是轮询的消费消息,这种方式不好,可以使用channel.basicQos设置消费者最大未确认的数量,设置0,表示没有上限

注意要点:
Basic.Qos 的使用对于拉模式的消费方式无效.
channel.basicQos最多有三个参数:
prefetchCount:最多多少个未消费的消息,为0是无线大
prefetchSize:表示消费者所能接收未确认消息的总体大小的上限,单位B,为0是无线大
global: 为true ,设置的是整个信道,例如设置最大未消费消息数量,是所有消费者可以保留的数量,false是每个消费者可以保留的数量;false设置的是消费者,最好不要,默认为false

消息顺序性:

无法保证有序,除非是非常局限的情况下,如果要有序,需要业务方进一步处理

弃用QueueingConsumer;

4.10 消息传输保障

一般消息中间键的消息传输保障分为三个层级:

At Most Once:最多一次。消息可能会丢失,但绝不会重复传输

At Last Once:最少一次。消息绝不会丢失,但可能会重复传输。

Exactly Once:最少一次。消息绝不会丢失,但可能会重复传输。

其中"最少一 次"投递实现需要考虑以下内容:

1.生产者开启事务或者publisher confirm ,保证消息可靠传输到mq

2.保证消息路由到队列,或者保存到备份服务器,不会丢失

3.消息和队列进行持久化,保证宕机不会丢失

4.消费者设置AutoAck 为false,手动确认,避免消息不必要丢失

"最多一 次"的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这
样很难确保消息不会丢失
"恰好一 次"是 RabbitMQ 目前无法保障的。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-15 11:51:47  更:2021-10-15 11:52:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 0:50:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码