前置条件
首先已经安装的Java环境,并且安装了zookeeper服务注册中心。
下载
https://kafka.apache.org// 当前最新版本3.1.0
安装
在windows解压后主要关注的是config文件下的server.properties文件,修改:
broker.id=0
log.dirs = ""
zookeeper.connect=localhost:2181
启动
bin/windows/kafka-server-start.bat ./config/server.properties
java jar包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
生产者
public class Producer {
public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
int i = 0;
try {
while (true) {
String msg = "Hello," + i;
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", msg);
kafkaProducer.send(record);
Thread.sleep(2000);
i++;
}
} finally {
kafkaProducer.close();
}
}
}
消费者
public class Consumer {
public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList("test"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ZERO.withSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s",
record.topic(), record.offset(), record.value()));
}
}
}
}
|