首先,我们需要梳理清楚SQL Server数据库中SSIS包及表、视图、用户定义函数和存储过程之间的依赖关系。操作上可以使用SQL Server的系统存储过程sp_depends和INFORMATION_SCHEMA架构下面的系统视图来查看数据库对象之间的关系。对于SSIS包,可以将发布在SQL Server服务器上的SSIS包用系统存储过程导出为二进制数据流,然后解析出其中用到数据库对象和dtsx文件,然后将得到的依赖性关系的信息存储到数据库或文件中。迁移ETL程序的时候,可以根据依赖性关系,确定迁移项目的顺序和迁移SSIS包或数据库对象的先后顺序。 然而SQL Server的T-SQL语言不擅长处理二进制数据,故我们可以用SQL CLR存储过程,用C#编程的方法来处理二进制数据。其实,从本质上来说,SSIS包的ispac文件或SQL Server数据库里存储的已经发布的包的格式是Zip格式的压缩包,压缩包里面包含多个Xml格式并以dtsx为扩展名的文件,这些文件里面就包含SQL语句和源表或视图以及目标表的信息,SQL语句又可以通过程序代码的语法分析得到其中的数据库对象名称。同时可以用将表导出为数据文件,比如Csv文件,导出的时候将数据字段全部导出为字符串以保留原始数据的精度,压缩为Zip或Gzip格式文件之后上传到S3,在EMR里跑PySpark程序,将文件解压并转换为Parquet格式。另外我们还可以在公司内网的电脑上安装Spark,接着用PySpark程序直接连到SQL Server数据库,将表的数据保存为Parquet格式数据文件,然后直接上传到S3。然后建立Hive外部表连接到S3上的Parquet格式数据文件,并根据SQL Server数据库里已经存在的视图定义,在Hive里面创建对应的视图。而存储过程和用户定义函数的逻辑可以在公共库里创建PySpark函数来实现。但是由于Parquet格文件不能修改或追加记录,只能删除后重新创建,所以需要对原来的T-SQL里面代码进行等效代码替换。比如,对于更新SQL语句,可以在表创建的时候加入更新SQL语句中的字段,并以更新的条件作为CASE WHEN筛选的条件。而对于删除SQL语句,可以在表创建的时候可以排除删除的条件。对于Merge合并语句,可以使用将需要合并数据的表和用来合并的表、视图或查询FULL JOIN一下,然后取后者的值返回,再通过判断合并目标表和源数据的字段值是否为NULL来确定要不要在合并数据的同时插入或删除数据。当然,SQL Server数据库里的表分区设置还是可以继续应用在Hive表分区中。另外,在编写PySpark代码的过程中,还有一些内存和磁盘缓存以及AQE等其它的技巧来优化运行性能。等数据迁移完毕,并且PySpark程序也开发好之后。接下来我们需要一个调度平台来管理PySpark程序的运行和监控,比如Airflow,用来连接到PySpark程序并按项目将PySpark程序放在不同的目录下分类存储在S3中。
|