1、读取与保存文件
1.1、读取文本文件
读取以下文本文件
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
1500100004,葛德曜,24,男,理科三班
1500100005,宣谷芹,22,女,理科五班
1500100006,边昂雄,21,男,理科二班
1500100007,尚孤风,23,女,文科六班
1500100008,符半双,22,女,理科六班
1500100009,沈德昌,21,男,理科一班
1500100010,羿彦昌,23,男,理科六班
例:
val spark: SparkSession = SparkSession
.builder()
.appName("readTextFile")
.master("local")
.getOrCreate()
val studentsDF: DataFrame = spark
.read
.schema("id INT,name STRING,age INT,gender STRING,clazz STRING")
.option("seq", ",")
.format("csv")
.load("spark/data/stu/students.txt")
studentsDF.show(10)
1.2、读取MySQL中的数据
例:
val spark: SparkSession = SparkSession
.builder()
.appName("readMySQL")
.master("local")
.getOrCreate()
val stuJDBC: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://master:3306/student")
.option("dbtable", "student")
.option("user", "root")
.option("password", "123456")
.load()
stuJDBC.show(10)
1.3、将数据保存为orc格式
将上述从MySQL中读取的数据保存为orc格式的文件 例:
stuJDBC.write
.mode(SaveMode.Overwrite)
.format("orc")
.save("spark/data/stuORC/")
2、SparkSQL SQL语法
SparkSQL SQL语法中可以执行SQL语句,并且支持开窗函数,可以将写好的SQL语句放入spark.sql()中执行。在此就不多赘述。 下面将举例一个简单的例子
需求:统计每个班级有多少学生 例:
val spark: SparkSession = SparkSession
.builder()
.appName("readTextFile")
.master("local")
.getOrCreate()
val studentsDF: DataFrame = spark
.read
.schema("id INT,name STRING,age INT,gender STRING,clazz STRING")
.option("seq", ",")
.format("csv")
.load("spark/data/stu/students.txt")
studentsDF.createOrReplaceTempView("students")
spark.sql(
"""
|select clazz
| ,count(id) as cnt
|from students
|group by clazz
""".stripMargin).show()
3、SparkSQL DSL语法
DSL含有
- select
- filter
- where
- join
- order by
等等操作
val spark: SparkSession = SparkSession
.builder()
.appName("readTextFile")
.master("local")
.getOrCreate()
val studentsDF: DataFrame = spark
.read
.schema("id INT,name STRING,age INT,gender STRING,clazz STRING")
.option("seq", ",")
.format("csv")
.load("spark/data/stu/students.txt")
import spark.implicits._
studentsDF
.select($"id", $"name", $"age", $"clazz")
.where($"clazz" like "理科%")
.filter(row => {
val age: Int = row.getAs[Int]("age")
age > 23
})
.show()
需求:统计每个班级的人数
val spark: SparkSession = SparkSession
.builder()
.appName("readTextFile")
.master("local")
.getOrCreate()
val studentsDF: DataFrame = spark
.read
.schema("id INT,name STRING,age INT,gender STRING,clazz STRING")
.option("seq", ",")
.format("csv")
.load("spark/data/stu/students.txt")
import spark.implicits._
studentsDF
.groupBy($"clazz")
.count()
.show()
|