关于增量更新 DataX 支持多种数据库的读写, json 格式配置文件很容易编写, 同步性能很好, 通常可以达到每秒钟 1 万条记录或者更高, 可以说是相当优秀的产品, 但是缺乏对增量更新的内置支持。
其实增量更新非常简单, 只要从目标数据库读取一个最大值的记录, 可能是 DateTime 或者 RowVersion 类型, 然后根据这个最大值对源数据库要同步的表进行过滤, 然后再进行同步即可。
由于 DataX 支持多种数据库的读写, 一种相对简单并且可靠的思路就是:
1.利用 DataX 的 DataReader 去目标数据库读取一个最大值; 2.将这个最大值用 TextFileWriter 写入到一个 CSV 文件; 3.用 Shell 脚本来读取 CSV 文件, 并动态修改全部同步的配置文件; 4.执行修改后的配置文件, 进行增量同步。 5.将shell脚本加入crontab命令中
源端数据库192.168.56.100,目标端192.168.56.113
要实现增量更新, 首先要 oraclereader 从目标数据库读取最大日期, 并用 TextFileWriter 写入到一个 csv 文件,
[root@db01 job]# cat oracletocsv.json
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"username": "hbhe",
"password": "wwwwww",
"column": [{"index": "0","type": "substring"}],
"splitPk": "",
"connection": [
{
"querySql": [
"SELECT max(update_time) FROM t1"
],
"jdbcUrl": [
"jdbc:oracle:thin:@192.168.56.113:1521:orcl"
]
}
]
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"fileName": "minute_data_max_time_result",
"fileFormat": "csv",
"path": "/datax/",
"writeMode": "truncate"
}
}
}
]
}
}
[root@db01 job]# cat oracletooracle.json
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"username": "c##hbhe",
"password": "wwwwww",
"column": [{"index": "0","type": "TIMESTAMP"}],
"splitPk": "",
"connection": [
{
"querySql": [
"select * from t1 where 1=1"
],
"jdbcUrl": [
"jdbc:oracle:thin:@192.168.56.100:1521:orcl"
]
}
]
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"username": "hbhe",
"password": "wwwwww",
"column": ["*"],
"preSql": [
"truncate table t1"
],
"connection": [
{
"table": [
"t1"
],
"jdbcUrl": "jdbc:oracle:thin:@//192.168.56.113:1521/orcl"
}
]
}
}
}
]
}
}
使用脚本实现增量同步功能
[root@db01 bin]# cat fullSync.sh
#!/bin/sh
###############################################################################################################
#Script name: fullSync.sh
#Script description: Sync data from oracle to oracle
#Current Release Version: 1.0.0
#Script Owner: He ,Haibo
#Latest editor: He, Haibo
#Support platform: Linux OS for redhat and centos.
#Change log:
#Descript:
#
#
###############################################################################################################
python /datax/bin/datax.py /datax/job/oracletocsv.json
if [[ $? -ne 0 ]];then
echo "fullSync.sh error, can not get max_time from target db!"
exit 1
fi
RESULT_FILE=`ls /datax/minute_data_max_time_result_*`
MAX_TIME=`cat $RESULT_FILE`
if [[ "$MAX_TIME" != "null" ]];then
WHERE="to_char(update_time,'yyyy-MM-dd HH:mi:ss') > '$MAX_TIME'"
sed "s/1=1/$WHERE/g" /datax/job/oracletooracle.json > /datax/job/oracletooracle_tmp.json
sed '41d' /datax/job/oracletooracle_tmp.json > /datax/job/oracletooracle_inc.json
python /datax/bin/datax.py /datax/job/oracletooracle_inc.json
if [[ -f /datax/job/oracletooracle_inc.json ]];then
rm -rf /datax/job/oracletooracle_inc.json
fi
if [[ -f /datax/job/oracletooracle_tmp.json ]];then
rm -rf /datax/job/oracletooracle_tmp.json
fi
else
python /datax/bin/datax.py /datax/job/oracletooracle.json
fi
insert into t1 values(to_timestamp(‘2021-10-22 15:23:23.123456’,’yyyy-mm-dd hh24:mi:ss.ff’));
select to_char(update_time,’yyyy-MM-dd hh24:mi:ss’) from t1;
|