IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MQ相关整理 -> 正文阅读

[大数据]MQ相关整理

1.为什么使用mq?优缺点?

出处:微信公众号(java面试题精选)

主要说以前的项目里哪里用到了mq,为什么用。

首先说一下mq的作用也就是优点:

1.系统解耦,A调用D系统的接口,直接将D系统的接口调用写在代码里,显然两个系统有严重的耦合。D系统的任何变动,A都得修改。消息队列就是向一个主题发送消息之后,D系统自己消费去,与A系统完全解耦开。

在这里插入图片描述

2.异步调用:

场景二,还是ABCD四个系统,A系统收到一个请求,需要在自己本地写库,还需要往BCD三个系统写库,A系统自己写本地库需要3ms,往其他系统写库相对较慢,B系统200ms ,C系统350ms,D系统400ms,这样算起来,整个功能从请求到响应的时间为3ms+200ms+350ms+400ms=953ms,接近一秒,对于用户来说,点个按钮要等这么长时间,基本是无法接受的

在这里插入图片描述

如果用了MQ,用户发送请求到A系统耗时3ms,A系统发送三条消息到MQ,假如耗时5ms,用户从发送请求到相应3ms+5ms=8ms,仅用了8ms,用户的体验非常好。

在这里插入图片描述

3.流量削峰

场景三,这次举个实例吧,也是近期发生的,我们都知道 ,2020年爆发的这场新冠病毒,导致各大线上商城APP里面的口罩被抢购一空,在这种情况下,JD商城开启了一场每晚八点的抢购3Q口罩的活动,每天下午三点进行预约,晚上八点抢购,从JD商城刚上线这个活动,我连续抢了近一个周,也算是见证了一个百万并发量系统从出现问题到完善的一个过程,最初第一天,我抢购的时候,一百多万预约,到八点抢购估计也能有百万的并发量,可是第一天,到八点我抢的时候,由于并发量太高,直接把JD服务器弄崩了,直接报了异常,可能JD在上线这个活动的时候也没能够想到会有那么高的并发,打了一个猝不及防,但是这只是在前一两天出现报异常的情况,后面却没有再出现异常信息,到后来再抢购只是响应的时间变得很慢,但是JD系统并没有崩溃,这种情况下一般就是用了MQ(或者之前用了MQ,这次换了个吞吐量级别更高的MQ),也正是利用了MQ的三大好处之一——削峰。

如果使用了MQ,每秒百万个请求写入MQ,因为JD系统每秒能处理1W+的请求,JD系统处理完然后再去MQ里面,再拉取1W+的请求处理,每次不要超过自己能处理的最大请求量就ok,这样下来,等到八点高峰期的时候,系统也不会挂掉,但是近一个小时内,系统处理请求的速度是肯定赶不上用户的并发请求的,所以都会积压在MQ中,甚至可能积压千万条,但是高峰期过后,每秒只会有一千多的并发请求进入MQ,但是JD系统还是会以每秒1W+的速度处理请求,所以高峰期一过,JD系统会很快消化掉积压在MQ的请求,在用户那边可能也就是等的时间长一点,但是绝对不会让系统挂掉。

接下来说说mq的缺点:

1.系统可用性降低:即面临的风险变高,本来是直接的系统间调用,引入了mq,万一mq挂掉了,那么系统也就挂掉了。

2.系统的复杂度提高:需要考虑消息的重复消费,消息丢失,保证消息传递的顺序等问题

3.数据不一致的问题:A系统处理完再传递给MQ就直接返回成功了,用户以为你这个请求成功了,但是,如果在BCD的系统里,BC两个系统写库成功,D系统写库失败了怎么办,这样就导致数据不一致了。

最后说说项目中用到的场景:

之前实习做得项目是一个多语言翻译的中台,当时我负责开发了机器翻译的功能,里面有一个需要机器翻译完的生成报表问题,用户可以查询日期范围内的报表,并且也可以下载报表。下载功能用到了mq,主要是因为想让用户体验更加的好,点击下载报表之后,前端页面就之后有响应,页面上会有一个

在这里插入图片描述

存储好了变成这样:
在这里插入图片描述

点击蓝色字体会进行下载,这样的设计用户体验更加的好。(逻辑:有个record表记录操作记录和状态,filestorge记录存储在s3的位置(即服务器上的位置),一开始用户点击下载,回先生成record记录其状态是正在处理,然后发消息给上传文件的job,上传完成之后更改reord状态为上传完成,并且在filestorge中记录在服务器的位置。)

2.kafuka,ActiveMQ,RabbitMQ,RocketMQ等主流MQ的区别?

吞吐量:吞吐量是指对网络、设备、端口、虚电路或其他设施,单位时间内成功地传送数据的数量(以比特、字节、分组等测量)。 这里指的是每秒能处理的消息数量。

ActiveMQ,没经过大规模吞吐量场景的验证(单机能达到万级),社区也不是很活跃。用的比较少。

RabbitMQ是一个由 Erlang 语言开发的 AMQP 的开源实现。,高吞吐(单机达到万级,通过集群方式拓展可以达到10W/s的吞吐速率),高堆积(支持topic下消费者较长时间离线,消息堆积量大),能够快速持久化。主从架构实现高可用性。

RocketMQ,阿里开发,topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降(单机10W/s)。源码是java,可以自己阅读源码,定制自己公司的MQ。主从架构实现高可用性。

kafka,单机吞吐量:十万级。分布式架构实现高可用性,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用。

在大数据领域的实时计算以及日志采集被大规模使用。

