环境准备
- java环境
- kafka环境
- kafka-clients jar包
或者依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
Kafka API
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。  需要用到的类: KafkaProducer:需要创建一个生产者对象,用来发送数据 ProducerConfig:获取所需的一系列配置参数 ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
package com.huazai.zookeeper.example;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class CustomProducer {
private final static String TOPIC = "TEST_TOPIC";
private final static Integer COUNT = 100;
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.132:9092,192.168.64.132:9093,192.168.64.132:9094");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
CountDownLatch countDownLatch = new CountDownLatch(COUNT);
for (int i = 0; i < COUNT; i++) {
producer.send(new ProducerRecord<>(TOPIC, "TEST_TOPIC message:" + String.valueOf(i)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
try {
countDownLatch.wait();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
});
}
countDownLatch.await();
System.out.println("发送完毕!");
producer.close();
}
}
启动程序之前先执行消费者命令监控消息消费情况。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TEST_TOPIC
启动程序,客户端监控结果如下: 
遇到的问题
连接超时
 解决方案:
- 在kafka/config目录下的server.properties配置advertised.listeners或listeners的ip地址需与kafka所在主机的hostname保持一致)
 - 网络问题。检查/etc/hosts中的主机ip映射的hostname与配置的listeners中的hostname是否保持一致。

|