我们经常会遇到一个微服务里面想要对多态服务器的kafka进行监听(非集群)这时候平常在application.properties可能就没办法支撑了,我们就需要通过原始方式进行配置
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${brokerAddress}")
private String brokerAddress;
@Value("${groupId}")
private String groupId;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer
<String,String>> consumerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<String,String>();
ConsumerFactory<String,String> consumerFactory =
new DefaultKafkaConsumerFactory<String,String>(factoryProperties(groupId,brokerAddress));
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
return factory;
}
private Map<String, Object> factoryProperties(String groupId, String brokerAddress) {
Map<String,Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
return properties;
}
}
@KafkaListener(topics = "topicName",containerFactory =
"consumerFactory")
public void kafkaTest(String message){
...
}
|