IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka -> 正文阅读

[大数据]kafka

apache的kafka是一个分布式的发送并发消息,可以发送海量数据

一、kafka概述

?

?

1、概念详解:

1.1 apache官网:Welcome to The Apache Software Foundation!

kafka官网:Apache Kafka

1.2、producer:生产者即数据的发布者,将消息发送到kafka的brokers中;consumer消费者可以中brokers中读取数据,消费者可以消费多个kafka中的数据.

topic:使用类别属性,划分数据的所属类(可以理解为数据库中的表)

brocker:一个topc有多个分区partion,每个分区存储在一个broker中,其余分区下的partion数据是不会存储到这个brocker中;brocker的数量,一定要大于topics中partions的数量.

?

?

?1.2 kafka的安装与配置

(1)kafka需要java环境,下载jdk:Java Downloads | Oracle

(2)zookeeper安装Apache ZooKeeper

?下载http格式的即可

?实操:

cd zookerper安装目录下

cd conf?

mv zoo_sample.cfg zoo.cfg

bin/zkServer.sh

bin/zkServer.sh start

?(3)kafka的安装和配置

kafka是根据scala开发的,所以选择scala 2.12或者2.13均可

?实操

cd kafka安装目录

cd config

vim?server.properties修改配置文件

bin/kafka-server-start.sh config/server.properties启动kafka

(4)kafka的命令测试消息生产与消费

?

实操

(1)cd kafka安装目录下

(2)bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic huahua --partitions 2 --replication-factor 1

报错:??

?报错原因:服务没有起来,之前被自己给ctrl+c关掉了

输入如下命令重启后,重新执行上述创建操作:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

创建成功了

(2)验证是否创建成功

bin/kafka-topics.sh --zookeeper localhost:2181 --list

?

list执行结束之后只有当前这个主题,则创建成功了

(3)查看当前创建主题的详情

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic huahua

?(4)启动消费端接收消息

?bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic huahua

?启动成功

(5)消费端终端启动之后,重新打开一个,创建生产端发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic huahua

?输入的字符串可以在消费者端展示,说明被监听到了

二、 java第一个程序

1、创建springboot工程:new-project-java initialize-改名字-web-java spring

2、导入jar包

3、写生产者

运行,查看之前终端启动的客户端能否收到消息

?注意:java客户端和kafka连接成功,需要将防火墙关闭,可以执行如下操作检查

telnet 本地ip,连接成功

如果是开启的状态,执行如下命令关闭即可

4、消费端

?运行,发送消息

5、优化与总结:将key和value的一些序列化和反序列化,都采用了kafka下的静态常量

改producer

?改consumer

?二、

1、消息发送流程解析

?2、发送类型

eg:

?消费端可以收到

异步发送:当前可以进行别的操作,消息发送成功之后,会进行一个回调;相当于重新启动了一个线程,不会阻塞当前线程的处理

消费端收到了消息

?3、序列化器

序列化器必须要实现serialize接口

(2)、自定义序列化器:重写serializer接口,再重写下面这几个方法

?4、分区器

自定义分区器:重写接口partioner和方法如下,一般情况下采用默认的分区策略即可

?5、拦截器

使用场景:(1)@test:测试的时候使用的类,一般上线后不会使用 (2)修改消息的内容:比如需要统一给消息加一些前缀这样的场景 (3)统计类需求:发送消息时,经常需要统计发送量,或者是发送的成功率

eg:

?将当前主机发送的消息加一个前缀

?

?执行结果:发送成功,且客户端加上了一个拦截器

?6、发送原理解析

?

?三、消费者详解

1、概念入门

?(1)消费者和消费组

eg:不置顶groupid会抛出如下异常

?

?2、消息接收

(1)必要参数设置

(2)订阅主题和分区?

?(3) 反序列化

?(4)位移提交 offset指定了消息在分区当中的位置

?重复消费,第一个消息已经消费的地方,第二个消费者又消费了一遍

消息丢失:当新的消费者加入之前,某一个消费者已经拉取了这一部分的消息,进行消息处理钱,提交了位移未处理,但是此时宕机了,新的消费者负责此分区的提交,并且提交位移,处理这部分的位移,就很有可能出现了丢失

提交方式

a、自动提交

b、同步提交

?手动提交会增加消息重复的概率

?c、异步提交

(5)指定位移消费

?eg:

?运行

?

增加判断是否获取到了分区,如果没有 ,则继续获取

3、再均衡?

?一个消费者发生了变更,变更成了另一个消费者,叫做再均衡,一旦出现再均衡,进行的操作:(1)同步位移的尽量避免重复消费(2)拉取消息处理完(3)进行异步的提交

?4、拦截器

?

如上场景:给消息设置某一些属性,如果消息再既定的时间内无法到达,则做一些处理?

?

运行消费端和发送端,发送端时间的第二个参数,倒退了一分钟,分别运行消费端和发送端,同时发送两条消息,一条消息是正常的,倒退一分钟的没有展示?

?

四、主题

1、主题管理

?

(2)查看主题

?

(3)修改主题?

?(4)删除主题

eg:

标记为删除后重启一下kafka

结果:之前创建的heima主题被删除掉了?

2、 分区

增加分区:修改分区的时候只能增加分区数,不能减少