3.RabbitMQ:出处

3.1基本概念:

RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
在这里插入图片描述

Message:
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

Publisher:

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange:

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Binding:

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

Queue:

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Connection:

网络连接,比如一个TCP连接。

Channel:

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer:

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Broker:

表示消息队列服务器实体

3.2 AMQP中的消息路由

AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

在这里插入图片描述

Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers

1.direct:完全匹配

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配!

在这里插入图片描述

2.fanout:

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。

在这里插入图片描述

3.topic

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配不多不少一个单词。
在这里插入图片描述

3.3 java客户端访问实例

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

消息生产者

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        //设置 RabbitMQ 地址
        factory.setHost("localhost");
        //建立到代理服务器到连接
        Connection conn = factory.newConnection();
        //获得信道
        Channel channel = conn.createChannel();
        //声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);

        String routingKey = "hola";
        //发布消息
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();
        conn.close();
    }
}

消息消费者

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        //建立到代理服务器到连接
        Connection conn = factory.newConnection();
        //获得信道
        final Channel channel = conn.createChannel();
        //声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        //声明队列
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "hola";
        //绑定队列,通过键 hola 将队列和交换器绑定起来
        channel.queueBind(queueName, exchangeName, routingKey);
        while(true) {
            //消费消息
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("消费的路由键:" + routingKey);
                    System.out.println("消费的内容类型:" + contentType);
                    long deliveryTag = envelope.getDeliveryTag();
                    //确认消息
                    channel.basicAck(deliveryTag, false);
                    System.out.println("消费的消息体内容:");
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println(bodyStr);

                }
            });
        }
    }
}

3.4 RabbitMQ集群

RabbitMQ 内部利用 Erlang 提供的分布式通信框架 OTP 来满足上述需求,使客户端在失去一个 RabbitMQ 节点连接的情况下,还是能够重新连接到集群中的任何其他节点继续生产、消费消息。

RabbitMQ 会始终记录以下四种类型的内部元数据:

  1. 队列元数据
    包括队列名称和它们的属性,比如是否可持久化,是否自动删除
  2. 交换器元数据
    交换器名称、类型、属性
  3. 绑定元数据
    内部是一张表格记录如何将消息路由到队列
  4. vhost 元数据
    为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性

在单一节点中,RabbitMQ 会将所有这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上可以确保队列和交换器在节点重启后能够重建。而在集群模式下同样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。

镜像队列:如果在集群中创建队列,集群只会在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容)。结果是只有队列的所有者节点知道有关队列的所有信息,因此当集群节点崩溃时,该节点的队列和绑定就消失了,并且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0之后提供了镜像队列以避免集群节点故障导致的队列内容不可用

RabbitMQ 集群中可以共享 user、vhost、exchange等,所有的数据和状态都是必须在所有节点上复制的。当在集群中声明队列、交换器、绑定的时候,这些操作会直到所有集群节点都成功提交元数据变更后才返回。

磁盘节点和内存节点:集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,但是它的执行比磁盘节点要好。内存节点可以提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这两者呢?

RabbitMQ 只要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入火离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但知道该节点恢复,否则无法更改任何东西。

3.5 常见问题

  • 如何保证消息尽量发送成功?

1.生产者确认:

? 首先,我们要确保生产者能成功地将消息发送到RabbitMQ服务器。
? 默认情况下生产者发送消息并不会返回任何状态信息,也就是它并不知道消息有没有正确地到达服务器。

? 针对这个问题,RabbitMQ提供了两种解决方案:

? 1.事务机制:事务机制是阻塞形式的,一条消息发送之后会使消息端阻塞,以等待RabbitMQ的回应,才能发送下一个消息。使用事务机制会影响RabbitMQ的性能,因此还是推荐使用发送方确认机制。

? 事务机制相关的方法主要有三个:

? 1.channel.txSelect:用于将当前的channel设置成事务模式;

? 2.channel.txCommit:用于提交事务;

? 3.channel.txRollback:用于回滚事务.

? 2.通过发送方确认机制(publisher confirm):发送方确认机制是指生产者将channel设置成confirm模式,所有在该信道上发布的消息都会被指派一个唯一ID(从1开始),一旦消息被投递到RabbitMQ服务器之后,RabbitMQ就会发送一个包含了消息唯一ID的确认(Basic.Ack)给生产者,使生产者知道消息已经正确到达了目的地。如果RabbitMQ因为内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者可以在回调方法中处理该nack命令。这个过程是异步的,不需要等待确认信息回来,因此效率更高。

? 相关的方法:

? 1.channel.confirmSelect();

? 2.channel.waitForConfirms;

? 3.channel.addConfirmListener;

注意:事务机制和生产者确认机制是互斥的,不能共存!

2.持久化

? RabbitMQ持久化分为:交换机持久化;队列持久化;消息持久化;

  • ? 如何保证消息被正确消费

这部分要处理的场景是: 当消费者接收到消息后,还没处理完业务逻辑,消费者挂掉了,此时消息等同于丢失了。

为了确保消息被消费者成功消费,RabbitMQ提供了消息确认机制,主要通过显示Ack模式来实现。

默认情况下,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或磁盘)删除,但是我们在使用时可以手动设置autoAck为False的,当然具体做法各个语言都不一样。

需要注意的时,如果设置autoAck为false,也就意味者每条消息需要我们自己发送ack确认,RabbitMQ才能正确标识消息的状态

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-14 23:05:55  更:2021-07-14 23:06:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/3 10:22:05-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码