kafka
观察验证ISR弹性
下面是kafka 自己维护得offset
创建分区
kafka-topics.sh --zookeeper server1:2181,server2:2181/kafka --create --topic zjj-items --partitions 2 --replication-factor 3
查看分区
[root@localhost vmuser]# kafka-topics.sh --zookeeper server1:2181,server2:2181/kafka --describe --topic zjj-items
Topic: zjj-items PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: zjj-items Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: zjj-items Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
创建完成之后,可以在目录下查看到创建改分区结构
- index 是他得偏移量、
- log 是存入数据
- timeiindex 时间偏移量索引
从上面看出来 index 与timeindex 都是10M,是因为用了mmap得预分配
jps
lsof -Pnp 25647
格式话文件内容展示
[root@localhost zjj-items-0]# kafka-dump-log.sh --files 00000000000000000000.log
Dumping 00000000000000000000.log
Starting offset: 0
我们从里面打点数据
可以看到里面得偏移量与数据
ack=0 的时候,将node02,指向其他位置,不让其网络通讯
这个时候kafka producer依然可以正常发送数据
ack=1 的时候
producer依然可以正常发送数据,但是在 isr里面 2 已经被提出去了
ack=-1 的时候
producer 客户端会卡10秒左右正常发送数据,irs剔除node2,
注意啊 partition1 里面 isr为什么踢出去2,是因为 partition1 里面 1是leader,而partition0 里面 3是leader,3无法通信2 所以踢出去,但是 1可以正常通信2
时间戳索引,自定义offset偏移
自定义offset 本质是seek
Map<TopicPartition, Long> tts =new HashMap<>();
Set<TopicPartition> as = consumer.assignment();
while(as.size()==0){
consumer.poll(Duration.ofMillis(100));
as = consumer.assignment();
}
for (TopicPartition partition : as) {
tts.put(partition,1610629127300L);
}
Map<TopicPartition, OffsetAndTimestamp> offtime = consumer.offsetsForTimes(tts);
for (TopicPartition partition : as) {
OffsetAndTimestamp offsetAndTimestamp = offtime.get(partition);
long offset = offsetAndTimestamp.offset();
System.out.println(offset);
consumer.seek(partition,offset);
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
|