AWS Glue 实践
Glue Job 配置
ps:IAM的权限配置请自行摸索,权限上基本是缺啥补啥,如果只是作为测试,直接给所有的权限(action和resource都是*)就可以一劳永逸了
Job Pramater
Key | Value |
---|
–conf | spark.serializer=org.apache.spark.serializer.KryoSerializer | –enable-glue-datacatalog | |
Dependent jars path
s3://gavin-test2/dependency_jars/hudi/spark-avro_2.11-2.4.3.jar,s3://gavin-test2/dependency_jars/hudi/hudi-spark-bundle_2.11-0.8.0.jar
jar 的下载路径:
Jar包 | 下载链接 |
---|
hudi-spark-bundle_2.11-0.8.0.jar | https://search.maven.org/remotecontent?filepath=org/apache/hudi/hudi-spark-bundle_2.11/0.8.0/hudi-spark-bundle_2.11-0.8.0.jar | spark-avro_2.11-2.4.3.jar | https://search.maven.org/remotecontent?filepath=org/apache/spark/spark-avro_2.11/2.4.3/spark-avro_2.11-2.4.3.jar |
存储为非分区表
Python Code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
basePath = 's3://gavin-test2/tables/hudi/table1/'
table_name = 'table1'
database = 'default'
data = [('Alice', 1, '2022/02/28'), ('Jhone', 2, '2022/03/01')]
rdd = sc.parallelize(data)
schema = StructType(
[
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("partitioin_path", StringType(), True),
]
)
src_df = spark.createDataFrame(rdd, schema)
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.operation': 'insert',
'hoodie.datasource.write.recordkey.field': 'name',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': 'default',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.table': table_name,
'hoodie.datasource.write.partitionpath.field': '',
'hoodie.datasource.hive_sync.partition_fields': '',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor'
}
src_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)
AWS Glue Catalog
Query in Athena
_hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | name | age | partitioin_path |
---|
20220307080836 | 20220307080836_0_1 | Jhone | | 8965ef34-4048-4420-8e69-562a478c3989-0_0-13-481_20220307080836.parquet | Jhone | 2 | 2022/03/01 | 20220307080836 | 20220307080836_0_2 | Alice | | 8965ef34-4048-4420-8e69-562a478c3989-0_0-13-481_20220307080836.parquet | Alice | 1 | 2022/02/28 |
Files in S3 Bucket
存储为分区表
分区表可以选择时间字段作为分区字段,也可以选择非时间字段作为分区字段,本文使用形如「yyyy/mm/dd」的时间字符串作为分区字段;
Python Code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
basePath = 's3://gavin-test2/tables/hudi/table2/'
table_name = 'table2'
database = 'default'
data = [('Alice', 1, '2022/02/28'), ('Jhone', 2, '2022/03/01')]
rdd = sc.parallelize(data)
schema = StructType(
[
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("partitioin_path", StringType(), True),
]
)
src_df = spark.createDataFrame(rdd, schema)
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.operation': 'insert',
'hoodie.datasource.write.recordkey.field': 'name',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': 'default',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.table': table_name,
'hoodie.datasource.write.partitionpath.field': 'partitioin_path',
'hoodie.datasource.hive_sync.partition_fields': 'partitioin_path',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.SimpleKeyGenerator',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor'
}
src_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)
AWS Glue Catalog
Query in Athena
_hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | name | age | partitioin_path |
---|
20220307084134 | 20220307084134_0_1 | Jhone | 2022/03/01 | afb8d8e3-5f3e-4420-b390-1eeb20a59165-0_0-8-322_20220307084134.parquet | Jhone | 2 | 2022-03-01 | 20220307084134 | 20220307084134_1_1 | Alice | 2022/02/28 | 351c3c19-3c69-4661-8051-11a90c031112-0_1-8-323_20220307084134.parquet | Alice | 1 | 2022-02-28 |
Files in S3 Bucket
FAQ
ClassNotFoundException: org.apache.calcite.rel.type.RelDataTypeSystem
Error Info
在Hudi中设置了:DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY -> “false”,具体错误为:
java.lang.ClassNotFoundException: org.apache.calcite.rel.type.RelDataTypeSystem
这是由于Hiv3/Spark3移除了对于calcite包的依赖引起的
Solution:
我偷了个懒,将spark 的版本降到2.X
IllegalArgumentException: Partition path default is not in the form yyyy/mm/dd
Error Info
Caused by: java.lang.IllegalArgumentException: Partition path default is not in the form yyyy/mm/dd
at org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor.extractPartitionValuesInPath(SlashEncodedDayPartitionValueExtractor.java:55)
at org.apache.hudi.hive.HoodieHiveClient.getPartitionEvents(HoodieHiveClient.java:220)
at org.apache.hudi.hive.HiveSyncTool.syncPartitions(HiveSyncTool.java:221)
... 42 more
Solution
由于配置'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor' , 而org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor 中要求时间格式必须是这个类型
org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor#extractPartitionValuesInPath
参考
[1] Hudi 实践 | 在 AWS Glue 中使用 Apache Hudi: https://jishuin.proginn.com/p/763bfbd56de6
[2] 详解Apache Hudi如何配置各种类型分区: https://www.cnblogs.com/leesf456/p/13521694.html
[3] EMR + Hudi报ClassNotFoundException: RelDataTypeSystem错误的解决方法: https://blog.csdn.net/bluishglc/article/details/117441071
|