IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 实现 kafka 消费者的动态订阅 -> 正文阅读

[大数据]实现 kafka 消费者的动态订阅

一、问题描述

因业务需要,需要实现在不停止 kafka 消费者的情况下修改订阅的主题,在实现这个需求之前先给一个正常的 kafka 的生产消费 demo,个人一直在使用这个模板

二、代码模板

2.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tech.kpretty</groupId>
    <artifactId>kafka-example</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- kafka 客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.17.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>tech.kpretty.Application</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2.2 工具类

package tech.kpretty.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class AllUtils {
    // 默认配置文件根目录
    public static final String DEFAULT_BASE_CONF_PATH = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
    // 默认消费者配置文件
    public static final String DEFAULT_CONSUMER_CONF_PATH = DEFAULT_BASE_CONF_PATH + "mqs.sdk.consumer.properties";
    // 默认生产者配置文件
    public static final String DEFAULT_PRODUCER_CONF_PATH = DEFAULT_BASE_CONF_PATH + "mqs.sdk.producer.properties";
    // 默认项目其他配置
    private static final String DEFAULT_CONF_PATH = DEFAULT_BASE_CONF_PATH + "conf.properties";

    private static final Logger LOG = LoggerFactory.getLogger(AllUtils.class);

    private static final Properties properties = new Properties();

    static {
        try {
            properties.load(new FileInputStream(DEFAULT_CONF_PATH));
        } catch (IOException e) {
            LOG.error("读取项目配置文件错误,错误代码-2,错误原因:", e);
            System.exit(-2);
        }
    }

    public static List<String> getConsumerTopics() {
        return Arrays.asList(properties.getProperty("consumer.topics").split(","));
    }

    public static String getProducerTopic(){
        return properties.getProperty("producer.topic");
    }
}

2.3 生产者

package tech.kpretty.producer;

import tech.kpretty.util.AllUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;

/**
 * @author wjun
 * @date 2022/4/8 11:37 am
 */
public class ServerProducer implements Runnable {
    private final KafkaProducer<String, String> producer;
    private volatile boolean isRunning = true;
    private static final Logger LOG = LoggerFactory.getLogger(ServerProducer.class);


    public ServerProducer(String path) {
        Properties properties = new Properties();
        try {
            properties.load(new FileInputStream(path));
        } catch (IOException e) {
            LOG.error("读取生产者配置文件错误,错误代码-3,错误原因:", e);
            System.exit(-3);
        }
        producer = new KafkaProducer<>(properties);
        // 注册 hook,监听 ctrl + c
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public ServerProducer() {
        this(AllUtils.DEFAULT_PRODUCER_CONF_PATH);
    }

    @Override
    public void run() {
        while (isRunning) {
            send();
        }
    }

    public void send() {
        // todo 这里修改发送的业务逻辑
        send(UUID.randomUUID().toString());
    }

    public void send(String message) {
        send(message, null);
    }

    public void send(String message, String key) {
        send(message, key, null);
    }

    public void send(String message, String key, Integer partitionNum) {
        ProducerRecord<String, String> record = new ProducerRecord<>(AllUtils.getProducerTopic(), partitionNum, key, message);
        producer.send(record, (metadata, exception) -> {
            if (null == exception) {
                LOG.info("消息发送成功{},元数据 {}", record, metadata);
            } else {
                LOG.error("消息发送失败,错误原因", exception);
            }
        });
    }

    public void close() {
        isRunning = false;
    }
}

2.3 消费者

package tech.kpretty.consumer;

import tech.kpretty.util.AllUtils;
import org.apache.kafka.clients.consumer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @author wjun
 * @date 2022/4/8 11:37 am
 */
public class ServerConsumer implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private volatile boolean isRunning = true;
    private volatile boolean flag;

    private static final Logger LOG = LoggerFactory.getLogger(ServerConsumer.class);


    public ServerConsumer(String path) {
        Properties properties = new Properties();
        try {
            properties.load(new FileInputStream(path));
        } catch (IOException e) {
            LOG.error("读取消费者配置文件错误,错误代码-1,错误原因:", e);
            System.exit(-1);
        }
        consumer = new KafkaConsumer<>(properties);
        // 订阅 topics
        consumer.subscribe(AllUtils.getConsumerTopics());
        // 注册 hook,监听 ctrl + c
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public ServerConsumer() {
        this(AllUtils.DEFAULT_CONSUMER_CONF_PATH);
    }

    @Override
    public void run() {
        while (isRunning) {
            receive();
        }
    }

    public void receive() {
        flag = true;
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
        if (!records.isEmpty()) {
            for (ConsumerRecord<String, String> record : records) {
                // todo 这里为业务逻辑处理
                LOG.info("mqs message:{}", record);
            }
            // 建议手动提交偏移量
            // 若自动提交,请将consumer配置文件中enable.auto.commit删除,并注释下面代码
            commitOffset();
        }
        flag = false;
    }

    /**
     * 异步提交偏移量
     */
    private void commitOffset() {
        consumer.commitAsync((offsets, exception) -> {
            if (null == exception) {
                LOG.info("偏移量提交成功,元数据:{}", offsets);
            } else {
                LOG.error("偏移量提交失败,错误信息:", exception);
            }
        });
    }

    /**
     * 安全关闭消费者
     */
    public void close() {
        isRunning = false;
        // 关闭之前同步提交一次
        do {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (flag);
        consumer.commitSync();
        consumer.close();
    }
}

2.4 启动类

package tech.kpretty;

import tech.kpretty.consumer.ServerConsumer;
import tech.kpretty.producer.ServerProducer;

/**
 * @author wjun
 * @date 2022/4/8 11:29 am
 */
public class Application {
    public static void main(String[] args) {
        // 启动消费者
        ServerConsumer consumer = new ServerConsumer();
        new Thread(consumer).start();

        // 启动生产者
        ServerProducer producer = new ServerProducer();
        new Thread(producer).start();
    }
}

2.5 日志配置

log4j.rootLogger=INFO,console,daily
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.Threshold=DEBUG
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%5p][%d{yyyy-MM-dd HH:mm:ss}][%10t][%c]%m%n

#log4j.appender.file=org.apache.log4j.RollingFileAppender
#log4j.appender.file.File=./log/xxx.log
#log4j.appender.file.MaxFileSize=100mb
#log4j.appender.file.Threshold=DEBUG
#log4j.appender.file.MaxBackupIndex=100
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=[%5p][%d{yyyy-MM-dd HH:mm:ss}][%10t][%c]%m%n

log4j.appender.daily=org.apache.log4j.DailyRollingFileAppender
log4j.appender.daily.File=./log/xxx.log
log4j.appender.daily.DatePattern='.'yyyy-MM-dd
log4j.appender.daily.layout=org.apache.log4j.PatternLayout
log4j.appender.daily.layout.ConversionPattern=[%5p][%d{yyyy-MM-dd HH:mm:ss}][%10t][%c]%m%n

到这里个人认为一个很标准的 kafka example demo 就完成,喜欢的可以拿去直接用,只需要在根目录创建 conf 文件夹,生产者配置为:mqs.sdk.producer.properties,消费者配置为:mqs.sdk.producer.properties,项目其他配置为:conf.properties

三、需求实现

最简单的实现就是每次 poll 后或者 poll 前重新订阅一次 topics 如下:

public void receive() {
  flag = true;
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
  if (!records.isEmpty()) {
    for (ConsumerRecord<String, String> record : records) {
      // todo 这里为业务逻辑处理
      LOG.info("mqs message:{}", record);
    }
    // 建议手动提交偏移量
    // 若自动提交,请将consumer配置文件中enable.auto.commit删除,并注释下面代码
    commitOffset();
  }
  // 订阅 topics
  consumer.subscribe(AllUtils.getConsumerTopics());
  flag = false;  
}

但是通常情况下我们的订阅不会这么频繁的修改,而且每次 poll 重新订阅也带来了不必要的开销(新的订阅会发送 re-join 请求),因此我们希望是当项目的配置文件发生改变的时候我们再去修改订阅,因此思路就明确了:新增一个线程用来实时监听配置文件,当配置文件发生修改时调用消费者的subscribe,为了不影响消费者不建议在业务端添加这个逻辑

需要加一个消费者重新订阅的方法,用来给新增的线程调用

public void update(List<String> topics) {
  consumer.subscribe(topics);
  LOG.info("配置信息已更新");
  LOG.info("任务重新启动");
}

这里我通过判断配置文件的修改时间来确定文件是否发生了修改,代码如下:

package tech.kpretty.util;

import tech.kpretty.consumer.ServerConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @author wjun
 * @date 2022/4/11 17:29 am
 */
public class AutoLoadTopic implements Runnable {
    private final ServerConsumer consumer;
    private final Properties properties;
    private final File file;
    private long lastModifiedTime;
    private static final Logger LOG = LoggerFactory.getLogger(AutoLoadTopic.class);

    public AutoLoadTopic(ServerConsumer consumer) {
        this.consumer = consumer;
        this.properties = new Properties();
        this.file = new File(AllUtils.DEFAULT_CONF_PATH);
        this.lastModifiedTime = file.lastModified();
    }

    @Override
    public void run() {
        while (true) {
            // 获取配置文件路径
            long currentLastModifiedTime = file.lastModified();
            // 如果当前文件的修改时间大于上一次文件修改时间,则代表配置文件已经更新
            if (currentLastModifiedTime > lastModifiedTime) {
                // 更新配置
                consumer.update(getTopics());
                // 修改时间
                lastModifiedTime = currentLastModifiedTime;
            }

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private List<String> getTopics() {
        try {
            // 清空配置
            properties.clear();
            properties.load(new FileInputStream(file));
        } catch (IOException e) {
            LOG.error("更新配置信息发生致命错误,错误代码-5,错误原因", e);
            System.exit(-5);
        }
        return Arrays.asList(properties.getProperty("consumer.topics").split(","));
    }
}

随即在 Application 中启动这个线程即可

public class Application {
    public static void main(String[] args) {
        // 启动消费者
        ServerConsumer consumer = new ServerConsumer();
        new Thread(consumer).start();

        // 启动生产者
        //ServerProducer producer = new ServerProducer();
        //new Thread(producer).start();

        // 启动动态配置线程
        AutoLoadTopic autoLoadTopic = new AutoLoadTopic(consumer);
        new Thread(autoLoadTopic).start();
    }
}

看起来很完美,但是消费者一定会报错!!!因为 kafka 的 Consumer 是线程不完全的,不允许多线程同时去操作,从源码可以看出

private void acquire() {
  long threadId = Thread.currentThread().getId();
  if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
    throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
  refcount.incrementAndGet();
}

这个方法在 Consumer 多个常用的方法中被调用,如:subscribe、poll、assignment、commitAsync 等等,因此我们在重新订阅的时候一定要保证消费者处于“空闲状态”,这里采用 CAS 的思想,修改部分代码:

// 是否更新标记
private volatile boolean isUpdate = false;
@Override
public void run() {
  while (isRunning) {
    if (!isUpdate) {
      receive();
    }
  }
}

需要更新的时候跳过 receive 进行一段时间的空轮序,但可能发生当修改 isUpdate 是消费者正在处理数据,因此必须等到 receive 结束才可以修改,因此修改 update 逻辑

public void update(List<String> topics) {
  isUpdate = true;
  do {
    try {
      TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  } while (flag);
  consumer.subscribe(topics);
  LOG.info("配置信息已更新");
  isUpdate = false;
  LOG.info("任务重新启动");
}

因为消费者在消费的时候来修改 flag,当开始消费将 flag 置为 true,结束消费将 flag 置为 false,这里的等待可有可无。当配置更新完记得将 isUpdate 置为 false 让消费者开始 poll,这样就实现了这个需求。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 17:49:30  更:2022-04-18 17:52:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:40:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码