简介
Flink CDC:
? 解决了传统数据库实时同步的痛点, 该技术抛弃了其他第三方组件(例如Kafka等),能够实时读取Mysql master节点全量和增量数据,能够捕获所有数据的变化,同时它完全与业务解耦,运维也及其简单。具体介绍请参考:Flink_CDC搭建及简单使用 及 flink-cdc-connectors。
Apache Doris:
它是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。
Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!目前Apache Doris已成为Apache顶级项目,成为许多一线大厂实时数据分析数据仓库的不二选择。具体请参考:Apache Doris 。
技术选型
? 本次采用Flink-CDC和flink-doris-connector技术利用FLink DataStream的方式实现Mysql全量或增量数据同步至分析性数据仓库Doris中。
版本选择:
Flink | Flink-doris-connector | Flink-CDC | Doris |
---|
1.11.x | 1.11.6-2.12-xx | 1.0.0/1.1.0 | 0.13.+ | 1.12.x | 1.12.7-2.12-xx | 1.2.0/1.3.0 | 0.13.+ | 1.13.x | 1.13.5-2.12-xx | 1.4.0 / 2.0.* / 2.1.* / 2.2.* | 0.13.+ | 1.14.x | 1.14.4-2.12-xx | 2.2.* | 0.13.+ |
注:Flink-doris-connector版本号解读:例如1.13.5-2.12-1.0.1 表示flink 版本 1.13.5,scala 版本 2.12,connector 版本 1.0.1。
具体实现
本次代码构建选择的版本号为:
Flink | Flink-doris-connector | Flink-CDC | Doris |
---|
1.13.6 | 1.0.3 | 2.2.0 | 0.15.13 |
导入依赖:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version>
</dependency>
代码实现:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(3306)
.databaseList("test")
.tableList("test.test_cdc")
.username("bigdata")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema()).build();
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql source").map(line ->JSONObject.parseObject(line).getString("after")).addSink(DorisSink.sink(DorisExecutionOptions.builder()
.setBatchSize(3)
.setBatchIntervalMs(10L)
.setMaxRetries(3)
.setStreamLoadProp(pro)
.build(),
DorisOptions.builder()
.setFenodes("FE_IP:8030")
.setTableIdentifier("test.test_cdc_sink")
.setUsername("root")
.setPassword("").build()));
env.execute("MYSQL TO Doris Job");
}
总结:
? 通过Flink CDC和flink doris connector 技术,我们可以很简单的将Mysql中增量或全量数据同步至Doris中,链路短,时效性高,简化了传统数据同步的方式,便于维护。
|