一、数据准备,json文件
项目处新建people.json文件
创建DataFrame
val df = spark.read.json("input/people.json")
二、DataFrame上的转化操作
1、where操作
df.where("name='Andy'").show()
|age|name| +---+----+ | 30|Andy|
2、查询操作
(1)select 获取指定字段值
df.select("name","age").show()
| ? name| age| +-------+----+ |Michael|null| | ? Andy| ?30| | Justin| ?19|
df.select(df("age")+1).show()
|(age + 1)| +---------+ | ? ? null| | ? ? ? 31| | ? ? ? 20|
(2)selectExpr? 对指定字段进行特殊处理。可以使用函数
df.selectExpr("name as myname","age","round(age)","concat(name,age)").show()
+-------+----+-------------+---------------------------------+ | myname| age|round(age, 0)|concat(name, CAST(age AS STRING))| +-------+----+-------------+---------------------------------+ |Michael|null| ? ? ? ? null| ? ? ? ? ? ? ? ? ? ? ? ? ? ? null| | ? Andy| ?30| ? ? ? ? ? 30| ? ? ? ? ? ? ? ? ? ? ? ? ? Andy30| | Justin| ?19| ? ? ? ? ? 19| ? ? ? ? ? ? ? ? ? ? ? ? Justin19| +-------+----+-------------+---------------------------------+
(3)col: 获取指定字段
df.col("name")
(4)apply: 获取指定列。同col
df.apply("name")
(5)drop: 去除指定字段
返回一个新的DataFrame对象, 其中不包含去除的字段, 一次只能去除一个字段。
df.drop("name").show()
| age| +----+ |null| | ?30| | ?19|
3、limit操作
df.limit(2).show()
| age| ? name| +----+-------+ |null|Michael| | ?30| ? Andy|
4、排序操作
按指定字段排序, 默认为升序。
df.orderBy(df("age").desc,df("name").asc).show()
| age| ? name| +----+-------+ | ?30| ? Andy| | ?19| Justin| |null|Michael|
sort()方法和orderBy()方法用法一致。
5、group by操作
df.groupBy("name").count().show()
| ? name|count| +-------+-----+ |Michael| ? ?1| | ? Andy| ? ?1| | Justin| ? ?1|
df.groupBy("name").min("age").show()
| ? name|min(age)| +-------+--------+ |Michael| ? ?null| | ? Andy| ? ? ?30| | Justin| ? ? ?19|
6、去重操作
(1)distinct: 返回一个不包含重复记录的DataFrame。全字段去重,不指定字段
df.distinct().show()
(2)dropDuplicates: 根据指定字段(可多个字段组合) 去重
df.dropDuplicates("name","age").show()
7、聚合操作
df.groupBy("name").agg("age" -> "min","age" -> "mean","age" -> "max").show()
+-------+--------+--------+--------+ | ? name|min(age)|avg(age)|max(age)| +-------+--------+--------+--------+ |Michael| ? ?null| ? ?null| ? ?null| | ? Andy| ? ? ?30| ? ?30.0| ? ? ?30| | Justin| ? ? ?19| ? ?19.0| ? ? ?19| +-------+--------+--------+--------+
8、合并操作
df.union(df).show()
| age| ? name| +----+-------+ |null|Michael| | ?30| ? Andy| | ?19| Justin| | ?19| Justin| |null|Michael| | ?30| ? Andy| | ?19| Justin| | ?19| Justin|
9、join操作
info.json
{"id":1,"name":"Michael","sex":"man"} {"id":2,"name":"Andy", "sex":"man"} {"id":3,"name":"Justin", "sex":"man"} {"id":3,"name":"Justin", "sex":"man"}
people.json
{"id":1,"name":"Michael"} {"id":2,"name":"Andy", "age":30} {"id":3,"name":"Justin", "age":19} {"id":4,"name":"Justin", "age":19}
(1)单字段join操作
val df1 = spark.read.json("input/info.json")
df.join(df1,"name").show()
+-------+----+---+ | ? name| age|sex| +-------+----+---+ |Michael|null|man| | ? Andy| ?30|man| | Justin| ?19|man| | Justin| ?19|man| | Justin| ?19|man| | Justin| ?19|man| +-------+----+---+
(2)多字段join操作
df.join(df1,Seq("name","id")).show()
+-------+---+----+---+ | ? name| id| age|sex| +-------+---+----+---+ |Michael| ?1|null|man| | ? Andy| ?2| ?30|man| | Justin| ?3| ?19|man| | Justin| ?3| ?19|man| +-------+---+----+---+
(3)指定join类型
inner、 outer、 full、 left、 left_outer、 right、 right_outer
df.join(df1,Seq("name","id"),"left_outer").show()
+-------+---+----+----+ | ? name| id| age| sex| +-------+---+----+----+ |Michael| ?1|null| man| | ? Andy| ?2| ?30| man| | Justin| ?3| ?19| man| | Justin| ?3| ?19| man| | Justin| ?4| ?19|null| +-------+---+----+----+
(4)使用Column类型来join
df.join(df1,df("name")===df1("name"),"left_outer").show()
+----+---+-------+---+-------+---+ | age| id| ? name| id| ? name|sex| +----+---+-------+---+-------+---+ |null| ?1|Michael| ?1|Michael|man| | ?30| ?2| ? Andy| ?2| ? Andy|man| | ?19| ?3| Justin| ?3| Justin|man| | ?19| ?3| Justin| ?3| Justin|man| | ?19| ?4| Justin| ?3| Justin|man| | ?19| ?4| Justin| ?3| Justin|man| +----+---+-------+---+-------+---+
10、操作字段名
(1)withColumnRenamed重命名指定字段名
df.withColumnRenamed("name", "englishname").show()
+----+---+-----------+ | age| id|englishname| +----+---+-----------+ |null| ?1| ? ?Michael| | ?30| ?2| ? ? ? Andy| | ?19| ?3| ? ? Justin| | ?19| ?4| ? ? Justin| +----+---+-----------+
(2)withColumn
往当前DataFrame中新增一列, 该列可来源于本身DataFrame对象, 不可来自其他非己DataFrame对象.
df.withColumn("age*5", df("age")*5).show()
+----+---+-------+-----+ | age| id| ? name|age*5| +----+---+-------+-----+ |null| ?1|Michael| null| | ?30| ?2| ? Andy| ?150| | ?19| ?3| Justin| ? 95| | ?19| ?4| Justin| ? 95| +----+---+-------+-----+
11、处理空值列
(1)drop: 删除指定列为空值的行
df.na.drop().show()
+---+---+------+ |age| id| ?name| +---+---+------+ | 30| ?2| ?Andy| | 19| ?3|Justin| | 19| ?4|Justin| +---+---+------+
(2)fill: 使用指定的值替换指定空值列的值
df.na.fill(Map(("age","123"))).show()
+---+---+-------+ |age| id| ? name| +---+---+-------+ |123| ?1|Michael| | 30| ?2| ? Andy| | 19| ?3| Justin| | 19| ?4| Justin| +---+---+-------+
|