同源类算子
算子用途 | 算子 |
---|
数据转换 | map/mapPartitions/flatMap/filter | 数据聚合 | groupByKey/reduce | 数据准备 | union/sample | 数据预处理 | repartition/coalesce | 结构收集 | first/take/collect |
探索类算子
初步了解并认识数据,如数据模式Schema、数据分布等。
算子用途 | 算子 |
---|
查看数据模式 | columns/schema/printSchema | 查看数据 | show | 查看数据分布 | describe | 查看数据的执行计划 | explain |
import org.apache.spark.sql.DataFrame
import spark.implicits._
val employees = Seq((1, "John", 26, "Male"), (2, "Lily", 28, "Female"), (3, "Raymond", 30, "Male"))
val employeesDF: DataFrame = employees.toDF("id", "name", "age", "gender")
employeesDF.printSchema
清洗类算子
算子用途 | 算子 |
---|
drop | 删除列数据 | distinct | 去重 | dropDuplicates | 指定列去重 | na | null值处理,如df.na.drop, df.na.fill(0) |
employeesDF.show
employeesDF.dropDuplicates("gender").show
转换类算子
作用:数据的生成、提取与转换。
算子用途 | 算子 |
---|
select | 按照列名对数据做投影 | selectExpr | 以SQL语句为参数生成、提取数据,比select更灵活。 | where | 以SQL语句为参数做数据过滤 | withColumnRenamed | 字段重命名 | withColumn | 生成新的数据列 | explode | 爆炸,展开数组类的数据列,不会引入Shuffle。 |
employeesDF.select("name", "gender").show
employeesDF.selectExpr("id", "name", "concat(id, '_', name) as id_name").show
employeesDF.withColumnRenamed(“gender”, “sex”)
employeesDF.withColumn("crypto", hash($"age")).show
val seq = Seq( (1, "John", 26, "Male", Seq("Sports", "News")),
(2, "Lily", 28, "Female", Seq("Shopping", "Reading")),
(3, "Raymond", 30, "Male", Seq("Sports", "Reading"))
)
val employeesDF: DataFrame = seq.toDF("id", "name", "age", "gender", "interests")
employeesDF.show
employeesDF.withColumn("interest", explode($"interests")).show
分析类算子
最为关键的算子。
算子用途 | 算子 |
---|
join | 两个DateFrame之间做数据关联,参数分别是:关联表、关联键、关联形式 | groupBy | 按照某列对数据做分组 | agg | 分组后做数据聚合 | sort / orderBy | 按照某列做排序 |
import spark.implicits._
import org.apache.spark.sql.DataFrame
val seq = Seq((1, "Mike", 28, "Male"), (2, "Lily", 30, "Female"), (3, "Raymond", 26, "Male"))
val employees: DataFrame = seq.toDF("id", "name", "age", "gender")
val seq2 = Seq((1, 26000), (2, 30000), (4, 25000), (3, 20000))
val salaries:DataFrame = seq2.toDF("id", "salary")
employees.show
salaries.show
val jointDF: DataFrame = salaries.join(employees, Seq("id"), "inner")
jointDF.show
val aggResult = fullInfo.groupBy("gender").agg(sum("salary").as("sum_salary"), avg("salary").as("avg_salary"))
aggResult.show
aggResult.sort(desc("sum_salary"), asc("gender")).show
aggResult.orderBy(desc("sum_salary"), asc("gender")).show
持久化类算子
类似read API:
sparkSession.read.format(文件格式).option(选项键, 选项值).load(文件路径)
write API:
dataFrame.write.format(文件格式).opton(选项键, 选项值).save(文件路径)
写入模式
模式名称 | 用法 |
---|
Append | 以追加的方式写入数据。 | OverWrite | 以覆盖的方式写入数据。 | ErrorIfExists | 如果目标存储路径已存在,则报异常。 | Ignore | 如果目标存储路径已存在,则放弃数据写入。 |
内置函数
https://spark.apache.org/docs/3.0.1/api/sql/index.html
|