IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark3 读写 S3 Parquet Hive Hudi -> 正文阅读

[大数据]Spark3 读写 S3 Parquet Hive Hudi

Spark 读 S3 Parquet 写入 Hudi 表

目录

Spark 读 S3 Parquet 写入 Hudi 表

参考

关于S3,S3N和S3A的区别与联系

Spark 读写 S3 Parquet 文件

测试代码

pom.xml

配置文件

EMR Spark任务提交

spark-shell

spark-submit

Spark 读写 Hudi

本地测试

代码

集群上测试

spark-shell

spark-sql

Spark-submit

Hive 中测试

问题解决


参考

Hadoop-aws

Apache Hadoop Amazon Web Services support – Hadoop-AWS module: Integration with Amazon Web Services

EMR 对应版本

Amazon EMR release 6.5.0 - Amazon EMR

EMR Spark

Apache Spark - Amazon EMR

EMR Hudi

Hudi - Amazon EMR

关于S3,S3N和S3A的区别与联系

首先是三种协议的访问大小有区别; 其次S3是block-based

s3n/s3a是 object-based 最后S3A是apache推荐的访问方式,且S3访问方式将会慢慢被替代,AWS不赞成使用S3访问,且S3A更加稳定安全高效

S3 Block FileSystem (URI scheme: s3) A block-based filesystem backed by S3. Files are stored as blocks, just like they are in HDFS. This permits efficient implementation of renames. This filesystem requires you to dedicate a bucket for the filesystem - you should not use an existing bucket containing files, or write other files to the same bucket. The files stored by this filesystem can be larger than 5GB, but they are not interoperable with other S3 tools.

S3A (URI scheme: s3a) A successor to the S3 Native, s3n fs, the S3a: system uses Amazon's libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for /successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.

S3 Native FileSystem (URI scheme: s3n) A native filesystem for reading and writing regular files on S3. The advantage of this filesystem is that you can access files on S3 that were written with other tools. Conversely, other tools can access files written using Hadoop. The disadvantage is the 5GB limit on file size imposed by S3.

Spark 读写 S3 Parquet 文件

测试代码

