IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka动态添加topic,动态添加消费者 -> 正文阅读

[大数据]kafka动态添加topic,动态添加消费者

kafka动态添加topic,动态添加消费者

依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

写个Kafka帮助类

@Slf4j
@Component
public class KafkaHelper {

    @Value("${spring.kafka.num.partitions:4}")
    private Integer partitions;

    @Value("${spring.kafka.num.replica.fetchers:1}")
    private short fetchers;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private AdminClient adminClient;

    @Autowired
    private ConsumerContainer consumerContainer;

    /**
     * 创建topic
     * @param name
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public boolean createTopic(String name){
        log.info("kafka创建topic:{}",name);
        NewTopic topic = new NewTopic(name, partitions, fetchers);
        CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(topic));
        try {
            topics.all().get();
        } catch (Exception e) {
            log.error("kafka创建topic失败",e);
            return false;
        }
        return true;
    }

    /**
     * 查询所有Topic
     * @return
     * @throws Exception
     */
    public List<String> list() throws Exception {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> names = listTopicsResult.names().get();
        return new ArrayList<>(names);
    }

    /**
     * 删除topic
     * @param name
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public boolean deleteTopic(String name){
        log.info("kafka删除topic:{}",name);
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(name));
        try {
            deleteTopicsResult.all().get();
        } catch (Exception e) {
            log.error("kafka删除topic失败",e);
            return false;
        }
        return true;
    }

    /**
     * 获取topic详情
     * @param name
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public TopicDescription describeTopic(String name){
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(name));
        try {
            Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
            if(stringTopicDescriptionMap.get(name)!=null){
                return stringTopicDescriptionMap.get(name);
            }
        } catch (Exception e) {
            log.error(" 获取topic详情异常:",e);
        }
        return null;
    }

    /**
     * 推送事件
     * @param e
     */
    public void pushEvent(IEvent e) {
        if(StrUtil.isEmpty(e.getTopic())) {
            return;
        }
        log.info("发送kafka消息: topic = {}, info = {}", e.getTopic(), e.getInfo());
        kafkaTemplate.send(e.getTopic(),JSONObject.toJSONString(e.getInfo()));
    }

    /**
     * 添加消费者
     * @param topic
     * @param consumer
     */
    public void addConsumer(String topic, Consumer<String> consumer){
        log.info("将为topic:{} 创建消费者",topic);
        consumerContainer.addConsumer(topic,consumer);
    }

    /**
     * 删除消费者
     * @param topic
     */
    public void deleteConsumer(String topic){
        log.info("将删除topic:{} 消费者",topic);
        consumerContainer.deleteConsumer(topic);
    }


}

新建一个消费者容器,用来存放动态创建的消费者

@Component
@Slf4j
public class ConsumerContainer {

    /**
     * 使用map对象,便于根据topic存储对应的KafkaConsumer
     */
    private Map<String, KafkaConsumerThread> kafkaConsumerThreadMap = new HashMap<>();

    @Resource(name = "kafkaProps")
    private Map<String,Object> props;

    /**
     * 添加消费者
     * @param topic
     * @param consumer
     */
    public synchronized void addConsumer(String topic, Consumer<String> consumer){
        if(kafkaConsumerThreadMap.get(topic)!=null){
            log.warn("重复创建消费者:{}",topic);
        }
        KafkaConsumer<String, String> stringStringKafkaConsumer = new KafkaConsumer<>(props);
        stringStringKafkaConsumer.subscribe(Arrays.asList(topic));
        //创建消费者线程
        KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(stringStringKafkaConsumer,consumer);
        kafkaConsumerThread.start();
        kafkaConsumerThreadMap.put(topic,kafkaConsumerThread);
        log.info("创建消费者成功:{}",topic);
    }

    /**
     * 删除消费者
     * @param topic
     */
    public synchronized void deleteConsumer(String topic){
        KafkaConsumerThread kafkaConsumerThread = kafkaConsumerThreadMap.get(topic);
        if (kafkaConsumerThread == null) {
            log.warn("该消费者已经被删除:{}",topic);
            return;
        }
        //打断消费者线程
        kafkaConsumerThread.interrupt();
        kafkaConsumerThreadMap.remove(topic);
        log.info("消费者删除成功:{}",topic);
    }

}

消费者线程

@Slf4j
public class KafkaConsumerThread extends Thread {

    private KafkaConsumer<String,String> kafkaConsumer;

    private Consumer<String> consumer;

    KafkaConsumerThread(KafkaConsumer<String,String> kafkaConsumer, Consumer<String> consumer){
        this.kafkaConsumer=kafkaConsumer;
        this.consumer=consumer;
    }

    @Override
    public void run() {
        try {
            while (true){
                if (isInterrupted()) {
                    throw new InterruptedException();
                }
                //拉取topic消息
                ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
                    consumer.accept(stringStringConsumerRecord.value());
                }
            }
        } catch (InterruptedException e) {
            Set<String> subscription = kafkaConsumer.subscription();
            log.info("topic:{} 已停止监听,线程停止!", StringUtils.join(subscription,","),e);
        }catch (Exception e){
            Set<String> subscription = kafkaConsumer.subscription();
            log.info("topic:{} 消费者运行异常!", StringUtils.join(subscription,","),e);
        }finally {
            //关闭消费者
            try {
                kafkaConsumer.close();
            } catch (Exception ex) {
            }
        }
    }


}

事件接口,方便发布事件

public interface IEvent {

    default String getTopic(){
        return null;
    };

    /**
     * 每个子事件对象的参数 公共出来
     * @param <T> 泛型
     * @return 子事件里的对象
     */
    <T> T getInfo();
}

消费者示例

@Slf4j
public class PTVMateConsumer implements Consumer<String> {

    @Override
    public void accept(String command) {
        log.info("监听元数据, command = {}", command);;
    }
}

使用示例

                kafkaHelper.addConsumer("topic",new PTVMateConsumer ());
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-27 11:55:47  更:2021-08-27 11:57:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:54:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码