Spark 读 S3 Parquet 写入 Hudi 表
目录
Spark 读 S3 Parquet 写入 Hudi 表
参考
关于S3,S3N和S3A的区别与联系
Spark 读写 S3 Parquet 文件
测试代码
pom.xml
配置文件
EMR Spark任务提交
spark-shell
spark-submit
Spark 读写 Hudi
本地测试
代码
集群上测试
spark-shell
spark-sql
Spark-submit
Hive 中测试
问题解决
参考
Hadoop-aws
Apache Hadoop Amazon Web Services support – Hadoop-AWS module: Integration with Amazon Web Services
EMR 对应版本
Amazon EMR release 6.5.0 - Amazon EMR
EMR Spark
Apache Spark - Amazon EMR
EMR Hudi
Hudi - Amazon EMR
关于S3,S3N和S3A的区别与联系
首先是三种协议的访问大小有区别; 其次S3是block-based
s3n/s3a是 object-based 最后S3A是apache推荐的访问方式,且S3访问方式将会慢慢被替代,AWS不赞成使用S3访问,且S3A更加稳定安全高效
S3 Block FileSystem (URI scheme: s3) A block-based filesystem backed by S3. Files are stored as blocks, just like they are in HDFS. This permits efficient implementation of renames. This filesystem requires you to dedicate a bucket for the filesystem - you should not use an existing bucket containing files, or write other files to the same bucket. The files stored by this filesystem can be larger than 5GB, but they are not interoperable with other S3 tools.
S3A (URI scheme: s3a) A successor to the S3 Native, s3n fs, the S3a: system uses Amazon's libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for /successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.
S3 Native FileSystem (URI scheme: s3n) A native filesystem for reading and writing regular files on S3. The advantage of this filesystem is that you can access files on S3 that were written with other tools. Conversely, other tools can access files written using Hadoop. The disadvantage is the 5GB limit on file size imposed by S3.
Spark 读写 S3 Parquet 文件
测试代码
package org.zero
?
import com.amazonaws.auth.{ClasspathPropertiesFileCredentialsProvider, DefaultAWSCredentialsProviderChain}
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
import org.utils.SparkUtils
import software.amazon.awssdk.auth.credentials.{EnvironmentVariableCredentialsProvider, ProfileCredentialsProvider}
?
object SparkS2Test {
?private var logger: org.slf4j.Logger = _
?
?def main(args: Array[String]): Unit = {
? ?logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
? ?Logger.getLogger("org.apache.hadoop").setLevel(Level.INFO)
? ?Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
? ?Logger.getLogger("org.spark_project.jetty").setLevel(Level.WARN)
?
? ?val start = System.currentTimeMillis()
? ?logger.warn(s"=================== Spark 读取 S3 ===================")
?
? ?val spark = SparkUtils.getSparkSession(this.getClass.getSimpleName, "local[*]")
? ?val sc = spark.sparkContext
? ?sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
? ?sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
? ?sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
?
? ?val dataframe = spark
? ? .read
? ? .parquet("s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09")
?
? ?val tmpCache = dataframe.cache()
? ?tmpCache.createOrReplaceTempView("parquet_tmp_view")
?
? ?val dataFrame2 = spark.sql("select * from parquet_tmp_view limit 10")
?
? ?dataFrame2.show
?
// ? dataFrame2.write.parquet("F:\\tmp\\output")
?
? ?spark.stop()
?
? ?val end = System.currentTimeMillis()
? ?logger.warn(s"=================== 耗时: ${(end - start) / 1000} 秒 ===================")
}
}
package org.utils
?
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
?
object SparkUtils {
?private val sparkConf: SparkConf = new SparkConf()
?
?def getSparkConf(appName: String, master: String): SparkConf = {
? ?sparkConf.setMaster(master).setAppName(appName)
}
?
?def getSparkSession(appName: String, master: String): SparkSession = {
? ?sparkConf.setMaster(master).setAppName(appName)
? ?sparkSessionInit
}
?
?lazy val sparkSessionInit: SparkSession = SparkSession.builder()
? .config(sparkConf)
? .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
? .config("spark.io.compression.codec", "snappy")
? .config("spark.rdd.compress", "true")
? .config("spark.hadoop.parquet.writer.version", "v2")
? .config("spark.sql.parquet.enableVectorizedReader", "false")
? .config("spark.sql.parquet.compression.codec", "false")
? .config("spark.sql.parquet.compression.codec", "snappy")
? .config("spark.sql.parquet.filterPushdown", "true")
? .config("spark.sql.parquet.mergeSchema", "true")
? .config("spark.sql.parquet.binaryAsString", "true")
? .getOrCreate()
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
?
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
? ? ? ? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
? ?<modelVersion>4.0.0</modelVersion>
?
? ?<groupId>org.example</groupId>
? ?<artifactId>spark-s3-hudi-test</artifactId>
? ?<version>1.0-SNAPSHOT</version>
?
? ?<name>spark-s3-hudi-test</name>
?
? ?<properties>
? ? ? ?<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
? ? ? ?<maven.compiler.source>1.8</maven.compiler.source>
? ? ? ?<maven.compiler.target>1.8</maven.compiler.target>
? ? ? ?<scala.maven.plugin.version>4.3.0</scala.maven.plugin.version>
? ? ? ?<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
? ? ? ?<maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
? ? ? ?<scala.version>2.12.13</scala.version>
? ? ? ?<scala.binary.version>2.12</scala.binary.version>
? ? ? ?<spark.version>3.1.2</spark.version>
? ? ? ?<hadoop.version>3.2.1</hadoop.version>
? ? ? ?<fasterxml.jackson.version>2.10.0</fasterxml.jackson.version>
? ? ? ?<project.build.scope>compile</project.build.scope>
? ?</properties>
?
? ?<repositories>
? ? ? ?<repository>
? ? ? ? ? ?<id>emr-6.5.0-artifacts</id>
? ? ? ? ? ?<name>EMR 6.5.0 Releases Repository</name>
? ? ? ? ? ?<releases>
? ? ? ? ? ? ? ?<enabled>true</enabled>
? ? ? ? ? ?</releases>
? ? ? ? ? ?<snapshots>
? ? ? ? ? ? ? ?<enabled>false</enabled>
? ? ? ? ? ?</snapshots>
? ? ? ? ? ?<url>https://s3.us-west-1.amazonaws.com/us-west-1-emr-artifacts/emr-6.5.0/repos/maven/</url>
? ? ? ?</repository>
? ?</repositories>
? ?<dependencyManagement>
? ? ? ?<dependencies>
? ? ? ? ? ?<dependency>
? ? ? ? ? ? ? ?<groupId>software.amazon.awssdk</groupId>
? ? ? ? ? ? ? ?<artifactId>bom</artifactId>
? ? ? ? ? ? ? ?<version>2.17.186</version>
? ? ? ? ? ? ? ?<type>pom</type>
? ? ? ? ? ? ? ?<scope>import</scope>
? ? ? ? ? ?</dependency>
? ? ? ?</dependencies>
? ?</dependencyManagement>
?
? ?<dependencies>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>software.amazon.awssdk</groupId>
? ? ? ? ? ?<artifactId>s3</artifactId>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>software.amazon.awssdk</groupId>
? ? ? ? ? ?<artifactId>kms</artifactId>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>software.amazon.awssdk</groupId>
? ? ? ? ? ?<artifactId>s3control</artifactId>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.scala-lang</groupId>
? ? ? ? ? ?<artifactId>scala-library</artifactId>
? ? ? ? ? ?<version>${scala.version}</version>
? ? ? ? ? ?<scope>${project.build.scope}</scope>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.spark</groupId>
? ? ? ? ? ?<artifactId>spark-core_${scala.binary.version}</artifactId>
? ? ? ? ? ?<version>${spark.version}</version>
? ? ? ? ? ?<scope>${project.build.scope}</scope>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.spark</groupId>
? ? ? ? ? ?<artifactId>spark-sql_${scala.binary.version}</artifactId>
? ? ? ? ? ?<version>${spark.version}</version>
? ? ? ? ? ?<scope>${project.build.scope}</scope>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.spark</groupId>
? ? ? ? ? ?<artifactId>spark-hive_${scala.binary.version}</artifactId>
? ? ? ? ? ?<version>${spark.version}</version>
? ? ? ? ? ?<scope>${spark.pom.scope}</scope>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.hadoop</groupId>
? ? ? ? ? ?<artifactId>hadoop-aws</artifactId>
? ? ? ? ? ?<version>${hadoop.version}</version>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.hadoop</groupId>
? ? ? ? ? ?<artifactId>hadoop-client</artifactId>
? ? ? ? ? ?<version>${hadoop.version}</version>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>com.fasterxml.jackson.core</groupId>
? ? ? ? ? ?<artifactId>jackson-core</artifactId>
? ? ? ? ? ?<version>${fasterxml.jackson.version}</version>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>com.fasterxml.jackson.core</groupId>
? ? ? ? ? ?<artifactId>jackson-databind</artifactId>
? ? ? ? ? ?<version>${fasterxml.jackson.version}</version>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>com.fasterxml.jackson.core</groupId>
? ? ? ? ? ?<artifactId>jackson-annotations</artifactId>
? ? ? ? ? ?<version>${fasterxml.jackson.version}</version>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.parquet</groupId>
? ? ? ? ? ?<artifactId>parquet-avro</artifactId>
? ? ? ? ? ?<version>1.12.0</version>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.httpcomponents</groupId>
? ? ? ? ? ?<artifactId>httpcore</artifactId>
? ? ? ? ? ?<version>4.4.15</version>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.httpcomponents</groupId>
? ? ? ? ? ?<artifactId>httpclient</artifactId>
? ? ? ? ? ?<version>4.5.13</version>
? ? ? ?</dependency>
? ?</dependencies>
?
? ?<build>
? ? ? ?<plugins>
? ? ? ? ? ?<plugin>
? ? ? ? ? ? ? ?<groupId>net.alchim31.maven</groupId>
? ? ? ? ? ? ? ?<artifactId>scala-maven-plugin</artifactId>
? ? ? ? ? ? ? ?<version>${scala.maven.plugin.version}</version>
? ? ? ? ? ? ? ?<executions>
? ? ? ? ? ? ? ? ? ?<execution>
? ? ? ? ? ? ? ? ? ? ? ?<goals>
? ? ? ? ? ? ? ? ? ? ? ? ? ?<goal>compile</goal>
? ? ? ? ? ? ? ? ? ? ? ?</goals>
? ? ? ? ? ? ? ? ? ?</execution>
? ? ? ? ? ? ? ?</executions>
? ? ? ? ? ?</plugin>
? ? ? ? ? ?<plugin>
? ? ? ? ? ? ? ?<groupId>org.apache.maven.plugins</groupId>
? ? ? ? ? ? ? ?<artifactId>maven-assembly-plugin</artifactId>
? ? ? ? ? ? ? ?<version>${maven.assembly.plugin.version}</version>
? ? ? ? ? ? ? ?<configuration>
? ? ? ? ? ? ? ? ? ?<descriptorRefs>
? ? ? ? ? ? ? ? ? ? ? ?<descriptorRef>jar-with-dependencies</descriptorRef>
? ? ? ? ? ? ? ? ? ?</descriptorRefs>
? ? ? ? ? ? ? ?</configuration>
? ? ? ? ? ? ? ?<executions>
? ? ? ? ? ? ? ? ? ?<execution>
? ? ? ? ? ? ? ? ? ? ? ?<id>make-assembly</id>
? ? ? ? ? ? ? ? ? ? ? ?<phase>package</phase>
? ? ? ? ? ? ? ? ? ? ? ?<goals>
? ? ? ? ? ? ? ? ? ? ? ? ? ?<goal>single</goal>
? ? ? ? ? ? ? ? ? ? ? ?</goals>
? ? ? ? ? ? ? ? ? ?</execution>
? ? ? ? ? ? ? ?</executions>
? ? ? ? ? ?</plugin>
? ? ? ?</plugins>
? ?</build>
</project>
配置文件
创建 resources 目录,添加配置文件
core-site.xml
<configuration>
? ?<property>
? ? ? ?<name>fs.s3a.aws.credentials.provider</name>
? ? ? ?<value>
? ? ? ? ? org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
? ? ? ?</value>
? ?</property>
<!-- ? <property>-->
<!-- ? ? ? <name>fs.s3a.access.key</name>-->
<!-- ? ? ? <description>AWS access key ID.-->
<!-- ? ? ? ? ? Omit for IAM role-based or provider-based authentication.</description>-->
<!-- ? ? ? <value>AKIA4ZNT6QH3L45V45VY</value>-->
<!-- ? </property>-->
<!-- ? <property>-->
<!-- ? ? ? <name>fs.s3a.secret.key</name>-->
<!-- ? ? ? <description>AWS secret key.-->
<!-- ? ? ? ? ? Omit for IAM role-based or provider-based authentication.</description>-->
<!-- ? ? ? <value>og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce</value>-->
<!-- ? </property>-->
</configuration>
log4j.properties
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# ? ? http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
log4j.rootLogger=info, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p - %m%n
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-10c %x - %m%n
EMR Spark任务提交
spark-shell
spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.binaryAsString", "true")
?
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
val df = spark.read.parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-11")
df.show
spark-submit
将代码打包上传,不需要打包依赖
spark-submit \
--master local[2] \
--class org.zero.SparkS2Test \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar
spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--total-executor-cores 2 \
--class org.zero.SparkS2Test \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar
spark-submit \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-cores 2 \
--executor-memory 1g \
--class org.zero.SparkS2Test \
s3://s3-datafacts-poc-001/spark-s3-hudi-test-1.0-SNAPSHOT.jar
Spark 读写 Hudi
本地测试
如果使用AWS hudi.jar,从msater服务器下载 hudi 包,并导入本地 maven 仓库
/usr/lib/hudi/hudi-spark3-bundle_2.12-0.9.0-amzn-1.jar
?
mvn install:install-file -Dfile=D:\soft\hudi-spark3-bundle_2.12-0.9.0-amzn-1.jar -DgroupId=org.apache.hudi -DartifactId=hudi-spark3-bundle_2.12 -Dversion=0.9.0-amzn-1 -Dpackaging=jar
项目中添加依赖
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>com.amazon.awssdk</groupId>
? ? ? ? ? ?<artifactId>hudi-spark3-bundle_${scala.binary.version}</artifactId>
? ? ? ? ? ?<version>0.9.0-amzn-1</version>
? ? ? ?</dependency>
代码
package org.zero
?
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieIndexConfig._
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{current_date, lit}
import org.slf4j.LoggerFactory
import org.utils.SparkUtils
?
?
object SparkS3HudiTest {
?private var logger: org.slf4j.Logger = _
?
?def main(args: Array[String]): Unit = {
? ?logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
? ?Logger.getLogger("org.apache.hadoop").setLevel(Level.INFO)
? ?Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
? ?Logger.getLogger("org.spark_project.jetty").setLevel(Level.DEBUG)
?
? ?val start = System.currentTimeMillis()
?
?
? ?val spark = SparkUtils.getSparkSession(this.getClass.getSimpleName, "local[1]")
? ?val sc = spark.sparkContext
? ?sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
? ?sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
? ?sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
? ?// ? sc.hadoopConfiguration.set("fs.s3.connection.ssl.enabled", "false")
?
? ?// 读 s3
? ?// ? val dataframe = spark
? ?// ? ? .read
? ?// ? ? .parquet("s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09")
?
? ?// ? val tmpCache = dataframe.cache()
? ?// ? tmpCache.createOrReplaceTempView("parquet_tmp_view")
? ?//
? ?// ? val dataFrame2 = spark.sql("select * from parquet_tmp_view limit 10")
? ?//
? ?// ? dataFrame2.show
?
? ?// 写 s3
? ?// ? ? ? val dataframe = spark
? ?// ? ? ? ? .read
? ?// ? ? ? ? .parquet("F:\\tmp\\test_table.parquet")
? ?//
? ?// ? ? ? dataframe
? ?// ? ? ? ? .write
? ?// ? ? ? ? .mode(SaveMode.Overwrite)
? ?// ? ? ? ? .save("s3a://s3-datafacts-poc-001/test/test_parquet")
?
? ?// 读 s3 多对象
? ?// ? logger.warn(s"=================== Spark 读 S3 Parquet ===================")
? ?// ? val dataframe2 = spark.read
? ?// ? ? .parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-1[1-3]")
? ? ?
? ?// ? dataframe2.printSchema()
? ?// ? dataframe2.show()
? ? ?
? ?// ? val dataframe3 = dataframe2.withColumn("dt", current_date())
? ?// ? dataframe2.withColumn("last_update_time" , unix_timestamp())
?
? ?// Spark Dataframe 写 Hudi
? ?// ? logger.warn(s"=================== Hudi 参数配置 ===================")
? ?// hudi 参数设置
? ?// ? val hudiOptions = Map[String, String](
? ?// ? ? HoodieWriteConfig.TBL_NAME.key() -> "hudi_test_table",
? ?// ? ? TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
? ?// ? ? RECORDKEY_FIELD.key() -> "id",
? ?// ? ? PARTITIONPATH_FIELD.key() -> "dt",
? ?// ? ? PRECOMBINE_FIELD.key() -> "time",
? ?// ? ? BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() -> "true",
? ?// ? ? INDEX_TYPE.key()-> IndexType.BLOOM.name(),
? ?// ? ? META_SYNC_ENABLED.key() -> "true",
? ?// ? ? HIVE_SYNC_ENABLED.key() -> "true",
? ?// ? ? HIVE_USER.key() -> "hive",
? ?// ? ? HIVE_PASS.key() -> "hive",
? ?// ? ? HIVE_DATABASE.key() -> "ods",
? ?// ? ? HIVE_TABLE.key() -> "hudi_test_table",
? ?// ? ? HIVE_URL.key() -> "jdbc:hive2://52.82.123.13:10000",
? ?// ? ? // ? ? HIVE_URL.key() -> "jdbc:hive2://172.31.194.132:10000",
? ?// ? ? HIVE_PARTITION_FIELDS.key() -> "dt",
? ?// ? ? HIVE_PARTITION_EXTRACTOR_CLASS.key() -> classOf[MultiPartKeysValueExtractor].getName,
? ?// ? ? HIVE_AUTO_CREATE_DATABASE.key() -> "true"
? ?// ? )
? ?// ? println("===> \n" + hudiOptions)
?
? ?// ? logger.warn(s"=================== Spark 写 S3 Hudi ===================")
? ?// ? dataframe3.write
? ?// ? ? .format("org.apache.hudi")
? ?// ? ? .option(OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
? ?// ? ? .options(hudiOptions)
? ?// ? ? .mode(SaveMode.Overwrite)
? ?// ? ? .save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
?
? ?// spark 读 hudi
? ?// ? logger.warn(s"=================== Spark 读取 S3 Hudi ===================")
? ?// ? val readDF = spark.read
? ?// ? ? .format("org.apache.hudi")
? ?// ? ? .load("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
? ?// ? ? .select("*")
? ?// ? ? .sort("creation_date","id")
? ?// ? ? .show(10)
?
? ?// Spark SQL 读 Hive 写 Hudi
? ?logger.warn(s"=================== SparkSQL 创建 Hive 表 ===================")
? ?spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict")
? ?spark.sql("set spark.hadoop.hive.exec.compress.output=true")
? ?spark.sql("set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec")
? ?spark.sql("set mapreduce.output.fileoutputformat.compress.type=BLOCK")
?
? ?// ? val hiveSourceDDL =
? ?// ? ? """
? ?// ? ? ? |create external table if not exists ods.test_parquet_to_hive_table(
? ?// ? ? ? | id int,
? ?// ? ? ? | name string,
? ?// ? ? ? | age int,
? ?// ? ? ? | job string,
? ?// ? ? ? | address string,
? ?// ? ? ? | company string,
? ?// ? ? ? | email string,
? ?// ? ? ? | url string,
? ?// ? ? ? | phone string,
? ?// ? ? ? | sfzh string,
? ?// ? ? ? | chrome string,
? ?// ? ? ? | ipv4 string,
? ?// ? ? ? | ipv6 string,
? ?// ? ? ? | `date` bigint,
? ?// ? ? ? | `time` bigint,
? ?// ? ? ? | mac_address string,
? ?// ? ? ? | col_tinyint int,
? ?// ? ? ? | col_smallint int,
? ?// ? ? ? | col_mediumint int,
? ?// ? ? ? | col_bigint bigint,
? ?// ? ? ? | col_decimal double,
? ?// ? ? ? | col_double double,
? ?// ? ? ? | col_float double,
? ?// ? ? ? | col_time bigint,
? ?// ? ? ? | col_blob string,
? ?// ? ? ? | col_text string
? ?// ? ? ? |) partitioned by(dt string)
? ?// ? ? ? |stored as parquet
? ?// ? ? ? |location "s3://s3-datafacts-poc-001/dw/ods/test_parquet_to_hive_table"
? ?// ? ? ? |""".stripMargin
? ?// ? spark.sql(hiveSourceDDL)
?
? ?// ? spark.sql("show databases").show
? ?spark.sql("use ods")
? ?// ? spark.sql("show tables").show
? ?// ? spark.sql("show create table test_parquet_to_hive_table").show
?
? ?logger.warn(s"=================== SparkSQL 创建 Hudi Sink 表 ===================")
?
? ?val hudiSinkDDL =
? ? ?"""
? ? ? ?|create table if not exists ods.ods_hudi_sink_table (
? ? ? ?| id int,
? ? ? ?| name string,
? ? ? ?| age int,
? ? ? ?| job string,
? ? ? ?| address string,
? ? ? ?| company string,
? ? ? ?| email string,
? ? ? ?| url string,
? ? ? ?| phone string,
? ? ? ?| sfzh string,
? ? ? ?| chrome string,
? ? ? ?| ipv4 string,
? ? ? ?| ipv6 string,
? ? ? ?| `date` bigint,
? ? ? ?| `time` bigint,
? ? ? ?| mac_address string,
? ? ? ?| col_tinyint int,
? ? ? ?| col_smallint int,
? ? ? ?| col_mediumint int,
? ? ? ?| col_bigint bigint,
? ? ? ?| col_decimal double,
? ? ? ?| col_double double,
? ? ? ?| col_float double,
? ? ? ?| col_time bigint,
? ? ? ?| col_blob string,
? ? ? ?| col_text string,
? ? ? ?| dt date
? ? ? ?|) using hudi
? ? ? ?|partitioned by (dt)
? ? ? ?|tblproperties (
? ? ? ?| type = 'cow',
? ? ? ?| primaryKey = 'id',
? ? ? ?| preCombineField = 'time',
? ? ? ?| hoodie.index.type = 'GLOBAL_BLOOM',
? ? ? ?| hiveSyncEnabled = 'true',
? ? ? ?| hiveDatabase = 'ods',
? ? ? ?| hiveUser = 'hive',
? ? ? ?| hivePass = 'hive',
? ? ? ?| hiveTable = 'ods_hudi_sink_table',
? ? ? ?| hiveUrl = 'jdbc:hive2://52.82.123.13:10000',
? ? ? ?| hivePartitionFields = 'dt'
? ? ? ?|)
? ? ? ?|location "s3a://s3-datafacts-poc-001/dw/ods/ods_hudi_sink_table";
? ? ? ?|""".stripMargin
?
? ?spark.sql(hudiSinkDDL)
? ?spark.sql("show tables").show
?
? ?logger.warn(s"=================== SparkSQL 读 Hive 写入 Hudi 表 ===================")
? ?spark.sql("insert into table ods_hudi_sink_table select * from test_parquet_to_hive_table")
?
? ?spark.sql("select * from ods_hudi_sink_table limit 10").show
?
? ?spark.stop()
?
? ?val end = System.currentTimeMillis()
? ?logger.warn(s"=================== 耗时: ${(end - start) / 1000} 秒 ===================")
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark-s3-hudi-test</artifactId>
<version>1.0-SNAPSHOT</version>
<name>spark-s3-hudi-test</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.maven.plugin.version>4.3.0</scala.maven.plugin.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
<scala.version>2.12.13</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.1.2</spark.version>
<hadoop.version>3.2.1</hadoop.version>
<hoodie.version>0.9.0</hoodie.version>
<!-- <hoodie.version>0.9.0-amzn-1</hoodie.version>-->
<fasterxml.jackson.version>2.10.5</fasterxml.jackson.version>
<project.build.scope>compile</project.build.scope>
<!-- <project.build.scope>provided</project.build.scope>-->
</properties>
<dependencies>
<!-- s3 -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.217</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3control</artifactId>
<version>1.12.217</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kms</artifactId>
<version>1.12.217</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-bundle_${scala.binary.version}</artifactId>
<version>${hoodie.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${fasterxml.jackson.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${fasterxml.jackson.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${fasterxml.jackson.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- http -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.11</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.9</version>
<scope>${project.build.scope}</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${scala.maven.plugin.version}</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven.assembly.plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
集群上测试
spark-shell
spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
配置
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.binaryAsString", "true")
?
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")
导入包
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
读 S3 parquet 文件写入 S3 Hudi 表
val df = spark.read.parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-1[1-3]")
?
val df2 = df.withColumn("creation_date" , current_date())
?
val hudiOptions = Map[String, String](
?HoodieWriteConfig.TBL_NAME.key() ?-> "hudi_test_table",
?TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
?RECORDKEY_FIELD.key() -> "id", // 主键
?PARTITIONPATH_FIELD.key() -> "creation_date", // 分区
?PRECOMBINE_FIELD.key() -> "time", // 数据更新时间的列名
?BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() -> "true", //当分区变更时,当前数据的分区目录是否变更
?INDEX_TYPE.key()-> IndexType.BLOOM.name(), // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM四种索引为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
?META_SYNC_ENABLED.key() -> "true", // 同步元数据
?HIVE_SYNC_ENABLED.key() -> "true", // 同步数据到Hive
?HIVE_USER.key() -> "hive",
?HIVE_PASS.key() -> "hive",
?HIVE_DATABASE.key() -> "ods",
?HIVE_TABLE.key()-> "hudi_test_table",
?HIVE_URL.key() -> "jdbc:hive2://172.31.194.132:10000",
?HIVE_PARTITION_FIELDS.key() -> "creation_date",
?HIVE_PARTITION_EXTRACTOR_CLASS.key() -> ?classOf[MultiPartKeysValueExtractor].getName,
?HIVE_AUTO_CREATE_DATABASE.key() -> "true"
)
?
(df2.write.format("org.apache.hudi")
.option(OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.options(hudiOptions)
.mode(SaveMode.Overwrite)
.save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table"))
?
val df3 = spark.read
.format("org.apache.hudi")
.load("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
.select("*")
.sort("creation_date", "id")
df3.show(10)
更新
val updateDF = df2.limit(1).withColumn("creation_date", lit("date"))
?
(updateDF.write
? .format("org.apache.hudi")
? .option(OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
? .options(hudiOptions)
? .mode(SaveMode.Append)
? .save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table"))
? ?
updateDF.select()
s3文件删除
aws s3 rm se3://s3-datafacts-poc-001/dw/ods/hudi_test_table
emrfs sync se3://s3-datafacts-poc-001/dw/ods
Spark-shell 中使用 SparkSql
spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hive/lib/mysql-connector-java-5.1.49.jar
配置
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.binaryAsString", "true")
?
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")
S3 Parquet文件导入Hive
spark.sql("show databases").show
?
+---------+
|namespace|
+---------+
| ?default|
| dhw_demo|
| ? ? ods|
| ? test1|
| test_db|
| ? ? yszy|
+---------+
Parquet 数据源
val df = spark.read.parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-1[1-3]")
?
val df2 = df.withColumn("dt" , current_date())
?
df2.printSchema
?
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- job: string (nullable = true)
|-- address: string (nullable = true)
|-- company: string (nullable = true)
|-- email: string (nullable = true)
|-- url: string (nullable = true)
|-- phone: string (nullable = true)
|-- sfzh: string (nullable = true)
|-- chrome: string (nullable = true)
|-- ipv4: string (nullable = true)
|-- ipv6: string (nullable = true)
|-- date: long (nullable = true)
|-- time: long (nullable = true)
|-- mac_address: string (nullable = true)
|-- col_tinyint: integer (nullable = true)
|-- col_smallint: integer (nullable = true)
|-- col_mediumint: integer (nullable = true)
|-- col_bigint: long (nullable = true)
|-- col_decimal: double (nullable = true)
|-- col_double: double (nullable = true)
|-- col_float: double (nullable = true)
|-- col_time: long (nullable = true)
|-- col_blob: string (nullable = true)
|-- col_text: string (nullable = true)
|-- dt: date (nullable = false)
spark.sql("set spark.sql.debug.maxToStringFields=100")
df2.createOrReplaceTempView("parquet_tmp_table")
?
spark.sql("use ods")
spark.sql("show tables").show
?
spark.sql("desc formatted parquet_tmp_table").show
spark.sql("select * from parquet_tmp_table limit 10").show
Hive 建表 Sink表,当然,也可以在 spark-sql 中创建
beeline -n hive -u jdbc:hive2://52.82.123.13:10000/default
?
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;
?
# alter table test_parquet_to_hive_table set tblproperties('external'='true');
drop table if exists test_parquet_to_hive_table;
?
create external table if not exists ods.test_parquet_to_hive_table(
id int,
name string,
age int,
job string,
address string,
company string,
email string,
url string,
phone string,
sfzh string,
chrome string,
ipv4 string,
ipv6 string,
`date` bigint,
`time` bigint,
mac_address string,
col_tinyint int,
col_smallint int,
col_mediumint int,
col_bigint bigint,
col_decimal double,
col_double double,
col_float double,
col_time bigint,
col_blob string,
col_text string
) partitioned by(dt string)
stored as parquet
location "s3://s3-datafacts-poc-001/dw/ods/test_parquet_to_hive_table";
写入Hive表
spark.sql("show create table ods.test_parquet_to_hive_table").show
?
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict")
?
spark.sql("insert overwrite ods.test_parquet_to_hive_table select * from parquet_tmp_table")
spark.sql("select * from ods.test_parquet_to_hive_table limit 10").show
读取 Hive 表写入Hudi
spark-sql
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hive/lib/mysql-connector-java-5.1.49.jar
set spark.sql.debug.maxToStringFields=100;
set fs.s3a.access.key=xxxx;
set fs.s3a.secret.key=xxxx;
set fs.s3a.endpoint=s3.cn-northwest-1.amazonaws.com.cn;
创建 Hudi 表
create table if not exists ods.ods_hudi_sink_table (
id int,
name string,
age int,
job string,
address string,
company string,
email string,
url string,
phone string,
sfzh string,
chrome string,
ipv4 string,
ipv6 string,
`date` bigint,
`time` bigint,
mac_address string,
col_tinyint int,
col_smallint int,
col_mediumint int,
col_bigint bigint,
col_decimal double,
col_double double,
col_float double,
col_time bigint,
col_blob string,
col_text string,
dt date
) using hudi
partitioned by (dt)
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'time',
hoodie.index.type = 'GLOBAL_BLOOM',
hiveSyncEnabled = 'true',
hiveDatabase = 'ods',
hiveUser = 'hive',
hivePass = 'hive',
hiveTable = 'ods_hudi_sink_table',
hiveUrl = 'jdbc:hive2://52.82.123.13:10000',
hivePartitionFields = 'dt'
)
location "s3://s3-datafacts-poc-001/dw/ods/ods_hudi_sink_table";
写入 Hudi
show tables;
?
#ods ? ? hudi_test_table false
#ods ? ? ods_hudi_sink_table ? ? false
#ods ? ? test_parquet_to_hive_table ? ? false
#ods ? ? test_tb_02 ? ? false
#Time taken: 0.021 seconds, Fetched 4 row(s)
?
insert into table ods_hudi_sink_table select * from test_parquet_to_hive_table;
select * from ods_hudi_sink_table limit 10;
Spark-submit
spark-submit \
--master local[2] \
--class org.zero.SparkS3HudiTest \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar
spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--total-executor-cores 2 \
--class org.zero.SparkS2Test \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar
Hive 中测试
beeline -n hive -u jdbc:hive2://52.82.123.13:10000/default
?
use ods;
?
show tables;
+-----------------------------+
| ? ? ? ? ?tab_name ? ? ? ? ? |
+-----------------------------+
| hudi_test_table ? ? ? ? ? ? |
| ods_hudi_sink_table ? ? ? ? |
| test_parquet_to_hive_table ?|
+-----------------------------+
?
show create table hudi_test_table;
+----------------------------------------------------+
| ? ? ? ? ? ? ? ? ? createtab_stmt ? ? ? ? ? ? ? ? ? |
+----------------------------------------------------+
| CREATE EXTERNAL TABLE `hudi_test_table`( ? ? ? ? ? |
| ? `_hoodie_commit_time` string, ? ? ? ? ? ? ? ? ? ?|
| ? `_hoodie_commit_seqno` string, ? ? ? ? ? ? ? ? ? |
| ? `_hoodie_record_key` string, ? ? ? ? ? ? ? ? ? ? |
| ? `_hoodie_partition_path` string, ? ? ? ? ? ? ? ? |
| ? `_hoodie_file_name` string, ? ? ? ? ? ? ? ? ? ? ?|
| ? `id` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `name` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `age` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `job` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `address` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `company` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `email` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `url` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `phone` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `sfzh` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `chrome` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `ipv4` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `ipv6` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `date` bigint, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `time` bigint, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `mac_address` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `col_tinyint` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_smallint` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `col_mediumint` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_bigint` bigint, ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_decimal` double, ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `col_double` double, ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_float` double, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `col_time` bigint, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_blob` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_text` string) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| PARTITIONED BY ( ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `creation_date` date) ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ROW FORMAT SERDE ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' ?|
| WITH SERDEPROPERTIES ( ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 'hoodie.query.as.ro.table'='false', ? ? ? ? ? ? ?|
| ? 'path'='s3://s3-datafacts-poc-001/dw/ods/hudi_test_table') ?|
| STORED AS INPUTFORMAT ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? 'org.apache.hudi.hadoop.HoodieParquetInputFormat' ?|
| OUTPUTFORMAT ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 's3://s3-datafacts-poc-001/dw/ods/hudi_test_table' |
| TBLPROPERTIES ( ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? 'bucketing_version'='2', ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 'last_commit_time_sync'='20220513101907', ? ? ? ?|
| ? 'spark.sql.sources.provider'='hudi', ? ? ? ? ? ? |
| ? 'spark.sql.sources.schema.numPartCols'='1', ? ? ?|
| ? 'spark.sql.sources.schema.numParts'='1', ? ? ? ? |
| ? 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"job","type":"string","nullable":true,"metadata":{}},{"name":"address","type":"string","nullable":true,"metadata":{}},{"name":"company","type":"string","nullable":true,"metadata":{}},{"name":"email","type":"string","nullable":true,"metadata":{}},{"name":"url","type":"string","nullable":true,"metadata":{}},{"name":"phone","type":"string","nullable":true,"metadata":{}},{"name":"sfzh","type":"string","nullable":true,"metadata":{}},{"name":"chrome","type":"string","nullable":true,"metadata":{}},{"name":"ipv4","type":"string","nullable":true,"metadata":{}},{"name":"ipv6","type":"string","nullable":true,"metadata":{}},{"name":"date","type":"long","nullable":true,"metadata":{}},{"name":"time","type":"long","nullable":true,"metadata":{}},{"name":"mac_address","type":"string","nullable":true,"metadata":{}},{"name":"col_tinyint","type":"integer","nullable":true,"metadata":{}},{"name":"col_smallint","type":"integer","nullable":true,"metadata":{}},{"name":"col_mediumint","type":"integer","nullable":true,"metadata":{}},{"name":"col_bigint","type":"long","nullable":true,"metadata":{}},{"name":"col_decimal","type":"double","nullable":true,"metadata":{}},{"name":"col_double","type":"double","nullable":true,"metadata":{}},{"name":"col_float","type":"double","nullable":true,"metadata":{}},{"name":"col_time","type":"long","nullable":true,"metadata":{}},{"name":"col_blob","type":"string","nullable":true,"metadata":{}},{"name":"col_text","type":"string","nullable":true,"metadata":{}},{"name":"creation_date","type":"date","nullable":false,"metadata":{}}]}', ?|
| ? 'spark.sql.sources.schema.partCol.0'='creation_date', ?|
| ? 'transient_lastDdlTime'='1652437162') ? ? ? ? ? ?|
+----------------------------------------------------+
? ?
select * from hudi_test_table limit 10;
问题解决
问题1
2022-05-10 14:27:07,535 DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList - No credentials provided by SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.CredentialInitializationException: Access key or secret key is unset org.apache.hadoop.fs.s3a.CredentialInitializationException: Access key or secret key is unset at org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider.getCredentials(SimpleAWSCredentialsProvider.java:68) at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1166) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) at ...2022-05-10 14:27:07,535 DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList - No credentials provided by EnvironmentVariableCredentialsProvider: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)) com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)) at com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50) at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1166) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at ...
方法1:
配置
? ?<property>
? ? ? ?<name>fs.s3a.aws.credentials.provider</name>
? ? ? ?<value>com.amazonaws.auth.profile.ProfileCredentialsProvider</value>
? ?</property>
Linux
~/.aws/credentials
[default]
aws_access_key_id = your_access_key_id
aws_secret_access_key = your_secret_access_key
?
export AWS_ACCESS_KEY_ID=your_access_key_id
export AWS_SECRET_ACCESS_KEY=your_secret_access_key
Windows
C:\Users\LJH\.aws\credentials
[default]
aws_access_key_id = your_access_key_id
aws_secret_access_key = your_secret_access_key
?
set AWS_ACCESS_KEY_ID=AKIA4ZNT6QH3L45V45VY
set AWS_SECRET_ACCESS_KEY=og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce
方法2:
在 resources 下创建配置文件 core-site.xml
<configuration>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<description>AWS access key ID.
Omit for IAM role-based or provider-based authentication.</description>
<value>AKIA4ZNT6QH3L45V45VY</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<description>AWS secret key.
Omit for IAM role-based or provider-based authentication.</description>
<value>og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce</value>
</property>
</configuration>
方法3:
<configuration>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
</property>
val sc = spark.sparkContext
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
问题2
Exception in thread "main" org.apache.hadoop.fs.s3a.AWSRedirectException: getFileStatus on s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09: com.amazonaws.services.s3.model.AmazonS3Exception: The bucket is in this region: cn-northwest-1. Please use this region to retry the request (Service: Amazon S3; Status Code: 301; Error Code: 301 Moved Permanently; Request ID: C1AV4REN3P1VGWY7; S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=), S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=:301 Moved Permanently: The bucket is in this region: cn-northwest-1. Please use this region to retry the request (Service: Amazon S3; Status Code: 301; Error Code: 301 Moved Permanently; Request ID: C1AV4REN3P1VGWY7; S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:217) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:151) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2239) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at ...Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The bucket is in this region: cn-northwest-1. Please use this region to retry the request (Service: Amazon S3; Status Code: 301; Error Code: 301 Moved Permanently; Request ID: C1AV4REN3P1VGWY7; S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=), S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1640) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) at ...
Set up AWS Credentials and Region for Development - AWS SDK for Java
Get started with the AWS SDK for Java 2.x - AWS SDK for Java
val sc = spark.sparkContext
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3a.cn-northwest-1.amazonaws.com.cn")
或者
Linux
~/.aws/config
[default]
region = cn-northwest-1
export AWS_REGION=cn-northwest-1
Windows
C:\Users\USERNAME\.aws\config
[default]
region = cn-northwest-1
set AWS_REGION=cn-northwest-1
问题3
Exception in thread "main" java.nio.file.AccessDeniedException: s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09: getFileStatus on s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: CB7P31ZHFVCTBZWM; S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY=), S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY=:403 Forbidden at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:230) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:151) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2239) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at a:47) ...?Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: CB7P31ZHFVCTBZWM; S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY=), S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1640) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at ...
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3a.cn-northwest-1.amazonaws.com.cn")
问题4
2022-05-10 14:38:15,669 DEBUG org.apache.hadoop.metrics2.impl.MetricsConfig - Could not locate file hadoop-metrics2.properties org.apache.commons.configuration2.ex.ConfigurationException: Could not locate: org.apache.commons.configuration2.io.FileLocator@75de29c0[fileName=hadoop-metrics2.properties,basePath=<null>,sourceURL=,encoding=<null>,fileSystem=<null>,locationStrategy=<null>] at org.apache.commons.configuration2.io.FileLocatorUtils.locateOrThrow(FileLocatorUtils.java:346) at org.apache.commons.configuration2.io.FileHandler.load(FileHandler.java:972) at org.apache.commons.configuration2.io.FileHandler.load(FileHandler.java:702) at org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:116) at org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:96) at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478) at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188) at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163) at org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:253)?
在 resources 下创建配置文件 core-site.xml
<configuration>
? ?<property>
? ? ? ?<name>fs.s3a.aws.credentials.provider</name>
? ? ? ?<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
? ?</property>
</configuration>
问题5
Caused by: java.lang.UnsupportedOperationException: Unsupported encoding: DELTA_BINARY_PACKED at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.initDataReader(VectorizedColumnReader.java:783) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV2(VectorizedColumnReader.java:833) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.access$100(VectorizedColumnReader.java:54) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:756) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:742) at org.apache.parquet.column.page.DataPageV2.accept(DataPageV2.java:141) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPage(VectorizedColumnReader.java:742) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:261) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:181) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
SparkSession = SparkSession.builder()
? .config("spark.sql.parquet.enableVectorizedReader", "false")
? .getOrCreate()
问题6
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: Q3SD2NMRY1JHEGHE; S3 Extended Request ID: YW7hSvgBhRgUkInd9coS/BFfnO61SFclCZj4sVYPbY8YYy6zH1n5d8tHFU8I0f1AE7L4YfOZH18=; Proxy: null) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1862) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1415) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1384) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1154) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:811) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:779) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:753) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:713) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:695) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:539) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5437) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5384) at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1445) at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1381) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:381) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ... 95 more
解决
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
问题7
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://s3-datafacts-poc-001/dw/ods/myhudidataset/.hoodie/hoodie.properties at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:716) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:462) at org.apache.hudi.common.config.DFSPropertiesConfiguration.visitFile(DFSPropertiesConfiguration.java:106) ... 91 more
解决
删除 S3 上 hudi 表未成功创建的目录
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.spark</groupId>
? ? ? ? ? ?<artifactId>spark-avro_${scala.binary.version}</artifactId>
? ? ? ? ? ?<version>${spark.version}</version>
? ? ? ? ? ?<scope>${project.build.scope}</scope>
? ? ? ?</dependency>
问题8
Caused by: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on s3-datafacts-poc-001: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: S38N9ZTM2GHJSAY7; S3 Extended Request ID: gBLBCPfbJjdFYrRPkG6qCEHu3al+LQPr8dQ3TBhocD/E3Cq3Nc5KWZE9ub9vtk7dt1mWJRgQJmc=; Proxy: null), S3 Extended Request ID: gBLBCPfbJjdFYrRPkG6qCEHu3al+LQPr8dQ3TBhocD/E3Cq3Nc5KWZE9ub9vtk7dt1mWJRgQJmc=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: S38N9ZTM2GHJSAY7; S3 Extended Request ID: gBLBCPfbJjdFYrRPkG6qCEHu3al+LQPr8dQ3TBhocD/E3Cq3Nc5KWZE9ub9vtk7dt1mWJRgQJmc=; Proxy: null) at org.apache.hive.jdbc.HiveStatement.waitForOperationToComplete(HiveStatement.java:385) at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:254) at org.apache.hudi.hive.ddl.JDBCExecutor.runSQL(JDBCExecutor.java:57) ... 55 more
解决
Hudi同步数据到Hive,无法同步,需要指定hive user
HIVE_USER.key() -> "hive",
HIVE_PASS.key() -> "",
配置协议为 s3
spark.write
..
.save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
问题9
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://s3-datafacts-poc-001/dw/ods/hudi_test_table/.hoodie/.temp/20220513161709/2022-05-13 at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListFiles(S3AFileSystem.java:3148) at org.apache.hadoop.fs.s3a.S3AFileSystem.listFiles(S3AFileSystem.java:3129) at org.apache.hudi.table.marker.DirectWriteMarkers.lambda$createdAndMergedDataPaths$69cdea3b$1(DirectWriteMarkers.java:136) at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:78) at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
...
原因
本地windows下运行,在s3上创建的目录为
s3-datafacts-poc-001/dw/ods/hudi_test_table/.hoodie\.temp/20220513161709/2022-05-13/
windows下写入路径为 \ ,导致路径不正确,无法本地执行。
问题10
Caused by: java.lang.IllegalArgumentException: BlockAlignedAvroParquetWriter does not support scheme s3n at org.apache.hudi.common.fs.HoodieWrapperFileSystem.getHoodieScheme(HoodieWrapperFileSystem.java:159) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.convertToHoodiePath(HoodieWrapperFileSystem.java:132) at org.apache.hudi.io.storage.HoodieParquetWriter.<init>(HoodieParquetWriter.java:56) at org.apache.hudi.io.storage.HoodieFileWriterFactory.newParquetFileWriter(HoodieFileWriterFactory.java:76) at org.apache.hudi.io.storage.HoodieFileWriterFactory.newParquetFileWriter(HoodieFileWriterFactory.java:63) at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriter(HoodieFileWriterFactory.java:49) at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:101) at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:74) at org.apache.hudi.io.CreateHandleFactory.create(CreateHandleFactory.java:44) at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:83) at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:40) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) at java.util.concurrent.FutureTask.run(FutureTask.java:266)
spark.write
...
.save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
|