数据的同步策略有全量同步和增量同步。
全量同步,就是每天都将业务数据库中的全部数据同步一份到数据仓库,这是保证两侧数据同步的最简单的方式。
增量同步,就是每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。
? 全量同步通常使用DataX、Sqoop等基于查询的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具,也可使用Maxwell、Canal等工具。
增量同步方案 | DataX、Sqoop | Maxwell、Canal |
---|
对数据库的要求 | 原理是基于查询,故若想通过select查询获取新增及变化数据,就要求数据表中存在create_time、update_time等字段,然后根据这些字段获取变更数据。 | 要求数据库记录变更操作,例如MySQL需开启binlog。 | 数据的中间状态 | 由于是离线批量同步,故若一条数据在一天中变化多次,该方案只能获取最后一个状态,中间状态无法获取。 | 由于是实时获取所有的数据变更操作,所以可以获取变更数据的所有中间状态。 |
DataX同步业务数据
(1)DataX概述
DataX 是阿里巴巴开源的一个异构数据源离线同步工具,源码地址:https://github.com/alibaba/DataX
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
DataX调度决策思路
举例来说,用户提交了一个DataX作业,并且配置了总的并发度为20,目的是对一个有100张分表的mysql数据源进行同步。
DataX的调度决策思路是:
1)DataX Job根据分库分表切分策略,将同步工作分成100个Task。
2)根据配置的总的并发度20,以及每个Task Group的并发度5,DataX计算共需要分配4个TaskGroup。
3)4个TaskGroup平分100个Task,每一个TaskGroup负责运行25个Task。
功能 | DataX | Sqoop |
---|
运行模式 | 单进程多线程 | MR | 分布式 | 不支持,可以通过调度系统规避 | 支持 | 流控 | 有流控功能 | 需要定制 | 统计信息 | 已有一些统计,上报需定制 | 没有,分布式的数据收集不方便 | 数据校验 | 在core部分有校验功能 | 没有,分布式的数据收集不方便 | 监控 | 需要定制 | 需要定制 |
(2)DataX安装
1)下载DataX安装包并上传到server16的/opt/yyds/apps
下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
2)解压datax.tar.gz到/opt/yyds/apps
[root@server16 apps]
3)自检,执行如下命令
[root@server16 datax]
出现如下内容,则表明安装成功
2022-04-25 21:09:25.133 [job-0] INFO JobContainer -
任务启动时刻 : 2022-04-25 21:09:15
任务结束时刻 : 2022-04-25 21:09:25
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
(3)DataX使用
DataX的使用十分简单,用户只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可。
[root@server16 datax]
第一种方式:查看模板文件
[root@server16 datax]
{
"job":{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"column":[
],
"connection":[
{
"jdbcUrl":[
],
"table":[
]
}
],
"password":"",
"username":"",
"where":""
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"column":[
],
"compress":"",
"defaultFS":"",
"fieldDelimiter":"",
"fileName":"",
"fileType":"",
"path":"",
"writeMode":""
}
}
}
],
"setting":{
"speed":{
"channel":""
}
}
}
}
第二种方式(官方文档):
Reader和Writer的具体参数可参考官方文档,地址如下:
https://github.com/alibaba/DataX/blob/master/README.md
https://gitee.com/mirrors/DataX/blob/master/README.md
例如:
mysqlreader示例如下:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
hdfswriter示例如下:
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
1)TableMode模式,从mysql到hdfs
MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。
{
"job":{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"column":[
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://server15:3306/gmall2022"
],
"table":[
"base_province"
]
}
],
"splitPk":"id",
"password":"root",
"username":"root",
"where":"id >= 1"
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"column":[
{
"name":"id",
"type":"bigint"
},
{
"name":"name",
"type":"string"
},
{
"name":"region_id",
"type":"string"
},
{
"name":"area_code",
"type":"string"
},
{
"name":"iso_code",
"type":"string"
},
{
"name":"iso_3166_2",
"type":"string"
}
],
"compress":"gzip",
"defaultFS":"hdfs://server16:8020",
"fieldDelimiter":"\t",
"fileName":"base_province",
"fileType":"text",
"path":"/base_province",
"writeMode":"append"
}
}
}
],
"setting":{
"speed":{
"channel":"1"
}
}
}
}
执行
[root@server16 datax]
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[HdfsWriter-02], Description:[您填写的参数值不合法.]. - 您配置的path: [/base_province] 不存在, 请先在hive端创建对应的数据库和表.
[root@server16 datax]
2022-04-25 21:48:56.178 [job-0] INFO JobContainer -
任务启动时刻 : 2022-04-25 21:48:44
任务结束时刻 : 2022-04-25 21:48:56
任务总计耗时 : 11s
任务平均流量 : 70B/s
记录写入速度 : 3rec/s
读出记录总数 : 34
读写失败总数 : 0
DROP TABLE IF EXISTS base_province;
CREATE EXTERNAL TABLE base_province
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '省份名称',
`region_id` STRING COMMENT '地区ID',
`area_code` STRING COMMENT '地区编码',
`iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
`iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
) COMMENT '省份表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/base_province';
注意事项:
HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(‘’),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。
解决该问题的方案:
在Hive中建表时指定null值存储格式为空字符串(‘’)
2)MySQLReader之QuerySQLMode模式(不能切分task)
{
"job":{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://server15:3306/gmall2022"
],
"querySql":[
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password":"root",
"username":"root"
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"column":[
{
"name":"id",
"type":"bigint"
},
{
"name":"name",
"type":"string"
},
{
"name":"region_id",
"type":"string"
},
{
"name":"area_code",
"type":"string"
},
{
"name":"iso_code",
"type":"string"
},
{
"name":"iso_3166_2",
"type":"string"
}
],
"compress":"gzip",
"defaultFS":"hdfs://server16:8020",
"fieldDelimiter":"\t",
"fileName":"base_province",
"fileType":"text",
"path":"/base_province",
"writeMode":"append"
}
}
}
],
"setting":{
"speed":{
"channel":"1"
}
}
}
}
DataX传参的用法如下,
在JSON配置文件中使用${param}引用参数,在提交任务时使用**-p"-Dparam=value"**传入参数值,具体示例如下。
注意:
需要先手动创建该日期目录,否则会报错
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[HdfsWriter-02], Description:[您填写的参数值不合法.]. - 您配置的path: [/base_province/2020-06-14] 不存在, 请先在hive端创建对应的数据库和表.
[root@server16 datax]
{
"job":{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://server15:3306/gmall2022"
],
"querySql":[
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password":"root",
"username":"root"
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"column":[
{
"name":"id",
"type":"bigint"
},
{
"name":"name",
"type":"string"
},
{
"name":"region_id",
"type":"string"
},
{
"name":"area_code",
"type":"string"
},
{
"name":"iso_code",
"type":"string"
},
{
"name":"iso_3166_2",
"type":"string"
}
],
"compress":"gzip",
"defaultFS":"hdfs://server16:8020",
"fieldDelimiter":"\t",
"fileName":"base_province",
"fileType":"text",
"path":"/base_province/${dt}",
"writeMode":"append"
}
}
}
],
"setting":{
"speed":{
"channel":"1"
}
}
}
}
3) 同步hdfs到mysql
{
"job":{
"content":[
{
"reader":{
"name":"hdfsreader",
"parameter":{
"defaultFS":"hdfs://server16:8020",
"path":"/base_province/2020-06-14",
"column":[
"*"
],
"fileType":"text",
"compress":"gzip",
"encoding":"UTF-8",
"nullFormat":"\\N",
"fieldDelimiter":"\t"
}
},
"writer":{
"name":"mysqlwriter",
"parameter":{
"username":"root",
"password":"root",
"connection":[
{
"table":[
"test_province"
],
"jdbcUrl":"jdbc:mysql://server15:3306/gmall2022?useUnicode=true&characterEncoding=utf-8"
}
],
"column":[
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"writeMode":"replace"
}
}
}
],
"setting":{
"speed":{
"channel":1
}
}
}
}
(4)优化
关键优化参数如下:
参数 | 说明 |
---|
job.setting.speed.channel | 总并发数 | job.setting.speed.record | 总record限速 | job.setting.speed.byte | 总byte限速 | core.transport.channel.speed.record | 单个channel的record限速,默认值为10000(10000条/s) | core.transport.channel.speed.byte | 单个channel的byte限速,默认值1024*1024(1M/s) |
注意事项:
1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channle的byte限速,总record限速/单个channel的record限速)
**配置示例:
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 1048576
}
}
}
},
"job": {
"setting": {
"speed": {
"byte" : 5242880
}
},
...
}
}
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json
|