将数据从PolarDB中导入到数仓:
从PolarDB向hive数仓导入数据的json文件
{
"job": {
"setting": { "speed": {"channel":2} }, --设置为2个线程
"content": [
{
"reader": {
"name": "mysqlreader", --从PolarDB中读(PolarDB本质上是mysql)
"parameter": {
"username": "weixin_prod",
"password": "PLVAU6Lf1ioGuZw1",
"connection": [
{
"querySql": [
"
SELECT
-- 这里使用REPLACE 是处理数据文件中的 \r \n 转义字符问题
REPLACE(REPLACE(REPLACE(id,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(campaign_id,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(campaign_name,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(campaign_status,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(begin_time,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(account_name,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(account_id,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(view_count,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(click_count,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(ctr,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(cpc,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(cost,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(product_type_name,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(advert_type,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(req_time,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(day_budget,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(thousand_display_price,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(core_pos_ad_exposure,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(core_pos_ad_download,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(core_pos_ad_download_rate,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(yyb_download_rate,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(add_desktop_pv,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(add_desktop_cost,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(op_status,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(op_status_text,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(mp_advert_type,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(create_time,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(update_time,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(status,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(app_id,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(agency_type,'\r\n',''),'\n',''),'\r',''),
REPLACE(REPLACE(REPLACE(data_time,'\r\n',''),'\n',''),'\r','')
FROM admin_tencent_advert_campaign_day
"
],
"jdbcUrl": ["jdbc:mysql://47.98.254.34:1010/weixin_ad"]
}
]
}
},
"writer": {
"name": "osswriter",
"parameter": {
"maxFileSize":1, --设置生成的文件最小尺寸(这里设置的为1MB但是系统默认最小为10MB,最大的话很大官网有说)
"endpoint": "${oss_endpoint}",
"accessId": "${oss_accessId}",
"accessKey": "${oss_accessKey}",
"bucket": "${oss_bucket}",
"object": "user/hive/warehouse/prod/data_yidai/base/ods/ods_yd_admin_tencent_advert_campaign_day/dt=${dt}/tencentAdvert", --将数据文件写入到具体的路径下,这里的tencentAdvert是跟系统生产的随机串进行拼接(系统一般会生产__开头的随机字符串),如果不进行拼接,数仓中的表可能映射不到数据(即加载不到文件中的数据)
"encoding": "UTF-8", --字符集避免中文乱码
"fieldDelimiter": "\u0001", --设置文件分割符
"writeMode": "truncate" --截断写入避免多次执行侧配置文件,造成数据重复
}
}
}
]
}
}
执行PolarDB中向hive数仓中数据导入的shell脚本
#!/bin/bash
source ~/.bashrc
source "${QY_PROFILE_SCRIPT_PATH}/code/DataWarehouse/conf/datax.cfg"
. ${QY_PROFILE_SCRIPT_PATH}/code/DataWarehouse/common/all.sh
# QY_PROFILE_SCRIPT_PATH = /opt/projects/bigdata_qiyue-profile
#获取日期
data_1="$(date "+%Y%m%d%H%M%S" -d "-1 day")"
year=${data_1:0:4}
month=${data_1:4:2}
day=${data_1:6:2}
dt=${year}-${month}-${day}
echo "dt is:${dt}"
#打印日志到指定位置
#LOG_FILE="/var/log/test.log"
#>"${LOG_FILE}"
#exec &>>${LOG_FILE}
#set -x
hive_db="ods_yd_db"
hive_tablename="ods_yd_admin_tencent_advert_campaign_day"
tb_location="hive数仓表文件存储路径"
# 截至时间设置为当前时间戳
#end_time=$(date "+%s")
# 开始时间设置为一个小时前时间戳
#start_time=$(($end_time-3600))
# 运行命令,如果有多个表就重复写几条就可以
#sql="
#create external table if not exists ${hive_db}.${hive_tablename}(
#id bigint comment '' ,
#campaign_id string comment '' ,
#campaign_name string comment '' ,
#campaign_status int comment '' ,
#begin_time string comment '' ,
#account_name string comment '' ,
#account_id string comment '' ,
#view_count int comment '' ,
#click_count int comment '' ,
#ctr double comment '' ,
#cpc double comment '' ,
#cost double comment '' ,
#product_type_name string comment '' ,
#advert_type int comment '' ,
#req_time string comment '' ,
#day_budget double comment '' ,
#thousand_display_price double comment '' ,
#core_pos_ad_exposure int comment '' ,
#core_pos_ad_download int comment '' ,
#core_pos_ad_download_rate double comment '' ,
#yyb_download_rate double comment '' ,
#add_desktop_pv int comment '' ,
#add_desktop_cost double comment '' ,
#op_status int comment '' ,
#op_status_text string comment '' ,
#mp_advert_type int comment '' ,
#create_time string comment '' ,
#update_time string comment '' ,
#tatus int comment '' ,
#app_id string comment '' ,
#agency_type int comment ' ' ,
#data_time string comment ' '
#)
#row format delimited fields terminated by '\001'
#location '${tb_location}';
#"
cd /opt/datax/bin/
python datax.py ${QY_PROFILE_SCRIPT_PATH}/code/extract/datax/ods_yd_tencent_advert_load-data/ods_yd_admin_tencent_advert_shell_and_json/ods_yd_admin_tencent_advert_campaign_day_data.json -p " -Dqy_mysql_user=${qy_mysql_user} -Dqy_mysql_pwd=${qy_mysql_pwd} -Doss_endpoint=${oss_endpoint} -Doss_accessId=${oss_accessId} -Doss_accessKey=${oss_accessKey} -Doss_bucket=${oss_bucket} -Ddt=${dt}"
exec_hql "${sql}"
# -p "-Dstart_time=${start_time} -Dend_time=${end_time}"
将数据从数仓的ods层的表中导入到clickhouse表中
{
"job": {
"setting": {"speed": {"channel":4 }},
"content": [
{
"reader": {
"name": "ossreader",
"parameter": {
"endpoint": "${oss_endpoint}",
"accessId": "${oss_accessId}",
"accessKey": "${oss_accessKey}",
"bucket": "${oss_bucket}",
"object": ["user/hive/warehouse/prod/data_yidai/base/ods/ods_yd_admin_tencent_advert_campaign_day/dt=${dt}/*"],
"column": [
{
"type": "long",
"index": 0
} ,
{
"type": "long",
"index": "1"
},
{
"type": "string",
"index": "2"
},
{
"type": "string",
"index": "5"
},
{
"type": "long",
"index": "6"
},
{
"type": "string",
"index": "4"
},
{
"type": "date",
"index": "14",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"type": "date",
"index": "26" ,
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"type": "date",
"index": "27" ,
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"type": "string",
"index": "13"
},
{
"type": "string",
"index": "25"
} ,
{
"type": "string",
"index": "3"
},
{
"type": "string",
"index": "11"
},
{
"type": "string",
"index": "7"
},
{
"type": "string",
"index": "8"
},
{
"type": "string",
"index": "9"
},
{
"type": "string",
"index": "21"
}
],
"encoding": "UTF-8",
"fieldDelimiter": "\u0001",
"nullFormat":"\\N"
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"username": "testuser",
"password": "1a2s3d4f",
"column": [
"id",
"campaign_id",
"campaign_name",
"account_name",
"account_id",
"begin_time"
"create_time",
"create_time_t",
"update_time_t",
"advert_type"
"mp_advert_type",
"campaign_status",
"cost",
"show",
"click",
"ctr",
"add_desktop_pv"
],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://121.40.53.178:8123/bigdata",
"table": ["bigdata.ldy_uniform_advert_history_v2"]
}
],
"preSql":["truncate table bigdata.ldy_uniform_advert_history_v2"],
"postSql": ["optimize table bigdata.ldy_uniform_advert_history final"],
"batchSize": 65536,
"batchByteSize": 134217728,
"dryRun": false,
"writeMode": "insert"
}
}
}
]
}
}
执行hive数仓表中数据向hive数仓中导入数据的shell脚本
#!/bin/bash
source ~/.bashrc
source "${QY_PROFILE_SCRIPT_PATH}/code/DataWarehouse/conf/datax.cfg"
. ${QY_PROFILE_SCRIPT_PATH}/code/DataWarehouse/common/all.sh
# QY_PROFILE_SCRIPT_PATH = /opt/projects/bigdata_qiyue-profile
#获取日期
data_1="$(date "+%Y%m%d%H%M%S" -d "-1 day")"
year=${data_1:0:4}
month=${data_1:4:2}
day=${data_1:6:2}
dt=${year}-${month}-${day}
echo "dt is:${dt}"
#打印日志到指定位置
#LOG_FILE="/var/log/test.log"
#>"${LOG_FILE}"
#exec &>>${LOG_FILE}
#set -x
cd /opt/datax/bin/
python datax.py ${QY_PROFILE_SCRIPT_PATH}/code/extract/datax/ods_yd_tencent_advert_load-data/ods_yd_admin_tencent_advert_shell_and_json/ods_ads_yd_admin_tencent_advert.json -p " -Doss_endpoint=${oss_endpoint} -Doss_accessId=${oss_accessId} -Doss_accessKey=${oss_accessKey} -Doss_bucket=${oss_bucket} -Ddt=${dt}"
# -p "-Dstart_time=${start_time} -Dend_time=${end_time}"
|