1、Kafka- connect增量query模式出现的问题
不得不说Kafka- connect真是个好东西,许多之前需要写程序或用其他组件进行的数据汇集、数据同步工作都可以用Kafka- connect搞定。但是Kafka- connect在进行query增量同步的时候是不支持谓词下推查询的,也就是query语句里面是不能带where查询。
因为当使用增量query查询时候保证查询语句中包含增量字段(incrementing或timestamp)的时候,从任务拉取的日志可以看出来,增量条件是以where谓词的形式拼接在查询语句之后的。 例如:
curl -X POST http://ip:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_test_04",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ip:3306/test",
"connection.user": "user",
"connection.password": "pwd",
"topic.prefix": "test-query",
"mode":"timestamp",
"query":"select a.gxsj a.id,a.name,a.sex,a.birth,b.course,b.sum from studnt as a left join course as b on a.s_id=b.s_id"
"timestamp.column.name": "gxsj",
"validate.non.null": false
}
}'
如上的源连接器配置最后走增量拉取数据的时候,增量条件以where形式拼接在查询语句之后,日志query语句会如下:
select a.gxsj a.id,a.name,a.sex,a.birth,b.course,b.sum from studnt as a left join course as b on a.s_id=b.s_id WHERE "a.gxsj" > ? and "a.gxsj" < ? ORDER BY "a.gxsj ASC"
其中,第一个占位符?,代表的时间戳值就是存储的偏移量,第二个占位符?代表的时间戳值是当前时间戳。 若在query语句中再加上where查询条件,任务执行会如下:
select a.gxsj a.id,a.name,a.sex,a.birth,b.course,b.sum from studnt as a left join course as b on a.s_id=b.s_id WHERE a.gxsj > '2021-01-01 00:00:00' WHERE "a.gxsj" > ? and "a.gxsj" < ? ORDER BY "a.gxsj ASC"
两个谓词where,任务执行并将报错。
2、针对带谓词查询的初级解决方案
对于如上问题,官方文档未给出解决方案,也就是说当前增量query模式并不支持谓词下推的查询(也希望官方早点支持此功能,如此kafka-conenct应用场景将更广泛)。 当然广大Kafka社区网友们充分发挥聪明才智,也出现了一个解决办法,使用嵌套查询。经实际测试,可以解决大部分带谓词的查询,例如,当同步的数据有条件限制的时候,可用如下方法配置connector:
curl -X POST http://ip:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_test_04",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ip:3306/test",
"connection.user": "user",
"connection.password": "pwd",
"topic.prefix": "test-query",
"mode":"timestamp",
"query":"select * from (select a.gxsj a.id,a.name,a.sex,a.birth,b.course,b.sum from studnt as a left join course as b on a.s_id=b.s_id where a.type='2' and a.year='2021') as tmp"
"timestamp.column.name": "gxsj",
"validate.non.null": false
}
}'
将条件查询作为临时表,毕竟SQL里面一切查询出来的东西都可以当作表,如此执行任务不会出现问题。
3、当只想从当前时间点同步数据,并保持增量同步
针对此问题,用如上的方案也可解决:
curl -X POST http://ip:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_test_04",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ip:3306/test",
"connection.user": "user",
"connection.password": "pwd",
"topic.prefix": "test-query",
"mode":"timestamp",
"query":"select * from (select id,name,type,year,gxsj from test where gxsj > '2021-01-01 00;00:00') as tmp"
"timestamp.column.name": "gxsj",
"validate.non.null": false
}
}'
每次执行任务时候,query语句其实如下:
select * from (select id,name,type,year,gxsj from test where gxsj > '2021-01-01 00;00:00') as tmp WHERE "a.gxsj" > ? AND "a.gxsj" < ? ORDER BY "a.gxsj" ASC
如此执行不会报错,但嵌套了多次gxsj的判断,拉取数据的效率会变慢。
4、更改增量偏移量的方法:
当使用增量模式时候其实每一个任务都会把增量字段的偏移量写到connect-offsets主题里面,我们可以通过更改任务增量字段偏移量的方法,实现重置读取数据的点或从当前时间点去读数据
- 第一步:查看当前connector任务的偏移量情况
- 第二步:造一条偏移量数据写到connect- offsets里面
- 第三步:connector下一次拉取数据便会从造出的偏移量位置开始读区数据
1)、对于第一步,可在启动connector以后,消费connect-offsets主题,看到当前任务的偏移量情况。
注意:消费打印的数据你只会看到类似这种的:{“timestamp_nanos”:0,“timestamp”:1626340289149},并不知道消费到的偏移量对应着哪一个connector任务,对此,你需要把消费数据时候打印消息key的开关打开,如下:
bin/kafka-console-consumer.sh --bootstrap-server hdp01:9092,hdp02:9092,hdp03:9092 --topic connect-offsets --from-beginning --property print.key=true
使用如上消费命令即可看出消费的偏移量消息是属于哪一个connector任务,具体消费的消息可能如下:
["mysql_source_test",{"query":"query"}] {"timestamp_nanos":0,"timestamp":1626340289149}
2)、对于第二步,当你看那到第一步所消费出来的增量字段偏移量(我这里以timestamp模式为例,incrementing模式同理),你可能已心生办法去处理它。当然,我们的办法就是造一条和当前任务偏移量一样的数据,可使用kafkacat(可查找社区资料看如何使用)等工具或手写程序写一条数据到connect-offsets,覆盖之前的偏移量消息。 我这里基于之前的生产者demo,使用手写程序的方式,具体如下:
package thread.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ProducerThread implements Runnable {
private final KafkaProducer<String, String> producer;
private final String topic_name;
public ProducerThread(String topic_name) {
Properties props = new Properties();
props.put("bootstrap.servers", "hdp01:9092,hdp02:9092,hdp03:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pwd");
this.producer = new KafkaProducer<>(props);
this.topic_name = topic_name;
}
@Override
public void run() {
String key = "[\"mysql_source_test\",{\"query\":\"query\"}]";
String value = "{\"timestamp_nanos\":0,\"timestamp\":1626336453008}";
Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(topic_name, key, value));
try {
RecordMetadata recordMetadata = metadataFuture.get();
System.out.println("topic=>" + recordMetadata.topic() + "---" + "key=>" + recordMetadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ProducerThread test = new ProducerThread("connect-offsets");
Thread thread = new Thread(test);
thread.start();
}
}
使用生产者发送消息到connect-offsets主题以后,再重启connector任务
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://ip:8083/connectors/mysql_source_test/tasks/0/restart
当前任务便会以最新更改的时间偏移量为增量基准去同步数据,到此便实现了重置源连接器读取数据的点。
|