package org.zero
?
import com.amazonaws.auth.{ClasspathPropertiesFileCredentialsProvider, DefaultAWSCredentialsProviderChain}
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
import org.utils.SparkUtils
import software.amazon.awssdk.auth.credentials.{EnvironmentVariableCredentialsProvider, ProfileCredentialsProvider}
?
object SparkS2Test {
 ?private var logger: org.slf4j.Logger = _
?
 ?def main(args: Array[String]): Unit = {
 ? ?logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
 ? ?Logger.getLogger("org.apache.hadoop").setLevel(Level.INFO)
 ? ?Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
 ? ?Logger.getLogger("org.spark_project.jetty").setLevel(Level.WARN)
?
 ? ?val start = System.currentTimeMillis()
 ? ?logger.warn(s"=================== Spark 读取 S3 ===================")
?
 ? ?val spark = SparkUtils.getSparkSession(this.getClass.getSimpleName, "local[*]")
 ? ?val sc = spark.sparkContext
 ? ?sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
 ? ?sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
 ? ?sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
?
 ? ?val dataframe = spark
 ? ?  .read
 ? ?  .parquet("s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09")
?
 ? ?val tmpCache = dataframe.cache()
 ? ?tmpCache.createOrReplaceTempView("parquet_tmp_view")
?
 ? ?val dataFrame2 = spark.sql("select * from parquet_tmp_view limit 10")
?
 ? ?dataFrame2.show
?
// ?  dataFrame2.write.parquet("F:\\tmp\\output")
?
 ? ?spark.stop()
?
 ? ?val end = System.currentTimeMillis()
 ? ?logger.warn(s"=================== 耗时: ${(end - start) / 1000} 秒 ===================")
  }
}
package org.utils
?
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
?
object SparkUtils {
 ?private val sparkConf: SparkConf = new SparkConf()
?
 ?def getSparkConf(appName: String, master: String): SparkConf = {
 ? ?sparkConf.setMaster(master).setAppName(appName)
  }
?
 ?def getSparkSession(appName: String, master: String): SparkSession = {
 ? ?sparkConf.setMaster(master).setAppName(appName)
 ? ?sparkSessionInit
  }
?
 ?lazy val sparkSessionInit: SparkSession = SparkSession.builder()
 ?  .config(sparkConf)
 ?  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 ?  .config("spark.io.compression.codec", "snappy")
 ?  .config("spark.rdd.compress", "true")
 ?  .config("spark.hadoop.parquet.writer.version", "v2")
 ?  .config("spark.sql.parquet.enableVectorizedReader", "false")
 ?  .config("spark.sql.parquet.compression.codec", "false")
 ?  .config("spark.sql.parquet.compression.codec", "snappy")
 ?  .config("spark.sql.parquet.filterPushdown", "true")
 ?  .config("spark.sql.parquet.mergeSchema", "true")
 ?  .config("spark.sql.parquet.binaryAsString", "true")
 ?  .getOrCreate()
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
?
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 ? ? ? ? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 ? ?<modelVersion>4.0.0</modelVersion>
?
 ? ?<groupId>org.example</groupId>
 ? ?<artifactId>spark-s3-hudi-test</artifactId>
 ? ?<version>1.0-SNAPSHOT</version>
?
 ? ?<name>spark-s3-hudi-test</name>
?
 ? ?<properties>
 ? ? ? ?<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 ? ? ? ?<maven.compiler.source>1.8</maven.compiler.source>
 ? ? ? ?<maven.compiler.target>1.8</maven.compiler.target>
 ? ? ? ?<scala.maven.plugin.version>4.3.0</scala.maven.plugin.version>
 ? ? ? ?<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
 ? ? ? ?<maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
 ? ? ? ?<scala.version>2.12.13</scala.version>
 ? ? ? ?<scala.binary.version>2.12</scala.binary.version>
 ? ? ? ?<spark.version>3.1.2</spark.version>
 ? ? ? ?<hadoop.version>3.2.1</hadoop.version>
 ? ? ? ?<fasterxml.jackson.version>2.10.0</fasterxml.jackson.version>
 ? ? ? ?<project.build.scope>compile</project.build.scope>
 ? ?</properties>
?
 ? ?<repositories>
 ? ? ? ?<repository>
 ? ? ? ? ? ?<id>emr-6.5.0-artifacts</id>
 ? ? ? ? ? ?<name>EMR 6.5.0 Releases Repository</name>
 ? ? ? ? ? ?<releases>
 ? ? ? ? ? ? ? ?<enabled>true</enabled>
 ? ? ? ? ? ?</releases>
 ? ? ? ? ? ?<snapshots>
 ? ? ? ? ? ? ? ?<enabled>false</enabled>
 ? ? ? ? ? ?</snapshots>
 ? ? ? ? ? ?<url>https://s3.us-west-1.amazonaws.com/us-west-1-emr-artifacts/emr-6.5.0/repos/maven/</url>
 ? ? ? ?</repository>
 ? ?</repositories>
 ? ?<dependencyManagement>
 ? ? ? ?<dependencies>
 ? ? ? ? ? ?<dependency>
 ? ? ? ? ? ? ? ?<groupId>software.amazon.awssdk</groupId>
 ? ? ? ? ? ? ? ?<artifactId>bom</artifactId>
 ? ? ? ? ? ? ? ?<version>2.17.186</version>
 ? ? ? ? ? ? ? ?<type>pom</type>
 ? ? ? ? ? ? ? ?<scope>import</scope>
 ? ? ? ? ? ?</dependency>
 ? ? ? ?</dependencies>
 ? ?</dependencyManagement>
?
 ? ?<dependencies>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>software.amazon.awssdk</groupId>
 ? ? ? ? ? ?<artifactId>s3</artifactId>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>software.amazon.awssdk</groupId>
 ? ? ? ? ? ?<artifactId>kms</artifactId>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>software.amazon.awssdk</groupId>
 ? ? ? ? ? ?<artifactId>s3control</artifactId>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.scala-lang</groupId>
 ? ? ? ? ? ?<artifactId>scala-library</artifactId>
 ? ? ? ? ? ?<version>${scala.version}</version>
 ? ? ? ? ? ?<scope>${project.build.scope}</scope>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.spark</groupId>
 ? ? ? ? ? ?<artifactId>spark-core_${scala.binary.version}</artifactId>
 ? ? ? ? ? ?<version>${spark.version}</version>
 ? ? ? ? ? ?<scope>${project.build.scope}</scope>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.spark</groupId>
 ? ? ? ? ? ?<artifactId>spark-sql_${scala.binary.version}</artifactId>
 ? ? ? ? ? ?<version>${spark.version}</version>
 ? ? ? ? ? ?<scope>${project.build.scope}</scope>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.spark</groupId>
 ? ? ? ? ? ?<artifactId>spark-hive_${scala.binary.version}</artifactId>
 ? ? ? ? ? ?<version>${spark.version}</version>
 ? ? ? ? ? ?<scope>${spark.pom.scope}</scope>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.hadoop</groupId>
 ? ? ? ? ? ?<artifactId>hadoop-aws</artifactId>
 ? ? ? ? ? ?<version>${hadoop.version}</version>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.hadoop</groupId>
 ? ? ? ? ? ?<artifactId>hadoop-client</artifactId>
 ? ? ? ? ? ?<version>${hadoop.version}</version>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>com.fasterxml.jackson.core</groupId>
 ? ? ? ? ? ?<artifactId>jackson-core</artifactId>
 ? ? ? ? ? ?<version>${fasterxml.jackson.version}</version>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>com.fasterxml.jackson.core</groupId>
 ? ? ? ? ? ?<artifactId>jackson-databind</artifactId>
 ? ? ? ? ? ?<version>${fasterxml.jackson.version}</version>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>com.fasterxml.jackson.core</groupId>
 ? ? ? ? ? ?<artifactId>jackson-annotations</artifactId>
 ? ? ? ? ? ?<version>${fasterxml.jackson.version}</version>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.parquet</groupId>
 ? ? ? ? ? ?<artifactId>parquet-avro</artifactId>
 ? ? ? ? ? ?<version>1.12.0</version>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.httpcomponents</groupId>
 ? ? ? ? ? ?<artifactId>httpcore</artifactId>
 ? ? ? ? ? ?<version>4.4.15</version>
 ? ? ? ?</dependency>
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.httpcomponents</groupId>
 ? ? ? ? ? ?<artifactId>httpclient</artifactId>
 ? ? ? ? ? ?<version>4.5.13</version>
 ? ? ? ?</dependency>
 ? ?</dependencies>
?
 ? ?<build>
 ? ? ? ?<plugins>
 ? ? ? ? ? ?<plugin>
 ? ? ? ? ? ? ? ?<groupId>net.alchim31.maven</groupId>
 ? ? ? ? ? ? ? ?<artifactId>scala-maven-plugin</artifactId>
 ? ? ? ? ? ? ? ?<version>${scala.maven.plugin.version}</version>
 ? ? ? ? ? ? ? ?<executions>
 ? ? ? ? ? ? ? ? ? ?<execution>
 ? ? ? ? ? ? ? ? ? ? ? ?<goals>
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?<goal>compile</goal>
 ? ? ? ? ? ? ? ? ? ? ? ?</goals>
 ? ? ? ? ? ? ? ? ? ?</execution>
 ? ? ? ? ? ? ? ?</executions>
 ? ? ? ? ? ?</plugin>
 ? ? ? ? ? ?<plugin>
 ? ? ? ? ? ? ? ?<groupId>org.apache.maven.plugins</groupId>
 ? ? ? ? ? ? ? ?<artifactId>maven-assembly-plugin</artifactId>
 ? ? ? ? ? ? ? ?<version>${maven.assembly.plugin.version}</version>
 ? ? ? ? ? ? ? ?<configuration>
 ? ? ? ? ? ? ? ? ? ?<descriptorRefs>
 ? ? ? ? ? ? ? ? ? ? ? ?<descriptorRef>jar-with-dependencies</descriptorRef>
 ? ? ? ? ? ? ? ? ? ?</descriptorRefs>
 ? ? ? ? ? ? ? ?</configuration>
 ? ? ? ? ? ? ? ?<executions>
 ? ? ? ? ? ? ? ? ? ?<execution>
 ? ? ? ? ? ? ? ? ? ? ? ?<id>make-assembly</id>
 ? ? ? ? ? ? ? ? ? ? ? ?<phase>package</phase>
 ? ? ? ? ? ? ? ? ? ? ? ?<goals>
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?<goal>single</goal>
 ? ? ? ? ? ? ? ? ? ? ? ?</goals>
 ? ? ? ? ? ? ? ? ? ?</execution>
 ? ? ? ? ? ? ? ?</executions>
 ? ? ? ? ? ?</plugin>
 ? ? ? ?</plugins>
 ? ?</build>
</project>

配置文件

创建 resources 目录,添加配置文件

core-site.xml

<configuration>
 ? ?<property>
 ? ? ? ?<name>fs.s3a.aws.credentials.provider</name>
 ? ? ? ?<value>
 ? ? ? ? ?  org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
 ? ? ? ?</value>
 ? ?</property>
<!-- ?  <property>-->
<!-- ? ? ?  <name>fs.s3a.access.key</name>-->
<!-- ? ? ?  <description>AWS access key ID.-->
<!-- ? ? ? ? ?  Omit for IAM role-based or provider-based authentication.</description>-->
<!-- ? ? ?  <value>AKIA4ZNT6QH3L45V45VY</value>-->
<!-- ?  </property>-->
<!-- ?  <property>-->
<!-- ? ? ?  <name>fs.s3a.secret.key</name>-->
<!-- ? ? ?  <description>AWS secret key.-->
<!-- ? ? ? ? ?  Omit for IAM role-based or provider-based authentication.</description>-->
<!-- ? ? ?  <value>og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce</value>-->
<!-- ?  </property>-->
</configuration>

log4j.properties

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
# ? ?  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
log4j.rootLogger=info, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p - %m%n
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-10c %x - %m%n

EMR Spark任务提交

spark-shell

spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.binaryAsString", "true")
?
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")

val df = spark.read.parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-11")
df.show

spark-submit

将代码打包上传,不需要打包依赖

spark-submit \
--master local[2] \
--class org.zero.SparkS2Test \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar
spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--total-executor-cores 2 \
--class org.zero.SparkS2Test \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar
spark-submit \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-cores 2 \
--executor-memory 1g \
--class org.zero.SparkS2Test \
s3://s3-datafacts-poc-001/spark-s3-hudi-test-1.0-SNAPSHOT.jar

Spark 读写 Hudi

本地测试

如果使用AWS hudi.jar,从msater服务器下载 hudi 包,并导入本地 maven 仓库

/usr/lib/hudi/hudi-spark3-bundle_2.12-0.9.0-amzn-1.jar
?
mvn install:install-file -Dfile=D:\soft\hudi-spark3-bundle_2.12-0.9.0-amzn-1.jar -DgroupId=org.apache.hudi -DartifactId=hudi-spark3-bundle_2.12 -Dversion=0.9.0-amzn-1 -Dpackaging=jar

项目中添加依赖

 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>com.amazon.awssdk</groupId>
 ? ? ? ? ? ?<artifactId>hudi-spark3-bundle_${scala.binary.version}</artifactId>
 ? ? ? ? ? ?<version>0.9.0-amzn-1</version>
 ? ? ? ?</dependency>

代码

package org.zero
?
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieIndexConfig._
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{current_date, lit}
import org.slf4j.LoggerFactory
import org.utils.SparkUtils
?
?
object SparkS3HudiTest {
 ?private var logger: org.slf4j.Logger = _
?
 ?def main(args: Array[String]): Unit = {
 ? ?logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
 ? ?Logger.getLogger("org.apache.hadoop").setLevel(Level.INFO)
 ? ?Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
 ? ?Logger.getLogger("org.spark_project.jetty").setLevel(Level.DEBUG)
?
 ? ?val start = System.currentTimeMillis()
?
?
 ? ?val spark = SparkUtils.getSparkSession(this.getClass.getSimpleName, "local[1]")
 ? ?val sc = spark.sparkContext
 ? ?sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
 ? ?sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
 ? ?sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
 ? ?// ?  sc.hadoopConfiguration.set("fs.s3.connection.ssl.enabled", "false")
?
 ? ?// 读 s3
 ? ?// ?  val dataframe = spark
 ? ?// ? ?  .read
 ? ?// ? ?  .parquet("s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09")
?
 ? ?// ?  val tmpCache = dataframe.cache()
 ? ?// ?  tmpCache.createOrReplaceTempView("parquet_tmp_view")
 ? ?//
 ? ?// ?  val dataFrame2 = spark.sql("select * from parquet_tmp_view limit 10")
 ? ?//
 ? ?// ?  dataFrame2.show
?
 ? ?// 写 s3
 ? ?// ? ? ?  val dataframe = spark
 ? ?// ? ? ? ?  .read
 ? ?// ? ? ? ?  .parquet("F:\\tmp\\test_table.parquet")
 ? ?//
 ? ?// ? ? ?  dataframe
 ? ?// ? ? ? ?  .write
 ? ?// ? ? ? ?  .mode(SaveMode.Overwrite)
 ? ?// ? ? ? ?  .save("s3a://s3-datafacts-poc-001/test/test_parquet")
?
 ? ?// 读 s3 多对象
 ? ?// ?  logger.warn(s"=================== Spark 读 S3 Parquet ===================")
 ? ?// ?  val dataframe2 = spark.read
 ? ?// ? ?  .parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-1[1-3]")
 ? ? ?
 ? ?// ?  dataframe2.printSchema()
 ? ?// ?  dataframe2.show()
 ? ? ?
 ? ?// ?  val dataframe3 = dataframe2.withColumn("dt", current_date())
 ? ?// ?  dataframe2.withColumn("last_update_time" , unix_timestamp())
?
 ? ?// Spark Dataframe 写 Hudi
 ? ?// ?  logger.warn(s"=================== Hudi 参数配置 ===================")
 ? ?// hudi 参数设置
 ? ?// ?  val hudiOptions = Map[String, String](
 ? ?// ? ?  HoodieWriteConfig.TBL_NAME.key() -> "hudi_test_table",
 ? ?// ? ?  TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
 ? ?// ? ?  RECORDKEY_FIELD.key() -> "id",
 ? ?// ? ?  PARTITIONPATH_FIELD.key() -> "dt",
 ? ?// ? ?  PRECOMBINE_FIELD.key() -> "time",
 ? ?// ? ?  BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() -> "true",
 ? ?// ? ?  INDEX_TYPE.key()-> IndexType.BLOOM.name(),
 ? ?// ? ?  META_SYNC_ENABLED.key() -> "true",
 ? ?// ? ?  HIVE_SYNC_ENABLED.key() -> "true",
 ? ?// ? ?  HIVE_USER.key() -> "hive",
 ? ?// ? ?  HIVE_PASS.key() -> "hive",
 ? ?// ? ?  HIVE_DATABASE.key() -> "ods",
 ? ?// ? ?  HIVE_TABLE.key() -> "hudi_test_table",
 ? ?// ? ?  HIVE_URL.key() -> "jdbc:hive2://52.82.123.13:10000",
 ? ?// ? ?  // ? ?  HIVE_URL.key() -> "jdbc:hive2://172.31.194.132:10000",
 ? ?// ? ?  HIVE_PARTITION_FIELDS.key() -> "dt",
 ? ?// ? ?  HIVE_PARTITION_EXTRACTOR_CLASS.key() -> classOf[MultiPartKeysValueExtractor].getName,
 ? ?// ? ?  HIVE_AUTO_CREATE_DATABASE.key() -> "true"
 ? ?// ?  )
 ? ?// ?  println("===> \n" + hudiOptions)
?
 ? ?// ?  logger.warn(s"=================== Spark 写 S3 Hudi ===================")
 ? ?// ?  dataframe3.write
 ? ?// ? ?  .format("org.apache.hudi")
 ? ?// ? ?  .option(OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
 ? ?// ? ?  .options(hudiOptions)
 ? ?// ? ?  .mode(SaveMode.Overwrite)
 ? ?// ? ?  .save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
?
 ? ?// spark 读 hudi
 ? ?// ?  logger.warn(s"=================== Spark 读取 S3 Hudi ===================")
 ? ?// ?  val readDF = spark.read
 ? ?// ? ?  .format("org.apache.hudi")
 ? ?// ? ?  .load("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
 ? ?// ? ?  .select("*")
 ? ?// ? ?  .sort("creation_date","id")
 ? ?// ? ?  .show(10)
?
 ? ?// Spark SQL 读 Hive 写 Hudi
 ? ?logger.warn(s"=================== SparkSQL 创建 Hive 表 ===================")
 ? ?spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict")
 ? ?spark.sql("set spark.hadoop.hive.exec.compress.output=true")
 ? ?spark.sql("set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec")
 ? ?spark.sql("set mapreduce.output.fileoutputformat.compress.type=BLOCK")
?
 ? ?// ?  val hiveSourceDDL =
 ? ?// ? ?  """
 ? ?// ? ? ?  |create external table if not exists ods.test_parquet_to_hive_table(
 ? ?// ? ? ?  | id int,
 ? ?// ? ? ?  | name string,
 ? ?// ? ? ?  | age int,
 ? ?// ? ? ?  | job string,
 ? ?// ? ? ?  | address string,
 ? ?// ? ? ?  | company string,
 ? ?// ? ? ?  | email string,
 ? ?// ? ? ?  | url string,
 ? ?// ? ? ?  | phone string,
 ? ?// ? ? ?  | sfzh string,
 ? ?// ? ? ?  | chrome string,
 ? ?// ? ? ?  | ipv4 string,
 ? ?// ? ? ?  | ipv6 string,
 ? ?// ? ? ?  | `date` bigint,
 ? ?// ? ? ?  | `time` bigint,
 ? ?// ? ? ?  | mac_address string,
 ? ?// ? ? ?  | col_tinyint int,
 ? ?// ? ? ?  | col_smallint int,
 ? ?// ? ? ?  | col_mediumint int,
 ? ?// ? ? ?  | col_bigint bigint,
 ? ?// ? ? ?  | col_decimal double,
 ? ?// ? ? ?  | col_double double,
 ? ?// ? ? ?  | col_float double,
 ? ?// ? ? ?  | col_time bigint,
 ? ?// ? ? ?  | col_blob string,
 ? ?// ? ? ?  | col_text string
 ? ?// ? ? ?  |) partitioned by(dt string)
 ? ?// ? ? ?  |stored as parquet
 ? ?// ? ? ?  |location "s3://s3-datafacts-poc-001/dw/ods/test_parquet_to_hive_table"
 ? ?// ? ? ?  |""".stripMargin
 ? ?// ?  spark.sql(hiveSourceDDL)
?
 ? ?// ?  spark.sql("show databases").show
 ? ?spark.sql("use ods")
 ? ?// ?  spark.sql("show tables").show
 ? ?// ?  spark.sql("show create table test_parquet_to_hive_table").show
?
 ? ?logger.warn(s"=================== SparkSQL 创建 Hudi Sink 表 ===================")
?
 ? ?val hudiSinkDDL =
 ? ? ?"""
 ? ? ? ?|create table if not exists ods.ods_hudi_sink_table (
 ? ? ? ?| id int,
 ? ? ? ?| name string,
 ? ? ? ?| age int,
 ? ? ? ?| job string,
 ? ? ? ?| address string,
 ? ? ? ?| company string,
 ? ? ? ?| email string,
 ? ? ? ?| url string,
 ? ? ? ?| phone string,
 ? ? ? ?| sfzh string,
 ? ? ? ?| chrome string,
 ? ? ? ?| ipv4 string,
 ? ? ? ?| ipv6 string,
 ? ? ? ?| `date` bigint,
 ? ? ? ?| `time` bigint,
 ? ? ? ?| mac_address string,
 ? ? ? ?| col_tinyint int,
 ? ? ? ?| col_smallint int,
 ? ? ? ?| col_mediumint int,
 ? ? ? ?| col_bigint bigint,
 ? ? ? ?| col_decimal double,
 ? ? ? ?| col_double double,
 ? ? ? ?| col_float double,
 ? ? ? ?| col_time bigint,
 ? ? ? ?| col_blob string,
 ? ? ? ?| col_text string,
 ? ? ? ?| dt date
 ? ? ? ?|) using hudi
 ? ? ? ?|partitioned by (dt)
 ? ? ? ?|tblproperties (
 ? ? ? ?|  type = 'cow',
 ? ? ? ?|  primaryKey = 'id',
 ? ? ? ?|  preCombineField = 'time',
 ? ? ? ?|  hoodie.index.type = 'GLOBAL_BLOOM',
 ? ? ? ?|  hiveSyncEnabled = 'true',
 ? ? ? ?|  hiveDatabase = 'ods',
 ? ? ? ?|  hiveUser = 'hive',
 ? ? ? ?|  hivePass = 'hive',
 ? ? ? ?|  hiveTable = 'ods_hudi_sink_table',
 ? ? ? ?|  hiveUrl = 'jdbc:hive2://52.82.123.13:10000',
 ? ? ? ?|  hivePartitionFields = 'dt'
 ? ? ? ?|)
 ? ? ? ?|location "s3a://s3-datafacts-poc-001/dw/ods/ods_hudi_sink_table";
 ? ? ? ?|""".stripMargin
?
 ? ?spark.sql(hudiSinkDDL)
 ? ?spark.sql("show tables").show
?
 ? ?logger.warn(s"=================== SparkSQL 读 Hive 写入 Hudi 表 ===================")
 ? ?spark.sql("insert into table ods_hudi_sink_table select * from test_parquet_to_hive_table")
?
 ? ?spark.sql("select * from ods_hudi_sink_table limit 10").show
?
 ? ?spark.stop()
?
 ? ?val end = System.currentTimeMillis()
 ? ?logger.warn(s"=================== 耗时: ${(end - start) / 1000} 秒 ===================")
  }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>spark-s3-hudi-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>spark-s3-hudi-test</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.maven.plugin.version>4.3.0</scala.maven.plugin.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
        <scala.version>2.12.13</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.1.2</spark.version>
        <hadoop.version>3.2.1</hadoop.version>
        <hoodie.version>0.9.0</hoodie.version>
<!--        <hoodie.version>0.9.0-amzn-1</hoodie.version>-->
        <fasterxml.jackson.version>2.10.5</fasterxml.jackson.version>

