| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> HUDI(搭建详细记录附加jar) -> 正文阅读 |
|
[大数据]HUDI(搭建详细记录附加jar) |
???????前言Apache Hudi(发音为“hoodie”)是下一代流数据湖平台。Apache Hudi 将核心仓库和数据库功能直接引入数据湖。Hudi 提供表、?事务、高效的 upserts/deletes、高级索引、?流式摄取服务、数据集群/压缩优化和并发,同时将您的数据保持为开源文件格式。 Apache Hudi 不仅非常适合流式工作负载,而且还允许您创建高效的增量批处理管道。阅读文档以获取更多用例描述并查看谁在使用 Hudi,了解世界上一些最大的数据湖,包括Uber、Amazon、?ByteDance、?Robinhood等如何使用 Hudi 转变他们的生产数据湖。 Apache Hudi 可以轻松地在任何云存储平台上使用。Hudi 的高级性能优化,使用任何流行的查询引擎(包括 Apache Spark、Flink、Presto、Trino、Hive 等)使分析工作负载更快。 一、Hudi核心概念如果您对 Apache Hudi 比较陌生,熟悉一些核心概念很重要:
二、使用步骤1.拉取code 并编译。# Checkout code and build git clone https://gitee.com/apache/Hudi.git && cd Hudi mvn clean package -DskipTests 如果编译失败可以切换分支 编译 完成进入cd packaging/ hadoop,hive,spark,等等的部署就不贴了。
|
Maven build options | Expected Spark bundle jar name | Notes |
---|---|---|
(empty) | hudi-spark-bundle_2.11 (legacy bundle name) | For Spark 2.4.4 and Scala 2.11 (default options) |
-Dspark2.4 | hudi-spark2.4-bundle_2.11 | For Spark 2.4.4 and Scala 2.11 (same as default) |
-Dspark2.4 -Dscala-2.12 | hudi-spark2.4-bundle_2.12 | For Spark 2.4.4 and Scala 2.12 |
-Dspark3.1 -Dscala-2.12 | hudi-spark3.1-bundle_2.12 | For Spark 3.1.x and Scala 2.12 |
-Dspark3.2 -Dscala-2.12 | hudi-spark3.2-bundle_2.12 | For Spark 3.2.x and Scala 2.12 |
-Dspark3 | hudi-spark3-bundle_2.12 (legacy bundle name) | For Spark 3.2.x and Scala 2.12 |
-Dscala-2.12 | hudi-spark-bundle_2.12 (legacy bundle name) | For Spark 2.4.4 and Scala 2.12 |
?例如
# 针对 Spark 3.2.x 构建
mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12
# 针对 Spark 3.1.x 构建
mvn clean package -DskipTests -Dspark3.1 -Dscala-2.12
# 针对 Spark 2.4.4 和 Scala 2.12 构建
mvn clean package -DskipTests -Dspark2.4 -Dscala-2.12
那么“spark-avro”模块呢?
从 0.11 版本开始,不再需要spark-avro
使用指定Hudi--packages
支持的默认 Flink 版本是 1.14。有关使用不同 Flink 和 Scala 版本进行构建的信息,请参阅下表。
Maven build options | Expected Flink bundle jar name | Notes |
---|---|---|
(empty) | hudi-flink1.14-bundle_2.11 | For Flink 1.14 and Scala 2.11 (default options) |
-Dflink1.14 | hudi-flink1.14-bundle_2.11 | For Flink 1.14 and Scala 2.11 (same as default) |
-Dflink1.14 -Dscala-2.12 | hudi-flink1.14-bundle_2.12 | For Flink 1.14 and Scala 2.12 |
-Dflink1.13 | hudi-flink1.13-bundle_2.11 | For Flink 1.13 and Scala 2.11 |
-Dflink1.13 -Dscala-2.12 | hudi-flink1.13-bundle_2.12 | For Flink 1.13 and Scala 2.12 |
cp /apps/Hudi/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.11.0.jar /apps/hive/lib
nohup hive --service metastore >> metastore.log 2>&1 &
nohup hive --service hiveserver2 >> hiveserver2.log 2>&1 &
正常的查询hive?
Hudi 支持使用 Spark SQL 通过HoodieSparkSessionExtension?sql 扩展来写入和读取数据。从提取的目录中使用 Hudi 运行 Spark SQL:
代码如下(示例):
# Spark 3.2
spark-sql --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
运行中
出现这个情况是因为包的配置--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1
可以把jar包提前拷贝到spark/jars目录下这样可以把这个参数取消如下
spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
?# Spark 3.1
spark-shell \
? --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.11.1 \
? --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'# Spark 2.4
spark-shell \
? --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.11.1 \
? --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
请注意以下事项
- 对于 Spark 3.2,需要额外的 spark_catalog 配置:--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
- 我们使用了为 scala 2.12 构建的 hudi-spark-bundle,因为使用的 spark-avro 模块也可以依赖于 2.12。
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
? options(getQuickstartWriteConfigs).
? option(PRECOMBINE_FIELD_OPT_KEY, "ts").
? option(RECORDKEY_FIELD_OPT_KEY, "uuid").
? option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
? option(TABLE_NAME, tableName).
? mode(Overwrite).
? save(basePath)
// spark-shell
val tripsSnapshotDF = spark.
? read.
? format("hudi").
? load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")spark.sql("select fare, begin_lon, begin_lat, ts from ?hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from ?hudi_trips_snapshot").show()
spark.read.
? format("hudi").
? option("as.of.instant", "20210728141108100").
? load(basePath)spark.read.
? format("hudi").
? option("as.of.instant", "2021-07-28 14:11:08.200").
? load(basePath)// It is equal to "as.of.instant = 2021-07-28 00:00:00"
spark.read.
? format("hudi").
? option("as.of.instant", "2021-07-28").
? load(basePath)
?
// spark-shell
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
? options(getQuickstartWriteConfigs).
? option(PRECOMBINE_FIELD_OPT_KEY, "ts").
? option(RECORDKEY_FIELD_OPT_KEY, "uuid").
? option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
? option(TABLE_NAME, tableName).
? mode(Append).
? save(basePath)
?
// spark-shell
// reload data
spark.
? read.
? format("hudi").
? load(basePath).
? createOrReplaceTempView("hudi_trips_snapshot")val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from ?hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
? option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
? option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
? load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from ?hudi_trips_incremental where fare > 20.0").show()
?
// spark-shell
val beginTime = "000" // Represents all commits > this time.
val endTime = commits(commits.length - 2) // commit time we are interested in//incrementally query data
val tripsPointInTimeDF = spark.read.format("hudi").
? option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
? option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
? option(END_INSTANTTIME_OPT_KEY, endTime).
? load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
// spark-shell
// fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// fetch two records to be deleted
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))df.write.format("hudi").
? options(getQuickstartWriteConfigs).
? option(OPERATION_OPT_KEY,"delete").
? option(PRECOMBINE_FIELD_OPT_KEY, "ts").
? option(RECORDKEY_FIELD_OPT_KEY, "uuid").
? option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
? option(TABLE_NAME, tableName).
? mode(Append).
? save(basePath)// run the same read query as above.
val roAfterDeleteViewDF = spark.
? read.
? format("hudi").
? load(basePath)roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
?
// spark-shell
spark.
? read.format("hudi").
? load(basePath).
? select("uuid","partitionpath").
? sort("partitionpath","uuid").
? show(100, false)val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
? read.json(spark.sparkContext.parallelize(inserts, 2)).
? filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi").
? options(getQuickstartWriteConfigs).
? option(OPERATION.key(),"insert_overwrite").
? option(PRECOMBINE_FIELD.key(), "ts").
? option(RECORDKEY_FIELD.key(), "uuid").
? option(PARTITIONPATH_FIELD.key(), "partitionpath").
? option(TBL_NAME.key(), tableName).
? mode(Append).
? save(basePath)// Should have different keys now for San Francisco alone, from query before.
spark.
? read.format("hudi").
? load(basePath).
? select("uuid","partitionpath").
? sort("partitionpath","uuid").
? show(100, false)
?
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/16 1:39:22- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |