| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> kafka -> 正文阅读 |
|
[大数据]kafka |
apache的kafka是一个分布式的发送并发消息,可以发送海量数据 一、kafka概述 ? ? 1、概念详解: 1.1 apache官网:Welcome to The Apache Software Foundation! kafka官网:Apache Kafka 1.2、producer:生产者即数据的发布者,将消息发送到kafka的brokers中;consumer消费者可以中brokers中读取数据,消费者可以消费多个kafka中的数据. topic:使用类别属性,划分数据的所属类(可以理解为数据库中的表) brocker:一个topc有多个分区partion,每个分区存储在一个broker中,其余分区下的partion数据是不会存储到这个brocker中;brocker的数量,一定要大于topics中partions的数量. ? ? ?1.2 kafka的安装与配置 (1)kafka需要java环境,下载jdk:Java Downloads | Oracle (2)zookeeper安装Apache ZooKeeper ?下载http格式的即可 ?实操:
?(3)kafka的安装和配置 kafka是根据scala开发的,所以选择scala 2.12或者2.13均可 ?实操
(4)kafka的命令测试消息生产与消费 ? 实操 (1)cd kafka安装目录下 (2)bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic huahua --partitions 2 --replication-factor 1 报错:?? ?报错原因:服务没有起来,之前被自己给ctrl+c关掉了 输入如下命令重启后,重新执行上述创建操作:
创建成功了 (2)验证是否创建成功 bin/kafka-topics.sh --zookeeper localhost:2181 --list ? list执行结束之后只有当前这个主题,则创建成功了 (3)查看当前创建主题的详情 bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic huahua ?(4)启动消费端接收消息 ?bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic huahua ?启动成功 (5)消费端终端启动之后,重新打开一个,创建生产端发送消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic huahua ?输入的字符串可以在消费者端展示,说明被监听到了 二、 java第一个程序 1、创建springboot工程:new-project-java initialize-改名字-web-java spring 2、导入jar包 3、写生产者 运行,查看之前终端启动的客户端能否收到消息 ?注意:java客户端和kafka连接成功,需要将防火墙关闭,可以执行如下操作检查 telnet 本地ip,连接成功 如果是开启的状态,执行如下命令关闭即可 4、消费端 ?运行,发送消息 5、优化与总结:将key和value的一些序列化和反序列化,都采用了kafka下的静态常量 改producer ?改consumer ?二、 1、消息发送流程解析 ?2、发送类型 eg: ?消费端可以收到 异步发送:当前可以进行别的操作,消息发送成功之后,会进行一个回调;相当于重新启动了一个线程,不会阻塞当前线程的处理 消费端收到了消息 ?3、序列化器 序列化器必须要实现serialize接口 (2)、自定义序列化器:重写serializer接口,再重写下面这几个方法 ?4、分区器 自定义分区器:重写接口partioner和方法如下,一般情况下采用默认的分区策略即可 ?5、拦截器 使用场景:(1)@test:测试的时候使用的类,一般上线后不会使用 (2)修改消息的内容:比如需要统一给消息加一些前缀这样的场景 (3)统计类需求:发送消息时,经常需要统计发送量,或者是发送的成功率 eg: ?将当前主机发送的消息加一个前缀 ? ?执行结果:发送成功,且客户端加上了一个拦截器 ?6、发送原理解析 ? ?三、消费者详解 1、概念入门 ?(1)消费者和消费组 eg:不置顶groupid会抛出如下异常 ? ?2、消息接收 (1)必要参数设置 (2)订阅主题和分区? ?(3) 反序列化 ?(4)位移提交 offset指定了消息在分区当中的位置 ?重复消费,第一个消息已经消费的地方,第二个消费者又消费了一遍 消息丢失:当新的消费者加入之前,某一个消费者已经拉取了这一部分的消息,进行消息处理钱,提交了位移未处理,但是此时宕机了,新的消费者负责此分区的提交,并且提交位移,处理这部分的位移,就很有可能出现了丢失 提交方式 a、自动提交 b、同步提交 ?手动提交会增加消息重复的概率 ?c、异步提交 (5)指定位移消费 ?eg: ?运行 ? 增加判断是否获取到了分区,如果没有 ,则继续获取 3、再均衡? ?一个消费者发生了变更,变更成了另一个消费者,叫做再均衡,一旦出现再均衡,进行的操作:(1)同步位移的尽量避免重复消费(2)拉取消息处理完(3)进行异步的提交 ?4、拦截器 ? 如上场景:给消息设置某一些属性,如果消息再既定的时间内无法到达,则做一些处理? ? 运行消费端和发送端,发送端时间的第二个参数,倒退了一分钟,分别运行消费端和发送端,同时发送两条消息,一条消息是正常的,倒退一分钟的没有展示? ? 四、主题 1、主题管理 ? (2)查看主题 ? (3)修改主题? ?(4)删除主题 eg: 标记为删除后重启一下kafka 结果:之前创建的heima主题被删除掉了? 2、 分区 增加分区:修改分区的时候只能增加分区数,不能减少 eg: ?3、总结 一般用于中台 eg: ?结果跑出异常: ?将上述代码位置分区改为四个(因为分区超过了三个) ?五、分区 1、副本机制 ? 上述绿色的是副本 ?2、leader选举 ?leader选举: 主题中的源信息都是保存在zookeeper上面,当leader挂掉,其他flower回去获取leader角色,谁先获取到谁就是新leader ? 3、分区重新分配001:一般添加集群机器就是因为当前的集群机器满足不了业务的需求,所以需要进行添加,添加之后又不能立即分担工作,所以采用重新分配分区这种方式 ? eg: ?进入kafka01修改配置:brokerid需要唯一,不能重复 vim config.properties? ??修改log的地址 (2)同样的步骤修改第二和三个集群的节点? (3)分别启动三个节点 ? ?因为之前是把目录拷贝了一下,所以需要,把之前的log文件删除,再重新启动 ?抛异常:配置文件里面集群里面要写成localhost不写成ip,修改之后,重新启动即可 ?相当于一台机器,挂了三个集群,所以端口号应该不同,修改三个集群的端口号分别为不同的再重新启动 ?目前为止就搭建了一个kafka集群,一台主机上启动了三个实例 ?4、分区重新分配002 (1)结合上面的集群创建主题 (2)查看主题的详细信息 :三个分区,三个副本,三个分区分配在了三个节点中,实现了负载均衡 (3)给主题添加分区 (4)查看详情变成了四个分区:节点2承担的压力更大,负载不均衡 ?(5)再启用一个终端,进入kafka,目录,复制kafka01到kafka04,在增加一个节点,修改配置信息,如之前所讲;然后清理log文件,启动kafka04的集群 (6)回到之前的终端再次查看详情,和之前没有变化;说明新增加的节点不参与数据的保存,新增加的节点只有当新创建的主题时才能承担数据,所以需要做重新分配,把新增加的节点重新分配到broker 1-4节点上 ?(7)重新分配 ? ? ?eg: 放到kafka01下 (1)新建一个json文件 ?(2)json串,指定接下来要对哪个主题进行重新分配 ? ?(3)生成可行性方案,并没有从新执行分配的动作 执行结束之后会生成两个json串,表示当前执行的计划会保存到哪个json串中? ?(4)把生成的方案重新保存到一个文件中 ? ?(5)执行分配策略 ?(6)重新分配需要时间,查看当前分配进度 ?(7)重新查看分区,4个分区均匀的分配到了4个节点中,实现了负载均衡 5、分区分配策略 ,了解即可 ? ? ? ?六、kafka存储 1、存储结构 ?2、日志索引? ? ? 3、日志清理、总结 ? ? 七、稳定性 1、幂等性 ?2、事务? eg: ? 打开之前的一个消费端,运行,再运行上述发送端,报错: ?端口号有问题,加上端口号 ?结果: ? 都是message1,因为trycatch中写的不正确? ? 修改后: ? 模拟异常的情况 ?结果: 因为抛出异常回滚了,所以消费端一条消息都没有 3、控制器? 4、可靠性保证 ? ? 5、一致性保证? ? 数据丢失场景: ? ? 6、总结 ? ? ? 八、高级 1、消费组管理 2、connect-文件系统 ? 3、springboot+kafka ? eg: 消息的发送方: 没有@PathVariable,最后的展示会发现后面的值为null,而不是kafka 运行展示: 消息的消费者? ????? 配置文件中增加consumer 运行展示: (2)springboot中使用事务操作 a、添加配置文件? ?b、生产者 运行展示(说明发挥了事务的特性) ?输入为error时,抛出了异常 ?(3)事务的控制方式二:注解的方式 ?运行展示: ? 说明上述方式也实现了事务的控制? 总结: ? ? 4、spark+kafka spak和kafka的整合 ?启动spark客户端(运行上述代码),启动kafka服务端发送内容 spark客户端会收到那几条消息? ?九、集群管理 1、集群概述 ? ? 2、集群搭建zookeeper kafka集群依赖于zookeeper集群 ? ? ? ? ? ? ?eg: ?配置clientport dataDir修改 ? ?myid中的实例写为0 配置文件中增加配置接口 ? 启动之前把其他几个节点拷贝过去,重复01的操作 ?分别去每个实例下启动验证启动状态: 同一台主机上搭建多个集群和多台主机搭建多个集群,不同的地方在于,同一个主机ip相同,端口号不同;多台机器的ip不同,但是端口号相同. 3、集群搭建kafka,之前搭建过,方法同 ?依赖的zookeeper有三个实例,所以需要更换 ? ? 启动zookeeper实例 ,目前三个实例已经启动? ?启动kafka实例 打开另一个终端查看,发现kafka已经启动 kafka03和03方法同01的即可? 4、集群同步总结(了解即可) 十、监控 1、JMX ?eg: 启动之前的kafka节点 ?打开zookeeper的管理工具,可以看到kafka集群的三个节点及对应的内容 2、编程获取指标 找jdk目录下的jconsole 打开jconsole并连接 ? 查看kafka相关指标 (1)代码演示kafka监控指标? ? ?结果: 和下图一致(上述方法按照下图获取属性) 3、监控指标了解 ? ? ? |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/18 11:58:08- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |