1 概述
1.1 CDC 的全称:
Change Data Capture 。在广义的概念上,只要是能捕获数据变更的技术,都可以称之为CDC。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。 Flink 从 1.11 版本开始原生支持 CDC 数据(changelog)的处理,目前已经是非常成熟的变更数据处理方案。 Flink CDC Connectors 是 Flink 的一组 Source 连接器,是 Flink CDC 的核心组件,这些连接器负责从 MySQL、PostgreSQL、Oracle、MongoDB 等数据库读取存量历史数据和增量变更数据。
1.2 数据湖:
数据湖是一个集中式存储库,允许以任意规模存储所有结构化和非结构化数据。可以按原样存储数据(无需先对数据进行结构化处理),并运行不同类型的分析 – 从控制面板和可视化到大数据处理、实时分析和机器学习,以指导做出更好的决策。
1.3 Flink CDC优势
传统数据采集流程,如下图所示:
binlog收集到的数据,需要通过canal、kafka才能进入flink进行处理,如果采用Flink CDC简化这一步骤呢? 如下图所示:
采用flink CDC可以做到简化分析链路,降低维护成本。同时更少的组件也意味着数据时效性能够进一步提高。 canal是同步增量数据的,如果需要同步全量数据,则需要另起一条链路,而Flink CDC可以做到全增量数据同步。
2 Flink CDC 性能测试
阿里巴巴技术专家—伍翀(云邪)对Flink CDC性能进行了测试, 使用数据量为6500万条的customer 表,Flink 版本是 1.13.1, Source 并发为 8,全量读取阶段: CDC 2.0版本 用时13分钟 CDC 1.4版本 用时89分钟 性能提升了6.8倍 平均1s读取数据83333条
3 FLink CDC 如何简化数据入湖
3.1传统数据入湖痛点:
(1) 全增量数据的同步
上面视图是传统的数据入湖架构,分为两个链路,有一个全量同步作业做一次性的全量数据拉取,还有一个增量作业通过 Canal 和处理引 擎将 Binlog 数据准实时地同步到 Hudi 表中。这个架构虽然利用了 Hudi 的更新能力,无需周期性地调度全量合并任务,能做到分钟级延迟。但是全量和增量仍是割裂的两个作业,全量和增量的切换仍需要人工的介入,并且需要指定一个准确的增量启动位点,否则的话就会有丢失数据的风险。可以看到这种架构是流批割裂的,并不是一个统一的整体。 而Flink CDC最大的优势之一就是全增量统一入湖
(2) 手动映射表结构,复杂易出错
用户用了 Flink CDC 后,遇到的第一个痛点就是需要将 MySQL 的 DDL 手工映射成 Flink 的 DDL。手工映射表结构是比较繁琐的,尤其是当表和字段数非常多的时候。而且手工映射也容易 出错,比如 说 MySQL 的 BIGINT UNSINGED,它不能映射成 Flink 的 BIGINT,而是要映射成 DECIMAL(20)。 如果系统能自动帮助用户自动去映射表结构就会简单安全很多。
(3) Schema变更
用户遇到的另一个痛点是表结构的变更导致入湖链路难以维护。例如用户有一张表,原先有 id 和 name 两列,突然增加了一列 Address。新增的这一列数据可能就无法同步到数据湖中,甚至导致 入湖链路的挂掉,影响稳定性。除了加列的变更,还可能会有删列、类型变更等等。国外的 Fivetran 做过一个调研报告,发现 60%的公司,schema 每个月都会变化,30%每周都会变化。这 说明基本每个公司都会面临 schema 变更带来的数据集成上的挑战。
(4) 整库入湖
最后一个是整库入湖的挑战。因为用户主要使用 SQL,这就需要为每个表的数据同步链路定义一个 INSERT INTO 语句。有些用户的 MySQL 实例中甚至有上千张的业务表,用户就要写上千个 INSE RT INTO 语句。更令人望而生却的是,每一个 INSERT INTO 任务都会创建至少一个数据库连接, 读取一次 Binlog 数据。千表入湖的话就需要上千个连接,上千次的 Binlog 重复读取。这就会对 MySQL 和网络造成很大的压力。
3.2全自动化数据集成
针对刚刚的四个痛点,开发团队基于 Flink 打造“全自动化数据集成“。
(1) 首先 Flink CDC 已经具备了全增量自动切换的能力,这也是 Flink CDC 的亮点之一。
(2) 在元信息的自动发现上,可以通过 Flink 的 Catalog 接口无缝对接上,开发人员开发了 MySQL Catalog 来自动发现 MySQL 中的表和 schema,还开发了 Hudi Catalog 自动地去 Hudi 中创建目标表的 元信息。
(3) 在表结构变更的自动同步方面,引入了一个 Schema Evolution 的内核,使得 Flink Job 无需依赖外部服务就能实时同步 schem变更。
(4) 在整库同步方面,引入了 CDAS 语法, 一行 SQL 语句就能完成整库同步作业的定义,并且引入了 source 合并的优化,减轻对源端数据库的压力。还引入了CTAS的数据同步语法。 CDAS 和 CTAS 的数据同步语法的语法非常简单,CDAS 语法就是 create database as database,主要用于整库同步,像这里展示的这行语句就完成了从 MySQL 的 tpc_ds 库,整库同步至 Hudi 的 ods 库中。与之类似的,我们还有一个 CTAS 语法, 可以方便的用来支持表级别的同步,还可以通过正则表达式指定库名和表名,来完成分库分表合并同步。像这里就完成了 MySQL 的 user 分库分表合并到了 Hudi 的 users 表中。CDAS CTAS 的 语法,会自动地去目标端创建目标表,然后启动一个 Flink Job 自动同步全量+增量的数据,并且也会实时同步表结构变更。
之前提到千表入湖时,建立的数据库连接过多,Binlog 重复读取会造成源库的巨大压力。为解决这个问题,引入了 source 合并的优化,尝试合并同一作业中的 source,如果都是读的同一数据源,则会被合并成一个 source 节点,这时数据库只需要建立一个连接,binlog 也只需读取一次,就实现了整库的读取,降低了对数据库的压力。
如果对flink简化数据实时入湖入仓感兴趣的,可以在Flink Forward Asia 2021 峰会上观看官方人员准备的DEMO视频。
4 Flink CDC 使用示例
4.1 版本
hadoop-2.7.5 Flink-1.13.3 scala-2.11.8 Flink CDC 2.1.0
4.2 Flink环境配置
flink集群部署在三台服务器上
4.3 创建mysql数据源
create database test ;
create table test.user_source(
id int(4) PRIMARY KEY,
name varchar(20),
favor varchar(20),
area varchar(20),
ts datetime NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO `user_source` VALUES ('1', '段博士', 'music', 'BigData',curdate());
INSERT INTO `user_source` VALUES ('2', '马博士', 'game', 'MachineLearning',curdate());
DELETE FROM user_source WHERE id = *;
4.4 Flink CDC 下载
进入 https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.0/ 目录下,下载 flink-sql-connector-mysql-cdc-2.1.0.jar 将jar包放在三台服务器的$FLINK_HOME/lib/
4.5 启动Flink
启动Flink本地集群:
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/start-cluster.sh
启动Flink SQL Client
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded shell
在SQL Cli设置分析结果展示模式为:
set sql-client.execution.result-mode=tableau; (flink1.13版本要加sql-client.)
set execution.checkpointing.interval=3sec;
--流处理模式
set execution.runtime-mode = streaming;
4.6 使用FLINK CDC
在Flink SQL Client中创建输入表,通过FLINK CDC连接数据源
CREATE TABLE input1(
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
favor STRING,
area STRING,
ts STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'ip地址',
'port' = '3306',
'username' = '***',
'password' = '***',
'server-time-zone' = 'Asia/Shanghai',
'debezium.snapshot.mode' = 'initial',
'database-name' = 'test',
'table-name' = 'user_source'
);
select * from input1;
INSERT INTO `user_source` VALUES ('3', '小红', 'none', 'BigData',curdate());
|