| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> kafka -> 正文阅读 |
|
[大数据]kafka |
apache的kafka是一个分布式的发送并发消息,可以发送海量数据 一、kafka概述
1、概念详解: 1.1 apache官网:Welcome to The Apache Software Foundation! kafka官网:Apache Kafka 1.2、producer:生产者即数据的发布者,将消息发送到kafka的brokers中;consumer消费者可以中brokers中读取数据,消费者可以消费多个kafka中的数据. topic:使用类别属性,划分数据的所属类(可以理解为数据库中的表) brocker:一个topc有多个分区partion,每个分区存储在一个broker中,其余分区下的partion数据是不会存储到这个brocker中;brocker的数量,一定要大于topics中partions的数量. ?
(1)kafka需要java环境,下载jdk:Java Downloads | Oracle (2)zookeeper安装Apache ZooKeeper ?下载http格式的即可
?(3)kafka的安装和配置 kafka是根据scala开发的,所以选择scala 2.12或者2.13均可 ?实操
(4)kafka的命令测试消息生产与消费 ? 实操 (1)cd kafka安装目录下 (2)bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic huahua --partitions 2 --replication-factor 1 报错:?? ?报错原因:服务没有起来,之前被自己给ctrl+c关掉了 输入如下命令重启后,重新执行上述创建操作:
创建成功了 (2)验证是否创建成功 bin/kafka-topics.sh --zookeeper localhost:2181 --list
list执行结束之后只有当前这个主题,则创建成功了 (3)查看当前创建主题的详情 bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic huahua
?bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic huahua ?启动成功 (5)消费端终端启动之后,重新打开一个,创建生产端发送消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic huahua ?输入的字符串可以在消费者端展示,说明被监听到了 二、 java第一个程序 1、创建springboot工程:new-project-java initialize-改名字-web-java spring 2、导入jar包 3、写生产者 运行,查看之前终端启动的客户端能否收到消息
telnet 本地ip,连接成功 如果是开启的状态,执行如下命令关闭即可
5、优化与总结:将key和value的一些序列化和反序列化,都采用了kafka下的静态常量 改producer
1、消息发送流程解析 ?2、发送类型 eg:
异步发送:当前可以进行别的操作,消息发送成功之后,会进行一个回调;相当于重新启动了一个线程,不会阻塞当前线程的处理 消费端收到了消息
序列化器必须要实现serialize接口 (2)、自定义序列化器:重写serializer接口,再重写下面这几个方法
自定义分区器:重写接口partioner和方法如下,一般情况下采用默认的分区策略即可
使用场景:(1)@test:测试的时候使用的类,一般上线后不会使用 (2)修改消息的内容:比如需要统一给消息加一些前缀这样的场景 (3)统计类需求:发送消息时,经常需要统计发送量,或者是发送的成功率 eg:
?执行结果:发送成功,且客户端加上了一个拦截器 ?6、发送原理解析
1、概念入门 ?(1)消费者和消费组 eg:不置顶groupid会抛出如下异常
(1)必要参数设置 (2)订阅主题和分区? ?(3) 反序列化
消息丢失:当新的消费者加入之前,某一个消费者已经拉取了这一部分的消息,进行消息处理钱,提交了位移未处理,但是此时宕机了,新的消费者负责此分区的提交,并且提交位移,处理这部分的位移,就很有可能出现了丢失 提交方式 a、自动提交 b、同步提交
?c、异步提交 (5)指定位移消费 ?eg:
增加判断是否获取到了分区,如果没有 ,则继续获取 3、再均衡? ?一个消费者发生了变更,变更成了另一个消费者,叫做再均衡,一旦出现再均衡,进行的操作:(1)同步位移的尽量避免重复消费(2)拉取消息处理完(3)进行异步的提交
如上场景:给消息设置某一些属性,如果消息再既定的时间内无法到达,则做一些处理?
运行消费端和发送端,发送端时间的第二个参数,倒退了一分钟,分别运行消费端和发送端,同时发送两条消息,一条消息是正常的,倒退一分钟的没有展示? ? 四、主题 1、主题管理
(2)查看主题
(3)修改主题? ?(4)删除主题
标记为删除后重启一下kafka 结果:之前创建的heima主题被删除掉了?
增加分区:修改分区的时候只能增加分区数,不能减少 eg:
一般用于中台 eg:
?五、分区 1、副本机制 ? 上述绿色的是副本 ?2、leader选举
主题中的源信息都是保存在zookeeper上面,当leader挂掉,其他flower回去获取leader角色,谁先获取到谁就是新leader ? 3、分区重新分配001:一般添加集群机器就是因为当前的集群机器满足不了业务的需求,所以需要进行添加,添加之后又不能立即分担工作,所以采用重新分配分区这种方式
eg:
vim config.properties? ??修改log的地址 (2)同样的步骤修改第二和三个集群的节点? (3)分别启动三个节点
?因为之前是把目录拷贝了一下,所以需要,把之前的log文件删除,再重新启动 ?抛异常:配置文件里面集群里面要写成localhost不写成ip,修改之后,重新启动即可 ?相当于一台机器,挂了三个集群,所以端口号应该不同,修改三个集群的端口号分别为不同的再重新启动 ?目前为止就搭建了一个kafka集群,一台主机上启动了三个实例 ?4、分区重新分配002 (1)结合上面的集群创建主题 (2)查看主题的详细信息 :三个分区,三个副本,三个分区分配在了三个节点中,实现了负载均衡 (3)给主题添加分区 (4)查看详情变成了四个分区:节点2承担的压力更大,负载不均衡 ?(5)再启用一个终端,进入kafka,目录,复制kafka01到kafka04,在增加一个节点,修改配置信息,如之前所讲;然后清理log文件,启动kafka04的集群 (6)回到之前的终端再次查看详情,和之前没有变化;说明新增加的节点不参与数据的保存,新增加的节点只有当新创建的主题时才能承担数据,所以需要做重新分配,把新增加的节点重新分配到broker 1-4节点上 ?(7)重新分配
? ?eg: 放到kafka01下 (1)新建一个json文件 ?(2)json串,指定接下来要对哪个主题进行重新分配
?(3)生成可行性方案,并没有从新执行分配的动作 执行结束之后会生成两个json串,表示当前执行的计划会保存到哪个json串中? ?(4)把生成的方案重新保存到一个文件中
?(5)执行分配策略 ?(6)重新分配需要时间,查看当前分配进度
5、分区分配策略 ,了解即可 ?
1、存储结构
? 七、稳定性 1、幂等性 ?2、事务? eg:
打开之前的一个消费端,运行,再运行上述发送端,报错: ?端口号有问题,加上端口号
都是message1,因为trycatch中写的不正确? ? 修改后:
模拟异常的情况 ?结果: 因为抛出异常回滚了,所以消费端一条消息都没有 3、控制器? 4、可靠性保证
? 数据丢失场景:
?
? 八、高级 1、消费组管理
eg: 消息的发送方: 没有@PathVariable,最后的展示会发现后面的值为null,而不是kafka 运行展示: 消息的消费者?
配置文件中增加consumer 运行展示: (2)springboot中使用事务操作 a、添加配置文件? ?b、生产者 运行展示(说明发挥了事务的特性)
?(3)事务的控制方式二:注解的方式
说明上述方式也实现了事务的控制? 总结: ?
4、spark+kafka spak和kafka的整合
spark客户端会收到那几条消息? ?九、集群管理 1、集群概述
kafka集群依赖于zookeeper集群
? ? ? ?eg:
dataDir修改
?myid中的实例写为0 配置文件中增加配置接口
启动之前把其他几个节点拷贝过去,重复01的操作
同一台主机上搭建多个集群和多台主机搭建多个集群,不同的地方在于,同一个主机ip相同,端口号不同;多台机器的ip不同,但是端口号相同. 3、集群搭建kafka,之前搭建过,方法同
启动zookeeper实例 ,目前三个实例已经启动?
打开另一个终端查看,发现kafka已经启动 kafka03和03方法同01的即可? 4、集群同步总结(了解即可)
1、JMX
启动之前的kafka节点 ?打开zookeeper的管理工具,可以看到kafka集群的三个节点及对应的内容 2、编程获取指标 找jdk目录下的jconsole 打开jconsole并连接
?结果: 和下图一致(上述方法按照下图获取属性)
|
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年3日历 | -2025/3/4 21:02:30- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |