IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka-connect增量query模式的深入解读 -> 正文阅读

[大数据]Kafka-connect增量query模式的深入解读

1、Kafka- connect增量query模式出现的问题

不得不说Kafka- connect真是个好东西,许多之前需要写程序或用其他组件进行的数据汇集、数据同步工作都可以用Kafka- connect搞定。但是Kafka- connect在进行query增量同步的时候是不支持谓词下推查询的,也就是query语句里面是不能带where查询。

因为当使用增量query查询时候保证查询语句中包含增量字段(incrementing或timestamp)的时候,从任务拉取的日志可以看出来,增量条件是以where谓词的形式拼接在查询语句之后的。
例如:

curl -X POST http://ip:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_test_04",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://ip:3306/test",
                "connection.user": "user",
                "connection.password": "pwd",
                "topic.prefix": "test-query",
                "mode":"timestamp",
                "query":"select a.gxsj a.id,a.name,a.sex,a.birth,b.course,b.sum from studnt as a left join course as b on a.s_id=b.s_id"
                "timestamp.column.name": "gxsj",
                "validate.non.null": false
                }
        }'

如上的源连接器配置最后走增量拉取数据的时候,增量条件以where形式拼接在查询语句之后,日志query语句会如下:

select a.gxsj a.id,a.name,a.sex,a.birth,b.course,b.sum from studnt as a left join course as b on a.s_id=b.s_id WHERE "a.gxsj" > ?  and "a.gxsj" < ? ORDER BY "a.gxsj ASC"

其中,第一个占位符?,代表的时间戳值就是存储的偏移量,第二个占位符?代表的时间戳值是当前时间戳。
若在query语句中再加上where查询条件,任务执行会如下:

select a.gxsj a.id,a.name,a.sex,a.birth,b.course,b.sum from studnt as a left join course as b on a.s_id=b.s_id WHERE a.gxsj > '2021-01-01 00:00:00' WHERE "a.gxsj" > ?  and "a.gxsj" < ? ORDER BY "a.gxsj ASC"

两个谓词where,任务执行并将报错。

2、针对带谓词查询的初级解决方案

对于如上问题,官方文档未给出解决方案,也就是说当前增量query模式并不支持谓词下推的查询(也希望官方早点支持此功能,如此kafka-conenct应用场景将更广泛)。
当然广大Kafka社区网友们充分发挥聪明才智,也出现了一个解决办法,使用嵌套查询。经实际测试,可以解决大部分带谓词的查询,例如,当同步的数据有条件限制的时候,可用如下方法配置connector:

curl -X POST http://ip:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_test_04",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://ip:3306/test",
                "connection.user": "user",
                "connection.password": "pwd",
                "topic.prefix": "test-query",
                "mode":"timestamp",
                "query":"select * from (select a.gxsj a.id,a.name,a.sex,a.birth,b.course,b.sum from studnt as a left join course as b on a.s_id=b.s_id where a.type='2' and a.year='2021') as tmp"
                "timestamp.column.name": "gxsj",
                "validate.non.null": false
                }
        }'

将条件查询作为临时表,毕竟SQL里面一切查询出来的东西都可以当作表,如此执行任务不会出现问题。

3、当只想从当前时间点同步数据,并保持增量同步

针对此问题,用如上的方案也可解决:

curl -X POST http://ip:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_test_04",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://ip:3306/test",
                "connection.user": "user",
                "connection.password": "pwd",
                "topic.prefix": "test-query",
                "mode":"timestamp",
                "query":"select * from (select id,name,type,year,gxsj from test where gxsj > '2021-01-01 00;00:00') as tmp"
                "timestamp.column.name": "gxsj",
                "validate.non.null": false
                }
        }'

每次执行任务时候,query语句其实如下:

select * from (select id,name,type,year,gxsj from test where gxsj > '2021-01-01 00;00:00') as tmp WHERE "a.gxsj" > ? AND "a.gxsj" < ? ORDER BY "a.gxsj" ASC

如此执行不会报错,但嵌套了多次gxsj的判断,拉取数据的效率会变慢。

4、更改增量偏移量的方法:

当使用增量模式时候其实每一个任务都会把增量字段的偏移量写到connect-offsets主题里面,我们可以通过更改任务增量字段偏移量的方法,实现重置读取数据的点从当前时间点去读数据

  • 第一步:查看当前connector任务的偏移量情况
  • 第二步:造一条偏移量数据写到connect- offsets里面
  • 第三步:connector下一次拉取数据便会从造出的偏移量位置开始读区数据

1)、对于第一步,可在启动connector以后,消费connect-offsets主题,看到当前任务的偏移量情况。

注意:消费打印的数据你只会看到类似这种的:{“timestamp_nanos”:0,“timestamp”:1626340289149},并不知道消费到的偏移量对应着哪一个connector任务,对此,你需要把消费数据时候打印消息key的开关打开,如下:

bin/kafka-console-consumer.sh --bootstrap-server hdp01:9092,hdp02:9092,hdp03:9092 --topic connect-offsets --from-beginning --property print.key=true

使用如上消费命令即可看出消费的偏移量消息是属于哪一个connector任务,具体消费的消息可能如下:

["mysql_source_test",{"query":"query"}]		{"timestamp_nanos":0,"timestamp":1626340289149}

2)、对于第二步,当你看那到第一步所消费出来的增量字段偏移量(我这里以timestamp模式为例,incrementing模式同理),你可能已心生办法去处理它。当然,我们的办法就是造一条和当前任务偏移量一样的数据,可使用kafkacat(可查找社区资料看如何使用)等工具或手写程序写一条数据到connect-offsets,覆盖之前的偏移量消息。
我这里基于之前的生产者demo,使用手写程序的方式,具体如下:

package thread.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ProducerThread implements Runnable {

    private final KafkaProducer<String, String> producer;

    private final String topic_name;

    public ProducerThread(String topic_name) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hdp01:9092,hdp02:9092,hdp03:9092");
        props.put("acks", "all");
        //If the request fails, the producer can automatically retry,
        props.put("retries", 3);
        //Specify buffer size in config
        props.put("batch.size", 16384);
        //Reduce the no of requests less than 0
        props.put("linger.ms", 0);
        //The buffer.memory controls the total amount of memory available to the producer for buffering.
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pwd");
        this.producer = new KafkaProducer<>(props);
        this.topic_name = topic_name;
    }

    @Override
    public void run() {
        String key = "[\"mysql_source_test\",{\"query\":\"query\"}]";
        String value = "{\"timestamp_nanos\":0,\"timestamp\":1626336453008}";

        Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(topic_name, key, value));
        try {
            RecordMetadata recordMetadata = metadataFuture.get();
            System.out.println("topic=>" + recordMetadata.topic() + "---" + "key=>" + recordMetadata.offset());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        ProducerThread test = new ProducerThread("connect-offsets");
        Thread thread = new Thread(test);
        thread.start();
    }
}

使用生产者发送消息到connect-offsets主题以后,再重启connector任务

curl -i -X POST -H "Accept:application/json" \
        -H "Content-Type:application/json" http://ip:8083/connectors/mysql_source_test/tasks/0/restart

当前任务便会以最新更改的时间偏移量为增量基准去同步数据,到此便实现了重置源连接器读取数据的点

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-17 11:59:21  更:2021-07-17 12:01:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/28 9:46:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码