IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 学习笔记day05(消息队列) -> 正文阅读

[大数据]学习笔记day05(消息队列)

一、消息队列

1、消息队列概述

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

2、消息队列应用场景

1)、异步处理

传统的串行或者并行处理方式并发量、吞吐量、响应时间会有瓶颈,引入消息队列后,将不是必须的业务逻辑,异步处理,如发短信

2)、应用解耦

比如借据查询的订单状态为成功后需要将订单信息同步给资产管理平台或者产品层,此时如果传统的接口调用方式使得借据与这两个系统耦合性较高,此时如果还款计划需要这个订单状态,则借据又需要调用还款计划的 接口,不易于拓展,引入消息队列kafka之后,可以将消息队列发到kafka队列中,产品层等应用需要此消息只需要去订阅消费它即可

3)、流量肖峰

秒杀活动等,可以将用户的请求写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面 秒杀业务根据消息队列中的请求信息 再做后续处理

3、JMS消息服务

JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

4.常用MQ选择

中小型公司?技术实力较为一般?技术挑战不是特别高?用RabbitMQ是不错的选择

大型公司?基础架构研发实力较强?用RocketMQ是很好的选择

如果是大数据领域的实时计算?日志采集等场景?用Kafka是业内标准的? 绝对没问题? 社区活跃度?很高

可靠性(少量丢数据)要求稍低的场景使用

5、如何保证消息队列的高可用性

RabbitMQ:基于主从做高可用性的 普通集群模式 镜像集群模式

Kafka :replica副本机制

6、如何保证消息不被重复消费

以kafka为例 kafka实际上有个offset的概念 就是每个消息写进去 都有一个offset 代表它的序号 然后消费者消费了数据之后

每隔一段时间 会把自己消费过的消息的offset提交一下 代表我已经消费过了 下次我要是重启啥的 你就让我继续从上次消费到的Offset来继续消费吧

但是凡事总有意外 比如我们之前生产经常遇到的 就是有时候重启系统 看你怎么重启了 如果遇到点着急的 直接kill进程了 再重启

这回导致consumer有些消息处理了 但是还没来得及提交offset 重启之后 少数消息会再次消费一次

其实重复消费不可怕 可怕的是你没考虑到重复消费之后 怎么保证幂等性

举个例子 假设有个系统 消费一条数据库插入一条 要么你一个消息重复两次 你不就插入了两条 这数据就错了

但是你要是消费到第二次的时候 自己判断一下已经消费过了 直接扔了 就保留了一条数据

一条数据重复出现两次 数据库里就只有一条数据 这就保证了系统的幂等性

幂等性:通俗点说 就一个数据或者一个请求 给你重复来多次 你得保证对应的数据是不会改变的 不嫩出错

7、如何保证消息队列消费的幂等性

比如你拿个数据要写库 你先根据主键查一下 如果这个数据都有了 你就别插入了 update 一下

比如你是写redis 那没问题 反正每次都是set 天然幂等性

如果不是上面两个场景 那做的稍微复杂一点 你需要让生产者发生每条数据的时候 里面加一个全局唯一的id 类似订单id址类的东西 然后你这里消费了之后 先根据这个id取比如redis里查一下 之前消费过吗 如果没有消费过 你就处理 然后这个id写redis

如果消费过了 那你就别处理可 保证别重复处理相同的消息即可

8、如何处理消息丢失的问题

以rabbitmq为例

1)生产者弄丢了数据

生产者将数据发送到rabbitmq的时候,可能数据就在半路给搞丢了 因为网络啥的问题都有可能

此时可以选择用rabbitmq提供的事务给你 就是生产者发送数据之前开启rabbitmq事务 然后发送消息 如果消息没有成功

被rabbitmq接受到,那么生产者会收到异常报错 此时可以回滚事务 然后重试发送消息 如果收到了消息 那么可以提交事务

但是问题是 rabbitmq事务机制一搞 基本上吞吐量会下来 因为太耗性能

所以一般来说 如果你要确保说写rabbitmq的消息别丢 可以开启confirm模式 再生产者哪里设置开启confirm模式之后

你每次写的消息都会分配一个唯一的Id 然后如果写入了rabbitmq中 rabbitmq会给你回传一个ack消息 告诉你说这个消息Ok了

