kakfa的基准测试
-
- 创建一个topic : 在实际上生产中, 可以创建多个 拥有不同数量的分片和副本topic
./kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181
--topic test02 --partitions 3 --replication-factor 1
-
- 测试写入效率:
./kafka-producer-perf-test.sh --topic test02 --num-records 5000000
--throughput -1 --record-size 1000 --producer-props
bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
属性说明:
--num-records : 测试消息的条数
--throughput : 是否需要限流 -1 不指定
--record-size : 每条数据的字节大小
acks : 消息确认方案

 总结
前提: 假设broker数量是无限的
1) 当topic分片的数量越多, 读写效率越高
2) topic的副本数量越多, 对读写效率影响越大
kafka的javaAPI操作
使用java API 将数据生产到 kafka 代码实现:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
// 模拟kafka生产者:
public class KafkaProducerTest {
public static void main(String[] args) {
//1. 创建kafka生产者的核心类对象: KafkaProducer
//1.1: 创建生产者配置对象: 设置相关配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all"); // 消息的确认方案
// key序列化类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化类型
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
//2. 发送数据
for (int i = 0; i < 10; i++) {
//2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01", Integer.toString(i));
producer.send(producerRecord);
}
//3. 释放资源
producer.close();
}
}
使用javaAPI 消费kafka中数据 代码实现:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
// 模拟消费者JAVA API:
public class KafkaConsumerTest {
public static void main(String[] args) {
//1. 创建kafka的消费者的核心对象: KafkaConsumer
//1.1: 创建消费者配置对象, 并设置相关的参数:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "test"); // 消费者组的 id
props.setProperty("enable.auto.commit", "true"); // 是否启动消费者自动提交消费偏移量
props.setProperty("auto.commit.interval.ms", "1000"); // 每间隔多长时间提交一次偏移量: 单位 毫秒
// key 反序列化
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// val 发序列化
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2. 给消费者设置订阅topic:
consumer.subscribe(Arrays.asList("test01"));
//3. 循环获取相关的消息数据
while (true) {
//3.1: 从kafka中获取消息数据: 参数表示等待超时时间
// 注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象,
//一个对象就是一条消息
for (ConsumerRecord<String, String> record : records) {
String massage = record.value();
System.out.println("消息数据为:"+massage);
}
}
}
}
kafka的原理
kafka的分片与副本机制
- topic的分片:
- 描述: topic是一个逻辑架构, 理解为大大的容器, 而分片对这个大容器切割操作, 将其划为多个小的容器, 分别放置在不同的broker中, 进行分布式存储操作 , 分片数量与broker数量没有关系的
- 作用:
- 提高读写的效率, 或者说可以提升承受并发量
- 解决单台节点存储容量有限的问题
- topic的副本:
- 描述: 对topic中每一个分片都可以构建多个副本, 副本的数量最多和broker节点数量是相等, 一般副本为 1~3
- 作用:
- 提升数据的可靠性, 保证数据不丢失
- 会导致占用更多的磁盘空间, 冗余较大
kafka如何保证数据不丢失
生产端如何保证数据不丢失的  生产者是基于ack方案, 确保数据不丢失
生产端是如何保证数据不丢失: ack 确认机制
0 : 当ACK确认级别设置为0时
生产者只管向broker发送数据, 并不去接收或者等待broker响应确认消息
1: 当ACK确认级别设置为1时
生产者向broker发送消息, 需要等待对应topic的对应分片上主副本接收到消息后, 生产者认为消息发送成功了
-1(ALL): 当ACK确认级别设置为-1(ALL)时
生产者向broker发送消息, 需要等待对应topic的对应分片上所有的副本都接收到数据后, 生产者认为数据发送成功了
效率角度: 0 > 1 > -1
安全角度: -1 > 1 > 0
思考: 在实际生产中, 一般设置ack为多少呢? 三种都需要设置
一般会根据数据的重要程度, 以及数据发生的频率确定合适的ack方案如何设置生产者ack方案呢? props.put("acks", "all");
相关面试题:
生产者数据不丢失相关的面试题:
1) 生产者发送一条数据, broker需要给予ack响应, 如果broker迟迟没有给与ack响应, 如何解决呢?
解决方案:
设置超时时间, 如果过了超时时间, 依然没有给与响应, 可以尝试进行重试策略(一般重试3次), 如果依然没有响应呢,
程序直接抛出异常, 通知相关人员进行处理
2) 生产者发送一次消息, 就需要一次ack响应, 请问这样是否会对宽带带宽造成更大的影响呢? 如何解决呢
解决方案: 会产生影响
引入缓存池, 采用异步批量发送数据操作, 一批一批发送, 当缓存池中数据达到一批后, 就会触发执行发送操作
3) 采用一批一批发送操作 如果broker又没有给与响应, 此时缓存池中数据以及满了, 如何解决呢?
解决方案:
程序员可以选择直接清空缓存池或者不清空, 将缓存池中数据存储在临时的容器中, 然后程序抛出异常 通知相关的人员,
重启后, 先加载临时容器中数据重新发送即可
扩展配置信息:
扩展相关的配置信息:
1) 超时时间:
delivery.timeout.ms : 总超时时间 默认 120s
request.timeout.ms : 每次超时时间 默认 30s
2) 重试次数: retries 默认值 2147483647
最终重试次数, 是由 总超时时间和每次超时时间计算得出
3) 一批数据:
数据大小: batch.size 默认值为 16kb
时间阈值(间隔时间): linger.ms 默认值为 0
4) 缓冲池大小: buffer.memory 默认值为: 32M
5) 如何进行同步和异步发送操作
所有的配置参数, 全部都需要设置到生产者的properties对象中
props.put(key,value)
生产者所有配置:
http://kafka.apache.org/24/documentation.html#producerconfigs
实现同步发送消息:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
// 模拟kafka生产者_异步发送方式 (同步发送数据)
public class KafkaProducerTest02 {
public static void main(String[] args) {
//1. 创建kafka生产者的核心类对象: KafkaProducer
//1.1: 创建生产者配置对象: 设置相关配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all"); // 消息的确认方案
// key序列化类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化类型
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
//2. 发送数据
for (int i = 0; i < 10; i++) {
//2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01", Integer.toString(i));
try {
producer.send(producerRecord).get(); // 如果没有抛出异常, 说明消息发送成功了
} catch (Exception e) {
// 如果捕获到了异常 认为消息发送失败了(重试后的失败)
// 在此处, 编写异常后处理业务代码....
e.printStackTrace();
}
}
//3. 释放资源
producer.close();
}
}
实现异步发送消息:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
// 模拟kafka生产者_异步发送方式 (异步有返回值) public class KafkaProducerTest03 {
public static void main(String[] args) {
//1. 创建kafka生产者的核心类对象: KafkaProducer
//1.1: 创建生产者配置对象: 设置相关配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all"); // 消息的确认方案
// key序列化类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化类型
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
//2. 发送数据
for (int i = 0; i < 10; i++) {
//2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01", Integer.toString(i));
producer.send(producerRecord, new Callback() {
// 回调函数: 由于采用异步发送操作, 当触发send方法时候, 并没有直接将消息发送到broker端, 而是将消息
// 存储到缓存池中即可, send方法相当于不断向缓冲池写入数据
// 当缓存池子中数据达到一批数据大小后, 会专门有子线程进行数据发送broker端: 一批一批发送操作
// 发送完一批数据后, 就会调用一次这个回调函数
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
// 认为数据发送失败了...
// 编写数据发生失败的业务代码
}
}
});
}
//3. 释放资源
producer.close();
}
}
broker端如何保证数据不丢失
单独通过副本机制, 是否可以保证不丢失呢? 不行的, 因为 如何 生产者设置ack为 0或在 1的时候, 副本再多可能都没用
一般保证不丢失: 多副本 + ack为 -1(ALL)
消费端如何保证数据不丢失 
不丢失机制流程:
1) consumer连接kakfa集群, 开始读取数据进行消费, 首先kafka集群会根据传递过来group.id 查询上一次
消费到哪个偏移量, 如果没有找到, 默认从当前位置开始消费, 如果找到了, 从上一次位置开始消费
2) 消费者接收到消息后,开始消费数据即可
3) 当消费者消费完成后, 将当前消费偏移量提交给kaka集群, 集群更新一下当前这个消费者组消费的偏移量的位置信息
总结: 通过此种模型, 可以确保数据不会丢失, 但是会导致数据重复消费问题
那么偏移量信息是记录在那个位置呢?
在老版本(0.8x以前)的kafka中, 偏移量信息是保持在zookeeper中, 但是在0.8x后, 偏移量数据保存到broker端,
通过一个topic来存储: __consumer_offsets
此topic有50个分区, 每个分区有一个副本
消费者偏移量的提交方式:
自动提交偏移量 (推荐使用)
手动提交偏移量
手动提交偏移量 :
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
// 模拟消费者JAVA API_ 手动提交偏移量
public class KafkaConsumerTest02 {
public static void main(String[] args) {
//1. 创建kafka的消费者的核心对象: KafkaConsumer
//1.1: 创建消费者配置对象, 并设置相关的参数:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "test"); // 消费者组的 id
props.setProperty("enable.auto.commit", "false"); // 是否启动消费者自动提交消费偏移量
//props.setProperty("auto.commit.interval.ms", "1000"); // 每间隔多长时间提交一次偏移量: 单位 毫秒
// key 反序列化
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// val 发序列化
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2. 给消费者设置订阅topic:
consumer.subscribe(Arrays.asList("test01"));
//3. 循环获取相关的消息数据
while (true) {
//3.1: 从kafka中获取消息数据: 参数表示等待超时时间
// 注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象,
// 一个对象就是一条消息
for (ConsumerRecord<String, String> record : records) {
String massage = record.value();
System.out.println("消息数据为:"+massage);
consumer.commitSync(); // 同步提交偏移量
}
}
}
}
kafka的消息存储和查询机制
kafka的消息存储 
如何修改默认168小时呢? 修改server.properties
log.retention.hours=168
如何修改默认1GB log文件大小呢? 修改server.properties
log.segment.bytes=1073741824
kafka的数据查询机制  需求: 假设目前某个分片下有如下的目录结构, 请查询 368776这个偏移量消息:
1) 确定消息在那个segment段中: 在第二个segment段中
2) 根据消息的偏移量到index文件中, 寻找对应消息在log文件的物理偏移范围
3) 读取log文件, 采用顺序查询方式, 找到对应范围下的数据, 直接获取
磁盘: 顺序读写 和 随机读写: 这两种对磁盘的读写操作, 那个效率更高一些? 顺序读写
kafka中生产者的数据分发策略
生产者的数据分发策略:
当生产者将数据生产到对应topic中后, 那么这条数据最终被topic中那个分片所接收, 这就是分发机制
思考: 我们能想到分发策略有那些呢?
1) 轮询
2) hash取模方案
3) 指定分片方式
4) 基于范围分发
那么kafka采用哪种分发策略呢?
1) 轮询(在新版本中:2.4以上 更改为 粘性分区方案)
2) hash取模方案
3) 指定分片方式
4) 自定义分发策略
如何实现分发策略呢?
分区类: DefaultPartitioner (默认分区类)
生产者数据承载对象: ProducerRecord
如何模拟指定分区方案: 与分区类没有任何关系, 不需要采用分区类进行分发
// 参数2: 表示指定往哪个分片上发送数据: 分片编号是0开始
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
如何模拟hash取模方案:
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
此时会基于 DefaultPartitioner 根据key计算发送到那个分片上
// 参数1: topic名称 参数2: key值 参数3: key值字节数据 参数4: value值, 参数5; value字段数组 参数6: 集群对象
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) { // 判断key是否为null
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; // 当key不为null, 基于hash取模计算操作
}
注意: 采用hash取模方式, 一定要确保key是可变的, 否则会出现所有的数据发往同一个分片情况
好处: 相同key 可以发往同一个分片上
如何模拟粘性分区方案:
public ProducerRecord(String topic, V value) {
this(topic, null, null, key, value, null);
}
此时会基于 DefaultPartitioner 根据key计算发送到那个分片上
// 参数1: topic名称 参数2: key值 参数3: key值字节数据 参数4: value值, 参数5; value字段数组 参数6: 集群对象
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) { // 判断key是否为null
return stickyPartitionCache.partition(topic, cluster); // 粘性分区操作
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
粘性分区: 粘性分区是kafka在2.4版本以上新推出一种分发策略, 主要是为了替代轮询方案
描述: 在进行分发数据时候, 首先会先随机选择某一个分片, 然后尽可能黏上这个分区,
将这一批数据全部写入到这个分区上即可, 每次写入一批数据, 都会先随机选择一个分片
轮询: 三个分片 0 1 2
一批数据: 0 1 2 3 4 5 6 7 8 9
0分片: 0 3 6 9
1分片: 1 4 7
2分片: 2 5 8
轮询方式在发送一批数据到broker端, 对数据根据分片数量拆分多个小的批次,
一个批次对应一个分片, 发送到对应分片即可, 此种操作由于需要再次进行划分批次, 导致整个效率相对较低
如何自定义分区策略: 抄 抄DefaultPartitioner
1) 创建一个类, 实现 Partitioner 接口
2) 重写接口下的方法:
int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster); -- 设置分区的方法, 返回值表示要分到那个区
void close(); -- 释放资源
3) 在 partition方法中自定义分区策略, 返回值为对应要分发的分区编号
4) 在生产者的properties对象中, 设置自定义分区类:
key: partitioner.class
value值:
默认值为: org.apache.kafka.clients.producer.internals.DefaultPartitioner
修改为自定义类的 权限类名
kafka的消费者负载均衡的机制—消息积压

负载均衡机制规定:
在一个消费者组内, 消费者数量最多和所监听的topic的分片数量相等, 如果消费者数量大于分片数量,
必然会有某些消费者无法消费数据, 处于闲置状态
思考点:
请问 如何模拟 点对点发送消息模型?
让所有监听这个topic的消费者都在同一个组中即可
请问 如何模拟 发布订阅发送消息模型?
让所有监听这个topic的消费者都在不同组中即可
|