Kafka开源代码阅读学习之旅(三) - 从一个DEMO入手
一、Producer-生产者
1.producer类
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else {
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
}
二、Producer核心流程
- ProducerInterceptors是一个拦截器,对发送的数据进行拦截处理
- Serializer对消息的key和value进行序列化
- 通过使用分区器作用在每一条消息上,实现数据分发进入到topic不同的分区之中
- RecordAccumulator缓存消息,实现批量发送
- Sender从RecordAccumulator中获取消息
- 构建ClientRequest对象
- 将ClientRequest交到NetWorkClient准备发送
- ClientRequest将请求放入到KafkaChannel的缓存
- 发送请求到kafka集群
- Sender线程接受服务端发送的响应
- 执行绑定的回调函数
三、Producer初始化
1.初始化KafkaProducer对象
private final KafkaProducer<Integer, String> producer;
producer = new KafkaProducer<>(props);
2.初始化KafkaProducer重要参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|