| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Kafka最全知识总结(建议收藏) -> 正文阅读 |
|
[大数据]Kafka最全知识总结(建议收藏) |
1、为什么有消息系统
2、Kafka核心概念生产者:Producer 往Kafka集群生成数据消费者:Consumer 往Kafka里面去获取数据,处理数据、消费数据Kafka的数据是由消费者自己去拉去Kafka里面的数据主题:topic分区:partition 默认一个topic有一个分区(partition),自己可设置多个分区(分区分散存储在服务器不同节点上) 3、Kafka的集群架构Kafka集群中,一个kafka服务器就是一个broker Topic只是逻辑上的概念,partition在磁盘上就体现为一个目录Consumer Group:消费组 消费数据的时候,都必须指定一个group id,指定一个组的id假定程序A和程序B指定的group id号一样,那么两个程序就属于同一个消费组特殊: 比如,有一个主题topicA程序A去消费了这个topicA,那么程序B就不能再去消费topicA(程序A和程序B属于一个消费组) 再比如程序A已经消费了topicA里面的数据,现在还是重新再次消费topicA的数据,是不可以的,但是重新指定一个group id号以后,可以消费。不同消费组之间没有影响。消费组需自定义,消费者名称程序自动生成(独一无二)。Controller:Kafka节点里面的一个主节点。借助zookeeper 4、Kafka磁盘顺序写保证写数据性能kafka写数据:顺序写,往磁盘上写数据时,就是追加数据,没有随机写的操作。经验: 如果一个服务器磁盘达到一定的个数,磁盘也达到一定转数,往磁盘里面顺序写(追加写)数据的速度和写内存的速度差不多 5、Kafka零拷贝机制保证读数据高性能消费者读取数据流程:
kafka linux sendfile技术 — 零拷贝 1.消费者发送请求给kafka服务 2.kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据) 3.从磁盘读取了数据到os cache缓存中 4.os cache直接将数据发送给网卡 5.通过网卡将数据传输给消费者 6、Kafka日志分段保存Kafka中一个主题,一般会设置分区;比如创建了一个 7、Kafka二分查找定位数据???????? Kafka里面每一条消息,都有自己的offset(相对偏移量),存在物理磁盘上面,在position Position:物理位置(磁盘上面哪个地方)也就是说一条消息就有两个位置:offset:相对偏移量(相对位置)position:磁盘物理位置稀疏索引:???????? Kafka中采用了稀疏索引的方式读取索引,kafka每当写入了4k大小的日志(.log),就往index里写入一个记录索引。其中会采用二分查找 8、高并发网络设计(先了解NIO)???????? 网络设计部分是kafka中设计最好的一个部分,这也是保证Kafka高并发、高性能的原因,对kafka进行调优,就得对kafka原理比较了解,尤其是网络设计部分 Reactor网络设计模式1:Reactor网络设计模式2:Reactor网络设计模式3:Kafka超高并发网络设计: 9、Kafka冗余副本保证高可用在kafka里面分区是有副本的,注:0.8以前是没有副本机制的。创建主题时,可以指定分区,也可以指定副本个数。副本是有角色的:leader partition:1、写数据、读数据操作都是从leader partition去操作的。2、会维护一个ISR(in-sync- replica )列表,但是会根据一定的规则删除ISR列表里面的值 生产者发送来一个消息,消息首先要写入到leader partition中 写完了以后,还要把消息写入到ISR列表里面的其它分区,写完后才算这个消息提交 follower partition:从leader partition同步数据。 10、优秀架构思考-总结Kafka — 高并发、高可用、高性能 高可用:多副本机制 高并发:网络架构设计 三层架构:多selector -> 多线程 -> 队列的设计(NIO) 高性能:写数据:
读数据:
11、Kafka生产环境搭建11.1 需求场景分析
也就是说6亿的数据是靠3个小时处理完的。我们简单的算一下高峰期时候的qps
一般情况下,我们都会设置两个副本?46T * 2 = 92T??Kafka里面的数据是有保留的时间周期,保留最近3天的数据。92T * 3天 = 276T我这儿说的是50kb不是说一条消息就是50kb不是(把日志合并了,多条日志合并在一起),通常情况下,一条消息就几b,也有可能就是几百字节。 11.2 物理机数量评估1)首先分析一下是需要虚拟机还是物理机 像Kafka mysql hadoop这些集群搭建的时候,我们生产里面都是使用物理机。2)高峰期需要处理的请求总的请求每秒5.5万个,其实一两台物理机绝对是可以抗住的。一般情况下,我们评估机器的时候,是按照高峰期的4倍的去评估。如果是4倍的话,大概我们集群的能力要准备到 20万qps。这样子的集群才是比较安全的集群。大概就需要5台物理机。每台承受4万请求。 场景总结: 11.3 磁盘选择搞定10亿请求,高峰期5.5万的qps,276T的数据,需要5台物理机。1)SSD固态硬盘,还是需要普通的机械硬盘SSD硬盘:性能比较好,但是价格贵 SAS盘:某方面性能不是很好,但是比较便宜。SSD硬盘性能比较好,指的是它随机读写的性能比较好。适合MySQL这样集群。但是其实他的顺序写的性能跟SAS盘差不多。kafka的理解:就是用的顺序写。所以我们就用普通的【 2)需要我们评估每台服务器需要多少块磁盘 5台服务器,一共需要276T ,大约每台服务器 需要存储60T的数据。我们公司里面服务器的配置用的是 11块硬盘,每个硬盘 7T。11 * 7T = 77T
场景总结:
11.4 内存评估搞定10亿请求,需要5台物理机,11(SAS) * 7T 我们发现kafka读写数据的流程 都是基于os cache,换句话说假设咱们的os cashe无限大那么整个kafka是不是相当于就是基于内存去操作,如果是基于内存去操作,性能肯定很好。内存是有限的。1) 尽可能多的内存资源要给 os cache 2) Kafka的代码用 核心的代码用的是scala写的,客户端的代码java写的。都是基于jvm。所以我们还要给一部分的内存给jvm。Kafka的设计,没有把很多数据结构都放在jvm里面。所以我们的这个jvm不需要太大的内存。根据经验,给个10G就可以了。
假设我们这个10请求的这个项目,一共会有100个topic。100 topic * 5 partition * 2 = 1000 partition 一个partition其实就是物理机上面的一个目录,这个目录下面会有很多个.log的文件。.log就是存储数据文件,默认情况下一个.log文件的大小是1G。我们如果要保证 1000个partition 的最新的.log 文件的数据 如果都在内存里面,这个时候性能就是最好。1000 * 1G = 1000G内存. 我们只需要把当前最新的这个log 保证里面的25%的最新的数据在内存里面。250M * 1000 = 0.25 G* 1000 =250G的内存。 250内存 / 5 = 50G内存 50G+10G = 60G内存 64G的内存,另外的4G,操作系统本生是不是也需要内存。其实Kafka的jvm也可以不用给到10G这么多。评估出来64G是可以的。当然如果能给到128G的内存的服务器,那就最好。 我刚刚评估的时候用的都是一个topic是5个partition,但是如果是数据量比较大的topic,可能会有10个partition。 总结: 11.5 CPU压力评估评估一下每台服务器需要多少cpu core(资源很有限) 我们评估需要多少个cpu ,依据就是看我们的服务里面有多少线程去跑。线程就是依托cpu 去运行的。如果我们的线程比较多,但是cpu core比较少,这样的话,我们的机器负载就会很高,性能不就不好。 评估一下,kafka的一台服务器 启动以后会有多少线程? Acceptor线程 1 processor线程 3 6~9个线程 处理请求线程 8个 32个线程 定时清理的线程,拉取数据的线程,定时检查ISR列表的机制 等等。所以大概一个Kafka的服务启动起来以后,会有一百多个线程。 cpu core = 4个,一遍来说,几十个线程,就肯定把cpu 打满了。cpu core = 8个,应该很轻松的能支持几十个线程。如果我们的线程是100多个,或者差不多200个,那么8 个 cpu core是搞不定的。所以我们这儿建议:CPU core = 16个。如果可以的话,能有32个cpu core 那就最好。 结论:kafka集群,最低也要给16个cpu core,如果能给到32 cpu core那就更好。2cpu * 8 =16 cpu core 4cpu * 8 = 32 cpu core 总结: 11.6 网络需求评估评估我们需要什么样网卡?一般要么是千兆的网卡(1G/s),还有的就是万兆的网卡(10G/s)
11.7 集群规划请求量 规划物理机的个数 分析磁盘的个数,选择使用什么样的磁盘 内存 cpu core 网卡就是告诉大家,以后要是公司里面有什么需求,进行资源的评估,服务器的评估,大家按照我的思路去评估 一条消息的大小 50kb -> 1kb 500byte 1Mip 主机名 192.168.0.100 hadoop1 192.168.0.101 hadoop2 192.168.0.102 hadoop3 主机的规划:kafka集群架构的时候:主从式的架构:controller -> 通过zk集群来管理整个集群的元数据。
12、kafka运维12.1 常见运维工具介绍KafkaManager — 页面管理工具 12.2 常见运维命令场景一:topic数据量太大,要增加topic数 一开始创建主题的时候,数据量不大,给的分区数不多。
broker id: hadoop1:0 hadoop2:1 hadoop3:2 假设一个partition有三个副本:partition0:a,b,c a:leader partition b,c:follower partition ISR:{a,b,c} 场景二:核心topic增加副本因子 如果对核心业务数据需要增加副本因子 vim test.json脚本,将下面一行json脚本保存
执行上面json脚本:
场景三:负载不均衡的topic,手动迁移vi topics-to-move.json
???????? ?把你所有的包括新加入的broker机器都写在这里,就会说是把所有的partition均匀的分散在各个broker上,包括新进来的broker此时会生成一个迁移方案,可以保存到一个文件里去:expand-cluster-reassignment.json
场景四:如果某个broker leader partition过多 正常情况下,我们的leader partition在服务器之间是负载均衡。hadoop1 4 hadoop2 1 hadoop3 1 现在各个业务方可以自行申请创建topic,分区数量都是自动分配和后续动态调整的, kafka本身会自动把leader partition均匀分散在各个机器上,这样可以保证每台机器的读写吞吐量都是均匀的 但是也有例外,那就是如果某些broker宕机,会导致leader partition过于集中在其他少部分几台broker上, 这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是folloer partition,读写请求很低, 造成集群负载不均衡有一个参数,auto.leader.rebalance.enable,默认是true, 每隔300秒(leader.imbalance.check.interval.seconds)检查leader负载是否平衡 如果一台broker上的不均衡的leader超过了10%,leader.imbalance.per.broker.percentage, 就会对这个broker进行选举 配置参数:auto.leader.rebalance.enable 默认是true leader.imbalance.per.broker.percentage: 每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。这个值表示百分比。10% leader.imbalance.check.interval.seconds:默认值300秒 13、Kafka生产者13.1 生产者发送消息原理13.2 生产者发送消息原理—基础案例演示13.3 如何提升吞吐量如何提升吞吐量:参数一: 13.4 如何处理异常
13.5 重试机制重试会带来一些问题:
13.6 ACK参数详解producer端设置的 request.required.acks=0;只要请求已发送出去,就算是发送完了,不关心有没有写成功。性能很好,如果是对一些日志进行分析,可以承受丢数据的情况,用这个参数,性能会很好。request.required.acks=1;发送一条消息,当leader partition写入成功以后,才算写入成功。不过这种方式也有丢数据的可能。request.required.acks=-1;需要ISR列表里面,所有副本都写完以后,这条消息才算写入成功。ISR:1个副本。1 leader partition 1 follower partition kafka服务端:min.insync.replicas:1, 如果我们不设置的话,默认这个值是1 一个leader partition会维护一个ISR列表,这个值就是限制ISR列表里面 至少得有几个副本,比如这个值是2,那么当ISR列表里面只有一个副本的时候。往这个分区插入数据的时候会报错。设计一个不丢数据的方案:数据不丢失的方案:1)分区副本 >=2 2)acks = -1 3)min.insync.replicas >=2 还有可能就是发送有异常:对异常进行处理 13.7 自定义分区分区:1、没有设置key我们的消息就会被轮训的发送到不同的分区。2、设置了keykafka自带的分区器,会根据key计算出来一个hash值,这个hash值会对应某一个分区。如果key相同的,那么hash值必然相同,key相同的值,必然是会被发送到同一个分区。但是有些比较特殊的时候,我们就需要自定义分区
如何使用:配置上这个类即可:props.put(”partitioner.class”, “com.zhss.HotDataPartitioner”); 13.8 综合案例演示14.1 消费组概念 groupid相同就属于同一个消费组 1)每个consumer都要属于一个consumer.group,就是一个消费组,topic的一个分区只会分配给 一个消费组下的一个consumer来处理,每个consumer可能会分配多个分区,也有可能某个consumer没有分配到任何分区 2)如果想要实现一个广播的效果,那只需要使用不同的group id去消费就可以。topicA: partition0、partition1 groupA:consumer1:消费 partition0 consuemr2:消费 partition1 consuemr3:消费不到数据 groupB: consuemr3:消费到partition0和partition1 3)如果consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他 14、Kafka消费者14.1 消费组概念groupid相同就属于同一个消费组 1)每个consumer都要属于一个consumer.group,就是一个消费组,topic的一个分区只会分配给 一个消费组下的一个consumer来处理,每个consumer可能会分配多个分区,也有可能某个consumer没有分配到任何分区 2)如果想要实现一个广播的效果,那只需要使用不同的group id去消费就可以。topicA: partition0、partition1 groupA:consumer1:消费 partition0 consuemr2:消费 partition1 consuemr3:消费不到数据 groupB: consuemr3:消费到partition0和partition1 3)如果consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他 14.2 基础案例演示14.3 偏移量管理
14.4 偏移量监控工具介绍
14.5 消费异常感知heartbeat.interval.ms:consumer心跳时间间隔,必须得与coordinator保持心跳才能知道consumer是否故障了, 然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作 session.timeout.ms:kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒 max.poll.interval.ms:如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一般来说结合业务处理的性能来设置就可以了。 14.6 核心参数解释fetch.max.bytes:获取一条消息最大的字节数,一般建议设置大一些,默认是1M 其实我们在之前多个地方都见到过这个类似的参数,意思就是说一条信息最大能多大?
14.7 综合案例演示引入案例:二手电商平台(欢乐送),根据用户消费的金额,对用户星星进行累计。订单系统(生产者) -> Kafka集群里面发送了消息。会员系统(消费者) -> Kafak集群里面消费消息,对消息进行处理。 14.8 group coordinator原理面试题:消费者是如何实现rebalance的?— 根据coordinator实现
14.9 rebalance策略consumer group靠coordinator实现了Rebalance 这里有三种rebalance的策略:range、round-robin、sticky 比如我们消费的一个主题有12个分区:p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11 假设我们的消费者组里面有三个消费者
假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3 这样的话,原本在consumer2上的的p6,p7分区就被分配到了 consumer3上。
consumer1:0-3 consumer2: 4-7 consumer3: 8-11 假设consumer3挂了 consumer1:0-3,+8,9 consumer2: 4-7,+10,11 15、Broker管理15.1 Leo、hw含义
核心的概念:LEO,HW LEO:是跟offset偏移量有关系。 LEO:在kafka里面,无论leader partition还是follower partition统一都称作副本(replica)。
HW:高水位 LEO有一个很重要的功能就是更新HW,如果follower和leader的LEO同步了,此时HW就可以更新 HW之前的数据对消费者是可见,消息属于commit状态。HW之后的消息消费者消费不到。 15.2 Leo更新15.3 hw更新15.4 controller如何管理整个集群1: 竞争controller的 /controller/id 2:controller服务监听的目录:/broker/ids/ 用来感知 broker上下线 /broker/topics/ 创建主题,我们当时创建主题命令,提供的参数,ZK地址。/admin/reassign_partitions 分区重分配 …… 15.5 延时任务kafka的延迟调度机制(扩展知识) 我们先看一下kafka里面哪些地方需要有任务要进行延迟调度。第一类延时的任务:比如说producer的acks=-1,必须等待leader和follower都写完才能返回响应。有一个超时时间,默认是30秒(request.timeout.ms)。所以需要在写入一条数据到leader磁盘之后,就必须有一个延时任务,到期时间是30秒延时任务 放到DelayedOperationPurgatory(延时管理器)中。假如在30秒之前如果所有follower都写入副本到本地磁盘了,那么这个任务就会被自动触发苏醒,就可以返回响应结果给客户端了, 否则的话,这个延时任务自己指定了最多是30秒到期,如果到了超时时间都没等到,就直接超时返回异常。第二类延时的任务:follower往leader拉取消息的时候,如果发现是空的,此时会创建一个延时拉取任务 延时时间到了之后(比如到了100ms),就给follower返回一个空的数据,然后follower再次发送请求读取消息, 但是如果延时的过程中(还没到100ms),leader写入了消息,这个任务就会自动苏醒,自动执行拉取任务。 海量的延时任务,需要去调度。 15.6?时间轮机制
|
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/18 16:51:09- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |