跨集群拷贝Distcp性能测试
DistCp跨集群拷贝hive表测试方案一
1. DistCp是是什么?
? distcp是一个hdfs提供的工具。看官网关于distcp工具的描述: distCp官网教程链接
distcp(分布式复制)是一种用于大型集群间/集群内复制的工具,且支持不同hadoop版本间的数据传输复制。 它使用 MapReduce 来实现其分布、错误处理和恢复以及报告。它将文件和目录列表扩展为map任务的输入(一个只有map没有reduce的mapreduce任务),每个任务将复制源列表中指定的文件的一个分区。
2. DistCp的生产使用
? 注意distcp支持大型集群间/集群内部数据的复制,对于集群间的数据复制要注意源集群和目标集群的hadoop版本是否一致,如果不一致,复制的方式是不一致的,这个版本主要针对大版本的一致性性,如hadoop1.x,hadoop2.x,hadoop3.x之间的区别,对于小版本没有关系,本质是看版本之间使用的协议是否一致。 对于集群内的复制就没有什么区别了,用法跟hadoop cp差不多。因为Distcp实际开发中也主要是针对跨集群之间的数据传输。
2.1 Distcp语法与参数解析
2.1.1.distcp标准格式与参数
#将10.90.48.127上的文件传输到指定10.90.50.53上同名目录
hadoop distcp hdfs://10.90.48.127:8020/user/hive/warehouse/dm_smssdk_master.db/log_device_phone_dedup/
hdfs://10.90.50.53:8020/user/hive/warehouse/dm_smssdk_master.db/log_device_phone_dedup/
- 1. 注意,上面 10.90.48.127 和 10.90.50.53 都是两个集群的主namenode的ip,故distcp源ip和目标ip要和对应actvie的namenode的ip保一致,因为它是复制指定namespace空间下的文件所以要去找namenode.
-
- 端口这块要和namenode配置的通信端口一致fs.default.name, fs.defaultFS,不同集群可能配置的端口不一致,以实际为准。
-
- 源集群和目标集群在执行传输数据时,要有同名的hadoop用户,并且具有对应的权限。
-
- 相关distcp参数可以互相搭配使用
-
- 注意,使用hdfs协议的distcp即可以在源集群也可以在目标集群执行,在哪个集群执行使用哪个集群的资源。
distcp核心参数 | 参数含义 | 补充 |
---|
**-m ** | 同时启动的最大map数 -m 50,启动50个map | 指定要复制数据的map数。请注意,更多的map数不一定会提高吞吐量。因为DistCp 的最小工作单位是文件。即,一个文件仅由一个map处理。将map数量增加到超过文件数量的值不会产生性能优势。启动的Map数量将等于文件数量。 | -overwrite | 覆盖目的地的数据 | 跟hive overwrite用法一样 | -update | 如果源数据和目标数据的大小、块大小或校验和不同,则覆盖它,使用-update仅复制更改的文件。 | 这种主要用来更新复制,如果源数据和目标数不一致,则更新复制数据,将目标数据覆盖了。这种非常使用比如传输中途失败了,然后重传,不需要重头再传,有点类似断点续传。 | -bandwidth | 指定传输过程每个map的带宽,以 MB/秒为单位。-bandwidth 100,每个map最多使用100Mb带宽。 | 主要用来限制复制时节点使用的带宽,防止影响生产正常任务的执行,尤其对于带宽紧张的集群。每个Map将被限制为仅消耗指定的带宽。 | -numListstatusThreads | 用于构建文件列表的线程数 | 最多 40 个线程,一般不指定。 | -skipcrccheck | 是否跳过源路径和目标路径之间的 CRC 检查。 | 一般跳过,不然复制很慢。但是跳过会存在数据安全性降低的情况。 | -direct | 直接写入目标路径 | 当目标是对象存储时,可用于避免可能非常昂贵的临时文件重命名操作。一般不建议使用。一般实际传输都是先创建临时文件,最后mv重命名一下即可。 |
其他相关参数,参考官方文档:disctcp 官网使用介绍
2.2 跨集群不跨版本distcp使用hdfs协议传输
1.控制map个数,不校验,更新模式传输。注意这里设置了用户使用队列。
hadoop distcp -Dmapreduce.job.queuename=root.yarn_data_compliance \
-update -skipcrccheck -m 100 \
hdfs://10.90.48.127:8020/user/hive/warehouse/dm_smssdk_master.db/log_device_phone_dedup/ \
hdfs://10.90.50.53:8020/user/hive/warehouse/dm_smssdk_master.db/log_device_phone_dedup/
2.3跨集群跨HDFS版本的distcp使用webhdfs传输
为了在 Hadoop 的两个不同主要版本集群之间进行复制(例如在 2.X 和 3.X 之间),通常会使用 WebHdfsFileSystem。与之前的 HftpFileSystem 不同,由于 webhdfs 可用于读写操作,因此 DistCp 可以在源集群和目标集群上运行(HftpFileSystem。 这是一个只读文件系统,所以DistCp必须运行在目标端集群上,Hadoop3.x 已经对其废弃使用,使用webhdfs替代)
注意使用webhdfs时远程集群的URI格式为为webhdfs://<namenode_hostname>:<http_port>。在相同主要版本的 Hadoop 集群之间进行复制时(例如在 2.X 和 2.X 之间),使用 hdfs 协议以获得更好的性能。
案例1:cdh集群hadoop2.6往cdh hadoop3.3集群传输文件
hadoop distcp -update -skipcrccheck -m 50 \
webhdfs://10.90.48.127:50070/user/hive/warehouse/dm_smssdk_master.db/log_device_phone_dedup/day=20210701 \
webhdfs://10.90.50.54:9870/user/hive/warehouse/dm_smssdk_master.db/log_device_phone_dedup/day=20210701
如上,跨集群跨版本从10.90.28.127的hadoop2.6集群往10.90.50.54的hadoop3.3集群上传输文件,使用webhdfs协议。脚本运行在目标集群10.90.50.54上执行。当然也可以运行10.90.28.127上执行。注意这里两个集群的端口不一致,因为不同版本的端口不一致,以实际配置端口为准。 实际执行过程:
3. Distcp的其他介绍
3.1 distcp支持同时配置多个数据源的传输复制
3.2 distcp的架构
新的hadoop3.x DistCp 的组件可以分为以下几类:基于此我们可以自定义distcp jar包,定制化文件传输使用。
- DistCp 驱动程序
- 复制列表生成器
- 输入格式和 Map-Reduce 组件
本地集群测试
单个文件或hive表的跨集群复制
大集群3个节点:
- nameNode节点(dataNode节点):192.168.191.111
- dataNode节点:192.168.191.112
- dataNode节点:192.168.191.113
小集群1个节点:192.168.191.114—> servers:9820(active) nameNode节点
启动两个集群:
分别再两个集群上执行一下命令:
在大集群上创建一个库
create database db_hive;
源服务器 192.168.191.114: 9820==>hive数仓在hdfs上的存储路径下的db_hive.db下的一张表copy到 ==> 目标服务器 :192.168.191.111:8020/hive/warehouse/db_hive.db下面
执行distcp命令:
hadoop distcp hdfs://192.168.191.114:9820/user/hive/warehouse/db_hive.db/business hdfs://192.168.191.111:8020/hive/warehouse/db_hive.db
数据跨集群拷贝成功 在对应节点创建相同的数据库和表:目标端集群nameNode上使用Hue或者连接工具创建数据库和表
create database db_hive;
CREATE TABLE `business`(
`name` string,
`orderdate` string,
`cost` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'=',',
'serialization.format'=',')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://node01:8020/hive/warehouse/db_hive.db/business'
TBLPROPERTIES (
'bucketing_version'='2',
'transient_lastDdlTime'='1650619281')
如下图所示:
注意:
可能出现的错误:当源服务器和目标服务器的hdfs数据路径不同是,如果直接拷贝源端的建表数据则会出现数据加载失败!空表无数据!!
但是发现表和数据并未关联起来:表中并没有数据,这是为什么呢?但是从111节点的hdfs-web页面访问查看,对应目录有数据文件。那为什么表中没有数据呢?
通过Navicat连接mysql数据库,查看hive的目标服务器111节点的hive元数据库metastore发现db_hive这个库对应的元数据记录与源服务器节点114的元数据库的对应数据不同,DB_ID目标端是32,源端是26,OWNER_TYPE源端是root,目标端是anonymous
-
源端节点元数据库信息 -
目标端节点元数据库信息
如果是多个表和数据文件或者数据文件较大时又该怎么办? 难道要一张表一张表的手动创建吗?这样数据迁移效率岂不是很低下?
这里可以使用Shell脚本,参照下面这个使用案例。
DistCp跨集群拷贝hive表测试方案二
CDH不同集群之间hive/impala数据迁移(hadoop distcp)
CDH管理界面可直接配置进行自动化的迁移(可通过备份-复制计划来定制hive的迁移);本文介绍使用hadoop distcp命令批量迁移hive数据。
迁移步骤:
hadoop distcp迁移分为三个步骤:
- 1、批量导表
- 2、hive数据文件的迁移
- 3、分区恢复
批量导表脚本
批量获取表结构在目标集群建表==
where_src_table_info="/home/hadoop/hive_db_tables"
hive -e " show databases; exit ;" | grep -v default | grep -v test > ${where_src_table_info}/databases.txt
if [ ! -d "${where_src_table_info}/tables" ] ; then
mkdir "${where_src_table_info}/tables"
fi
if [ ! -d "${where_src_table_info}/desc_table" ] ; then
mkdir "${where_src_table_info}/desc_table"
fi
for database in `cat ${where_src_table_info}/databases.txt`
do
{
echo "database:${database}"
hive -e " use $database ; show tables ; exit ;" > ${where_src_table_info}/tables/$database
sed -i '1d' ${where_src_table_info}/tables/$database
sed -i "/WARN:/d" ${where_src_table_info}/tables/$database
for table in `cat ${where_src_table_info}/tables/$database `
do
if [ ! -d "${where_src_table_info}/desc_table/$database" ] ; then
mkdir "${where_src_table_info}/desc_table/$database"
fi
echo "table:${database}.${table}"
hive -e "use $database ; show create table $table ;" >> ${where_src_table_info}/desc_table/$database/${table}.sql
echo >> ${where_src_table_info}/desc_table/$database/${table}.sql
sed -i "/WARN:/d" `grep -rl WARN: ${where_src_table_info}/desc_table/$database/${table}.sql`
sed -i "/createtab_stmt/d" `grep -rl createtab_stmt ${where_src_table_info}/desc_table/$database/${table}.sql`
sed -i "s/\`//g" ${where_src_table_info}/desc_table/$database/${table}.sql
echo ";" >> ${where_src_table_info}/desc_table/$database/${table}.sql
v_create_sql=`cat ${where_src_table_info}/desc_table/$database/${table}.sql`
echo $v_create_sql
ssh username@xx.xx.xx.xx " hive -e \"use $database; $v_create_sql \" ; impala-shell -q \"invalidate metadata ${database}.${table} \" "
done
}
done
跨集群拷贝数据文件脚本
集群之间数据文件移动==
#!/bin/bash
where_src_table_info="/home/hadoop/hive_db_tables"
v_database='db_name1
db_name2'
for database in $v_database
do
ret=`ssh user_name@xx.xx.xx.xx " cat ${where_src_table_info}/tables/$database "`
echo "开始移动数据"${database}-${ret}
for tem in $ret;
do
echo "开始移动数据"${database}-${tem}
starttime=`date +%s`
hadoop distcp -bandwidth 300 -m 50 hdfs://xx.xx.xx.xx:8020/user/hive/warehouse/${database}.db/${tem}/* hdfs://xx.xx.xx.xx:8020/user/hive/warehouse/${database}.db/${tem}
endtime=`date +%s`
echo "结束distcp,本次移动"${database}"."$tem"花费时间"$[$(($endtime - $starttime))/60]"分钟"
done
done
为分区表添加分区脚本
=添加分区
#!/bin/bash
v_table_names='databasename.tab_name'
for table_name in $v_table_names
do
v_start_data_str=`hdfs dfs -ls /user/hive/warehouse/${table_name%.*}.db/${table_name`
lvv_start_timekey=${v_start_data_str##*date_timekey=}
v_end_data_str=`hdfs dfs -ls /user/hive/warehouse/${table_name%.*}.db/${table_name`
lvv_end_timekey=${v_end_data_str##*date_timekey=}
lvn_start_sec=`date -d "$lvv_start_timekey" "+%s"`
lvn_end_sec=`date -d "$lvv_end_timekey" "+%s"`
echo "==============================开始${table_name}分区恢复,恢复开始时间${lvv_start_timekey}=========================================="
for((i=$lvn_start_sec;i<$lvn_end_sec;i+=86400))
do
lvv_data_start=`date -d "@$i" "+%Y%m%d"`
echo $lvv_data_start
j=$[ i + 86400 ]
lvv_data_end=`date -d "@$j" "+%Y%m%d"`
echo $lvv_data_end
impala-shell -q "ALTER TABLE ${table_name} ADD IF not EXISTS PARTITION (date_timekey='$lvv_data_start');"
done
echo "==============================结束${table_name}分区恢复=========================================="
done
迁移的过程中可能遇见到的问题
? 1、目标集群用root用户执行 hadoop distcp迁移的时候没有有报错,但是日志显示任务提交后一直没有反应,在 mapreduce.Job: Running job: 之后日志就卡着不动,map任务就是不run:
21/08/22 13:12:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1629427129196_0056
21/08/22 13:12:45 INFO mapreduce.JobSubmitter: Executing with tokens: []
21/08/22 13:12:45 INFO conf.Configuration: resource-types.xml not found
21/08/22 13:12:45 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/08/22 13:12:45 INFO impl.YarnClientImpl: Submitted application application_1629427129196_0056
21/08/22 13:12:45 INFO mapreduce.Job: The url to track the job: http://t4hadoopap01:8088/proxy/application_1629427129196_0056/
21/08/22 13:12:45 INFO tools.DistCp: DistCp job-id: job_1629427129196_0056
21/08/22 13:12:45 INFO mapreduce.Job: Running job: job_1629427129196_0056
由于用的测试集群,性能比较差,查看cdh界面的yarn任务有很多卡在那里,导致任务无法执行。清除卡死的任务。
? 2、ERROR tools.DistCp: Exception encountered 报错
ERROR tools.DistCp: Exception encountered
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/.staging/_distcp-683351060/fileList.seq could only be written to 0 of the 1 minReplication nodes. There are 4 datanode(s) running and 4 node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2102)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2673)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:872)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:550)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)
3、ERROR tools.DistCp: Exception encountered File /user/hive/.staging/_distcp-683351060/fileList.seq could only be written to 0
4、ERROR tools.DistCp: Invalid arguments
ERROR tools.DistCp: Invalid arguments:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:88)
at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1962)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1421)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3055)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1151)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:940)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499)
at org.apache.hadoop.ipc.Client.call(Client.java:1445)
at org.apache.hadoop.ipc.Client.call(Client.java:1355)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:875)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1630)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1496)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1493)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1508)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1617)
at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:241)
at org.apache.hadoop.tools.DistCp.run(DistCp.java:143)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.tools.DistCp.main(DistCp.java:432)
Invalid arguments: Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:88)
at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1962)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1421)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3055)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1151)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:940)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)
大概意思参数不对,提示在待机状态下不支持读取操作类别。
使用相同的 hadoop distcp 命令,第一次成功迁移,第二次执行失败。 最后查证由于修改了目标集群的参数,重启了HDFS,导致活动namenode的ip发生变更,之前namenode的ip成为备用节点,处于待机状态。修改目标主机的namemode的ip即可。
|