初识Kafka
消息
Kafka 的数据单元成为一个消息。一条消息可以看作数据库中的一个"数据行"或一条记录。消息由字节组成,其中消息有个可选的元数据,也就是键,当一个消息以可控的方式发送到不同的分区,就会用到键。
批次
为了提高效率,消息分批次写入kafka。批次 就是一组消息,这一组消息数据同一分区和主题。分批次传输会减少网络开销。不过需要在时间延迟和吞吐量做出权衡。
模式
对于kafka来说,一条消息无非是晦涩难懂的字节数组。因此为了更好的定义一个消息内容增加可读性,引入了 消息模式,通常使用 Apache Avro (一般为hadoop)的一款数据序列化框架,其中模式和数据是分开的。
数据格式的一致性,对于kafka很重要,消除了读写操作之间的耦合性。
主题与分区
Kafka的消息是以主题来分类的,一个主题相当于数据库中的一个表或文件系统的一个文件夹。一个主题可以分为若干个分区,一个分区就是一个提交日志,消息都是以追加写入的方式写入分区中,然后以先入先出的方式的顺序读取。
但无法保证在整个主题范围内保证消息的顺序,可以保证消息在每个分区中的顺序。
Kafka通过分区实现数据的冗余和伸缩性。分区可以分布在不同的节点上。
我们通常会使用 流 这个词来描述Kafka中的消息。
生产者和消费者
Kafka的客户端就是Kafka系统的用户,它们分为两种基本类型:生产者和消费者。
当然还有高级客户端API——用于数据集成的Kafka Connect API 和用于流式处理的 Kafka Streams。
生产者创建消息。默认情况下,一个消息发布到一个特定的主题上,然后把消息均衡地分布到主题的所有分区上,而并不关心特定的消息写入到哪个分区。如果生产者会把消息直接写入到指定分区上,通常是通过消息键生成散列值和分区器根据不同的业务规则来实现的。
消费者读取消息。消费者会订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者会通过检查消息的偏移量来区分已经读过的消息。偏移量是另一种元数据,在创建消息时,Kakfa会添加到消息里,在给定分区中,每个消息的偏移量是唯一的。消费者会把每个分区最后读到的消息偏移量保存到Zookeeper或Kafka上,如果消费者关闭或重启,它的读取消息不会改变。
消费者是消费者群组的一部分,也就是说,会有一个或多个消费者订阅读取同一个主题,群组保证每个分区只能被一个消费者读取。
broker和集群
一个独立的Kafka服务器被称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘上的消息。
broker是集群的组成部分。每个集群都有一个broker同时充当 集群控制器 的角色(通过集群的活跃成员中选举出来),一个分区可以分配给多个broker,这个时候会发生分区复制,这个复制机制为分区提供了消息冗余,如果有一个broker失效,其他broker会接管领导权。
保留消息
保留消息是Kafka的一个重要的特征,broker的默认保留策略:要么保留7天,要么保留消息到达一定的字节数(比如1GB),达到这些上限就会过期删除。当然可以通过配置参数,保留到不使用它为止。
多集群
使用多集群的原因:
Kafka提供了一个叫做 MirrorMaker 的工具,可以用它来实现集群间进行消息复制(默认只能单个集群)。
Kafka数据生态
安装与配置
安装 Java
安装 Zookeeper
Kafka使用Zookeeper保存集群元数据信息和消费者元数据(分区偏移量)。
Zookeeper 群组(Ensemble)
Zookeeper 集群被称为群组。使用的是一致性协议,建议每个群组应该包含奇数个节点(因为只有当群组里的大多数节点处于可用状态,Zookeeper才能处理外界的请求)
配置文件
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=192.168.10.102:2888:3888
server.2=192.168.10.103:2888:3888
server.3=192.168.10.104:2888:3888
initLimit:用于在从节点与主节点之间建立初始化连接的时间上限
syncLimit:用于从节点与主节点处于不同步的状态的时间上限
? 这两个值都应该是 tickTime 的倍数。
服务器地址遵循:server.X = hostname:peerPort:leaderPort 格式
X:服务器ID,不一定是从0开始,也不一定是连续
hostname:服务器IP地址
peerPort:用于节点间通信的TCP端口
leaderPort:用于首领选举的TCP端口
客户端只要通过 clientPort 就能连接到群组,而群组之间的节点的通信同时需要三个端口。
除了公共配置文件外,datadir目录下还应该创建叫做 myid 的文件来包含服务器ID。
安装 Kafka
启动kafka
$ /kafka/bin/kafka-server-start.sh -daemon /config/server.properties
测试:
创建一个测试主题:
$ /bin/kafka-topic.sh --create --zookeeper hadoop102:2181 --replication-factor 1 --partitions 1 --topic test
$/bin/kafka-topic.sh --create --zookeeper hadoop102:2181 --desc --topic test
发布消息:
$ /kafka/bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test
读取消息:
$ /kafka/bin/kafka-console-consumer.sh --broker-list hadoop102:9092 --topic test --from-beginning
相关重要参数:
-
broker.id -
zookeeper.connect -
log.dirs -
num.partitions -
log.retention.ms -
log.retention.bytes -
log.segment.ms -
log.segment.bytes -
message.max.bytes
硬件影响
生产者客户端的性能直接受到了服务器端 磁盘吞吐量 的影响。
需要多大的磁盘容量取决于需要保留的消息数量。
磁盘性能影响着生产者,而内存影响着消费者。
网络影响
网络吞吐量决定着Kafka能够处理的最大数据流量。
它和磁盘存储是制约Kafka扩展规模的主要因素。
不仅生产消费,如集群复制,镜像也会占用到网络流量。
Kafka集群
使用集群的最大好处是可以跨服务器进行负载均衡,再则就是可以使用复制功能来避免单点故障造成的数据丢失,可以确保为客户端提供高可用性。
那需要多少个broker
首先,考虑需要多少磁盘空间来保留数据,如果启用了数据复制,那么至少还要需要一倍的空间。
第二考虑的因素是集群处理请求的能力。这通常与网络接口处理客户端流量的能力有关,特别是高峰时段。因磁盘吞吐量低和系统内存不足造成了性能问题,可以通过扩展多个broker来解决。
共享Zookeeper
Kafka使用Zookeeper来保存broker、主题和分区的元数据信息。实际上,在大多数情况下,会让多个Kafka集群共享一个Zookeeper群组(每个集群使用一个chroot路径)。
在Kafka0.9版本之前,Zookeeper上也会保存着消费者群组、主题和消费分区的偏移量。到了0.9版本以后,Kafka引入了一个新的消费者接口,允许broker来直接维护这些信息。
|