@Bean @Primary //kafka消费者默认关联该容器 KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ? ? ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); ? ? factory.setConsumerFactory(consumerFactory()); ? ? factory.getContainerProperties().setMissingTopicsFatal(false); ? ?//topic缺失时不报错 ? ? return factory; }
//kafka消息配置 public Map<String, Object> consumerConfigs() { ? ? Map<String, Object> props = new HashMap<>(); ? ? props.put(ConsumerConfig.GROUP_ID_CONFIG, "XXX"); ? ? props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); ? ?//最新从消费 ? ? props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); ? ? props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); ? ? return props; }
public ConsumerFactory<String, String> consumerFactory() { ? ? Map<String, Object> map = consumerConfigs(); ? ? map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092");? ?//kafka集群 ? ? return new DefaultKafkaConsumerFactory<>(map); }
|