eg:

?3、总结

一般用于中台

eg:

?结果跑出异常:

?将上述代码位置分区改为四个(因为分区超过了三个)

?五、分区

1、副本机制

?

上述绿色的是副本

?2、leader选举

?leader选举:

主题中的源信息都是保存在zookeeper上面,当leader挂掉,其他flower回去获取leader角色,谁先获取到谁就是新leader

?

3、分区重新分配001:一般添加集群机器就是因为当前的集群机器满足不了业务的需求,所以需要进行添加,添加之后又不能立即分担工作,所以采用重新分配分区这种方式

?

eg:

?进入kafka01修改配置:brokerid需要唯一,不能重复

vim config.properties?

??修改log的地址

(2)同样的步骤修改第二和三个集群的节点?

(3)分别启动三个节点

?

?因为之前是把目录拷贝了一下,所以需要,把之前的log文件删除,再重新启动

?抛异常:配置文件里面集群里面要写成localhost不写成ip,修改之后,重新启动即可

?相当于一台机器,挂了三个集群,所以端口号应该不同,修改三个集群的端口号分别为不同的再重新启动

?目前为止就搭建了一个kafka集群,一台主机上启动了三个实例

?4、分区重新分配002

(1)结合上面的集群创建主题

(2)查看主题的详细信息 :三个分区,三个副本,三个分区分配在了三个节点中,实现了负载均衡

(3)给主题添加分区

(4)查看详情变成了四个分区:节点2承担的压力更大,负载不均衡

?(5)再启用一个终端,进入kafka,目录,复制kafka01到kafka04,在增加一个节点,修改配置信息,如之前所讲;然后清理log文件,启动kafka04的集群

(6)回到之前的终端再次查看详情,和之前没有变化;说明新增加的节点不参与数据的保存,新增加的节点只有当新创建的主题时才能承担数据,所以需要做重新分配,把新增加的节点重新分配到broker 1-4节点上

?(7)重新分配

?

?

?eg:

放到kafka01下

(1)新建一个json文件

?(2)json串,指定接下来要对哪个主题进行重新分配

?

?(3)生成可行性方案,并没有从新执行分配的动作

执行结束之后会生成两个json串,表示当前执行的计划会保存到哪个json串中?

?(4)把生成的方案重新保存到一个文件中

?

?(5)执行分配策略

?(6)重新分配需要时间,查看当前分配进度

?(7)重新查看分区,4个分区均匀的分配到了4个节点中,实现了负载均衡

5、分区分配策略 ,了解即可

?

?

?

?六、kafka存储

1、存储结构

?2、日志索引?

?

?

3、日志清理、总结

?

?

七、稳定性

1、幂等性

?2、事务?

eg:

?

打开之前的一个消费端,运行,再运行上述发送端,报错:

?端口号有问题,加上端口号

?结果:

?

都是message1,因为trycatch中写的不正确?

?

修改后:

?

模拟异常的情况

?结果:

因为抛出异常回滚了,所以消费端一条消息都没有

3、控制器?

4、可靠性保证

?

?

5、一致性保证?

?

数据丢失场景:

?

?

6、总结

?

?

?

八、高级

1、消费组管理

2、connect-文件系统

?

3、springboot+kafka

?

eg:

消息的发送方:

没有@PathVariable,最后的展示会发现后面的值为null,而不是kafka

运行展示:

消息的消费者?

?????

配置文件中增加consumer

运行展示:

(2)springboot中使用事务操作

a、添加配置文件?

?b、生产者

运行展示(说明发挥了事务的特性)

?输入为error时,抛出了异常

?(3)事务的控制方式二:注解的方式

?运行展示:

?

说明上述方式也实现了事务的控制?

总结:

?

?

4、spark+kafka

spak和kafka的整合

?启动spark客户端(运行上述代码),启动kafka服务端发送内容

spark客户端会收到那几条消息?

?九、集群管理

1、集群概述

?

?

2、集群搭建zookeeper

kafka集群依赖于zookeeper集群

?

?

?

?

?

?

?eg:

?配置clientport

dataDir修改

?

?myid中的实例写为0

配置文件中增加配置接口

?

启动之前把其他几个节点拷贝过去,重复01的操作

?分别去每个实例下启动验证启动状态:

同一台主机上搭建多个集群和多台主机搭建多个集群,不同的地方在于,同一个主机ip相同,端口号不同;多台机器的ip不同,但是端口号相同.

3、集群搭建kafka,之前搭建过,方法同

?依赖的zookeeper有三个实例,所以需要更换

?

?

启动zookeeper实例

,目前三个实例已经启动?

?启动kafka实例

打开另一个终端查看,发现kafka已经启动

kafka03和03方法同01的即可?

4、集群同步总结(了解即可)

十、监控

1、JMX

?eg:

启动之前的kafka节点

?打开zookeeper的管理工具,可以看到kafka集群的三个节点及对应的内容

2、编程获取指标

找jdk目录下的jconsole

打开jconsole并连接

?

查看kafka相关指标

(1)代码演示kafka监控指标?

?

?结果:

和下图一致(上述方法按照下图获取属性)

3、监控指标了解

?

?

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-19 08:02:49  更:2021-09-19 08:05:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 11:58:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码