1.定义(作用)
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
2.消息队列引入
-
使用消息队列的好处
-
解耦 -
可恢复性 当系统的一部分组件失效时,不会影响到整个系统,例如处理消息的某一进程挂掉时,因为有一个队列可以存储消息,所以系统还可以运行,只需要将消息放在队列中等待处理就可以了 -
缓冲 有助于调节前后端处理速度不同 -
灵活性&峰值处理能力 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 -
异步通信 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。 -
两种消息队列模式
- 点对点
一对一,customer主动拉取数据,消息收到后就删除 - 发布/订阅者模式
producer将消息发布到topic中,以供多个订阅者消费 而且消息在被消费后不会删除,会保留一定的时间后在清理。
3.基础架构
-
**producer:**消息生成者,就是向 kafka broker发送消息的客户端 -
consumer:消息消费者,向kafka broker取消息的客户端 -
consumer group: 消费者组,一个消费者组相当于一个订阅者 消费者组内的各个consumer相当于一起消费消息(用一点少一点)。一般情况下,consumer都是以一个partition为单位分的,(就相当于,consumer1消费partition0,同一组的其他consumer就不能消费partition0了,除非consumer1挂了) 所有consumer都有对应的消费者组 注:在一个消费者组内,消费者是相当于竞争关系,消费一点资源就消失一点资源 -
broker: 一台Kafka就是一个broker,一个集群有多个broker组成,一个broker可以有多个topic. -
topic: 可以理解为一个队列, 生产者和消费者面向的都是一个topic 注:这是逻辑上的划分,物理上的基本单位是partition -
partition:为了实现扩展性,一个非常大的topic可以分布到多个broker上(类似文件的切分)一个topic可以分为多个partition,每个partition都是有序队列 -
replica:副本: 为了提供数据的可靠性,有副本策略(类似hdfs中的副本机制),提供topic的每个partition都有n个副本,有一个是leader,其他是follower -
leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。(说白了,就是主要工作对象) -
follow:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader(备胎,再次选leader的策略kafka已经定义好了,不是选举或在争抢策略)
4.运行原理
1.工作流程及文件存储机制
- 工作流程
以两条信息为例
- produce 生产消息a发送到topic中
- topic 根据分区策略将消息a放到特定的leader partition 中,
- 将消息追加写入partition的segment的log文件并维护该信息的 offset。
- 等 leader partition 以及 他的ISR中 follow partition 全部落盘完成后,给 produce 发送ack 确认报文,
- produce 发送下一条信息
- emmmm,还是说的不是很清楚,
- 文件存储机制
topic 是逻辑上的划分 partition 是 物理上的划分
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。
例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。(一个分区一个文件夹)
first-0
00000000000000000000.index 00000000000000000000.log
first-1
00000000000000170410.index 00000000000000170410.log
first-2
00000000000000239430.index 00000000000000239430.log
index和log文件以当前segment的第一条消息的offset命名。下图为index文件和log文件的结构示意图。 “.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。
2.生产者
1.分区策略(往哪放)
- 分区原因:
1. 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了; 2. 可以提高并发,因为可以以Partition为单位读写了。 - 分区原则:
分了3种情况
- 指明了partition,直接进入partition
- 没有指明partition,但指明了key值
将key取hash值再对partitionNumber取余就是对应的partition了 - 既没有指明partition又没有key,那么kafka会采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用.(就是保证一定数量的消息进入同一partition,比轮询好)
2.数据可靠性保证
通过发送ack来确保数据没丢失
2.1ack报文确认机制
- 发送数据的可靠性保证(通过ack确认)
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
2.2副本策略
有两种方案
方案 | 优点 | 缺点 |
---|
半数以上同步完成,就发ack | 延迟低 | 当有n台故障时需要2n+1副本才能工作 | 全部同步完成,发送ack | 当有n台故障时需要n+1副本才能工作 | 延迟高 |
kafka选择了第二种方案,原因:
- Kafka本身存储数据就多,如果副本过多的话,会造成大量数据冗余
- Kafka的优势就在于读写性能块,所以网络延迟影响较低
1.ISR含义
采用了第二种方案,带来了这样一个问题,当集群中有一台机器发生故障时,无法完成同步,那么leader就要一致等待下去,直到它完成同步才能发送ack。 解决方案: Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。
2.ack应答级别
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。 所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。 ack参数配置: acks: 0:延迟最低,不管leader有没有完成数据落盘,生产者一直发送消息。当leader故障,会丢失数据 1:当leader完成数据落盘,返回ack,但当follower同步完成之前leader故障,会丢失数据 -1(all):只有等到所有的节点落盘数据成功后,返回ack,但当所有的节点落盘数据成功后,发生故障由于未返回ack,所以会造成数据重复
3.leader与follower故障处理
引进概念 LEO 与 HW
LEO:指的是每个副本最大的offset; HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。
(1)follower故障 follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。 (2)leader故障 leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
3.3种语义
为了解决数据的重复或丢失
1.At least once
不丢失但会重复
服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义
2.At most once
不重复但会丢失
将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。
3.Exactly once(重点)
不丢失也不重复
At Least Once + 幂等性 = Exactly Once 要启用幂等性,只需要将Producer的参数中enable.idempotence设置为true即可
原理:开启幂等性后,Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence number。而Broker端会对 <PID, Partition, SeqNumber> 做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。 但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
解释:其实就是为每一条信息做一个全局ID,然后当要写到partition前,先检查broker是否有记录这条信息,没有则写入,并当写入成功并发送ack确认后,将这条信息录入缓存.
同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。 这句话,我不理解
3.消费者
0.消费规则
一个消费者组相当于一个订阅者,每个订阅者可以消费相同的内容,互不冲突,但同一消费者组内的消费者消费内容却不可以一样,是竞争关系
1.消费模式(拉与推的选择)
- push:用考虑到每一个订阅者的消费速度,这样传输速度就会因为一个订阅者导致整体的速度降低了
- pull:消费速度取决于订阅者,但是如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
注:flume 种的sink也是采取 pull 模式
2. 分区分配策略
前言:一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
解决同一个消费者组的消费问题 同一个消费者组的消费者属于竞争关系
- RoundRobin(轮询)
以分区为单位一个一个轮着来分 - range
按范围来分(n份之一是一个订阅者的分区是连续的)
3.offset的维护
记录消费位置
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。 Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。
4.消费者组案例
4.高效读写数据原因
前言:顺序写磁盘 (省去了磁头移动的时间) Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
Kafka数据持久化是直接持久化到Pagecache中,这样会产生以下几个好处:
- I/O Scheduler 会将连续的小块写组装成大块的物理写从而提高性能
- I/O Scheduler 会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
- 充分利用所有空闲内存(非 JVM 内存)。如果使用应用层 Cache(即 JVM 堆内存),会增加 GC 负担
- 读操作可直接在 Page Cache 内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过 Page Cache)交换数据
- 如果进程重启,JVM 内的 Cache 会失效,但 Page Cache 仍然可用
尽管持久化到Pagecache上可能会造成宕机丢失数据的情况,但这可以被Kafka的Replication机制解决。如果为了保证这种情况下数据不丢失而强制将 Page Cache 中的数据 Flush 到磁盘,反而会降低性能。
3)零复制技术(了解)
其实我也没看懂是啥
5.zookeeper在kafka中的作用
概念区分
borker 的 Master 是叫 Controller partition的 Master 是叫 Leader
- zookeeper作用
选出kafka集群中的其中一个broker作为controller负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作(争抢选举) - controller作用
负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
6.事务
Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么`全部成功,要么全部失败。
1.producer 事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。 为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
2.consumer 事务(待完成)
5.shell编程
都要先开启zookeeper和kafka
1.topic
这里使用连接方式为 broker连接,zookeeper已经过时了
- 帮助命令
kafka-topics.sh --help - 查看所有topic
kafka-topics.sh --bootstrap-server hadoop102:9092 --list - 查看一个topic详细情况
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic topic_name - 创建topic
kafka-kafka-topics.sh --bootstrap-server hadoop102:9092 –create --topic topic_name --replication-factor 3 --partitions 2
注: –topic: 指定topic名称 –replication-factor : 副本数 –partitions:分区数 一定要指定 5. 修改topic
partition分区 自能增加不能减少 副本数可以调节
-
修改分区数(只能增) kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic hello --partitions 3 -
修改副本数(有点麻烦) -
删除topic kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
2.producer
kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
注:生产数据 连接时 --broker-list
3.consumer
-
消费数据(在开始之前的数据无法消费) kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first -
从头开始消费数据 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --from-beginning -
指定消费者组 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first –group group_name
注:group_name 任意取名 默认组id名称在 ./config/consumer.properties查看
5.API使用
API使用实现
6.kafka与flume对接
须知:source 一般命名为 r1,r2
channel 一般命名为 c1,c2
sink 一般命名为 k1,k2
1. kafka source
分析: source channel sink
kafka memory logger
配置文件:kafka-memory-logger.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = flume
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/review/kafka-memory-logger.conf -n a1 -Dflume.root.logger=INFO,console
注:flume-ng agent 启动一个事务
-c flume的配置文件路径
-f 自己写的配置文件
-n 事务名称
-D.. 只是把消息打印到控制台,可有可无
2. kafka sink
分析: source channel sink
netcat memory Kafka
配置文件:netcat-memory-kafka.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.useFlumeEventFormat = true
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/review/netcat-memory-kafka.conf -n a1 -Dflume.root.logger=INFO,console
注:nc ip(地址) port(端口号) 向指定机器的端口号发送信息
3. kafka sink 在source 端 设立拦截链 实现给不同 kafka 的topic 发送消息
分析: source channel sink
netcat memory kafka 外加 拦截链
配置文件:netcat-memory-kafka-interceptor.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = review.self_defiend.FlumeInterceptor$MyBuild
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = third
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = -1
a1.sinks.k1.useFlumeEventFormat = true
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/review/netcat-memory-kafka-interceptor.conf -n a1 -Dflume.root.logger=INFO,console
注:当上传jar包到flume的lib目录时,要把原来同一模块的jar删除,也就是要更新,不能保存,会有冲突.
4. kafka channel 因为Kafka 本身会将信息存储一段时间
分析: source channel sink
无 kafka logger
配置文件:xxx-kafka-logger.conf
a1.channels = c1
a1.sinks = k1
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = aa
a1.channels.c1.kafka.consumer.group.id = flume
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.auto.offset.reset = latest
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/review/xxx-kafka-logger.conf -n a1 -Dflume.root.logger=INFO,console
注: kafka channel 消费信息
5. kafka channel 因为Kafka 本身会将信息存储一段时间
分析: source channel sink
netcat kafka
配置文件:netcat-kafka-xxx.conf
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = first
a1.channels.c1.parseAsFlumeEvent = false
a1.sources.r1.channels = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/review/netcat-kafka-xxx.conf -n a1 -Dflume.root.logger=INFO,console
注:相当于 kafka channel 生产信息
- 在kafka当source时 设置拦截链
package review.self_defiend;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class FlumeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String message = new String(event.getBody(), StandardCharsets.UTF_8);
if (message.startsWith("aaa")){
headers.put("topic", "hello");
}else if (message.startsWith("bbb")){
headers.put("topic", "first");
}else {
headers.put("topic", "aa");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class MyBuild implements Builder{
@Override
public Interceptor build() {
return new FlumeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
|