向hudi中写入数据
1.添加maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark-bundle -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-bundle_2.11</artifactId>
<version>0.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0</version>
</dependency>
2.编写插入程序
val spark = SparkSession.builder().master("local").config("spark.serializer","org.apache.spark.serializer.KryoSerializer").getOrCreate()
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val insert = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(insert, 2));
df.show()//用于查看表结构和数据
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
运行在目录下会生成相应的目录结构:

一、生成数据的结构:

该实例为出租车的数据: 其数据结构为:
begin_lat,?begin_lon,end_lat,end_lon是出租的起点和重点数据
driver,rider为出租车id和 乘客id
fare为花费金额,partitionpath 为国家信息,uuid为主键
二、在测试程序中设置了一些参数,
hoodie.datasource.write.precombine.field 实际写入之前在preCombining中使用的字段。 当两个记录具有相同的键值时,我们将使用Object.compareTo(..)从precombine字段中选择一个值最大的记录。也就是说这个值类似于hbase的version,当我们读取数据时会取最新的值,所以hudi也不会立即覆盖原有的值而是重新增加了一条新的
hoodie.datasource.write.recordkey.field 主键字段
hoodie.datasource.write.partitionpath.field 分区字段,测试程序中使用国家信息作为分区
三、目录结构
?
?.hoodie 为表的属性,归档目录,hoodie.properties定义了表的相关属性,archive为表的归档目录
.hoodie_partition_metadata定义了分区的属性和提交时间。
四、查询和插入
查询
val basePath = "file:///tmp/hudi_trips_cow"
val spark = SparkSession.builder().master("local").config("spark.serializer","org.apache.spark.serializer.KryoSerializer").getOrCreate()
val roViewDF = spark.
read.
format("org.apache.hudi").
load(basePath + "/*/*/*/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select * from hudi_ro_table").show()
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show()
spark.sql("select count(1) from hudi_ro_table").show()
插入和更新
val spark = SparkSession.builder().master("local").config("spark.serializer","org.apache.spark.serializer.KryoSerializer").getOrCreate()
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val insert = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(insert, 2));
df.show()
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
val updates = convertToStringList(dataGen.generateInserts(10))
val updateDf = spark.read.json(spark.sparkContext.parallelize(updates, 2));
updateDf.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
参考:https://hudi.apachecn.org/docs/0.5.0/comparison.html
|