FlinkCDC的2.2.0版本怎么监控库中的所有表,增加新表到已有任务?
一、监控全表
? 千呼万唤始出来,之前预告FlinkCDC的2.2.0支持Flink1.14和添加新表,满怀希望!今天一看略显失望,添加新表,不支持动态添加,需要修改tableList之后,从ck中重启,倒是不用重新写新代码了,但是不满足我们目前的需求,失望之一。
二是,api改得有点随意了。
2.0版本监控全表,tableList不设置就行了
DebeziumSourceFunction<String> mySQLSource = MySqlSource.<String>builder()
.hostname(parameterTool.get("source1.mysql.jdbc.host"))
.port(parameterTool.getInt("source1.mysql.jdbc.port"))
.username(parameterTool.get("source1.mysql.jdbc.username"))
.password(parameterTool.get("source1.mysql.jdbc.password"))
.databaseList(parameterTool.get("source1.mysql.jdbc.database"))
//可选配置,如果不指定该参数,则会读取上一个配置下的所有表数据
//指定的时候需要使用db.table的方式明确指定
//.tableList("reported2.epidemic_report_entty")
.startupOptions(StartupOptions.latest())
.deserializer(new MyDeseriallizationFun())
.build();
到了2.2版本源码中MySqlSourceConfig类对tableList做了校验,不能为null
this.tableList = checkNotNull(tableList);
对于如何监控所有表,文档中也没有说明。
通过尝试发现,传空串是监控全表,这你。。。行吧,api改得有点随意了,也不考虑版本兼容。。。
正确得写法是
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(parameterTool.get("source1.mysql.jdbc.host"))
.port(parameterTool.getInt("source1.mysql.jdbc.port"))
.databaseList(parameterTool.get("source1.mysql.jdbc.database"))
// .scanNewlyAddedTableEnabled(true)
.connectTimeout(Duration.ofSeconds(60))
//必选配置,空串的时候为监控全表
//指定的时候需要使用db.table的方式明确指定
// .tableList("")
.tableList("")
.username(parameterTool.get("source1.mysql.jdbc.username"))
.password(parameterTool.get("source1.mysql.jdbc.password"))
.startupOptions(StartupOptions.latest())
.deserializer(new MyDeseriallizationFun())
.build();
二、增加新表到已有的任务
另外想要加新的表,到已有的任务中,需要设置scanNewlyAddedTableEnabled(true)
1.设置savepoint
$ ./bin/flink stop $Existing_Flink_JOB_ID
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
2.修改tablelist,增加表
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.scanNewlyAddedTableEnabled(true)
.databaseList("db")
.tableList("db.product, db.user, db.address, db.order, db.custom") // set captured tables [product, user, address ,order, custom]
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
// your business code
3.从savepoint启动
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./FlinkCDCExample.jar
|