通过命令行工具连接服务器本地操作(kafka-console-producer.sh和kafka-console-consumer.sh)是能够相互通信的,producer发布的信息consumer能够接收到。
但是 java通过kafka-client的API写的代码始终不能跟kafka通信:java producer的消息发不出去, java comsumer也收不到任何消息。 仔细检查了下代码中IP、端口都没有写错。 服务器的防火墙也是关闭的。
解决办法
将kafka/config/server.properties文件中advertised.listeners改为如下属性。 192.168.32.XXX是我虚拟机的IP。
advertised.listeners=PLAINTEXT://192.168.32.XXX:9092
修改后消费者的命令监控的ip也要从loacalhost改为192.168.32.XXX
kafka-console-consumer.sh --bootstrap-server 192.168.32.XXX:9092 --topic topic_1 --from-beginning
不然也无法接收到消息`在
{
Map<String,Object> configs = new HashMap<>();
configs.put("bootstrap.servers","192.168.32.129:9092");
configs.put("key.serializer", IntegerSerializer.class);
configs.put("value.serializer", StringSerializer.class);
KafkaProducer<Integer,String> producer=new KafkaProducer<Integer, String>(configs);
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("biz.name","producer.demo".getBytes()));
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"topic_1",
0,
0,
"message 0",
headers
);
final Future<RecordMetadata> future = producer.send(record);
final RecordMetadata metadata = future.get();
System.out.println("主题"+ metadata.topic());
System.out.println("分区" + metadata.partition());
System.out.println("偏移量" + metadata.offset());
producer.close();
}
成功后
idea返回结果
消费者接受发了两次message 0
|