| |
|
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
| -> 大数据 -> kafka -> 正文阅读 |
|
|
[大数据]kafka |
|
apache的kafka是一个分布式的发送并发消息,可以发送海量数据 一、kafka概述
1、概念详解: 1.1 apache官网:Welcome to The Apache Software Foundation! kafka官网:Apache Kafka 1.2、producer:生产者即数据的发布者,将消息发送到kafka的brokers中;consumer消费者可以中brokers中读取数据,消费者可以消费多个kafka中的数据. topic:使用类别属性,划分数据的所属类(可以理解为数据库中的表) brocker:一个topc有多个分区partion,每个分区存储在一个broker中,其余分区下的partion数据是不会存储到这个brocker中;brocker的数量,一定要大于topics中partions的数量.
?
(1)kafka需要java环境,下载jdk:Java Downloads | Oracle (2)zookeeper安装Apache ZooKeeper
?下载http格式的即可
?(3)kafka的安装和配置
kafka是根据scala开发的,所以选择scala 2.12或者2.13均可
?实操
(4)kafka的命令测试消息生产与消费
? 实操 (1)cd kafka安装目录下 (2)bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic huahua --partitions 2 --replication-factor 1 报错:?? ?报错原因:服务没有起来,之前被自己给ctrl+c关掉了 输入如下命令重启后,重新执行上述创建操作:
创建成功了
(2)验证是否创建成功 bin/kafka-topics.sh --zookeeper localhost:2181 --list
list执行结束之后只有当前这个主题,则创建成功了 (3)查看当前创建主题的详情 bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic huahua
?bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic huahua
?启动成功 (5)消费端终端启动之后,重新打开一个,创建生产端发送消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic huahua
?输入的字符串可以在消费者端展示,说明被监听到了 二、 java第一个程序 1、创建springboot工程:new-project-java initialize-改名字-web-java spring 2、导入jar包 3、写生产者
运行,查看之前终端启动的客户端能否收到消息
telnet 本地ip,连接成功
如果是开启的状态,执行如下命令关闭即可
5、优化与总结:将key和value的一些序列化和反序列化,都采用了kafka下的静态常量 改producer
1、消息发送流程解析
?2、发送类型
eg:
异步发送:当前可以进行别的操作,消息发送成功之后,会进行一个回调;相当于重新启动了一个线程,不会阻塞当前线程的处理
消费端收到了消息
序列化器必须要实现serialize接口 (2)、自定义序列化器:重写serializer接口,再重写下面这几个方法
自定义分区器:重写接口partioner和方法如下,一般情况下采用默认的分区策略即可
使用场景:(1)@test:测试的时候使用的类,一般上线后不会使用 (2)修改消息的内容:比如需要统一给消息加一些前缀这样的场景 (3)统计类需求:发送消息时,经常需要统计发送量,或者是发送的成功率 eg:
?执行结果:发送成功,且客户端加上了一个拦截器
?6、发送原理解析
1、概念入门 ?(1)消费者和消费组
eg:不置顶groupid会抛出如下异常
(1)必要参数设置
(2)订阅主题和分区?
?(3) 反序列化
消息丢失:当新的消费者加入之前,某一个消费者已经拉取了这一部分的消息,进行消息处理钱,提交了位移未处理,但是此时宕机了,新的消费者负责此分区的提交,并且提交位移,处理这部分的位移,就很有可能出现了丢失 提交方式 a、自动提交
b、同步提交
?c、异步提交
(5)指定位移消费
?eg:
增加判断是否获取到了分区,如果没有 ,则继续获取
3、再均衡? ?一个消费者发生了变更,变更成了另一个消费者,叫做再均衡,一旦出现再均衡,进行的操作:(1)同步位移的尽量避免重复消费(2)拉取消息处理完(3)进行异步的提交
如上场景:给消息设置某一些属性,如果消息再既定的时间内无法到达,则做一些处理?
运行消费端和发送端,发送端时间的第二个参数,倒退了一分钟,分别运行消费端和发送端,同时发送两条消息,一条消息是正常的,倒退一分钟的没有展示? ? 四、主题 1、主题管理
(2)查看主题
(3)修改主题?
?(4)删除主题
标记为删除后重启一下kafka 结果:之前创建的heima主题被删除掉了?
增加分区:修改分区的时候只能增加分区数,不能减少
eg:
一般用于中台
eg:
?五、分区 1、副本机制 ?
上述绿色的是副本 ?2、leader选举
主题中的源信息都是保存在zookeeper上面,当leader挂掉,其他flower回去获取leader角色,谁先获取到谁就是新leader
? 3、分区重新分配001:一般添加集群机器就是因为当前的集群机器满足不了业务的需求,所以需要进行添加,添加之后又不能立即分担工作,所以采用重新分配分区这种方式
eg:
vim config.properties?
??修改log的地址
(2)同样的步骤修改第二和三个集群的节点? (3)分别启动三个节点
?因为之前是把目录拷贝了一下,所以需要,把之前的log文件删除,再重新启动
?抛异常:配置文件里面集群里面要写成localhost不写成ip,修改之后,重新启动即可
?相当于一台机器,挂了三个集群,所以端口号应该不同,修改三个集群的端口号分别为不同的再重新启动
?目前为止就搭建了一个kafka集群,一台主机上启动了三个实例 ?4、分区重新分配002 (1)结合上面的集群创建主题
(2)查看主题的详细信息 :三个分区,三个副本,三个分区分配在了三个节点中,实现了负载均衡
(3)给主题添加分区
(4)查看详情变成了四个分区:节点2承担的压力更大,负载不均衡
?(5)再启用一个终端,进入kafka,目录,复制kafka01到kafka04,在增加一个节点,修改配置信息,如之前所讲;然后清理log文件,启动kafka04的集群
(6)回到之前的终端再次查看详情,和之前没有变化;说明新增加的节点不参与数据的保存,新增加的节点只有当新创建的主题时才能承担数据,所以需要做重新分配,把新增加的节点重新分配到broker 1-4节点上
?(7)重新分配
? ?eg: 放到kafka01下 (1)新建一个json文件
?(2)json串,指定接下来要对哪个主题进行重新分配
?(3)生成可行性方案,并没有从新执行分配的动作 执行结束之后会生成两个json串,表示当前执行的计划会保存到哪个json串中? ?(4)把生成的方案重新保存到一个文件中
?(5)执行分配策略
?(6)重新分配需要时间,查看当前分配进度
5、分区分配策略 ,了解即可 ?
1、存储结构
? 七、稳定性 1、幂等性
?2、事务?
eg:
打开之前的一个消费端,运行,再运行上述发送端,报错:
?端口号有问题,加上端口号
都是message1,因为trycatch中写的不正确? ? 修改后:
模拟异常的情况
?结果: 因为抛出异常回滚了,所以消费端一条消息都没有
3、控制器?
4、可靠性保证
? 数据丢失场景:
?
? 八、高级 1、消费组管理
eg: 消息的发送方: 没有@PathVariable,最后的展示会发现后面的值为null,而不是kafka
运行展示:
消息的消费者?
配置文件中增加consumer
运行展示:
(2)springboot中使用事务操作 a、添加配置文件?
?b、生产者
运行展示(说明发挥了事务的特性)
?(3)事务的控制方式二:注解的方式
说明上述方式也实现了事务的控制? 总结: ?
4、spark+kafka
spak和kafka的整合
spark客户端会收到那几条消息? ?九、集群管理 1、集群概述
kafka集群依赖于zookeeper集群
? ? ? ?eg:
dataDir修改
?myid中的实例写为0 配置文件中增加配置接口
启动之前把其他几个节点拷贝过去,重复01的操作
同一台主机上搭建多个集群和多台主机搭建多个集群,不同的地方在于,同一个主机ip相同,端口号不同;多台机器的ip不同,但是端口号相同. 3、集群搭建kafka,之前搭建过,方法同
启动zookeeper实例 ,目前三个实例已经启动?
打开另一个终端查看,发现kafka已经启动
kafka03和03方法同01的即可? 4、集群同步总结(了解即可)
1、JMX
启动之前的kafka节点
?打开zookeeper的管理工具,可以看到kafka集群的三个节点及对应的内容
2、编程获取指标 找jdk目录下的jconsole
打开jconsole并连接
?结果:
和下图一致(上述方法按照下图获取属性)
|
|
|
|
|
| 上一篇文章 下一篇文章 查看所有文章 |
|
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
| 360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年11日历 | -2025/11/30 3:00:51- |
|
| 网站联系: qq:121756557 email:121756557@qq.com IT数码 |