如果rabbitmq没能处理这个消息 会回调你一个nack接口 告诉你这个消息接收失败你可以重试

而且你可以结合这个机制自己再内存里维护每个消息Id的状态 如果超过一段时间还没接收到这个消息的回调 那么你可以重发

事务机制和confirm机制最大不同在于 事务机制是同步的 你提交一个事务之后会阻塞i再哪儿 但是confirm机制是以部的 你发送个消息之后 可以发送下一个消息 然后那个消息rabbitmq接受了之后会异步回调你一个接口通知你这个消息接收到了

所以一般在生产者这块避免数据丢失 都是用confirm机制的

2)rabbitmq弄丢了数据

就是rabbitmq自己弄丢了数据 这个你必须开启rabbitmq的持久化 就是消息写入之后 会持久化到磁盘中 哪怕是rabbitmq自己挂了 恢复之后会自动读取之前存储的数据 一般数据不会丢 除非极其罕见的是 rabbitmq还没持久化 自己就挂了 可能导致少量数据会丢失的 但是这个概率较小

持久化可以跟生产者那边的confirm机制配合起来 只有消息被持久化到磁盘之后 才会通知生产者ack了 所以哪怕是在持久化到磁盘之前rabbitmq挂了数据丢了 生产者收不打破ack 你也是可以自己重发的

哪怕是你给rabbitmq开启了持久化机制 也有一种可能 就是这个消息写道rabbitmq中 但是还没来得及持久化到磁盘中 结果不巧 此时rabbitmq挂了 就会导致内存里的一点数据会丢失

3)消费端弄丢了数据

rabbitmq如果丢失了数据 主要是因为你消费的时候,刚消费到 还没处理 结果进程挂了 比如重启了 rabbitmq认为你都消费了 这数据就丢了

这个时候得用rabbitmq提供得ack机制 简单来说就是你关闭rabbitmq得时候 自动ack 可以通过调用一个api来调用就行 然后每次你自己代码里确保处理完得时候 在程序里ack一把 这样的话 如果你还没处理完 不就没有ack? 那rabbitmq就认为你还没处理完

这个时候 rabbitmq会把消费分配给别的consumer取处理 消息是不会丢的

9、如果让你写一个消息队列,该如何进行架构设计?

1)首先这个mq得支持可缩性?就是需要的时候快速扩容?就可以增加吞吐量和容量? 那怎么搞?设计一个分布式的系统

参照一下kafka的设计理念 broker->topic->partition? 每个partition放一个机器?就存一部分数据?如果资源不够?给topic

增加Partition?然后做数据迁移?增加机器?就可以存放更多数据?提供更高的吞吐量

2)其次你得考虑一下这个mq的数据要不要落地磁盘,落磁盘?才能保证别进程挂了数据就丢了? 那落磁盘的时候怎么落?

顺序写?这样就没有磁盘随机读写的寻址开销? 磁盘顺序读写的性能是很高的?这就是Kafka的思路

3)然后在考虑一下mq的可用性? ?多副本->leader&follower->broker?挂了重新选举leader即可对外服务

4)能不能支持数据0丢失

二、kafka的使用

1、pom

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

<version>1.3.0.RELEASE</version>

</dependency>

2、配置文件

<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="notifyProducerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
        <ref bean="notifyProducerProperties" />
    </constructor-arg>
</bean>

<!-- 授信\借钱\还款通知队列,开始 -->
<!-- 定义producer的参数 -->
<bean id="notifyProducerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="0.11-kafka-n1.bestpay.hg-mid:9092,0.11-kafka-n2.bestpay.hg-mid:9092,0.11-kafka-n3.bestpay.hg-mid:9092" />
            <!-- acks=all表示所有同步节点都返回确认,可靠性最高 -->
            <entry key="acks" value="${kafka-hx.producer.acks}" />
            <entry key="retries" value="${kafka-hx.producer.retries}" />
            <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
        </map>
    </constructor-arg>
</bean>

bootstrap.servers

一组host和port用于初始化连接,不管这里配置了多少台server 都只是用作发现整个集群全部server信息 这个配置不需要包含集群所有的机器信息 但是最好多于一个防止服务器挂掉

key.serializer

用来序列化key的serializer接口的实现类

value.serializer

用来序列化value的Serializer接口的实现类

acks

producer希望leader返回的用于确认请求完成的确认数量. 可选值 all, -1, 0 1. 默认值为1

acks=0 不需要等待服务器的确认. 这是retries设置无效. 响应里来自服务端的offset总是-1. producer只管发不管发送成功与否。延迟低,容易丢失数据。

acks=1 表示leader写入成功(但是并没有刷新到磁盘)后即向producer响应。延迟中等,一旦leader副本挂了,就会丢失数据。

acks=all等待数据完成副本的复制, 等同于-1. 假如需要保证消息不丢失, 需要使用该设置. 同时需要设置unclean.leader.election.enable为true, 保证当ISR列表为空时, 选择其他存活的副本作为新的leader.

buffer.memory

producer可以使用的最大内存来缓存等待发送到server端的消息. 如果消息速度大于producer交付到server端的阻塞时间max.block.ms, 将会抛出异常. 默认值33554432 byte (32m). 这个设置不是一个严格的边界, 因为producer除了用来缓存消息, 还要用来进行压缩.

compression.type

producer压缩数据的类型, 默认为none, 就是不压缩. 可选none, gzip, snappy 和lz4. 压缩整个batch的数据, 因此batch的效果对压缩率也有影响. 更多的批处理意味着更好的压缩

retries

设置大于零的值将导致客户端重新发送其发送失败并发生潜在的瞬时错误的记录. 相当于client在发送失败的时候会重新发行. 如果设置了retries而没有将max.in.flight.request.per.connection设置为1, 在两个batch发送到同一个partition时有可能打乱消息的发送顺序(第一个发送失败, 而第二个发送成功)

2、发送kafka

<!-- 创建kafkatemplate发送模版 -->
<bean id="notifyKafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="notifyProducerFactory" />
    <!-- autoFlush=true表示生产者同步发送到kafka,可靠性高 -->
    <constructor-arg name="autoFlush" value="true" />
    <!-- 队列名 -->
    <property name="defaultTopic" value="${kafka-hx.producer.loan.result.topic.name}" />
    <!-- 发送结果监听器 -->
    <property name="producerListener" ref="notifyProducerListener" />
</bean>


public void sendLoanApplyNotifyKafka(LoanApplyNotifyMessage loanApplyNotifyMessage) {
    try {
        MessageBO messageBO = new MessageBO();
        messageBO.setType(KafkaType.loanNotify.getType());
        messageBO.setLogId(MDC.get(BestpayMarker.TRACE_LOG_ID));
        messageBO.setValue(loanApplyNotifyMessage);
        notifyKafkaTemplate.sendDefault(JSONObject.toJSONString(messageBO));
    } catch (Exception e) {
        log.error("kafka消息发送异常,请求参数:{}异常信息:{}", loanApplyNotifyMessage, e);
    }
}


<!-- 发送结果监听器 继承ProducerListenerAdapter-->
<bean id="notifyProducerListener" class="com.bestpay.clp.debitcore.service.message.NotifyProducerListener" />


public class CostCenterProducerListener extends ProducerListenerAdapter {

    @Autowired
    private CostDetailManager costDetailManager;
    /**
     * 发送消息成功后调用
     */
    @Override
    public void onSuccess(String topic, Integer partition, Object key,
                          Object value, RecordMetadata recordMetadata) {
        super.onSuccess(topic, partition, key, value, recordMetadata);
        log.info("消息发送成功----topic:" + topic + ", partition:" + partition + ", key:" + key + ", value:" + value + ", RecordMetadata:" + recordMetadata.toString() + "----");
        Gson gson = new Gson();
        MessageBO messageBO = gson.fromJson(value.toString(), MessageBO.class);
    }

3、消费者

<!-- 消息监听器,实际执行消息消费的类 -->
<bean id="commonLoanListener" class="com.bestpay.personalfin.loan.biz.kafka.CommonLoanListener" />
<!-- 消费者容器配置信息 -->
<bean id="commonLoanContainerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg value="${kafka.debitcore.topic.name}" />
    <!-- 每接收到一条消息ack一次,可靠性最高 -->
    <property name="ackMode" value="RECORD" />
    <property name="groupId" value="${kafka.debitcore.consumer.group}" />
    <!--  消息监听器  -->
    <property name="messageListener" ref="commonLoanListener" />
</bean>

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 12:48:25  更:2021-07-30 12:49:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 2:00:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码