Spring-kafka 配置是以 spring.kafka 为前缀的配置信息进行注入的,按照这种形式定义的 kafka 配置信息,在项目启动时会自动读取并配置到 kafka 实例中。当然也可以在配置文件中自定义配置的名称,对应的则需要手动封装配置信息的映射表,并创建相应的 Factory 和 KafkaTemplate 容器。
添加启动 kafka 需要的基本配置:
# kafka producer
kafka.servers=127.0.0.1:9092
kafka.producer.topic=kafka-demo-queue
kafka.producer.sasl.username=demo
kafka.producer.sasl.password=demo
# kafka consumer
kafka.consumer.topic=kafka-demo-queue
kafka.consumer.group.id=kafka-demo-group
kafka.consumer.sasl.username=demo
kafka.consumer.sasl.password=demo
这里的 demo 是连接同一个 kafka 服务器(集群),因此 server ip 用的同一个,而且是自己生产消息,自己消费,所以使用同一个 topic。group id 是消费方用来筛选目标消息用的。kafka 服务器认证的账号、密码则是由服务器配置,控制业务方权限。
然后就是配置 kafka 生产者和消费者的启动配置,由于大部分配置都有其默认配置,所以这里只展示必须的一些配置项:
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.servers}")
private String servers;
@Value("${kafka.producer.sasl.username}")
private String userName;
@Value("${kafka.producer.sasl.password}")
private String password;
@Bean
public KafkaProducer<String, String> initKafkaProducer() {
return new KafkaProducer<>(kafkaProducerConfig());
}
private Map<String, Object> kafkaProducerConfig() {
Map<String, Object> props = new HashMap<>(8);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
return props;
}
}
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.servers}")
private String servers;
@Value("${kafka.consumer.topic}")
private String topic;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consume.sasl.username}")
private String userName;
@Value("${kafka.consume.sasl.password}")
private String password;
@Bean
public KafkaConsumer<String, String> initKafkaConsumer() {
KafkaConsumer<String, String> consumer = new KafkaProducer<>(kafkaConsumerConfig());
consumer.subscribe(topic);
return consumer;
}
private Map<String, Object> kafkaConsumerConfig() {
Map<String, Object> props = new HashMap<>(8);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
return props;
}
}
生产者发送
template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable throwable) {
......
}
@Override
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
....
}
});
ListenableFuture<SendResult<Object,Object>> future = template.send("demo-topic","sync message send!");
try {
SendResult<Object,Object> result = future.get();
}catch (Throwable e){
e.printStackTrace();
}
消费者监听
@KafkaListener(id = "demo_group_id", topics = "demo_topic")
public void consume(String msg) {
logger.info("Consume message is: {}" , msg);
}
@KafkaListener(id = "demo_group_id", topics = "demo_topic")
public String consume(String msg, Acknowledgment ack) {
logger.info("input value: {}", msg);
ack.acknowledge();
return "successful";
}
@KafkaListener(topics = "demo_topic", containerFactory = "demoKafkaListenerContainerFactory")
public void consume(ConsumerRecord<String, String> record) {
logger.info("Consume record: partition-[{}], offset-[{}]", record.partition(), record.offset());
logger.info("Consume message is {}: {}.", record.key(), record.value());
}
@KafkaListener(topics = "demo_topic", containerFactory = "demoKafkaListenerContainerFactory", concurrency = "3")
public void consume(ConsumerRecord<String, String> record,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer<String, String> consumer) {
logger.info("Consume record: partition-[{}], offset-[{}]", record.partition(), record.offset());
logger.info("Consume message is {}: {}.", record.key(), record.value());
consumer.commitSync();
}
接收回复的生产发送
ReplyingKafkaTemplate 继承了父类 KafkaTemplate ,在这个基础上增加了 sendAndReceive 方法,实现了在消息发送的同时,接收消费方回复的功能。方法的具体定义为:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
其生产者配置在基本的 producerTemplate 基础下,补充下列的配置:
@Configuration
public class KafkaProducerConfig {
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("send_and_reply");
repliesContainer.getContainerProperties().setGroupId("reply_group");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> factory, ConcurrentMessageListenerContainer<String, String> container) {
return new ReplyingKafkaTemplate(factory, container);
}
}
完成相关配置后,就可以写一个简单的 demo 验证一下它的功能:
@RestController
public class KafkaController {
@Autowired
private ReplyingKafkaTemplate template;
@GetMapping("/kafka/send_and_receive")
@Transactional(rollbackFor = RuntimeException.class)
public void sendAndReceive(String input) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>("demo-topic", input);
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
System.out.println("Reply message: " + consumerRecord.value());
}
@KafkaListener(id = "reply_group", topics = "demo-topic")
@SendTo
public String listen(String input) {
logger.info("input value: {}", input);
return "successful";
}
}
Kafka 事务消息
默认情况下,Spring-kafka通过配置等方式启动了相关的 kafka 生产和消费服务,KafkaTemplate 实例是不具有事务消息处理能力的。如果需要支持事务特性,可以通过添加特定配置来激活,但是必须注意的是,一旦事务特性被激活,所有的消息发送逻辑都需要封装在事务方法内执行,否则会抛出无事务交易的异常。
Spring-kafka 的事务是基于 Kafka-client 的事务消息功能实现的,我们可以通过配置激活 kafka 消息事务特性:
kafka.producer.transaction-id-prefix=kafka_tx
事务的概念就是,当所有的流程都执行完成了才算成功,中途任何一步出现异常了,前面执行的操作都进行回滚。对于 kafka 来说,只有当事务内的消息发送动作都完成了,消费端才能接收到消息。
生产者使用事务消息的方式有两种:
@Autowired
private KafkaTemplate<String, Object> template;
public void sendTransactionDemo(String input) {
template.executeInTransaction(t -> {
t.send("demo-topic", input);
if (input == null || "".equals(input)) {
throw new RuntimeException("invalid kafka input!");
}
t.send("demo-topic", "Second time sending: " + input);
})
}
@Transactional(rollbackFor = RuntimeException.class)
public void sendTransactionDemo(String input) {
t.send("demo-topic", input);
if (input == null || "".equals(input)) {
throw new Exception("invalid kafka input!");
}
t.send("demo-topic", "Second time sending: " + input);
}
|