深入浅出理解kafka原理系列之:java实现kafka生产者
一、引入pom.xml依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>
二、java实现kafka生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyProducer {
private final static String TOPIC_NAME = "optics-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"debezium\" password=\"NGFlM2I1NTJlNmFk\";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
props.put("acks", "-1");
props.put("retries", 0);
props.put("batch.size", 10);
props.put("linger.ms", 10000);
props.put("buffer.memory", 10240);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 10; i++) {
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Integer.toString(i), "dd:" + i)).get();
System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + " | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset());
}
}
}
输出如下所示:
同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:6
同步方式发送消息结果:topic名称:optics-topic | partition分区:2 | offset偏移量:8
同步方式发送消息结果:topic名称:optics-topic | partition分区:2 | offset偏移量:9
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:6
同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:7
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:7
同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:8
同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:9
同步方式发送消息结果:topic名称:optics-topic | partition分区:2 | offset偏移量:10
三、发送消息到指定分区上
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 10; i++) {
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, 1,Integer.toString(i), "dd:" + i)).get();
System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + " | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset());
}
输出如下所示:
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:9
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:10
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:11
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:12
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:13
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:14
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:15
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:16
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:17
|