        <project.build.scope>compile</project.build.scope>
<!--        <project.build.scope>provided</project.build.scope>-->
    </properties>

    <dependencies>
        <!-- s3 -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-s3</artifactId>
            <version>1.12.217</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-s3control</artifactId>
            <version>1.12.217</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-kms</artifactId>
            <version>1.12.217</version>
            <scope>${project.build.scope}</scope>
        </dependency>

        <!-- scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>

        <!-- spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>

        <!-- hadoop -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>${hadoop.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>

        <!-- hudi -->
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark3-bundle_${scala.binary.version}</artifactId>
            <version>${hoodie.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>

        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${fasterxml.jackson.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${fasterxml.jackson.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${fasterxml.jackson.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>

        <!-- http -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.11</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.9</version>
            <scope>${project.build.scope}</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>${scala.maven.plugin.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>${maven.assembly.plugin.version}</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

集群上测试

spark-shell

spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

配置

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.binaryAsString", "true")
?
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")

导入包

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

读 S3 parquet 文件写入 S3 Hudi 表

val df = spark.read.parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-1[1-3]")
?
val df2 = df.withColumn("creation_date" , current_date())
?
val hudiOptions = Map[String, String](
 ?HoodieWriteConfig.TBL_NAME.key() ?-> "hudi_test_table",
 ?TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
 ?RECORDKEY_FIELD.key() -> "id",                    // 主键
 ?PARTITIONPATH_FIELD.key() -> "creation_date",     // 分区
 ?PRECOMBINE_FIELD.key() -> "time",                 // 数据更新时间的列名
 ?BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() -> "true",     //当分区变更时,当前数据的分区目录是否变更
 ?INDEX_TYPE.key()-> IndexType.BLOOM.name(),        // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM四种索引为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
 ?META_SYNC_ENABLED.key() -> "true",                // 同步元数据
 ?HIVE_SYNC_ENABLED.key() -> "true",                // 同步数据到Hive
 ?HIVE_USER.key() -> "hive",
 ?HIVE_PASS.key() -> "hive",
 ?HIVE_DATABASE.key() -> "ods",
 ?HIVE_TABLE.key()-> "hudi_test_table",
 ?HIVE_URL.key() -> "jdbc:hive2://172.31.194.132:10000",
 ?HIVE_PARTITION_FIELDS.key() -> "creation_date",
 ?HIVE_PARTITION_EXTRACTOR_CLASS.key() -> ?classOf[MultiPartKeysValueExtractor].getName,
 ?HIVE_AUTO_CREATE_DATABASE.key() -> "true"
)
?
(df2.write.format("org.apache.hudi")
  .option(OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
  .options(hudiOptions)
  .mode(SaveMode.Overwrite)
  .save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table"))
?
val df3 = spark.read
 .format("org.apache.hudi")
 .load("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
 .select("*")
 .sort("creation_date", "id")
 
df3.show(10)

更新

val updateDF = df2.limit(1).withColumn("creation_date", lit("date"))
?
(updateDF.write
 ?  .format("org.apache.hudi")
 ?  .option(OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
 ?  .options(hudiOptions)
 ?  .mode(SaveMode.Append)
 ?  .save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table"))
 ? ?
updateDF.select()

s3文件删除


aws s3 rm se3://s3-datafacts-poc-001/dw/ods/hudi_test_table

emrfs sync se3://s3-datafacts-poc-001/dw/ods

Spark-shell 中使用 SparkSql

spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hive/lib/mysql-connector-java-5.1.49.jar

配置

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.binaryAsString", "true")
?
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")

S3 Parquet文件导入Hive

spark.sql("show databases").show
?
+---------+
|namespace|
+---------+
| ?default|
| dhw_demo|
| ? ?  ods|
| ?  test1|
|  test_db|
| ? ? yszy|
+---------+

Parquet 数据源

val df = spark.read.parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-1[1-3]")
?
val df2 = df.withColumn("dt" , current_date())
?
df2.printSchema
?
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- address: string (nullable = true)
 |-- company: string (nullable = true)
 |-- email: string (nullable = true)
 |-- url: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- sfzh: string (nullable = true)
 |-- chrome: string (nullable = true)
 |-- ipv4: string (nullable = true)
 |-- ipv6: string (nullable = true)
 |-- date: long (nullable = true)
 |-- time: long (nullable = true)
 |-- mac_address: string (nullable = true)
 |-- col_tinyint: integer (nullable = true)
 |-- col_smallint: integer (nullable = true)
 |-- col_mediumint: integer (nullable = true)
 |-- col_bigint: long (nullable = true)
 |-- col_decimal: double (nullable = true)
 |-- col_double: double (nullable = true)
 |-- col_float: double (nullable = true)
 |-- col_time: long (nullable = true)
 |-- col_blob: string (nullable = true)
 |-- col_text: string (nullable = true)
 |-- dt: date (nullable = false)
 
spark.sql("set spark.sql.debug.maxToStringFields=100")
df2.createOrReplaceTempView("parquet_tmp_table")
?
spark.sql("use ods")
spark.sql("show tables").show
?
spark.sql("desc formatted parquet_tmp_table").show
 
spark.sql("select * from parquet_tmp_table limit 10").show

Hive 建表 Sink表,当然,也可以在 spark-sql 中创建

beeline -n hive -u jdbc:hive2://52.82.123.13:10000/default
?
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;
?
# alter table test_parquet_to_hive_table set tblproperties('external'='true');
drop table if exists test_parquet_to_hive_table;
?
create external table if not exists ods.test_parquet_to_hive_table(
 id int,
 name string,
 age int,
 job string,
 address string,
 company string,
 email string,
 url string,
 phone string,
 sfzh string,
 chrome string,
 ipv4 string,
 ipv6 string,
 `date` bigint,
 `time` bigint,
 mac_address string,
 col_tinyint int,
 col_smallint int,
 col_mediumint int,
 col_bigint bigint,
 col_decimal double,
 col_double double,
 col_float double,
 col_time bigint,
 col_blob string,
 col_text string
) partitioned by(dt string)
stored as parquet
location "s3://s3-datafacts-poc-001/dw/ods/test_parquet_to_hive_table";

写入Hive表

spark.sql("show create table ods.test_parquet_to_hive_table").show
?
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict")
?
spark.sql("insert overwrite ods.test_parquet_to_hive_table select * from parquet_tmp_table")
spark.sql("select * from ods.test_parquet_to_hive_table limit 10").show

读取 Hive 表写入Hudi

spark-sql

spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hive/lib/mysql-connector-java-5.1.49.jar
set spark.sql.debug.maxToStringFields=100;
set fs.s3a.access.key=xxxx;
set fs.s3a.secret.key=xxxx;
set fs.s3a.endpoint=s3.cn-northwest-1.amazonaws.com.cn;

创建 Hudi 表

create table if not exists ods.ods_hudi_sink_table (
 id int,
 name string,
 age int,
 job string,
 address string,
 company string,
 email string,
 url string,
 phone string,
 sfzh string,
 chrome string,
 ipv4 string,
 ipv6 string,
 `date` bigint,
 `time` bigint,
 mac_address string,
 col_tinyint int,
 col_smallint int,
 col_mediumint int,
 col_bigint bigint,
 col_decimal double,
 col_double double,
 col_float double,
 col_time bigint,
 col_blob string,
 col_text string,
 dt date
) using hudi
partitioned by (dt)
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'time',
  hoodie.index.type = 'GLOBAL_BLOOM',
  hiveSyncEnabled = 'true',
  hiveDatabase = 'ods',
  hiveUser = 'hive',
  hivePass = 'hive',
  hiveTable = 'ods_hudi_sink_table',
  hiveUrl = 'jdbc:hive2://52.82.123.13:10000',
  hivePartitionFields = 'dt'
)
location "s3://s3-datafacts-poc-001/dw/ods/ods_hudi_sink_table";

写入 Hudi

show tables;
?
#ods ? ? hudi_test_table false
#ods ? ? ods_hudi_sink_table ? ? false
#ods ? ? test_parquet_to_hive_table ? ?  false
#ods ? ? test_tb_02 ? ?  false
#Time taken: 0.021 seconds, Fetched 4 row(s)
?
insert into table ods_hudi_sink_table select * from test_parquet_to_hive_table;

select * from ods_hudi_sink_table limit 10;
 

Spark-submit

spark-submit \
--master local[2] \
--class org.zero.SparkS3HudiTest \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar


spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--total-executor-cores 2 \
--class org.zero.SparkS2Test \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar

Hive 中测试

beeline -n hive -u jdbc:hive2://52.82.123.13:10000/default
?
use ods;
?
show tables;
+-----------------------------+
| ? ? ? ? ?tab_name ? ? ? ? ? |
+-----------------------------+
| hudi_test_table ? ? ? ? ? ? |
| ods_hudi_sink_table ? ? ? ? |
| test_parquet_to_hive_table ?|
+-----------------------------+
?
show create table hudi_test_table;
+----------------------------------------------------+
| ? ? ? ? ? ? ? ? ? createtab_stmt ? ? ? ? ? ? ? ? ? |
+----------------------------------------------------+
| CREATE EXTERNAL TABLE `hudi_test_table`( ? ? ? ? ? |
| ? `_hoodie_commit_time` string, ? ? ? ? ? ? ? ? ? ?|
| ? `_hoodie_commit_seqno` string, ? ? ? ? ? ? ? ? ? |
| ? `_hoodie_record_key` string, ? ? ? ? ? ? ? ? ? ? |
| ? `_hoodie_partition_path` string, ? ? ? ? ? ? ? ? |
| ? `_hoodie_file_name` string, ? ? ? ? ? ? ? ? ? ? ?|
| ? `id` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `name` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `age` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `job` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `address` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `company` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `email` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `url` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `phone` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `sfzh` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `chrome` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `ipv4` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `ipv6` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `date` bigint, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `time` bigint, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `mac_address` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `col_tinyint` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_smallint` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `col_mediumint` int, ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_bigint` bigint, ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_decimal` double, ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `col_double` double, ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_float` double, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? `col_time` bigint, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_blob` string, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `col_text` string) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| PARTITIONED BY ( ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? `creation_date` date) ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ROW FORMAT SERDE ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' ?|
| WITH SERDEPROPERTIES ( ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 'hoodie.query.as.ro.table'='false', ? ? ? ? ? ? ?|
| ? 'path'='s3://s3-datafacts-poc-001/dw/ods/hudi_test_table') ?|
| STORED AS INPUTFORMAT ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? 'org.apache.hudi.hadoop.HoodieParquetInputFormat' ?|
| OUTPUTFORMAT ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 's3://s3-datafacts-poc-001/dw/ods/hudi_test_table' |
| TBLPROPERTIES ( ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
| ? 'bucketing_version'='2', ? ? ? ? ? ? ? ? ? ? ? ? |
| ? 'last_commit_time_sync'='20220513101907', ? ? ? ?|
| ? 'spark.sql.sources.provider'='hudi', ? ? ? ? ? ? |
| ? 'spark.sql.sources.schema.numPartCols'='1', ? ? ?|
| ? 'spark.sql.sources.schema.numParts'='1', ? ? ? ? |
| ? 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"job","type":"string","nullable":true,"metadata":{}},{"name":"address","type":"string","nullable":true,"metadata":{}},{"name":"company","type":"string","nullable":true,"metadata":{}},{"name":"email","type":"string","nullable":true,"metadata":{}},{"name":"url","type":"string","nullable":true,"metadata":{}},{"name":"phone","type":"string","nullable":true,"metadata":{}},{"name":"sfzh","type":"string","nullable":true,"metadata":{}},{"name":"chrome","type":"string","nullable":true,"metadata":{}},{"name":"ipv4","type":"string","nullable":true,"metadata":{}},{"name":"ipv6","type":"string","nullable":true,"metadata":{}},{"name":"date","type":"long","nullable":true,"metadata":{}},{"name":"time","type":"long","nullable":true,"metadata":{}},{"name":"mac_address","type":"string","nullable":true,"metadata":{}},{"name":"col_tinyint","type":"integer","nullable":true,"metadata":{}},{"name":"col_smallint","type":"integer","nullable":true,"metadata":{}},{"name":"col_mediumint","type":"integer","nullable":true,"metadata":{}},{"name":"col_bigint","type":"long","nullable":true,"metadata":{}},{"name":"col_decimal","type":"double","nullable":true,"metadata":{}},{"name":"col_double","type":"double","nullable":true,"metadata":{}},{"name":"col_float","type":"double","nullable":true,"metadata":{}},{"name":"col_time","type":"long","nullable":true,"metadata":{}},{"name":"col_blob","type":"string","nullable":true,"metadata":{}},{"name":"col_text","type":"string","nullable":true,"metadata":{}},{"name":"creation_date","type":"date","nullable":false,"metadata":{}}]}', ?|
| ? 'spark.sql.sources.schema.partCol.0'='creation_date', ?|
| ? 'transient_lastDdlTime'='1652437162') ? ? ? ? ? ?|
+----------------------------------------------------+
 ? ?
select * from hudi_test_table limit 10;

问题解决

问题1

2022-05-10 14:27:07,535 DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList - No credentials provided by SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.CredentialInitializationException: Access key or secret key is unset org.apache.hadoop.fs.s3a.CredentialInitializationException: Access key or secret key is unset at org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider.getCredentials(SimpleAWSCredentialsProvider.java:68) at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1166) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) at ...2022-05-10 14:27:07,535 DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList - No credentials provided by EnvironmentVariableCredentialsProvider: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)) com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)) at com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50) at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1166) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at ...

方法1:

配置

 ? ?<property>
 ? ? ? ?<name>fs.s3a.aws.credentials.provider</name>
 ? ? ? ?<value>com.amazonaws.auth.profile.ProfileCredentialsProvider</value>
 ? ?</property>

Linux

~/.aws/credentials
[default]
aws_access_key_id = your_access_key_id
aws_secret_access_key = your_secret_access_key
?
export AWS_ACCESS_KEY_ID=your_access_key_id
export AWS_SECRET_ACCESS_KEY=your_secret_access_key

Windows

C:\Users\LJH\.aws\credentials
[default]
aws_access_key_id = your_access_key_id
aws_secret_access_key = your_secret_access_key
?
set AWS_ACCESS_KEY_ID=AKIA4ZNT6QH3L45V45VY
set AWS_SECRET_ACCESS_KEY=og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce

方法2:

在 resources 下创建配置文件 core-site.xml

<configuration>
    <property>
        <name>fs.s3a.aws.credentials.provider</name>
        <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <description>AWS access key ID.
            Omit for IAM role-based or provider-based authentication.</description>
        <value>AKIA4ZNT6QH3L45V45VY</value>
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <description>AWS secret key.
            Omit for IAM role-based or provider-based authentication.</description>
        <value>og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce</value>
    </property>
</configuration>

方法3:

<configuration>
    <property>
        <name>fs.s3a.aws.credentials.provider</name>
        <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
    </property>
val sc = spark.sparkContext
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")

问题2

Exception in thread "main" org.apache.hadoop.fs.s3a.AWSRedirectException: getFileStatus on s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09: com.amazonaws.services.s3.model.AmazonS3Exception: The bucket is in this region: cn-northwest-1. Please use this region to retry the request (Service: Amazon S3; Status Code: 301; Error Code: 301 Moved Permanently; Request ID: C1AV4REN3P1VGWY7; S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=), S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=:301 Moved Permanently: The bucket is in this region: cn-northwest-1. Please use this region to retry the request (Service: Amazon S3; Status Code: 301; Error Code: 301 Moved Permanently; Request ID: C1AV4REN3P1VGWY7; S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:217) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:151) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2239) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at ...Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The bucket is in this region: cn-northwest-1. Please use this region to retry the request (Service: Amazon S3; Status Code: 301; Error Code: 301 Moved Permanently; Request ID: C1AV4REN3P1VGWY7; S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=), S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1640) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) at ...

Set up AWS Credentials and Region for Development - AWS SDK for Java

Get started with the AWS SDK for Java 2.x - AWS SDK for Java

val sc = spark.sparkContext
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3a.cn-northwest-1.amazonaws.com.cn")

或者

Linux

~/.aws/config

[default]
region = cn-northwest-1

export AWS_REGION=cn-northwest-1

Windows

C:\Users\USERNAME\.aws\config

[default]
region = cn-northwest-1

set AWS_REGION=cn-northwest-1

问题3

Exception in thread "main" java.nio.file.AccessDeniedException: s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09: getFileStatus on s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: CB7P31ZHFVCTBZWM; S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY=), S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY=:403 Forbidden at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:230) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:151) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2239) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at a:47) ...?Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: CB7P31ZHFVCTBZWM; S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY=), S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1640) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at ...

sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3a.cn-northwest-1.amazonaws.com.cn")

问题4

2022-05-10 14:38:15,669 DEBUG org.apache.hadoop.metrics2.impl.MetricsConfig - Could not locate file hadoop-metrics2.properties org.apache.commons.configuration2.ex.ConfigurationException: Could not locate: org.apache.commons.configuration2.io.FileLocator@75de29c0[fileName=hadoop-metrics2.properties,basePath=<null>,sourceURL=,encoding=<null>,fileSystem=<null>,locationStrategy=<null>] at org.apache.commons.configuration2.io.FileLocatorUtils.locateOrThrow(FileLocatorUtils.java:346) at org.apache.commons.configuration2.io.FileHandler.load(FileHandler.java:972) at org.apache.commons.configuration2.io.FileHandler.load(FileHandler.java:702) at org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:116) at org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:96) at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478) at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188) at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163) at org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:253)?

在 resources 下创建配置文件 core-site.xml

<configuration>
 ? ?<property>
 ? ? ? ?<name>fs.s3a.aws.credentials.provider</name>
 ? ? ? ?<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
 ? ?</property>
</configuration>

问题5

Caused by: java.lang.UnsupportedOperationException: Unsupported encoding: DELTA_BINARY_PACKED at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.initDataReader(VectorizedColumnReader.java:783) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV2(VectorizedColumnReader.java:833) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.access$100(VectorizedColumnReader.java:54) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:756) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:742) at org.apache.parquet.column.page.DataPageV2.accept(DataPageV2.java:141) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPage(VectorizedColumnReader.java:742) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:261) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:181) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

SparkSession = SparkSession.builder()
 ?  .config("spark.sql.parquet.enableVectorizedReader", "false")
 ?  .getOrCreate()

问题6

Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: Q3SD2NMRY1JHEGHE; S3 Extended Request ID: YW7hSvgBhRgUkInd9coS/BFfnO61SFclCZj4sVYPbY8YYy6zH1n5d8tHFU8I0f1AE7L4YfOZH18=; Proxy: null) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1862) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1415) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1384) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1154) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:811) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:779) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:753) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:713) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:695) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:539) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5437) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5384) at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1445) at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1381) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:381) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ... 95 more

解决

sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")

问题7

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://s3-datafacts-poc-001/dw/ods/myhudidataset/.hoodie/hoodie.properties at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:716) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:462) at org.apache.hudi.common.config.DFSPropertiesConfiguration.visitFile(DFSPropertiesConfiguration.java:106) ... 91 more

解决

删除 S3 上 hudi 表未成功创建的目录

 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.spark</groupId>
 ? ? ? ? ? ?<artifactId>spark-avro_${scala.binary.version}</artifactId>
 ? ? ? ? ? ?<version>${spark.version}</version>
 ? ? ? ? ? ?<scope>${project.build.scope}</scope>
 ? ? ? ?</dependency>

问题8

Caused by: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on s3-datafacts-poc-001: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: S38N9ZTM2GHJSAY7; S3 Extended Request ID: gBLBCPfbJjdFYrRPkG6qCEHu3al+LQPr8dQ3TBhocD/E3Cq3Nc5KWZE9ub9vtk7dt1mWJRgQJmc=; Proxy: null), S3 Extended Request ID: gBLBCPfbJjdFYrRPkG6qCEHu3al+LQPr8dQ3TBhocD/E3Cq3Nc5KWZE9ub9vtk7dt1mWJRgQJmc=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: S38N9ZTM2GHJSAY7; S3 Extended Request ID: gBLBCPfbJjdFYrRPkG6qCEHu3al+LQPr8dQ3TBhocD/E3Cq3Nc5KWZE9ub9vtk7dt1mWJRgQJmc=; Proxy: null) at org.apache.hive.jdbc.HiveStatement.waitForOperationToComplete(HiveStatement.java:385) at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:254) at org.apache.hudi.hive.ddl.JDBCExecutor.runSQL(JDBCExecutor.java:57) ... 55 more

解决

Hudi同步数据到Hive,无法同步,需要指定hive user

HIVE_USER.key() -> "hive",
HIVE_PASS.key() -> "",

配置协议为 s3

spark.write
..
.save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")

问题9

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://s3-datafacts-poc-001/dw/ods/hudi_test_table/.hoodie/.temp/20220513161709/2022-05-13 at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListFiles(S3AFileSystem.java:3148) at org.apache.hadoop.fs.s3a.S3AFileSystem.listFiles(S3AFileSystem.java:3129) at org.apache.hudi.table.marker.DirectWriteMarkers.lambda$createdAndMergedDataPaths$69cdea3b$1(DirectWriteMarkers.java:136) at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:78) at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)

...

原因

本地windows下运行,在s3上创建的目录为

s3-datafacts-poc-001/dw/ods/hudi_test_table/.hoodie\.temp/20220513161709/2022-05-13/
windows下写入路径为 \ ,导致路径不正确,无法本地执行。

问题10

Caused by: java.lang.IllegalArgumentException: BlockAlignedAvroParquetWriter does not support scheme s3n at org.apache.hudi.common.fs.HoodieWrapperFileSystem.getHoodieScheme(HoodieWrapperFileSystem.java:159) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.convertToHoodiePath(HoodieWrapperFileSystem.java:132) at org.apache.hudi.io.storage.HoodieParquetWriter.<init>(HoodieParquetWriter.java:56) at org.apache.hudi.io.storage.HoodieFileWriterFactory.newParquetFileWriter(HoodieFileWriterFactory.java:76) at org.apache.hudi.io.storage.HoodieFileWriterFactory.newParquetFileWriter(HoodieFileWriterFactory.java:63) at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriter(HoodieFileWriterFactory.java:49) at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:101) at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:74) at org.apache.hudi.io.CreateHandleFactory.create(CreateHandleFactory.java:44) at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:83) at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:40) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) at java.util.concurrent.FutureTask.run(FutureTask.java:266)

spark.write
  ...
.save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-21 19:02:54  更:2022-05-21 19:03:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 19:52:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码