Kafka(四).Kafka&JAVA 基础API
1.环境
使用java 来测试Kafka API 运行环境基于Kafka(三)的搭建的集群环境;
测试电脑(windows) 需要配置host
192.168.141.131 CentOSA
192.168.141.132 CentOSB
192.168.141.133 CentOSC
maven配置
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
2. 查看&创建&删除&查看 topic
public class KafkaTopicDML {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
KafkaAdminClient adminClient = (KafkaAdminClient)KafkaAdminClient.create(properties);
ListTopicsResult listTopics = adminClient.listTopics();
Set<String> names = listTopics.names().get();
for (String name : names) {
System.out.println("topic 为:"+ name);
}
adminClient.createTopics(Arrays.asList(new NewTopic("topic03",3,(short)3)));
adminClient.deleteTopics(Arrays.asList("topic05", "topic06"));
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(names);
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
System.out.println(stringTopicDescriptionMap);
adminClient.close();
}
}
3. 生产者& 消费者
public class KafkaProducerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String,String> kafkaProducer = new KafkaProducer (properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer (properties);
kafkaConsumer.subscribe(Pattern.compile("^topic02.*"));
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
if(!records.isEmpty()){
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord<String, String> next = iterator.next();
System.out.println(next);
}
}
}
}
}
测试验证 不指定分区的情况下
###打开一个消费者 消费了三个分区 S1
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-0, topic02-1, topic02-2])
###再打开一个消费者 S2 ====>S1的一个分区被同组另一个消费者S2获取了
##S1 log
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Finished assignment for group at generation 8: {consumer-g2-1-58d640e4-0269-4c1c-a212-5f21bc761cca=Assignment(partitions=[topic02-0, topic02-1]), consumer-g2-1-81b66c7f-6c84-4095-809c-ce7afb4100a9=Assignment(partitions=[topic02-2])}
=======================================================================
##S2 log
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-2])
####再打开一个消费者 S3 ====>====>S1的一个分区被同组另一个消费者S3获取了
##s1 log
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Finished assignment for group at generation 9: {consumer-g2-1-27f9cab4-cacc-425c-81c8-6e9fc141580c=Assignment(partitions=[topic02-0]), consumer-g2-1-58d640e4-0269-4c1c-a212-5f21bc761cca=Assignment(partitions=[topic02-1]), consumer-g2-1-81b66c7f-6c84-4095-809c-ce7afb4100a9=Assignment(partitions=[topic02-2])}
###S3 log
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-0])
=======================================================================
###再打开一个消费者 S4 ====>已经没有分区在分给新增的同组消费者了
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[])
=======================================================================
##关闭S2 候补S4 接替S2监听的分区
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[])
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Adding newly assigned partitions:
INFO - AbstractCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Attempt to heartbeat failed since group is rebalancing
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Revoke previously assigned partitions
INFO - AbstractCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] (Re-)joining group
INFO - AbstractCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Successfully joined group with generation Generation{generationId=11, memberId='consumer-g2-1-8f55c39e-9bf4-45a5-a35b-42ab9eb64d4f', protocol='range'}
INFO - AbstractCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Successfully synced group in generation Generation{generationId=11, memberId='consumer-g2-1-8f55c39e-9bf4-45a5-a35b-42ab9eb64d4f', protocol='range'}
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-2])
INFO - ConsumerCoordinator - [Consumer clientId=consumer-g2-1, groupId=g2] Adding newly assigned partitions: topic02-2
得出:消费者组内消费者每添加一个,消费者组内部会有一个简单的负载均衡机制,当消费者组成员数目大于分区数的时候就没有分区可以分配了,会进行候补;直到其中一个消费者宕机,最后新增的才可以替补宕机消费者的分区;
当开启三个消费者S1 S2 S3时 开启生产者生产10个记录
=======================================================================
##S1
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 7, CreateTime = 1632753328402, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key0, value = value0)
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 8, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key3, value = value3)
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 9, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key5, value = value5)
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 10, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key8, value = value8)
=======================================================================
##S2
ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 5, CreateTime = 1632753328415, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key1, value = value1)
ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 6, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key2, value = value2)
ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 7, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key9, value = value9)
=======================================================================
##S3
ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key4, value = value4)
ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key6, value = value6)
ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key7, value = value7)
得出:消费者消费的数据实在分区内有序 在分区之间比较是没有顺序的
4.指定消费那个分区
可以指定消费开始的偏移量 失去消费组的特性,消费者实例之间没有任何关系了
public class KafkaConsumerDemo_1 {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer (properties);
List<TopicPartition> topics = Arrays.asList(new TopicPartition("topic02", 0));
kafkaConsumer.assign(topics);
kafkaConsumer.seek(new TopicPartition("topic02", 0),3);
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
if(!records.isEmpty()){
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord<String, String> next = iterator.next();
System.out.println(next);
}
}
}
}
}
4.1负载均衡
当启动三个消费者是 发送30 条数据 得到的log ===>默认的不是轮训而是hash
key4
key6
key7
key10
key13
key14
key23
key24
key25
key26
key29
key1
key2
key9
key11
key12
key19
key20
key21
key22
key0
key3
key5
key8
key15
key16
key17
key18
key27
key28
4.2编写简单轮训
可以通过实现Partitioner 接口 并配置到KafkaProducer 中 来实现指定分区发送数据
public class KafkaProducerDemo_2 {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
public class MyPartitioner implements Partitioner {
public MyPartitioner() {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int size = cluster.topics().size();
Integer key1 = null;
try {
key1 = Integer.valueOf(key.toString().replace("key", ""));
} catch (NumberFormatException e) {
return 1;
}
return key1%3;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
5.发送对象序列化
可以使用现成的序列化包
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
配置对象序列化
public class ObjectSerializer implements Serializer {
@Override
public byte[] serialize(String topic, Object data) {
return SerializationUtils.serialize((Serializable) data);
}
}
配置对象反序列化
public class ObjectDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String topic, byte[] data) {
return SerializationUtils.deserialize(data);
}
}
测试对象
public class DemoObj implements Serializable {
private Integer f1;
private String f2;
private Date f3;
}
开始测试
生产者
public class KafkaProducerObjectDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ObjectSerializer.class.getName());
KafkaProducer<String,DemoObj> kafkaProducer = new KafkaProducer (properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, DemoObj> record = new ProducerRecord<>("topic04", "key" + i,new DemoObj(i, UUID.randomUUID().toString(),new Date() ));
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
消费者
public class KafkaConsumerObjectDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");
KafkaConsumer<String,DemoObj> kafkaConsumer = new KafkaConsumer (properties);
kafkaConsumer.subscribe(Arrays.asList("topic04"));
while (true){
ConsumerRecords<String, DemoObj> records = kafkaConsumer.poll(Duration.ofSeconds(1));
if(!records.isEmpty()){
Iterator<ConsumerRecord<String, DemoObj>> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord<String, DemoObj> next = iterator.next();
System.out.println(next.value());
}
}
}
}
}
测试结果 顺利反序列化
DemoObj{f1=0, f2='22c16c50-f703-4b78-ba1e-8fb2f58d9801', f3=Tue Sep 28 23:06:38 CST 2021}
DemoObj{f1=3, f2='abd6089a-3ac8-4952-b184-d09f94336893', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=5, f2='4732aafa-8a24-42b1-941d-b5efd72395e9', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=8, f2='08a56491-f238-4bba-afa5-008a39577756', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=4, f2='7bf2cb8b-e898-4581-b3ea-fecda46ba899', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=6, f2='0c14cf73-96e3-4f89-999b-5662e86900bf', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=7, f2='aa88582b-59ef-449c-a86d-c147aa1815e7', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=1, f2='b94e35a0-69ce-4190-b111-b9a65e9d60ed', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=2, f2='57c54244-9522-4689-97b3-411bd2fc587c', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=9, f2='6a3c5526-c015-425b-bdda-e242b1fda780', f3=Tue Sep 28 23:06:39 CST 2021}
6.自定义拦截器
可以对数据的发送 做一些拦截处理 ,比如发送失败处理
定义拦截器
public class KafkaProducerInterceptor implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord record) {
return new ProducerRecord(record.topic(),record.key(),record.value()+"sffffffffff") ;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println(metadata);
System.out.println(exception);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
生产者配置
public class KafkaProducerObjectInterceptorDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaProducerInterceptor.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer (properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic07", "key" + i,"value"+i);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
消费者不改动;
消费者日志
//对应上面拦截器对 ProducerRecord 的value 重新包装
//打印 消息的 value
value0sffffffffff
value3sffffffffff
value5sffffffffff
value8sffffffffff
value1sffffffffff
value2sffffffffff
value9sffffffffff
生产者日志
//对应上面拦截器对 onAcknowledgement 方法对发送后的调用
topic07-0@10
null
topic07-0@11
null
topic07-0@12
null
|