kafka动态添加topic,动态添加消费者
依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
写个Kafka帮助类
@Slf4j
@Component
public class KafkaHelper {
@Value("${spring.kafka.num.partitions:4}")
private Integer partitions;
@Value("${spring.kafka.num.replica.fetchers:1}")
private short fetchers;
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private AdminClient adminClient;
@Autowired
private ConsumerContainer consumerContainer;
public boolean createTopic(String name){
log.info("kafka创建topic:{}",name);
NewTopic topic = new NewTopic(name, partitions, fetchers);
CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(topic));
try {
topics.all().get();
} catch (Exception e) {
log.error("kafka创建topic失败",e);
return false;
}
return true;
}
public List<String> list() throws Exception {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> names = listTopicsResult.names().get();
return new ArrayList<>(names);
}
public boolean deleteTopic(String name){
log.info("kafka删除topic:{}",name);
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(name));
try {
deleteTopicsResult.all().get();
} catch (Exception e) {
log.error("kafka删除topic失败",e);
return false;
}
return true;
}
public TopicDescription describeTopic(String name){
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(name));
try {
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
if(stringTopicDescriptionMap.get(name)!=null){
return stringTopicDescriptionMap.get(name);
}
} catch (Exception e) {
log.error(" 获取topic详情异常:",e);
}
return null;
}
public void pushEvent(IEvent e) {
if(StrUtil.isEmpty(e.getTopic())) {
return;
}
log.info("发送kafka消息: topic = {}, info = {}", e.getTopic(), e.getInfo());
kafkaTemplate.send(e.getTopic(),JSONObject.toJSONString(e.getInfo()));
}
public void addConsumer(String topic, Consumer<String> consumer){
log.info("将为topic:{} 创建消费者",topic);
consumerContainer.addConsumer(topic,consumer);
}
public void deleteConsumer(String topic){
log.info("将删除topic:{} 消费者",topic);
consumerContainer.deleteConsumer(topic);
}
}
新建一个消费者容器,用来存放动态创建的消费者
@Component
@Slf4j
public class ConsumerContainer {
private Map<String, KafkaConsumerThread> kafkaConsumerThreadMap = new HashMap<>();
@Resource(name = "kafkaProps")
private Map<String,Object> props;
public synchronized void addConsumer(String topic, Consumer<String> consumer){
if(kafkaConsumerThreadMap.get(topic)!=null){
log.warn("重复创建消费者:{}",topic);
}
KafkaConsumer<String, String> stringStringKafkaConsumer = new KafkaConsumer<>(props);
stringStringKafkaConsumer.subscribe(Arrays.asList(topic));
KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(stringStringKafkaConsumer,consumer);
kafkaConsumerThread.start();
kafkaConsumerThreadMap.put(topic,kafkaConsumerThread);
log.info("创建消费者成功:{}",topic);
}
public synchronized void deleteConsumer(String topic){
KafkaConsumerThread kafkaConsumerThread = kafkaConsumerThreadMap.get(topic);
if (kafkaConsumerThread == null) {
log.warn("该消费者已经被删除:{}",topic);
return;
}
kafkaConsumerThread.interrupt();
kafkaConsumerThreadMap.remove(topic);
log.info("消费者删除成功:{}",topic);
}
}
消费者线程
@Slf4j
public class KafkaConsumerThread extends Thread {
private KafkaConsumer<String,String> kafkaConsumer;
private Consumer<String> consumer;
KafkaConsumerThread(KafkaConsumer<String,String> kafkaConsumer, Consumer<String> consumer){
this.kafkaConsumer=kafkaConsumer;
this.consumer=consumer;
}
@Override
public void run() {
try {
while (true){
if (isInterrupted()) {
throw new InterruptedException();
}
ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
consumer.accept(stringStringConsumerRecord.value());
}
}
} catch (InterruptedException e) {
Set<String> subscription = kafkaConsumer.subscription();
log.info("topic:{} 已停止监听,线程停止!", StringUtils.join(subscription,","),e);
}catch (Exception e){
Set<String> subscription = kafkaConsumer.subscription();
log.info("topic:{} 消费者运行异常!", StringUtils.join(subscription,","),e);
}finally {
try {
kafkaConsumer.close();
} catch (Exception ex) {
}
}
}
}
事件接口,方便发布事件
public interface IEvent {
default String getTopic(){
return null;
};
<T> T getInfo();
}
消费者示例
@Slf4j
public class PTVMateConsumer implements Consumer<String> {
@Override
public void accept(String command) {
log.info("监听元数据, command = {}", command);;
}
}
使用示例
kafkaHelper.addConsumer("topic",new PTVMateConsumer ());
|